Commit ed06cf33 authored by Daniel Lehmberg's avatar Daniel Lehmberg
Browse files

refactoring from comments

parent 84f289fc
Pipeline #135171 failed with stages
in 39 minutes and 25 seconds
......@@ -16,12 +16,8 @@
# limitations under the License.
#
from nomad.app.v1.routers.landscape import OasisInfo
import nomad.config as cfg
from typing import Any
import time
import requests
from fastapi import FastAPI, status, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, RedirectResponse
......@@ -29,15 +25,12 @@ import traceback
import orjson
from nomad import config, utils
from nomad.app.v1.routers.federation import SubmitInstallationInfoToCentral, default_tag
from .common import root_path
from .routers import users, entries, materials, auth, info, datasets, uploads, suggestions, landscape
import nest_asyncio # TODO: I needed this for testing
from .routers import users, entries, materials, auth, info, datasets, uploads, suggestions, federation
logger = utils.get_logger(__name__)
nest_asyncio.apply()
class ORJSONResponse(JSONResponse):
......@@ -77,86 +70,18 @@ async def redirect_to_docs(req: Request):
# app.add_route(f'{root_path}', redirect_to_docs, include_in_schema=False)
app.add_route('/', redirect_to_docs, include_in_schema=False)
local_to_central_link = SubmitInstallationInfoToCentral()
class OasisSubmitToCentral(object):
last_timestamp = None
interval_submit_sec = 2
running_since = None
# TODO: later this needs to be a client to send information to the central Oasis
# Currently I am setting this to the TestClient in the tests
client = None
def _collect_data_from_local_oasis(self):
# TODO: can reuse the class specification?
info = OasisInfo(
oasis_id=cfg.meta["deployment_id"],
oasis_url=cfg.client["url"],
oasis_maintainer_email=cfg.meta["maintainer_email"],
homepage=cfg.meta["homepage"],
nomad_version=cfg.meta["version"],
first_submit=self.running_since,
telemetry={"nr_of_datasets": 5, "nr_of_entries": 100} # TODO: how to obtain telemetry data properly?
)
return info
def _submit_to_central_oasis(self, message) -> requests.models.Response:
print("submit content")
post = self.client.post(f"{self.client.base_url}landscape", json=dict(message))
return post
async def check_and_submit_to_central(self):
now = time.time()
if self.last_timestamp is None or now - self.last_timestamp >= OasisSubmitToCentral.interval_submit_sec:
if self.last_timestamp is None:
print("initial submit")
from datetime import datetime
self.running_since = datetime.utcfromtimestamp(time.time()).strftime("UTC-%Y-%M-%dT%H:%M:%S")
else:
print("re-submit")
self.last_timestamp = now
try:
post_content = self._collect_data_from_local_oasis()
failed_to_collect = False
except Exception as e:
# TODO: probably good to log if it fails...
post_content = None
failed_to_collect = True
if not failed_to_collect:
try:
ret_request = self._submit_to_central_oasis(post_content)
if not isinstance(ret_request, requests.models.Response) or ret_request.status_code != 200:
raise ValueError("") # TODO: better error
failed_to_submit = False
except Exception as e:
# TODO: what to do if only the submission failed?
# -- drop content or store last X cases and try to submit later?
failed_to_submit = True
print(f"set self.last_timestamp={self.last_timestamp}")
else:
print("Too early to re-submit") # TODO: (remove, only for debugging)
return None
oasis_to_central = OasisSubmitToCentral()
@app.middleware("http")
@app.middleware('http')
async def submit_oasis_info(request: Request, call_next):
"""FastAPI middleware that checks to submit information of the local to the central Oasis.
"""
if "landscape" not in str(request.url): # avoid recursion if using same client when using post/landscape
print("--------------------------")
print(f"Execute oasis submit on request{request}")
await oasis_to_central.check_and_submit_to_central()
else:
print("performed middleware for landscape post")
'''FastAPI middleware that checks to submit information of the local to the central Oasis.'''
if default_tag not in str(request.url): # avoid recursion if using same client when using post/federation
is_successful = await local_to_central_link.check_and_submit_to_central()
logger.info('Attempted to submit installation information to central Nomad.', is_successful=is_successful)
return await call_next(request)
......@@ -176,7 +101,7 @@ async def unicorn_exception_handler(request: Request, e: Exception):
)
app.include_router(info.router, prefix='/info')
app.include_router(landscape.router, prefix='/landscape')
app.include_router(federation.router, prefix='/federation')
app.include_router(auth.router, prefix='/auth')
app.include_router(materials.router, prefix='/materials')
app.include_router(entries.router, prefix='/entries')
......
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
'''
API endpoint to send information from local to central Nomad where the data can be processed/stored.
'''
from fastapi.routing import APIRouter
from pydantic.fields import Field
from pydantic.main import BaseModel
import nomad.infrastructure as infrastructure
import requests
import time
import nomad.config as config
from datetime import datetime
router = APIRouter()
default_tag = 'federation'
class InstallationInfo(BaseModel):
'''Information to be sent from local installations to central Nomad.'''
oasis_id: str = Field(None, description='Oasis ID')
oasis_url: str = Field(None, description='Oasis URL')
oasis_maintainer_email: str = Field(None, description='Email of oasis maintainer / admin.')
homepage: str = Field(None, description='Associated homepage (e.g. of institution).')
nomad_version: str = Field(None, description='Installed Nomad version.')
first_submit: str = Field(None, description='First submit from current oasis deployment')
telemetry: dict = Field(None, description='Telemetry data on the Oasis (e.g. number of uploads or entries).')
class SubmitInstallationInfoToCentral(object):
last_timestamp = None
running_since = None
_timestamp_strptime = "UTC-%Y-%m-%dT%H:%M:%S"
@staticmethod
def initial_submit():
return SubmitInstallationInfoToCentral.running_since is None
@staticmethod
def _utc_str_timestamp(time: float):
return datetime.utcfromtimestamp(time).strftime(SubmitInstallationInfoToCentral._timestamp_strptime)
def _collect_data_from_local_oasis(self):
info = InstallationInfo(
oasis_id=config.meta.deployment_id,
oasis_url=config.client.url,
oasis_maintainer_email=config.meta.maintainer_email,
homepage=config.meta.homepage,
nomad_version=config.meta.version,
first_submit=SubmitInstallationInfoToCentral.running_since,
telemetry={'nr_of_datasets': 5, 'nr_of_entries': 100} # TODO: how to obtain telemetry data properly?
)
return info
def _submit_info_to_central_oasis(self, post_message: BaseModel) -> requests.models.Response:
central_post_federation_url = f'{config.oasis.central_nomad_api_url}/federation/installation_info/'
return requests.post(central_post_federation_url, post_message.json())
async def check_and_submit_to_central(self):
successful = True # re-set to False if not successful
time_now = time.time()
is_submit = self.initial_submit() or \
time_now - SubmitInstallationInfoToCentral.last_timestamp >= config.oasis.sec_submit_info_to_central
if is_submit:
if self.initial_submit():
SubmitInstallationInfoToCentral.running_since = self._utc_str_timestamp(time_now)
# always set timestamp, even if the transfer fails at some point in the following part
# (then try next time after time interval set in config.oasis.sec_submit_info_to_central)
SubmitInstallationInfoToCentral.last_timestamp = time_now
try:
post_content = self._collect_data_from_local_oasis()
failed_to_collect = False
except Exception:
post_content = None
failed_to_collect = True
failed_to_submit = True # re-set if submit is successful
if not failed_to_collect:
try:
ret_request = self._submit_info_to_central_oasis(post_content)
if ret_request.status_code == 200:
failed_to_submit = False
except Exception:
pass
successful = not (failed_to_submit or failed_to_collect)
return successful
@router.post('/installation_info/', tags=[default_tag], response_model=InstallationInfo,
summary='Receive information from local Nomad installations and store it into MongoDB collection.')
async def installation_info(info: InstallationInfo):
db = infrastructure.mongo_client[config.mongo.db_name]
# use separate collection "installation_info" to store the infos into
db.installation_info.insert_one(dict(info))
return info
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
'''
API endpoint that store/send information between local/central OASIS.
'''
from fastapi.routing import APIRouter
from pydantic.fields import Field
from pydantic.main import BaseModel
from nomad.infrastructure import setup_mongo
router = APIRouter()
default_tag = 'landscape'
class OasisInfo(BaseModel):
# TODO: can also include a counter of how often a local oasis has sent information so far
oasis_id: str = Field(None, description="Oasis ID")
oasis_url: str = Field(None, description="Oasis URL")
oasis_maintainer_email: str = Field(None, description="Email of oasis maintainer / admin.")
homepage: str = Field(None, description="Associated homepage (e.g. of institution).")
nomad_version: str = Field(None, description="Installed Nomad version.")
first_submit: str = Field(None, description="First submit from current oasis deployment")
telemetry: dict = Field(None, description="Telemetry data on the Oasis (e.g. number of uploads or entries).")
def __repr__(self):
# TODO: remove, just for debugging
return f"self.oasis_id={self.oasis_id} -- " \
f"self.oasis_url{self.oasis_url} -- " \
f"self.nomad_version={self.nomad_version} -- " \
f"self.telemetry{self.telemetry} "
mongodb = setup_mongo()
@router.post("", tags=[default_tag], response_model=OasisInfo,
summary='Receives information from a local OASIS and stores it.')
async def local_oasis_info(info: OasisInfo):
"""Receives information from local Oasis installations and stores it into MongoDB.
# TODO: only provide this for the central Oasis (the local Oasis should not have this...)
Args:
info:
Returns:
"""
print(f"Received information on landscape with info{info}")
db = mongodb.nomad_v1_test.landscape # set up own separate Collection landscape to store the data to.
result = db.insert_one(dict(info))
print(f"Stored in mongodb result.acknowledged={result.acknowledged} with result.inserted_id={result.inserted_id}")
return info
def print_mongodb_content():
mongodb = setup_mongo()
db = mongodb.nomad_v1_test.landscape
print(f"There are len(db.find())={len(list(db.find()))} objects stored")
for s in db.find():
print(s)
......@@ -195,7 +195,8 @@ oasis = NomadConfig(
central_nomad_deployment_id='nomad-lab.eu/prod/v1',
allowed_users=None, # a list of usernames or user account emails
uses_central_user_management=False,
is_oasis=False
is_oasis=False,
sec_submit_info_to_central=60 * 60 * 24 # submit every day once
)
tests = NomadConfig(
......
import time
from nomad.app.v1.main import SubmitInstallationInfoToCentral
import nomad.config as config
import nomad.infrastructure as infrastructure
from datetime import datetime
import requests
from nomad.app.v1.routers.federation import InstallationInfo
import nest_asyncio
nest_asyncio.apply()
# TODO: without nest_asyncio the following error comes up:
# TODO: self = <_UnixSelectorEventLoop running=False closed=False debug=False>
# def _check_running(self):
# if self.is_running():
# > raise RuntimeError('This event loop is already running')
def test_federation(federation, client, api_v1, mongo, print_collection=False):
time_start = time.time() # use for a test
time_pause = 1.01 # assuming an interval of 1 sec set in federation pytest.fixture
# request users via REST to invoke the FastAPI middleware
path_get_users = "api/v1/users"
r1 = client.get(path_get_users) # initial submit
r2 = client.get(path_get_users) # it is too soon to re-submit
time.sleep(time_pause)
r3 = client.get(path_get_users) # resubmit new data (assuming interval is 1 sec. for testing)
for r in [r1, r2, r3]:
# sanity check: all example requests to trigger the federation middleware should be successful
assert r.status_code == 200
db = infrastructure.mongo_client[config.mongo.db_name]
install_collection = db.installation_info
assert isinstance(SubmitInstallationInfoToCentral.last_timestamp, float)
assert isinstance(SubmitInstallationInfoToCentral.running_since, str)
assert not SubmitInstallationInfoToCentral.initial_submit()
assert SubmitInstallationInfoToCentral.last_timestamp - time_start >= time_pause
assert len(list(install_collection.find())) == 2
actual = datetime.strptime(SubmitInstallationInfoToCentral.running_since, SubmitInstallationInfoToCentral._timestamp_strptime)
expected = datetime.utcfromtimestamp(time_start)
assert actual.year == expected.year
assert actual.month == expected.month
assert actual.day == expected.day
if print_collection: # useful for interactive debugging sessions
for c in install_collection.find():
print(c)
def test_post_installation_info(federation, api_v1, mongo):
# dummy installation info
expected = InstallationInfo(
oasis_id=999,
oasis_url='www.testoasis.org',
oasis_maintainer_email='test@test.de',
homepage='www.invalid.url',
nomad_version="v1.0test",
first_submit=SubmitInstallationInfoToCentral._utc_str_timestamp(time.time()),
telemetry=None
)
actual_return = requests.post(f'{config.oasis.central_nomad_api_url}/federation/installation_info/', expected.json())
# test that returned object is the same from post
assert actual_return.status_code == 200
assert actual_return.json() == dict(expected)
db = infrastructure.mongo_client[config.mongo.db_name]
federation_collection = db.installation_info
# test that the stored object in mongo collection is the same object
assert len(list(federation_collection.find())) == 1
actual_db = federation_collection.find()[0]
actual_db.pop("_id") # remove key inserted in from mongodb
assert actual_db == dict(expected)
......@@ -42,6 +42,7 @@ from nomad.utils import structlogging
from nomad.archive import write_archive, read_archive, write_partial_archive_to_mongo
from nomad.processing import ProcessStatus
from nomad.app.main import app
from nomad.app.v1.routers.federation import SubmitInstallationInfoToCentral
from nomad.utils.exampledata import ExampleData
from tests.parsing import test_parsing
......@@ -209,6 +210,20 @@ def mongo(mongo_infra):
return clear_mongo(mongo_infra)
@pytest.fixture(scope='session')
def federation_infra(monkeysession):
'''Overwrites the Oasis configuration to test the federation API.'''
# decrease the interval value to only 1 second to re-trigger the "SubmitInstallationInfoToCentral" more frequently
monkeysession.setattr('nomad.config.oasis.sec_submit_info_to_central', 1)
@pytest.fixture(scope="function")
def federation(federation_infra):
'''Reset attrs in SubmitInstallationInfoToCentral (within federation API).'''
SubmitInstallationInfoToCentral.last_timestamp = None
SubmitInstallationInfoToCentral.running_since = None
@pytest.fixture(scope='session')
def elastic_infra(monkeysession):
''' Provides elastic infrastructure to the session '''
......@@ -980,7 +995,8 @@ def api_v1(monkeysession):
test_client = TestClient(app, base_url='http://testserver/api/v1/')
def call_test_client(method, url, *args, **kwargs):
url = url.replace(f'{config.client.url}/v1/', '')
url = url.replace(f'{config.client.url}/', '') # uses http
url = url.replace(f'{config.oasis.central_nomad_api_url}/', '') # uses https
return getattr(test_client, method)(url, *args, **kwargs)
monkeysession.setattr('requests.get', lambda *args, **kwargs: call_test_client('get', *args, **kwargs))
......
import time
def test_landscape(api_v1):
from nomad.app.v1.main import OasisSubmitToCentral
from nomad.app.v1.routers.landscape import print_mongodb_content
OasisSubmitToCentral.client = api_v1
# TODO: why does api_v1 not prepend the base_url itself?
api_v1.get(f"{api_v1.base_url}users") # get on users invokes the FastAPI middleware
api_v1.get(f"{api_v1.base_url}users") # it is too soon to re-submit data
time.sleep(3)
api_v1.get(f"{api_v1.base_url}users") # now resubmit new data (assuming the interval is 2 sec. for testing)
print_mongodb_content()
print("Finished successfully")
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment