Source code for grader_service.main

# Copyright (c) 2022, TU Wien
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

import asyncio
import logging
import os
import secrets
import shlex
import signal
import subprocess
import sys

import tornado
from tornado.httpserver import HTTPServer
from tornado_sqlalchemy import SQLAlchemy
from traitlets import config, Bool
from traitlets import log as traitlets_log
from traitlets import Enum, Int, TraitError, Unicode, observe, validate, default

# run __init__.py to register handlers
import grader_service.handlers
from grader_service.autograding.local_grader import LocalAutogradeExecutor
from grader_service.handlers.base_handler import RequestHandlerConfig
from grader_service.registry import HandlerPathRegistry
from grader_service.server import GraderServer
from grader_service.autograding.grader_executor import GraderExecutor
from grader_service._version import __version__


[docs]class GraderService(config.Application): name = "grader-service" version = __version__ description = """Starts the grader service, which can be used to create and distribute assignments, collect submissions and grade them. """ examples = """ generate default config file: grader-service --generate-config -f /etc/grader/grader_service_config.py spawn the grader service: grader-service -f /etc/grader/grader_service_config.py """ service_host = Unicode(os.getenv("GRADER_SERVICE_HOST", "0.0.0.0"), help="The host address of the service").tag( config=True ) service_port = Int(int(os.getenv("GRADER_SERVICE_PORT", "4010")), help="The port the service runs on").tag( config=True) reuse_port = Bool(False, help="Whether to allow for the specified service port to be reused.").tag(config=True) grader_service_dir = Unicode(os.getenv("GRADER_SERVICE_DIRECTORY"), allow_none=False).tag(config=True) db_url = Unicode(allow_none=False).tag(config=True) @default('db_url') def _default_username(self): service_dir_url = f'sqlite:///{os.path.join(self.grader_service_dir, "grader.db")}' return os.getenv("GRADER_DB_URL", service_dir_url) service_git_username = Unicode("grader-service", allow_none=False).tag(config=True) service_git_email = Unicode("", allow_none=False).tag(config=True) config_file = Unicode( "grader_service_config.py", help="The config file to load" ).tag(config=True) base_url_path = Unicode("/services/grader", allow_none=False).tag(config=True) @validate("config_file") def _validate_config_file(self, proposal): if not os.path.isfile(proposal.value): print( "ERROR: Failed to find specified config file: {}".format( proposal.value ), file=sys.stderr, ) sys.exit(1) return proposal.value flags = { "debug": ( { "Application": { "log_level": logging.DEBUG, }, }, "Set log-level to debug, for the most verbose logging.", ), "show-config": ( { "Application": { "show_config": True, }, }, "Show the application's configuration (human-readable format)", ), "show-config-json": ( { "Application": { "show_config_json": True, }, }, "Show the application's configuration (json format)", ), } aliases = { "log-level": "Application.log_level", "f": "GraderService.config_file", "config": "GraderService.config_file", } log_level = Enum( ["CRITICAL", "FATAL", "ERROR", "WARNING", "WARN", "INFO", "DEBUG", "NOTSET"], "INFO", ).tag(config=True)
[docs] def setup_loggers(self, log_level: str): # pragma: no cover """Handles application, Tornado, and SQLAlchemy logging configuration.""" stream_handler = logging.StreamHandler root_logger = logging.getLogger() root_logger.setLevel(log_level) root_logger.removeHandler( root_logger.handlers[0] ) # remove root handler to prevent duplicate logging fmt = "%(color)s%(levelname)-8s %(asctime)s %(module)-13s |%(end_color)s %(message)s" formatter = tornado.log.LogFormatter(fmt=fmt, color=True, datefmt=None) for log in ("access", "application", "general"): logger = logging.getLogger("tornado.{}".format(log)) if len(logger.handlers) > 0: logger.removeHandler(logger.handlers[0]) logger.setLevel(log_level) handler = stream_handler(stream=sys.stdout) handler.setFormatter(formatter) logger.addHandler(handler) sql_logger = logging.getLogger("sqlalchemy") sql_logger.propagate = False sql_logger.setLevel("WARN") sql_handler = stream_handler(stream=sys.stdout) sql_handler.setLevel("WARN") sql_handler.setFormatter(formatter) sql_logger.addHandler(sql_handler) traitlet_logger = traitlets_log.get_logger() traitlet_logger.removeHandler(traitlet_logger.handlers[0]) traitlet_logger.setLevel(log_level) traitlets_handler = stream_handler(stream=sys.stdout) traitlets_handler.setFormatter(formatter) traitlet_logger.addHandler(traitlets_handler)
[docs] def initialize(self, argv, *args, **kwargs): super().initialize(*args, **kwargs) self.setup_loggers(self.log_level) self.log.info("Starting Initialization...") self._start_future = asyncio.Future() self.parse_command_line(argv) self.log.info("Loading config file...") self.load_config_file(self.config_file)
[docs] async def cleanup(self): pass
[docs] async def start(self): self.log.info(f"Config File: {os.path.abspath(self.config_file)}") self.log.info("Starting Grader Service...") self.io_loop = tornado.ioloop.IOLoop.current() if not os.path.exists(os.path.join(self.grader_service_dir, "git")): os.mkdir(os.path.join(self.grader_service_dir, "git")) # check if git config exits so that git commits don't fail subprocess.run(shlex.split("git config --global init.defaultBranch main")) if subprocess.check_output(shlex.split("git config user.name")).decode("utf-8").strip() == "": subprocess.run(shlex.split(f'git config --global user.name "{self.service_git_username}"')) if subprocess.check_output(shlex.split("git config user.email")).decode("utf-8").strip() == "": subprocess.run(shlex.split(f'git config --global user.email "{self.service_git_email}"')) # pass config to DataBaseManager and GraderExecutor GraderExecutor.config = self.config RequestHandlerConfig.config = self.config handlers = HandlerPathRegistry.handler_list(self.base_url_path) # start the webserver self.http_server: HTTPServer = HTTPServer( GraderServer( grader_service_dir=self.grader_service_dir, base_url=self.base_url_path, handlers=handlers, cookie_secret=secrets.token_hex( nbytes=32 ), # generate new cookie secret at startup config=self.config, db=SQLAlchemy(self.db_url), parent=self, ), # ssl_options=ssl_context, xheaders=True, ) self.log.info(f"db_url - {self.db_url}") self.http_server.bind(self.service_port, address=self.service_host, reuse_port=self.reuse_port) self.http_server.start() for s in (signal.SIGTERM, signal.SIGINT): asyncio.get_event_loop().add_signal_handler( s, lambda s=s: asyncio.ensure_future(self.shutdown_cancel_tasks(s)) ) self.log.info( f"Grader service running at {self.service_host}:{self.service_port}" ) # finish start self._start_future.set_result(None)
[docs] async def shutdown_cancel_tasks(self, sig): """Cancel all other tasks of the event loop and initiate cleanup""" self.log.critical("Received signal %s, initiating shutdown...", sig.name) # For compatibility with python versions 3.6 or earlier. # asyncio.Task.all_tasks() is fully moved to asyncio.all_tasks() starting with 3.9. Also applies to current_task. try: asyncio_all_tasks = asyncio.all_tasks asyncio_current_task = asyncio.current_task except AttributeError as e: asyncio_all_tasks = asyncio.Task.all_tasks asyncio_current_task = asyncio.Task.current_task tasks = [t for t in asyncio_all_tasks() if t is not asyncio_current_task()] if tasks: self.log.debug("Cancelling pending tasks") [t.cancel() for t in tasks] try: await asyncio.wait(tasks) except asyncio.CancelledError as e: self.log.debug("Caught Task CancelledError. Ignoring") except StopAsyncIteration as e: self.log.error("Caught StopAsyncIteration Exception", exc_info=True) tasks = [t for t in asyncio_all_tasks()] for t in tasks: self.log.debug("Task status: %s", t) await self.cleanup() asyncio.get_event_loop().stop()
[docs] async def launch_instance_async(self, argv=None): try: self.initialize(argv) await self.start() except Exception as e: self.log.exception(e) self.exit(1)
[docs] @classmethod def launch_instance(cls, argv=None): self = cls.instance() loop = tornado.ioloop.IOLoop.current() task = asyncio.ensure_future(self.launch_instance_async(argv)) try: loop.start() except KeyboardInterrupt: print("\nInterrupted") finally: if task.done(): # re-raise exceptions in launch_instance_async task.result() loop.stop()
@validate("grader_service_dir") def _validate_service_dir(self, proposal): path: str = proposal["value"] if not os.path.isabs(path): raise TraitError("The path is not absolute") if not os.path.isdir(path): raise TraitError("The path has to be an existing directory") return path @observe("grader_service_dir") def _observe_service_dir(self, change): path = change["new"] git_path = os.path.join(path, "git") if not os.path.isdir(git_path): os.mkdir(git_path, mode=664)
main = GraderService.launch_instance if __name__ == "__main__": main(sys.argv)