# Copyright (c) 2022, TU Wien
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import asyncio
import json
import os
import shlex
import shutil
from asyncio import Future, Task
from contextlib import contextmanager
from pathlib import Path
from kubernetes.client import V1Pod, CoreV1Api, V1ObjectMeta, V1PodStatus, ApiException
from traitlets import Callable, Unicode, Integer, Dict, List
from traitlets.config import LoggingConfigurable
from urllib3.exceptions import MaxRetryError
from grader_service.autograding.kube.util import make_pod
from grader_service.autograding.local_grader import LocalAutogradeExecutor, rm_error
from kubernetes import config, client
from grader_service.orm import Lecture, Submission
from grader_service.orm import Assignment
[docs]class GraderPod(LoggingConfigurable):
"""
Wrapper for a kubernetes pod that supports polling of the pod's status.
"""
poll_interval = Integer(default_value=1000, allow_none=False,
help="Time in ms to wait before status is polled again.").tag(config=True)
def __init__(self, pod: V1Pod, api: CoreV1Api, **kwargs):
super().__init__(**kwargs)
self.pod = pod
self._client = api
self.loop = asyncio.get_event_loop()
self._polling_task = self.loop.create_task(self._poll_status())
[docs] def stop_polling(self) -> None:
self._polling_task.cancel()
@property
def polling(self) -> Task:
return self._polling_task
@property
def name(self) -> str:
return self.pod.metadata.name
@property
def namespace(self) -> str:
return self.pod.metadata.namespace
# https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase
async def _poll_status(self) -> str:
meta: V1ObjectMeta = self.pod.metadata
while True:
status: V1PodStatus = self._client.read_namespaced_pod_status(name=meta.name,
namespace=meta.namespace).status
if status.phase == "Succeeded" or status.phase == "Failed":
return status.phase
# continue for Running, Unknown and Pending
await asyncio.sleep(self.poll_interval / 1000)
def _get_image_name(lecture: Lecture, assignment: Assignment = None) -> str:
"""
Default implementation of the default_image_name method which return the lecture code followed by '_image'.
All the functions have the lecture and assignment available as parameters.
:param lecture: Lecture to build the image name.
:param assignment: Assignment to build the image name.
:return: The image name as a string.
"""
return f"{lecture.code}_image"
[docs]class KubeAutogradeExecutor(LocalAutogradeExecutor):
"""
Runs an autograde job in a kubernetes cluster as a pod. The cluster has to have a shared persistent
volume claim that is mounted in the input and output directories so that both the service and
the executor pods have access to the files. The service account of the grader service has to have
permission to get, update, create and delete pods, pod status and pod logs.
"""
image_config_path = Unicode(default_value=None, allow_none=True).tag(config=True)
default_image_name = Callable(default_value=_get_image_name, allow_none=False).tag(config=True)
kube_context = Unicode(default_value=None, allow_none=True,
help="Kubernetes context to load config from. " +
"If the context is None (default), the incluster config will be used.").tag(config=True)
volumes = List(default_value=[], allow_none=False).tag(config=True)
volume_mounts = List(default_value=[], allow_none=False).tag(config=True)
def __init__(self, grader_service_dir: str, submission: Submission, **kwargs):
super().__init__(grader_service_dir, submission, **kwargs)
self.assignment = self.submission.assignment
self.lecture = self.assignment.lecture
if self.kube_context is None:
self.log.info(f"Loading in-cluster config for kube executor of submission {self.submission.id}")
config.load_incluster_config()
else:
self.log.info(
f"Loading cluster config '{self.kube_context}' for kube executor of submission {self.submission.id}")
config.load_kube_config(context=self.kube_context)
self.client = CoreV1Api()
[docs] def get_image(self) -> str:
"""
Returns the image name based on the lecture and assignment. If an image config file exists and has
been specified it will first be queried for an image name. If the image name cannot be found in the
config file or none has been specified the image name will be determined by the default_image_name function
which takes the lecture and assignment as parameters and is specified in the config.
The default implementation of this function is to return the lecture code followed by '_image'.
:return: The image name as determined by this method.
"""
cfg = {}
if self.image_config_path is not None:
with open(self.image_config_path, "r") as f:
cfg = json.load(f)
try:
lecture_cfg = cfg[self.lecture.code]
if isinstance(lecture_cfg, str):
return lecture_cfg
else:
return lecture_cfg[self.assignment.name]
except KeyError:
return self.default_image_name(self.lecture, self.assignment)
[docs] def start_pod(self) -> GraderPod:
"""
Starts a pod in the default namespace with the commit hash as the name of the pod. The image is
determined by the get_image method.
:return:
"""
# The output path will not exist in the pod
command = f'{self.convert_executable} autograde ' \
f'-i "{self.input_path}" ' \
f'-o "{self.output_path}" ' \
f'-p "*.ipynb" --log-level=INFO'
# command = "sleep 10000"
pod = make_pod(
name=self.submission.commit_hash,
cmd=shlex.split(command),
image=self.get_image(),
image_pull_policy=None,
working_dir="/",
volumes=self.volumes,
volume_mounts=self.volume_mounts,
labels=None,
annotations=None,
tolerations=None,
)
self.log.info(f"Starting pod {pod.metadata.name} with command: {command}")
pod = self.client.create_namespaced_pod(namespace="default", body=pod)
return GraderPod(pod, self.client, config=self.config)
async def _run(self):
"""
Runs the autograding process in a kubernetes pod which has to have access to the files in the
input and output directory through a persistent volume claim.
:return: Coroutine
"""
if os.path.exists(self.output_path):
shutil.rmtree(self.output_path, onerror=rm_error)
os.mkdir(self.output_path)
self._write_gradebook(self.submission.assignment.properties)
grader_pod = None
try:
grader_pod = self.start_pod()
self.log.info(f"Started pod {grader_pod.name} in namespace {grader_pod.namespace}")
status = await grader_pod.polling
self.grading_logs = self._get_pod_logs(grader_pod)
self.log.info("Pod logs:\n" + self.grading_logs)
if status == "Succeeded":
self.log.info("Pod has successfully completed execution!")
else:
self.log.info("Pod has failed execution:")
self._delete_pod(grader_pod)
raise RuntimeError("Pod has failed execution!")
# cleanup
self._delete_pod(grader_pod)
except ApiException as e:
error_message = json.loads(e.body)
if error_message["reason"] != "AlreadyExists" and grader_pod is not None:
try:
self.client.delete_namespaced_pod(name=grader_pod.name, namespace=grader_pod.namespace)
except ApiException:
pass
self.log.error(f'{error_message["reason"]}: {error_message["message"]}')
raise RuntimeError("Pod has failed execution!")
except MaxRetryError:
self.log.error("Kubernetes client could not connect to cluster! Is it running and specified correctly?")
raise RuntimeError("Pod has failed execution!")
def _delete_pod(self, pod: GraderPod):
"""
Deletes the pod from the cluster after successful or failed execution.
:param pod: The pod to delete.
:return: None
"""
self.log.info(
f"Deleting pod '{pod.name}' in namespace '{pod.namespace}' after execution status {pod.polling.result()}")
self.client.delete_namespaced_pod(name=pod.name, namespace=pod.namespace)
def _get_pod_logs(self, pod: GraderPod) -> str:
"""
Returns the logs of the pod that were output during execution.
:param pod: The pod to retrieve the logs from.
:return: The logs as a string.
"""
api_response: str = self.client.read_namespaced_pod_log(name=pod.name, namespace=pod.namespace)
return api_response.strip()