Commit aa723bde authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added a process to push an upload from oasis to central nomad.

parent 5dca758c
Pipeline #89226 passed with stages
in 26 minutes and 31 seconds
......@@ -31,7 +31,7 @@ import os
import io
from functools import wraps
from nomad import config, utils, files, search
from nomad import config, utils, files, search, datamodel
from nomad.processing import Upload, FAILURE
from nomad.processing import ProcessAlreadyRunning
from nomad.app import common
......@@ -123,6 +123,7 @@ upload_metadata_parser.add_argument('name', type=str, help='An optional name for
upload_metadata_parser.add_argument('local_path', type=str, help='Use a local file on the server.', location='args')
upload_metadata_parser.add_argument('token', type=str, help='Upload token to authenticate with curl command.', location='args')
upload_metadata_parser.add_argument('oasis_upload_id', type=str, help='Use if this is an upload from an OASIS to the central NOMAD and set it to the upload_id.', location='args')
upload_metadata_parser.add_argument('oasis_uploader', type=str, help='Use if this is an upload from an OASIS to the central NOMAD and set it to the uploader\' id.', location='args')
upload_metadata_parser.add_argument('file', type=FileStorage, help='The file to upload.', location='files')
......@@ -253,10 +254,17 @@ class UploadListResource(Resource):
# check if allowed to perform oasis upload
oasis_upload_id = request.args.get('oasis_upload_id')
oasis_uploader_id = request.args.get('oasis_uploader')
user = g.user
from_oasis = oasis_upload_id is not None
if from_oasis is not None:
if from_oasis:
if not g.user.is_oasis_admin:
abort(401, 'Only an oasis admin can perform an oasis upload.')
if oasis_uploader_id is None:
abort(400, 'You must provide the original uploader for an oasis upload.')
user = datamodel.User.get(user_id=oasis_uploader_id)
if user is None:
abort(400, 'The given original uploader does not exist.')
upload_name = request.args.get('name')
if oasis_upload_id is not None:
......@@ -323,7 +331,7 @@ class UploadListResource(Resource):
upload = Upload.create(
upload_id=upload_id,
user=g.user,
user=user,
name=upload_name,
upload_time=datetime.utcnow(),
upload_path=upload_path,
......
......@@ -41,6 +41,7 @@ def _create_client(*args, **kwargs):
def __create_client(
user: str = nomad_config.client.user,
password: str = nomad_config.client.password,
api_base_url: str = nomad_config.client.url,
ssl_verify: bool = True, use_token: bool = True):
''' A factory method to create the client. '''
if not ssl_verify:
......@@ -50,8 +51,7 @@ def __create_client(
http_client = bravado_requests_client.RequestsClient(ssl_verify=ssl_verify)
client = bravado_client.SwaggerClient.from_url(
'%s/swagger.json' % nomad_config.client.url,
http_client=http_client)
'%s/swagger.json' % api_base_url, http_client=http_client)
utils.get_logger(__name__).info('created bravado client', user=user)
......
......@@ -159,7 +159,8 @@ services = NomadConfig(
upload_limit=10,
force_raw_file_decoding=False,
download_scan_size=500,
download_scan_timeout=u'30m'
download_scan_timeout=u'30m',
central_nomad_api_url='https://nomad-lab.eu/prod/rae/api'
)
tests = NomadConfig(
......
......@@ -312,7 +312,6 @@ class EntryMetadata(metainfo.MSection):
upload_id = metainfo.Quantity(
type=str,
description='The persistent and globally unique identifier for the upload of the entry',
categories=[OasisMetadata],
a_search=Search(
many_or='append', group='uploads_grouped', metric_name='uploads', metric='cardinality'))
......
......@@ -62,6 +62,7 @@ import tarfile
import hashlib
import io
import pickle
import json
from nomad import config, utils, datamodel
from nomad.archive import write_archive, read_archive, ArchiveReader
......@@ -805,6 +806,16 @@ class PublicUploadFiles(UploadFiles):
return staging_upload_files
def add_metadata_file(self, metadata: dict):
zip_path = self._zip_file_object('raw', 'public', 'plain').os_path
with zipfile.ZipFile(zip_path, 'a') as zf:
with zf.open('nomad.json', 'w') as f:
f.write(json.dumps(metadata).encode())
@property
def public_raw_data_file(self):
return self._zip_file_object('raw', 'public', 'plain').os_path
def raw_file(self, file_path: str, *args, **kwargs) -> IO:
return self._file_in_zip('raw', 'plain', file_path, *args, *kwargs)
......
......@@ -41,6 +41,8 @@ from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
import yaml
import json
from functools import lru_cache
import urllib.parse
import requests
from nomad import utils, config, infrastructure, search, datamodel
from nomad.files import (
......@@ -86,6 +88,11 @@ _log_processors = [
TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)]
def _normalize_oasis_upload_metadata(upload_id, metadata):
# This is overwritten by the tests to do necessary id manipulations
return upload_id, metadata
class Calc(Proc):
'''
Instances of this class represent calculations. This class manages the elastic
......@@ -875,6 +882,67 @@ class Upload(Proc):
self.last_update = datetime.utcnow()
self.save()
@process
def publish_from_oasis(self):
'''
Uploads the already published upload to a different NOMAD deployment. This allows
to push uploads from an OASIS to the central NOMAD.
'''
assert self.published, 'Only published uploads can be published to the central NOMAD.'
# TODO check if it might be there
# create a nomad.json with all necessary metadata that is not determined by
# processing the raw data
metadata = dict(
upload_time=str(self.upload_time))
entries = {}
for calc in self.calcs:
entry_metadata = dict(**{
key: str(value) if isinstance(value, datetime) else value
for key, value in calc.metadata.items()
if key in _editable_metadata or key in _oasis_metadata})
entry_metadata['calc_id'] = calc.calc_id
if entry_metadata.get('with_embargo'):
continue
entries[calc.mainfile] = entry_metadata
metadata['entries'] = entries
upload_id, metadata = _normalize_oasis_upload_metadata(self.upload_id, metadata)
assert len(entries) > 0, 'Only uploads with public contents can be published to the central NOMAD.'
public_upload_files = cast(PublicUploadFiles, self.upload_files)
public_upload_files.add_metadata_file(metadata)
# upload the file
from nomad.cli.client.client import _create_client as create_client
try:
client = create_client(
user=config.keycloak.username,
password=config.keycloak.password,
api_base_url=config.services.central_nomad_api_url)
oasis_admin = client.auth.get_auth().response().result
oasis_admin_token = oasis_admin.access_token
upload_url = '%s/uploads/?%s' % (
config.services.central_nomad_api_url,
urllib.parse.urlencode(dict(oasis_upload_id=upload_id, oasis_uploader=self.user_id)))
with open(public_upload_files.public_raw_data_file, 'rb') as f:
response = requests.put(
upload_url, headers={'Authorization': 'Bearer %s' % oasis_admin_token},
data=f)
if response.status_code != 200:
self.get_logger().error('Could not upload to central NOMAD', status_code=response.status_code)
except Exception as e:
self.get_logger().error('Could not upload to central NOMAD', exc_info=e)
raise e
# TODO record the publication at the other NOMAD deployment
pass
@process
def re_process_upload(self):
'''
......
......@@ -344,7 +344,7 @@ class TestUploads:
url = '/uploads/?token=%s&local_path=%s&name=test_upload' % (
generate_upload_token(test_user), non_empty_example_upload)
rv = api.put(url)
assert rv.status_code == 200
assert rv.status_code == 200, rv.data
assert 'Thanks for uploading' in rv.data.decode('utf-8')
@pytest.mark.parametrize('mode', ['multipart', 'stream', 'local_path'])
......@@ -544,11 +544,12 @@ class TestUploads:
headers=test_user_auth)
assert rv.status_code == 400
def test_post_from_oasis(self, api, test_user_auth, oasis_example_upload, proc_infra, no_warn):
rv = api.put(
'/uploads/?local_path=%s&oasis_upload_id=oasis_upload_id' % oasis_example_upload,
headers=test_user_auth)
assert rv.status_code == 200
def test_post_from_oasis(self, api, test_user_auth, test_user, oasis_example_upload, proc_infra, no_warn):
rv = api.put('/uploads/?%s' % urlencode(dict(
local_path=oasis_example_upload,
oasis_upload_id='oasis_upload_id',
oasis_uploader=test_user.user_id)), headers=test_user_auth)
assert rv.status_code == 200, rv.data
upload = self.assert_upload(rv.data)
upload_id = upload['upload_id']
assert upload_id == 'oasis_upload_id'
......
......@@ -360,6 +360,15 @@ def test_user_bravado_client(client, test_user_auth, monkeypatch):
monkeypatch.setattr('nomad.cli.client.create_client', create_client)
@pytest.fixture(scope='function')
def oasis_central_nomad_client(client, test_user_auth, monkeypatch):
def create_client(*args, **kwargs):
http_client = FlaskTestHttpClient(client, headers=test_user_auth)
return SwaggerClient.from_url('/api/swagger.json', http_client=http_client)
monkeypatch.setattr('nomad.cli.client.client._create_client', create_client)
@pytest.fixture(scope='function')
def no_warn(caplog):
caplog.handler.formatter = structlogging.ConsoleFormatter()
......@@ -588,7 +597,6 @@ def oasis_example_upload(non_empty_example_upload: str, raw_files) -> str:
shutil.copyfile(uploaded_path, uploaded_path_modified)
metadata = {
'upload_id': 'oasis_upload_id',
'upload_time': '2020-01-01 00:00:00',
'published': True,
'entries': {
......
......@@ -226,6 +226,46 @@ def test_oasis_upload_processing(proc_infra, oasis_example_uploaded: Tuple[str,
assert calc.metadata['published']
@pytest.mark.timeout(config.tests.default_timeout)
def test_publish_from_oasis(
client, proc_infra, non_empty_uploaded: Tuple[str, str], oasis_central_nomad_client,
monkeypatch, test_user, other_test_user, no_warn):
upload = run_processing(non_empty_uploaded, other_test_user)
upload.publish_upload()
upload.block_until_complete(interval=.01)
cn_upload_id = 'cn_' + upload.upload_id
# We need to alter the ids, because we this by uploading to the same NOMAD
def normalize_oasis_upload_metadata(upload_id, metadata):
for entry in metadata['entries'].values():
entry['calc_id'] = utils.create_uuid()
upload_id = 'cn_' + upload_id
return upload_id, metadata
monkeypatch.setattr(
'nomad.processing.data._normalize_oasis_upload_metadata',
normalize_oasis_upload_metadata)
def put(url, headers, data):
return client.put(url, headers=headers, data=data.read())
monkeypatch.setattr(
'requests.put', put)
monkeypatch.setattr(
'nomad.config.services.central_nomad_api_url', '/api')
upload.publish_from_oasis()
upload.block_until_complete()
assert_processing(upload, published=True)
cn_upload = Upload.objects(upload_id=cn_upload_id).first()
cn_upload.block_until_complete()
assert_processing(cn_upload, published=True)
assert cn_upload.user_id == other_test_user.user_id
@pytest.mark.timeout(config.tests.default_timeout)
def test_processing_with_warning(proc_infra, test_user, with_warn):
example_file = 'tests/data/proc/examples_with_warning_template.zip'
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment