# Copyright (c) 2022, TU Wien
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import os
import shlex
import subprocess
from typing import Optional, List
from urllib.parse import unquote
from grader_service.autograding.grader_executor import GraderExecutor
from grader_service.autograding.local_feedback import GenerateFeedbackExecutor
from grader_service.handlers.base_handler import GraderBaseHandler, RequestHandlerConfig
from grader_service.orm.assignment import Assignment, AutoGradingBehaviour
from grader_service.orm.group import Group
from grader_service.orm.lecture import Lecture
from grader_service.orm.submission import Submission
from grader_service.orm.takepart import Role, Scope
from grader_service.registry import VersionSpecifier, register_handler
from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound
from tornado.ioloop import IOLoop
from tornado.process import Subprocess
from tornado.web import HTTPError, stream_request_body
from grader_service.server import GraderServer
[docs]class GitBaseHandler(GraderBaseHandler):
[docs] async def data_received(self, chunk: bytes):
return self.process.stdin.write(chunk)
[docs] def write_error(self, status_code: int, **kwargs) -> None:
self.clear()
if status_code == 401:
self.set_header("WWW-Authenticate", 'Basic realm="User Visible Realm"')
self.set_status(status_code)
[docs] def on_finish(self):
if hasattr(
self, "process"
): # if we exit from super prepare (authentication) the process is not created
if self.process.stdin is not None:
self.process.stdin.close()
if self.process.stdout is not None:
self.process.stdout.close()
if self.process.stderr is not None:
self.process.stderr.close()
IOLoop.current().spawn_callback(self.process.wait_for_exit)
[docs] async def git_response(self):
try:
while data := await self.process.stdout.read_bytes(8192, partial=True):
self.write(data)
await self.flush()
except:
pass
def _check_git_repo_permissions(self, rpc: str, role: Role, pathlets: List[str]):
repo_type = pathlets[2]
if role.role == Scope.student:
# 1. no source or release interaction with the source repo for students
# 2. no pull allowed for autograde for students
if (repo_type in ["source", "release"]) or \
(repo_type == "autograde" and rpc == "upload-pack"):
raise HTTPError(403)
# 3. students should not be able to pull other submissions -> add query param for sub_id
if repo_type == "feedback" and rpc == "upload-pack":
try:
sub_id = int(pathlets[3])
except (ValueError, IndexError):
raise HTTPError(403)
submission = self.session.query(Submission).get(sub_id)
if submission is None or submission.username != self.user.name:
raise HTTPError(403)
# 4. no push allowed for autograde and feedback -> the autograder executor can push locally (will bypass this)
if repo_type in ["autograde", "feedback"] and rpc in ["send-pack", "receive-pack"]:
raise HTTPError(403)
[docs] def gitlookup(self, rpc: str):
pathlets = self.request.path.strip("/").split("/")
# pathlets = ['services', 'grader', 'git', 'lecture_code', 'assignment_id', 'repo_type', ...]
if len(pathlets) < 6:
return None
pathlets = pathlets[3:]
lecture_path = os.path.abspath(os.path.join(self.gitbase, pathlets[0]))
assignment_path = os.path.abspath(
os.path.join(self.gitbase, pathlets[0], pathlets[1])
)
repo_type = pathlets[2]
if repo_type not in {
"source",
"release",
"assignment",
"autograde",
"feedback",
}:
return None
# get lecture and assignment if they exist
try:
lecture = (
self.session.query(Lecture).filter(Lecture.code == pathlets[0]).one()
)
except NoResultFound:
raise HTTPError(404)
except MultipleResultsFound:
raise HTTPError(400)
role = self.session.query(Role).get((self.user.name, lecture.id))
self._check_git_repo_permissions(rpc, role, pathlets)
try:
assignment = self.get_assignment(lecture.id, int(pathlets[1]))
except ValueError:
raise HTTPError(404)
if repo_type == "assignment":
repo_type: str = assignment.type
# create directories once we know they exist in the database
if not os.path.exists(lecture_path):
os.mkdir(lecture_path)
if not os.path.exists(assignment_path):
os.mkdir(assignment_path)
path = self.construct_git_dir(repo_type, lecture, assignment)
if path is None:
return None
os.makedirs(os.path.dirname(path), exist_ok=True)
is_git = self.is_base_git_dir(path)
# return git repo
if os.path.exists(path) and is_git:
return path
else:
os.mkdir(path)
# this path has to be a git dir -> call git init
try:
self.log.info("Running: git init --bare")
subprocess.run(["git", "init", "--bare", path], check=True)
except subprocess.CalledProcessError:
return None
if repo_type in ["user", "group"]:
repo_path_release = self.construct_git_dir('release', assignment.lecture, assignment)
if not os.path.exists(repo_path_release):
return None
self.duplicate_release_repo(repo_path_release=repo_path_release, repo_path_user=path,
assignment=assignment, message="Initialize with Release",
checkout_main=True)
return path
@staticmethod
def _create_path(path):
if not os.path.exists(path):
os.mkdir(path)
[docs] def get_gitdir(self, rpc: str):
"""Determine the git repository for this request"""
gitdir = self.gitlookup(rpc)
if gitdir is None:
raise HTTPError(404, "unable to find repository")
self.log.info("Accessing git at: %s", gitdir)
return gitdir
[docs]@register_handler(path="/.*/git-(.*)", version_specifier=VersionSpecifier.NONE)
@stream_request_body
class RPCHandler(GitBaseHandler):
"""Request handler for RPC calls
Use this handler to handle example.git/git-upload-pack and example.git/git-receive-pack URLs"""
[docs] async def prepare(self):
await super().prepare()
self.rpc = self.path_args[0]
self.gitdir = self.get_gitdir(rpc=self.rpc)
self.cmd = f'git {self.rpc} --stateless-rpc "{self.gitdir}"'
self.log.info(f"Running command: {self.cmd}")
self.process = Subprocess(
shlex.split(self.cmd),
stdin=Subprocess.STREAM,
stderr=Subprocess.STREAM,
stdout=Subprocess.STREAM,
)
[docs] async def post(self, rpc):
self.set_header("Content-Type", "application/x-git-%s-result" % rpc)
self.set_header(
"Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
)
await self.git_response()
await self.finish()
[docs]@register_handler(path="/.*/info/refs", version_specifier=VersionSpecifier.NONE)
class InfoRefsHandler(GitBaseHandler):
"""Request handler for info/refs
Use this handler to handle example.git/info/refs?service= URLs"""
[docs] async def prepare(self):
await super().prepare()
if self.get_status() != 200:
return
self.rpc = self.get_argument("service")[4:]
self.cmd = f'git {self.rpc} --stateless-rpc --advertise-refs "{self.get_gitdir(self.rpc)}"'
self.log.info(f"Running command: {self.cmd}")
self.process = Subprocess(
shlex.split(self.cmd),
stdin=Subprocess.STREAM,
stderr=Subprocess.STREAM,
stdout=Subprocess.STREAM,
)
[docs] async def get(self):
self.set_header("Content-Type", "application/x-git-%s-advertisement" % self.rpc)
self.set_header(
"Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
)
prelude = f"# service=git-{self.rpc}\n0000"
size = str(hex(len(prelude))[2:].rjust(4, "0"))
self.write(size)
self.write(prelude)
await self.flush()
await self.git_response()
await self.finish()