Commit ccf8fcd4 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added oasis-nomad-publish metadata to uploads.

parent aa723bde
Pipeline #89297 passed with stages
in 29 minutes and 15 seconds
......@@ -122,9 +122,10 @@ upload_metadata_parser = api.parser()
upload_metadata_parser.add_argument('name', type=str, help='An optional name for the upload.', location='args')
upload_metadata_parser.add_argument('local_path', type=str, help='Use a local file on the server.', location='args')
upload_metadata_parser.add_argument('token', type=str, help='Upload token to authenticate with curl command.', location='args')
upload_metadata_parser.add_argument('oasis_upload_id', type=str, help='Use if this is an upload from an OASIS to the central NOMAD and set it to the upload_id.', location='args')
upload_metadata_parser.add_argument('oasis_uploader', type=str, help='Use if this is an upload from an OASIS to the central NOMAD and set it to the uploader\' id.', location='args')
upload_metadata_parser.add_argument('file', type=FileStorage, help='The file to upload.', location='files')
upload_metadata_parser.add_argument('oasis_upload_id', type=str, help='Use if this is an upload from an OASIS to the central NOMAD and set it to the upload_id.', location='args')
upload_metadata_parser.add_argument('oasis_uploader_id', type=str, help='Use if this is an upload from an OASIS to the central NOMAD and set it to the uploader\' id.', location='args')
upload_metadata_parser.add_argument('oasis_deployment_id', type=str, help='Use if this is an upload from an OASIS to the central NOMAD and set it to the OASIS\' deployment id.', location='args')
upload_list_parser = pagination_request_parser.copy()
......@@ -254,7 +255,8 @@ class UploadListResource(Resource):
# check if allowed to perform oasis upload
oasis_upload_id = request.args.get('oasis_upload_id')
oasis_uploader_id = request.args.get('oasis_uploader')
oasis_uploader_id = request.args.get('oasis_uploader_id')
oasis_deployment_id = request.args.get('oasis_deployment_id')
user = g.user
from_oasis = oasis_upload_id is not None
if from_oasis:
......@@ -262,9 +264,13 @@ class UploadListResource(Resource):
abort(401, 'Only an oasis admin can perform an oasis upload.')
if oasis_uploader_id is None:
abort(400, 'You must provide the original uploader for an oasis upload.')
if oasis_deployment_id is None:
abort(400, 'You must provide the oasis deployment id for an oasis upload.')
user = datamodel.User.get(user_id=oasis_uploader_id)
if user is None:
abort(400, 'The given original uploader does not exist.')
elif oasis_uploader_id is not None or oasis_deployment_id is not None:
abort(400, 'For an oasis upload you must provide an oasis_upload_id.')
upload_name = request.args.get('name')
if oasis_upload_id is not None:
......@@ -336,7 +342,8 @@ class UploadListResource(Resource):
upload_time=datetime.utcnow(),
upload_path=upload_path,
temporary=local_path != upload_path,
from_oasis=from_oasis)
from_oasis=from_oasis,
oasis_deployment_id=oasis_deployment_id)
upload.process_upload()
logger.info('initiated processing')
......
......@@ -159,8 +159,12 @@ services = NomadConfig(
upload_limit=10,
force_raw_file_decoding=False,
download_scan_size=500,
download_scan_timeout=u'30m',
central_nomad_api_url='https://nomad-lab.eu/prod/rae/api'
download_scan_timeout=u'30m'
)
oasis = NomadConfig(
central_nomad_api_url='https://nomad-lab.eu/prod/rae/api',
central_nomad_deployment_id='nomad-lab.eu/prod/rae'
)
tests = NomadConfig(
......@@ -285,7 +289,8 @@ meta = NomadConfig(
description='A FAIR data sharing platform for materials science data',
homepage='https://nomad-lab.eu',
source_url='https://gitlab.mpcdf.mpg.de/nomad-lab/nomad-FAIR',
maintainer_email='markus.scheidgen@physik.hu-berlin.de'
maintainer_email='markus.scheidgen@physik.hu-berlin.de',
deployment_id='nomad-lab.eu/prod/rae'
)
auxfile_cutoff = 100
......
......@@ -29,7 +29,7 @@ calculations, and files
'''
from typing import cast, List, Any, Tuple, Iterator, Dict, cast, Iterable
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField, ListField
import logging
from structlog import wrap_logger
from contextlib import contextmanager
......@@ -665,16 +665,23 @@ class Upload(Proc):
and processing state.
Attributes:
name: optional user provided upload name
upload_path: the path were the uploaded files was stored
temporary: True if the uploaded file should be removed after extraction
upload_id: the upload id generated by the database
upload_time: the timestamp when the system realized the upload
user_id: the id of the user that created this upload
published: Boolean that indicates the publish status
publish_time: Date when the upload was initially published
last_update: Date of the last publishing/re-processing
joined: Boolean indicates if the running processing has joined (:func:`check_join`)
name: Optional user provided upload name.
upload_path: The fs path were the uploaded files was stored during upload.
temporary: True if the uploaded file should be removed after extraction.
upload_id: The upload id generated by the database or the uploaded NOMAD deployment.
upload_time: Datetime of the original upload independent of the NOMAD deployment
it was first uploaded to.
user_id: The id of the user that created this upload.
published: Boolean that indicates that the upload is published on this NOMAD deployment.
publish_time: Datetime when the upload was initially published on this NOMAD deployment.
last_update: Datetime of the last modifying process run (publish, re-processing, upload).
from_oasis: Boolean indicating that this upload is coming from another NOMAD deployment.
oasis_id: The deployment id of the NOMAD that uploaded the upload.
published_to: A list of deployment ids where this upload has been successfully uploaded to.
joined: Boolean indicates if the running processing has joined (:func:`check_join`).
'''
id_field = 'upload_id'
......@@ -691,6 +698,9 @@ class Upload(Proc):
last_update = DateTimeField()
from_oasis = BooleanField(default=False)
oasis_deployment_id = StringField(default=None)
published_to = ListField(StringField())
joined = BooleanField(default=False)
meta: Any = {
......@@ -888,16 +898,20 @@ class Upload(Proc):
Uploads the already published upload to a different NOMAD deployment. This allows
to push uploads from an OASIS to the central NOMAD.
'''
assert self.published, 'Only published uploads can be published to the central NOMAD.'
# TODO check if it might be there
assert self.published, \
'Only published uploads can be published to the central NOMAD.'
assert config.oasis.central_nomad_deployment_id not in self.published_to, \
'Upload is already published to the central NOMAD.'
# create a nomad.json with all necessary metadata that is not determined by
# processing the raw data
metadata = dict(
upload_time=str(self.upload_time))
entries = {}
from nomad.cli.client.client import _create_client as create_client
central_nomad_client = create_client(
user=config.keycloak.username,
password=config.keycloak.password,
api_base_url=config.oasis.central_nomad_api_url)
# compile oasis metadata for the upload
upload_metadata = dict(upload_time=str(self.upload_time))
upload_metadata_entries = {}
for calc in self.calcs:
entry_metadata = dict(**{
key: str(value) if isinstance(value, datetime) else value
......@@ -906,42 +920,38 @@ class Upload(Proc):
entry_metadata['calc_id'] = calc.calc_id
if entry_metadata.get('with_embargo'):
continue
entries[calc.mainfile] = entry_metadata
metadata['entries'] = entries
upload_metadata_entries[calc.mainfile] = entry_metadata
upload_metadata['entries'] = upload_metadata_entries
oasis_upload_id, upload_metadata = _normalize_oasis_upload_metadata(
self.upload_id, upload_metadata)
upload_id, metadata = _normalize_oasis_upload_metadata(self.upload_id, metadata)
assert len(entries) > 0, 'Only uploads with public contents can be published to the central NOMAD.'
assert len(upload_metadata_entries) > 0, \
'Only uploads with public contents can be published to the central NOMAD.'
# add oasis metadata to the upload
public_upload_files = cast(PublicUploadFiles, self.upload_files)
public_upload_files.add_metadata_file(metadata)
# upload the file
from nomad.cli.client.client import _create_client as create_client
try:
client = create_client(
user=config.keycloak.username,
password=config.keycloak.password,
api_base_url=config.services.central_nomad_api_url)
oasis_admin = client.auth.get_auth().response().result
oasis_admin_token = oasis_admin.access_token
upload_url = '%s/uploads/?%s' % (
config.services.central_nomad_api_url,
urllib.parse.urlencode(dict(oasis_upload_id=upload_id, oasis_uploader=self.user_id)))
with open(public_upload_files.public_raw_data_file, 'rb') as f:
response = requests.put(
upload_url, headers={'Authorization': 'Bearer %s' % oasis_admin_token},
data=f)
if response.status_code != 200:
self.get_logger().error('Could not upload to central NOMAD', status_code=response.status_code)
except Exception as e:
self.get_logger().error('Could not upload to central NOMAD', exc_info=e)
raise e
public_upload_files.add_metadata_file(upload_metadata)
file_to_upload = public_upload_files.public_raw_data_file
# upload to central NOMAD
oasis_admin_token = central_nomad_client.auth.get_auth().response().result.access_token
upload_headers = dict(Authorization='Bearer %s' % oasis_admin_token)
upload_parameters = dict(
oasis_upload_id=oasis_upload_id,
oasis_uploader_id=self.user_id,
oasis_deployment_id=config.meta.deployment_id)
upload_url = '%s/uploads/?%s' % (
config.oasis.central_nomad_api_url,
urllib.parse.urlencode(upload_parameters))
with open(file_to_upload, 'rb') as f:
response = requests.put(upload_url, headers=upload_headers, data=f)
if response.status_code != 200:
self.get_logger().error(
'Could not upload to central NOMAD', status_code=response.status_code)
# TODO record the publication at the other NOMAD deployment
pass
self.published_to.append(config.oasis.central_nomad_deployment_id)
@process
def re_process_upload(self):
......
......@@ -531,24 +531,41 @@ class TestUploads:
rv = api.get('/raw/%s/examples_potcar/POTCAR%s.stripped' % (upload_id, ending))
assert rv.status_code == 200
def test_post_from_oasis_admin(self, api, other_test_user_auth, oasis_example_upload, proc_infra, no_warn):
rv = api.put(
'/uploads/?local_path=%s&oasis_upload_id=oasis_upload_id' % oasis_example_upload,
headers=other_test_user_auth)
assert rv.status_code == 401
def test_post_from_oasis_admin(self, api, non_empty_uploaded, other_test_user_auth, test_user, no_warn):
url = '/uploads/?%s' % urlencode(dict(
local_path=non_empty_uploaded[1], oasis_upload_id='oasis_upload_id',
oasis_uploader_id=test_user.user_id, oasis_deployment_id='an_id'))
assert api.put(url, headers=other_test_user_auth).status_code == 401
def test_post_from_oasis_duplicate(self, api, test_user, test_user_auth, oasis_example_upload, proc_infra, no_warn):
def test_post_from_oasis_duplicate(self, api, non_empty_uploaded, test_user, test_user_auth, no_warn):
Upload.create(upload_id='oasis_upload_id', user=test_user).save()
rv = api.put(
'/uploads/?local_path=%s&oasis_upload_id=oasis_upload_id' % oasis_example_upload,
headers=test_user_auth)
assert rv.status_code == 400
url = '/uploads/?%s' % urlencode(dict(
local_path=non_empty_uploaded[1], oasis_upload_id='oasis_upload_id',
oasis_uploader_id=test_user.user_id, oasis_deployment_id='an_id'))
assert api.put(url, headers=test_user_auth).status_code == 400
def test_post_from_oasis_missing_parameters(self, api, non_empty_uploaded, test_user_auth, test_user, no_warn):
url = '/uploads/?%s' % urlencode(dict(
local_path=non_empty_uploaded[1], oasis_upload_id='oasis_upload_id',
oasis_uploader_id=test_user.user_id))
assert api.put(url, headers=test_user_auth).status_code == 400
url = '/uploads/?%s' % urlencode(dict(
local_path=non_empty_uploaded[1], oasis_upload_id='oasis_upload_id',
oasis_deployment_id='an_id'))
assert api.put(url, headers=test_user_auth).status_code == 400
url = '/uploads/?%s' % urlencode(dict(
local_path=non_empty_uploaded[1],
oasis_uploader_id=test_user.user_id, oasis_deployment_id='an_id'))
assert api.put(url, headers=test_user_auth).status_code == 400
def test_post_from_oasis(self, api, test_user_auth, test_user, oasis_example_upload, proc_infra, no_warn):
rv = api.put('/uploads/?%s' % urlencode(dict(
local_path=oasis_example_upload,
oasis_upload_id='oasis_upload_id',
oasis_uploader=test_user.user_id)), headers=test_user_auth)
oasis_deployment_id='an_id',
oasis_uploader_id=test_user.user_id)), headers=test_user_auth)
assert rv.status_code == 200, rv.data
upload = self.assert_upload(rv.data)
upload_id = upload['upload_id']
......
......@@ -211,6 +211,7 @@ def test_oasis_upload_processing(proc_infra, oasis_example_uploaded: Tuple[str,
upload = Upload.create(
upload_id=uploaded_id, user=test_user, upload_path=uploaded_path)
upload.from_oasis = True
upload.oasis_deployment_id = 'an_oasis_id'
assert upload.tasks_status == 'RUNNING'
assert upload.current_task == 'uploading'
......@@ -219,6 +220,8 @@ def test_oasis_upload_processing(proc_infra, oasis_example_uploaded: Tuple[str,
upload.block_until_complete(interval=.01)
assert upload.published
assert upload.from_oasis
assert upload.oasis_deployment_id == 'an_oasis_id'
assert str(upload.upload_time) == '2020-01-01 00:00:00'
assert_processing(upload, published=True)
calc = Calc.objects(upload_id='oasis_upload_id').first()
......@@ -254,7 +257,7 @@ def test_publish_from_oasis(
monkeypatch.setattr(
'requests.put', put)
monkeypatch.setattr(
'nomad.config.services.central_nomad_api_url', '/api')
'nomad.config.oasis.central_nomad_api_url', '/api')
upload.publish_from_oasis()
upload.block_until_complete()
......@@ -264,6 +267,10 @@ def test_publish_from_oasis(
cn_upload.block_until_complete()
assert_processing(cn_upload, published=True)
assert cn_upload.user_id == other_test_user.user_id
assert len(cn_upload.published_to) == 0
assert cn_upload.from_oasis
assert cn_upload.oasis_deployment_id == config.meta.deployment_id
assert upload.published_to[0] == config.oasis.central_nomad_deployment_id
@pytest.mark.timeout(config.tests.default_timeout)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment