Skip to content
Snippets Groups Projects

Added a simple route response and some testing

Files
3
@@ -17,14 +17,61 @@
#
from typing import List
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException, Request, Depends
from collections import deque
from datetime import datetime, timedelta
import time
import docker
from docker import DockerClient
from north.app.models import InstanceModel
from north.app.models import InstanceModel, InstanceResponseModel, ChannelUnavailableError
from north import config
router = APIRouter()
router_tag = 'instances'
def get_http_date_in(minutes=1) -> str:
# https://httpwg.org/specs/rfc7231.html#header.date
return time.strftime(
'%a, %d %b %Y %H:%M:%S GMT',
(datetime.utcnow() + timedelta(minutes=minutes))
.timetuple())
def get_docker_client() -> DockerClient:
if config.docker_url:
docker_client = DockerClient(base_url=config.docker_url)
else:
docker_client = docker.from_env()
return docker_client
# Placeholder code for something that retains information of what container channel are available
# Todo: Add channel number tag to the docker container created and use that as a store
available_channel = deque(['0', '1', '2', '3'])
def get_available_channel() -> str:
try:
channel = available_channel.popleft()
return channel
except IndexError as channel_unavailable:
raise HTTPException(
status_code=503,
detail="channel are currently occupied.",
headers={"Retry-After": get_http_date_in()}
) from channel_unavailable
# temp current_user placeholder. I want to use the user id as
# a tag to filter out containers. Docker is source of truth.
# Todo: Implement in the JWT issue and change the return from str to BaseModel
async def get_current_active_user():
return 'test'
@router.get(
'/',
tags=[router_tag],
@@ -39,9 +86,44 @@ async def get_instances():
@router.post(
'/',
tags=[router_tag],
response_model=InstanceModel,
response_model=InstanceResponseModel,
response_model_exclude_unset=True,
response_model_exclude_none=True)
async def post_instances(instance: InstanceModel):
response_model_exclude_none=True,
responses={
200: {
"description": "Returns the path to the launched instance",
"content": {
"application/json": {
"example": {"path": "/container/0/"}
}
}
},
503: {"model": ChannelUnavailableError, "description": "Raises a 503 HTTP error when channels are unavailable"}
})
async def post_instances(request: Request, instance: InstanceModel, current_user: str = Depends(get_current_active_user)):
''' Create a new tool instance. '''
return instance
channel = get_available_channel()
path = f'{request.scope.get("root_path")}/container/{channel}/'
container_name = f'{config.docker_name_prefix}-{current_user}-{instance.name}'
docker_client = get_docker_client()
# Does the user have this service running - Todo: Move to a nice class that handles this in simple funcs
docker_name_prefix_filter = dict(filters=dict(name=container_name))
current_containers = docker_client.containers.list(**docker_name_prefix_filter)
if len(current_containers) != 0:
path = current_containers[0].labels['path']
return InstanceResponseModel(path=path)
docker_client.containers.run(
image="jupyter/datascience-notebook",
command=f'start-notebook.sh --NotebookApp.base_url={path}',
ports={"8888": int(f'1000{channel}')},
detach=True,
name=container_name,
user="1000:1000",
group_add=["1000"],
labels={"path": path}
)
return InstanceResponseModel(path=path)
Loading