diff --git a/nomad/app/api/upload.py b/nomad/app/api/upload.py index 2e8c1c3e0ba1fc4180292f1a3b848009f5760d8c..e6761a4c056b831104fb32a37a1ac7e7442daf19 100644 --- a/nomad/app/api/upload.py +++ b/nomad/app/api/upload.py @@ -122,9 +122,10 @@ upload_metadata_parser = api.parser() upload_metadata_parser.add_argument('name', type=str, help='An optional name for the upload.', location='args') upload_metadata_parser.add_argument('local_path', type=str, help='Use a local file on the server.', location='args') upload_metadata_parser.add_argument('token', type=str, help='Upload token to authenticate with curl command.', location='args') -upload_metadata_parser.add_argument('oasis_upload_id', type=str, help='Use if this is an upload from an OASIS to the central NOMAD and set it to the upload_id.', location='args') -upload_metadata_parser.add_argument('oasis_uploader', type=str, help='Use if this is an upload from an OASIS to the central NOMAD and set it to the uploader\' id.', location='args') upload_metadata_parser.add_argument('file', type=FileStorage, help='The file to upload.', location='files') +upload_metadata_parser.add_argument('oasis_upload_id', type=str, help='Use if this is an upload from an OASIS to the central NOMAD and set it to the upload_id.', location='args') +upload_metadata_parser.add_argument('oasis_uploader_id', type=str, help='Use if this is an upload from an OASIS to the central NOMAD and set it to the uploader\' id.', location='args') +upload_metadata_parser.add_argument('oasis_deployment_id', type=str, help='Use if this is an upload from an OASIS to the central NOMAD and set it to the OASIS\' deployment id.', location='args') upload_list_parser = pagination_request_parser.copy() @@ -254,7 +255,8 @@ class UploadListResource(Resource): # check if allowed to perform oasis upload oasis_upload_id = request.args.get('oasis_upload_id') - oasis_uploader_id = request.args.get('oasis_uploader') + oasis_uploader_id = request.args.get('oasis_uploader_id') + oasis_deployment_id = request.args.get('oasis_deployment_id') user = g.user from_oasis = oasis_upload_id is not None if from_oasis: @@ -262,9 +264,13 @@ class UploadListResource(Resource): abort(401, 'Only an oasis admin can perform an oasis upload.') if oasis_uploader_id is None: abort(400, 'You must provide the original uploader for an oasis upload.') + if oasis_deployment_id is None: + abort(400, 'You must provide the oasis deployment id for an oasis upload.') user = datamodel.User.get(user_id=oasis_uploader_id) if user is None: abort(400, 'The given original uploader does not exist.') + elif oasis_uploader_id is not None or oasis_deployment_id is not None: + abort(400, 'For an oasis upload you must provide an oasis_upload_id.') upload_name = request.args.get('name') if oasis_upload_id is not None: @@ -336,7 +342,8 @@ class UploadListResource(Resource): upload_time=datetime.utcnow(), upload_path=upload_path, temporary=local_path != upload_path, - from_oasis=from_oasis) + from_oasis=from_oasis, + oasis_deployment_id=oasis_deployment_id) upload.process_upload() logger.info('initiated processing') diff --git a/nomad/config.py b/nomad/config.py index af8dc0bd9ee1f6e25afca238e2dfec51e8311715..a332427063b53f14bcaade333ca097fbb0524303 100644 --- a/nomad/config.py +++ b/nomad/config.py @@ -159,8 +159,12 @@ services = NomadConfig( upload_limit=10, force_raw_file_decoding=False, download_scan_size=500, - download_scan_timeout=u'30m', - central_nomad_api_url='https://nomad-lab.eu/prod/rae/api' + download_scan_timeout=u'30m' +) + +oasis = NomadConfig( + central_nomad_api_url='https://nomad-lab.eu/prod/rae/api', + central_nomad_deployment_id='nomad-lab.eu/prod/rae' ) tests = NomadConfig( @@ -285,7 +289,8 @@ meta = NomadConfig( description='A FAIR data sharing platform for materials science data', homepage='https://nomad-lab.eu', source_url='https://gitlab.mpcdf.mpg.de/nomad-lab/nomad-FAIR', - maintainer_email='markus.scheidgen@physik.hu-berlin.de' + maintainer_email='markus.scheidgen@physik.hu-berlin.de', + deployment_id='nomad-lab.eu/prod/rae' ) auxfile_cutoff = 100 diff --git a/nomad/processing/data.py b/nomad/processing/data.py index a7d148f15e2e270ca0148574b89e652409b4403a..1ceaad37716e93d61623b30fa06764623acc59f8 100644 --- a/nomad/processing/data.py +++ b/nomad/processing/data.py @@ -29,7 +29,7 @@ calculations, and files ''' from typing import cast, List, Any, Tuple, Iterator, Dict, cast, Iterable -from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField +from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField, ListField import logging from structlog import wrap_logger from contextlib import contextmanager @@ -665,16 +665,23 @@ class Upload(Proc): and processing state. Attributes: - name: optional user provided upload name - upload_path: the path were the uploaded files was stored - temporary: True if the uploaded file should be removed after extraction - upload_id: the upload id generated by the database - upload_time: the timestamp when the system realized the upload - user_id: the id of the user that created this upload - published: Boolean that indicates the publish status - publish_time: Date when the upload was initially published - last_update: Date of the last publishing/re-processing - joined: Boolean indicates if the running processing has joined (:func:`check_join`) + name: Optional user provided upload name. + upload_path: The fs path were the uploaded files was stored during upload. + temporary: True if the uploaded file should be removed after extraction. + + upload_id: The upload id generated by the database or the uploaded NOMAD deployment. + upload_time: Datetime of the original upload independent of the NOMAD deployment + it was first uploaded to. + user_id: The id of the user that created this upload. + published: Boolean that indicates that the upload is published on this NOMAD deployment. + publish_time: Datetime when the upload was initially published on this NOMAD deployment. + last_update: Datetime of the last modifying process run (publish, re-processing, upload). + + from_oasis: Boolean indicating that this upload is coming from another NOMAD deployment. + oasis_id: The deployment id of the NOMAD that uploaded the upload. + published_to: A list of deployment ids where this upload has been successfully uploaded to. + + joined: Boolean indicates if the running processing has joined (:func:`check_join`). ''' id_field = 'upload_id' @@ -691,6 +698,9 @@ class Upload(Proc): last_update = DateTimeField() from_oasis = BooleanField(default=False) + oasis_deployment_id = StringField(default=None) + published_to = ListField(StringField()) + joined = BooleanField(default=False) meta: Any = { @@ -888,16 +898,20 @@ class Upload(Proc): Uploads the already published upload to a different NOMAD deployment. This allows to push uploads from an OASIS to the central NOMAD. ''' - assert self.published, 'Only published uploads can be published to the central NOMAD.' - - # TODO check if it might be there + assert self.published, \ + 'Only published uploads can be published to the central NOMAD.' + assert config.oasis.central_nomad_deployment_id not in self.published_to, \ + 'Upload is already published to the central NOMAD.' - # create a nomad.json with all necessary metadata that is not determined by - # processing the raw data - metadata = dict( - upload_time=str(self.upload_time)) - - entries = {} + from nomad.cli.client.client import _create_client as create_client + central_nomad_client = create_client( + user=config.keycloak.username, + password=config.keycloak.password, + api_base_url=config.oasis.central_nomad_api_url) + + # compile oasis metadata for the upload + upload_metadata = dict(upload_time=str(self.upload_time)) + upload_metadata_entries = {} for calc in self.calcs: entry_metadata = dict(**{ key: str(value) if isinstance(value, datetime) else value @@ -906,42 +920,38 @@ class Upload(Proc): entry_metadata['calc_id'] = calc.calc_id if entry_metadata.get('with_embargo'): continue - entries[calc.mainfile] = entry_metadata - metadata['entries'] = entries + upload_metadata_entries[calc.mainfile] = entry_metadata + upload_metadata['entries'] = upload_metadata_entries + oasis_upload_id, upload_metadata = _normalize_oasis_upload_metadata( + self.upload_id, upload_metadata) - upload_id, metadata = _normalize_oasis_upload_metadata(self.upload_id, metadata) - - assert len(entries) > 0, 'Only uploads with public contents can be published to the central NOMAD.' + assert len(upload_metadata_entries) > 0, \ + 'Only uploads with public contents can be published to the central NOMAD.' + # add oasis metadata to the upload public_upload_files = cast(PublicUploadFiles, self.upload_files) - public_upload_files.add_metadata_file(metadata) - - # upload the file - from nomad.cli.client.client import _create_client as create_client - - try: - client = create_client( - user=config.keycloak.username, - password=config.keycloak.password, - api_base_url=config.services.central_nomad_api_url) - - oasis_admin = client.auth.get_auth().response().result - oasis_admin_token = oasis_admin.access_token - upload_url = '%s/uploads/?%s' % ( - config.services.central_nomad_api_url, - urllib.parse.urlencode(dict(oasis_upload_id=upload_id, oasis_uploader=self.user_id))) - with open(public_upload_files.public_raw_data_file, 'rb') as f: - response = requests.put( - upload_url, headers={'Authorization': 'Bearer %s' % oasis_admin_token}, - data=f) - if response.status_code != 200: - self.get_logger().error('Could not upload to central NOMAD', status_code=response.status_code) - except Exception as e: - self.get_logger().error('Could not upload to central NOMAD', exc_info=e) - raise e + public_upload_files.add_metadata_file(upload_metadata) + file_to_upload = public_upload_files.public_raw_data_file + + # upload to central NOMAD + oasis_admin_token = central_nomad_client.auth.get_auth().response().result.access_token + upload_headers = dict(Authorization='Bearer %s' % oasis_admin_token) + upload_parameters = dict( + oasis_upload_id=oasis_upload_id, + oasis_uploader_id=self.user_id, + oasis_deployment_id=config.meta.deployment_id) + upload_url = '%s/uploads/?%s' % ( + config.oasis.central_nomad_api_url, + urllib.parse.urlencode(upload_parameters)) + + with open(file_to_upload, 'rb') as f: + response = requests.put(upload_url, headers=upload_headers, data=f) + + if response.status_code != 200: + self.get_logger().error( + 'Could not upload to central NOMAD', status_code=response.status_code) - # TODO record the publication at the other NOMAD deployment - pass + self.published_to.append(config.oasis.central_nomad_deployment_id) @process def re_process_upload(self): diff --git a/tests/app/test_api.py b/tests/app/test_api.py index 3527bbae17fccf8f26a07d863a773f091bfe4b5d..817d1054f893af6e05a7ac9e9c8a8067bc169881 100644 --- a/tests/app/test_api.py +++ b/tests/app/test_api.py @@ -531,24 +531,41 @@ class TestUploads: rv = api.get('/raw/%s/examples_potcar/POTCAR%s.stripped' % (upload_id, ending)) assert rv.status_code == 200 - def test_post_from_oasis_admin(self, api, other_test_user_auth, oasis_example_upload, proc_infra, no_warn): - rv = api.put( - '/uploads/?local_path=%s&oasis_upload_id=oasis_upload_id' % oasis_example_upload, - headers=other_test_user_auth) - assert rv.status_code == 401 + def test_post_from_oasis_admin(self, api, non_empty_uploaded, other_test_user_auth, test_user, no_warn): + url = '/uploads/?%s' % urlencode(dict( + local_path=non_empty_uploaded[1], oasis_upload_id='oasis_upload_id', + oasis_uploader_id=test_user.user_id, oasis_deployment_id='an_id')) + assert api.put(url, headers=other_test_user_auth).status_code == 401 - def test_post_from_oasis_duplicate(self, api, test_user, test_user_auth, oasis_example_upload, proc_infra, no_warn): + def test_post_from_oasis_duplicate(self, api, non_empty_uploaded, test_user, test_user_auth, no_warn): Upload.create(upload_id='oasis_upload_id', user=test_user).save() - rv = api.put( - '/uploads/?local_path=%s&oasis_upload_id=oasis_upload_id' % oasis_example_upload, - headers=test_user_auth) - assert rv.status_code == 400 + url = '/uploads/?%s' % urlencode(dict( + local_path=non_empty_uploaded[1], oasis_upload_id='oasis_upload_id', + oasis_uploader_id=test_user.user_id, oasis_deployment_id='an_id')) + assert api.put(url, headers=test_user_auth).status_code == 400 + + def test_post_from_oasis_missing_parameters(self, api, non_empty_uploaded, test_user_auth, test_user, no_warn): + url = '/uploads/?%s' % urlencode(dict( + local_path=non_empty_uploaded[1], oasis_upload_id='oasis_upload_id', + oasis_uploader_id=test_user.user_id)) + assert api.put(url, headers=test_user_auth).status_code == 400 + + url = '/uploads/?%s' % urlencode(dict( + local_path=non_empty_uploaded[1], oasis_upload_id='oasis_upload_id', + oasis_deployment_id='an_id')) + assert api.put(url, headers=test_user_auth).status_code == 400 + + url = '/uploads/?%s' % urlencode(dict( + local_path=non_empty_uploaded[1], + oasis_uploader_id=test_user.user_id, oasis_deployment_id='an_id')) + assert api.put(url, headers=test_user_auth).status_code == 400 def test_post_from_oasis(self, api, test_user_auth, test_user, oasis_example_upload, proc_infra, no_warn): rv = api.put('/uploads/?%s' % urlencode(dict( local_path=oasis_example_upload, oasis_upload_id='oasis_upload_id', - oasis_uploader=test_user.user_id)), headers=test_user_auth) + oasis_deployment_id='an_id', + oasis_uploader_id=test_user.user_id)), headers=test_user_auth) assert rv.status_code == 200, rv.data upload = self.assert_upload(rv.data) upload_id = upload['upload_id'] diff --git a/tests/processing/test_data.py b/tests/processing/test_data.py index 99d9f1b9a91bdb6e82fe60217960936a553d0649..2a6aaedf3f2bb02f9d2d1d732f9c214edc2cc89f 100644 --- a/tests/processing/test_data.py +++ b/tests/processing/test_data.py @@ -211,6 +211,7 @@ def test_oasis_upload_processing(proc_infra, oasis_example_uploaded: Tuple[str, upload = Upload.create( upload_id=uploaded_id, user=test_user, upload_path=uploaded_path) upload.from_oasis = True + upload.oasis_deployment_id = 'an_oasis_id' assert upload.tasks_status == 'RUNNING' assert upload.current_task == 'uploading' @@ -219,6 +220,8 @@ def test_oasis_upload_processing(proc_infra, oasis_example_uploaded: Tuple[str, upload.block_until_complete(interval=.01) assert upload.published + assert upload.from_oasis + assert upload.oasis_deployment_id == 'an_oasis_id' assert str(upload.upload_time) == '2020-01-01 00:00:00' assert_processing(upload, published=True) calc = Calc.objects(upload_id='oasis_upload_id').first() @@ -254,7 +257,7 @@ def test_publish_from_oasis( monkeypatch.setattr( 'requests.put', put) monkeypatch.setattr( - 'nomad.config.services.central_nomad_api_url', '/api') + 'nomad.config.oasis.central_nomad_api_url', '/api') upload.publish_from_oasis() upload.block_until_complete() @@ -264,6 +267,10 @@ def test_publish_from_oasis( cn_upload.block_until_complete() assert_processing(cn_upload, published=True) assert cn_upload.user_id == other_test_user.user_id + assert len(cn_upload.published_to) == 0 + assert cn_upload.from_oasis + assert cn_upload.oasis_deployment_id == config.meta.deployment_id + assert upload.published_to[0] == config.oasis.central_nomad_deployment_id @pytest.mark.timeout(config.tests.default_timeout)