Commit 48a6280d authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added upload put with direct publish after processing. Updated external db upload example.

parent 7554fc1d
Pipeline #90849 passed with stages
in 42 minutes and 43 seconds
......@@ -3,29 +3,59 @@ This is a brief example demonstrating the public nomad@FAIRDI API for doing oper
that might be necessary to integrate external project data.
"""
from bravado.requests_client import RequestsClient
from bravado.requests_client import RequestsClient, Authenticator
from bravado.client import SwaggerClient
from keycloak import KeycloakOpenID
from urllib.parse import urlparse
import time
import os.path
import sys
nomad_url = 'http://nomad-lab.eu/prod/rae/api'
user = 'leonard.hofstadter@nomad-fairdi.tests.de'
password = 'password'
user = 'youruser'
password = 'yourpassword'
upload_file = os.path.join(os.path.dirname(__file__), 'example.zip')
# an authenticator for NOMAD's keycloak user management
class KeycloakAuthenticator(Authenticator):
def __init__(self, user, password):
super().__init__(host=urlparse(nomad_url).netloc)
self.user = user
self.password = password
self.token = None
self.__oidc = KeycloakOpenID(
server_url='https://nomad-lab.eu/fairdi/keycloak/auth/',
realm_name='fairdi_nomad_test',
client_id='nomad_public')
def apply(self, request):
if self.token is None:
self.token = self.__oidc.token(username=self.user, password=self.password)
self.token['time'] = time.time()
elif self.token['expires_in'] < int(time.time()) - self.token['time'] + 10:
try:
self.token = self.__oidc.refresh_token(self.token['refresh_token'])
self.token['time'] = time.time()
except Exception:
self.token = self.__oidc.token(username=self.user, password=self.password)
self.token['time'] = time.time()
request.headers.setdefault('Authorization', 'Bearer %s' % self.token['access_token'])
return request
upload_file = os.path.join(os.path.dirname(__file__), 'external_project_example.zip')
# create the bravado client
host = urlparse(nomad_url).netloc.split(':')[0]
http_client = RequestsClient()
http_client.set_basic_auth(host, user, password)
http_client.authenticator = KeycloakAuthenticator(user=user, password=password)
client = SwaggerClient.from_url('%s/swagger.json' % nomad_url, http_client=http_client)
# upload data
print('uploading a file with "external_id/AcAg/vasp.xml" inside ...')
with open(upload_file, 'rb') as f:
upload = client.uploads.upload(file=f).response().result
upload = client.uploads.upload(file=f, publish_directly=True).response().result
print('processing ...')
while upload.tasks_running:
......@@ -37,70 +67,6 @@ while upload.tasks_running:
if upload.tasks_status != 'SUCCESS':
print('something went wrong')
print('errors: %s' % str(upload.errors))
# delete the unsuccessful upload
# try to delete the unsuccessful upload
client.uploads.delete_upload(upload_id=upload.upload_id).response().result
sys.exit(1)
# publish data
print('publishing ...')
client.uploads.exec_upload_operation(upload_id=upload.upload_id, payload={
'operation': 'publish',
'metadata': {
# these metadata are applied to all calcs in the upload
'comment': 'Data from a cool external project',
'references': ['http://external.project.eu'],
'calculations': [
{
# these metadata are only applied to the calc identified by its 'mainfile'
'mainfile': 'external_id/AcAg/vasp.xml',
# 'coauthors': ['sheldon.cooper@ucla.edu'], this does not YET work with emails,
# Currently you have to use user_ids: leonard (the uploader, who is automatically an author) is 2 and sheldon is 1.
# Ask NOMAD developers about how to find out about user_ids.
'coauthors': [1],
# If users demand, we can implement a specific metadata keys (e.g. 'external_id', 'external_url') for external projects.
# This could allow to directly search for, or even have API endpoints that work with external_ids
# 'external_id': 'external_id',
# 'external_url': 'http://external.project.eu/data/calc/external_id/'
}
]
}
}).response().result
while upload.process_running:
upload = client.uploads.get_upload(upload_id=upload.upload_id).response().result
time.sleep(1)
if upload.tasks_status != 'SUCCESS' or len(upload.errors) > 0:
print('something went wrong')
print('errors: %s' % str(upload.errors))
# delete the unsuccessful upload
client.uploads.delete_upload(upload_id=upload.upload_id).response().result
sys.exit(1)
# search for data
result = client.repo.search(paths=['external_id']).response().result
if result.pagination.total == 0:
print('not found')
sys.exit(1)
elif result.pagination.total > 1:
print('my ids are not specific enough, bummer ... or did I uploaded stuff multiple times?')
# The results key holds an array with the current page data
print('Found the following calcs for my "external_id".')
print(', '.join(calc['calc_id'] for calc in result.results))
# download data
calc = result.results[0]
client.raw.get(upload_id=calc['upload_id'], path=calc['mainfile']).response()
print('Download of first calc works.')
# download urls, e.g. for curl
print('Possible download URLs are:')
print('%s/raw/%s/%s' % (nomad_url, calc['upload_id'], calc['mainfile']))
print('%s/raw/%s/%s/*' % (nomad_url, calc['upload_id'], os.path.dirname(calc['mainfile'])))
# direct download urls without having to search before
print('%s/raw/query?paths=external_id' % nomad_url)
......@@ -75,7 +75,6 @@ export default function RepoEntryView({uploadId, calcId}) {
const loading = !state.calcData
const quantityProps = {data: calcData, loading: loading}
const authors = loading ? null : calcData.authors
const domain = calcData.domain && domains[calcData.domain]
let entryHeader = 'Entry metadata'
......@@ -117,7 +116,7 @@ export default function RepoEntryView({uploadId, calcId}) {
</Quantity>
<Quantity quantity='authors' {...quantityProps}>
<Typography>
{authorList(authors || [])}
{authorList(loading ? null : calcData)}
</Typography>
</Quantity>
<Quantity quantity='datasets' placeholder='no datasets' {...quantityProps}>
......
......@@ -256,14 +256,7 @@ class DatasetListUnstyled extends React.Component {
authors: {
label: 'Authors',
description: 'Authors including the uploader and the co-authors',
render: (dataset) => {
const authors = dataset.example.authors
if (authors.length > 3) {
return authorList(authors.filter((_, index) => index < 2)) + ' et al'
} else {
return authorList(authors)
}
}
render: (dataset) => authorList(dataset.example)
}
}
......
......@@ -32,7 +32,7 @@ import SharedIcon from '@material-ui/icons/SupervisedUserCircle'
import PrivateIcon from '@material-ui/icons/VisibilityOff'
import { domains } from '../domains'
import { apiContext, withApi } from '../api'
import { authorList } from '../../utils'
import { authorList, nameList } from '../../utils'
export function Published(props) {
const api = useContext(apiContext)
......@@ -147,19 +147,19 @@ export class EntryListUnstyled extends React.Component {
},
authors: {
label: 'Authors',
render: entry => authorList(entry.authors),
render: entry => authorList(entry),
supportsSort: true,
description: 'The authors of this entry. This includes the uploader and its co-authors.'
},
co_authors: {
label: 'co-Authors',
render: entry => authorList(entry.authors),
render: entry => nameList(entry.authors),
supportsSort: false,
description: 'The people that this entry was co authored with'
},
shared_with: {
label: 'Shared with',
render: entry => authorList(entry.authors),
render: entry => nameList(entry.authors),
supportsSort: false,
description: 'The people that this entry was shared with'
},
......@@ -280,7 +280,7 @@ export class EntryListUnstyled extends React.Component {
</Quantity>
<Quantity quantity='authors' data={row}>
<Typography>
{authorList(row.authors || [])}
{authorList(row)}
</Typography>
</Quantity>
<Quantity quantity='datasets' placeholder='no datasets' data={row}>
......
......@@ -309,6 +309,23 @@ export function titleCase(str) {
return splitStr.join(' ')
}
export function authorList(authors) {
return authors.map(author => titleCase(author.name)).filter(name => name !== '').join(', ')
export function nameList(users) {
const names = users.map(user => titleCase(user.name)).filter(name => name !== '')
if (names.length > 3) {
return names.slice(0, 2).join(', ') + ' et al'
} else {
return names.join(', ')
}
}
export function authorList(entry) {
if (!entry) {
return ''
}
if (entry.external_db) {
return entry.external_db
} else {
return nameList(entry.authors || [])
}
}
......@@ -125,6 +125,7 @@ upload_metadata_parser.add_argument('name', type=str, help='An optional name for
upload_metadata_parser.add_argument('local_path', type=str, help='Use a local file on the server.', location='args')
upload_metadata_parser.add_argument('token', type=str, help='Upload token to authenticate with curl command.', location='args')
upload_metadata_parser.add_argument('file', type=FileStorage, help='The file to upload.', location='files')
upload_metadata_parser.add_argument('publish_directly', type=bool, help='Set this parameter to publish the upload directly after processing.', location='args')
upload_metadata_parser.add_argument('oasis_upload_id', type=str, help='Use if this is an upload from an OASIS to the central NOMAD and set it to the upload_id.', location='args')
upload_metadata_parser.add_argument('oasis_uploader_id', type=str, help='Use if this is an upload from an OASIS to the central NOMAD and set it to the uploader\' id.', location='args')
upload_metadata_parser.add_argument('oasis_deployment_id', type=str, help='Use if this is an upload from an OASIS to the central NOMAD and set it to the OASIS\' deployment id.', location='args')
......@@ -224,7 +225,7 @@ class UploadListResource(Resource):
@api.expect(upload_metadata_parser)
@api.response(400, 'To many uploads')
@marshal_with(upload_model, skip_none=True, code=200, description='Upload received')
@authenticate(required=True, upload_token=True)
@authenticate(required=True, upload_token=True, basic=True)
def put(self):
'''
Upload a file and automatically create a new upload in the process.
......@@ -255,6 +256,9 @@ class UploadListResource(Resource):
if Upload.user_uploads(g.user, published=False).count() >= config.services.upload_limit:
abort(400, 'Limit of unpublished uploads exceeded for user.')
# check if the upload is to be published directly
publish_directly = request.args.get('publish_directly') is not None
# check if allowed to perform oasis upload
oasis_upload_id = request.args.get('oasis_upload_id')
oasis_uploader_id = request.args.get('oasis_uploader_id')
......@@ -344,6 +348,7 @@ class UploadListResource(Resource):
upload_time=datetime.utcnow(),
upload_path=upload_path,
temporary=local_path != upload_path,
publish_directly=publish_directly or from_oasis,
from_oasis=from_oasis,
oasis_deployment_id=oasis_deployment_id)
......
......@@ -424,7 +424,7 @@ class EntryMetadata(metainfo.MSection):
a_search=Search())
external_db = metainfo.Quantity(
type=metainfo.MEnum('EELSDB'), categories=[MongoMetadata, UserProvidableMetadata],
type=metainfo.MEnum('EELSDB', 'Materials Project'), categories=[MongoMetadata, UserProvidableMetadata],
description='The repository or external database where the original data resides',
a_search=Search())
......
......@@ -29,7 +29,8 @@ calculations, and files
'''
from typing import cast, List, Any, Tuple, Iterator, Dict, cast, Iterable
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField, ListField
from mongoengine import (
StringField, DateTimeField, DictField, BooleanField, IntField, ListField)
import logging
from structlog import wrap_logger
from contextlib import contextmanager
......@@ -44,15 +45,17 @@ from functools import lru_cache
import urllib.parse
import requests
from nomad import utils, config, infrastructure, search, datamodel
from nomad import utils, config, infrastructure, search, datamodel, metainfo
from nomad.files import (
PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles,
PublicUploadFiles, StagingUploadFiles)
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
from nomad.parsing.parsers import parser_dict, match_parser
from nomad.normalizing import normalizers
from nomad.datamodel import EntryArchive, EditableUserMetadata, OasisMetadata
from nomad.archive import query_archive, write_partial_archive_to_mongo, delete_partial_archives_from_mongo
from nomad.datamodel import (
EntryArchive, EditableUserMetadata, OasisMetadata, UserProvidableMetadata)
from nomad.archive import (
query_archive, write_partial_archive_to_mongo, delete_partial_archives_from_mongo)
from nomad.datamodel.encyclopedia import EncyclopediaMetadata
......@@ -60,8 +63,12 @@ section_metadata = datamodel.EntryArchive.section_metadata.name
section_workflow = datamodel.EntryArchive.section_workflow.name
_editable_metadata = {
quantity.name: quantity for quantity in EditableUserMetadata.m_def.definitions}
_editable_metadata: Dict[str, metainfo.Definition] = {}
_editable_metadata.update(**{
quantity.name: quantity for quantity in UserProvidableMetadata.m_def.definitions})
_editable_metadata.update(**{
quantity.name: quantity for quantity in EditableUserMetadata.m_def.definitions})
_oasis_metadata = {
quantity.name: quantity for quantity in OasisMetadata.m_def.definitions}
......@@ -600,6 +607,9 @@ class Calc(Proc):
self._entry_metadata.apply_domain_metadata(self._parser_results)
self._entry_metadata.processed = True
if self.upload.publish_directly:
self._entry_metadata.published |= True
self._read_metadata_from_file(logger)
# persist the calc metadata
......@@ -680,6 +690,7 @@ class Upload(Proc):
publish_time: Datetime when the upload was initially published on this NOMAD deployment.
last_update: Datetime of the last modifying process run (publish, re-processing, upload).
publish_directly: Boolean indicating that this upload should be published after initial processing.
from_oasis: Boolean indicating that this upload is coming from another NOMAD deployment.
oasis_id: The deployment id of the NOMAD that uploaded the upload.
published_to: A list of deployment ids where this upload has been successfully uploaded to.
......@@ -700,6 +711,7 @@ class Upload(Proc):
publish_time = DateTimeField()
last_update = DateTimeField()
publish_directly = BooleanField(default=False)
from_oasis = BooleanField(default=False)
oasis_deployment_id = StringField(default=None)
published_to = ListField(StringField())
......@@ -715,6 +727,7 @@ class Upload(Proc):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.publish_directly = self.publish_directly or self.from_oasis
self._upload_files: ArchiveBasedStagingUploadFiles = None
@lru_cache()
......@@ -1297,36 +1310,33 @@ class Upload(Proc):
def _cleanup_after_processing(self):
# send email about process finish
user = self.uploader
name = '%s %s' % (user.first_name, user.last_name)
message = '\n'.join([
'Dear %s,' % name,
'',
'your data %suploaded at %s has completed processing.' % (
'"%s" ' % self.name if self.name else '', self.upload_time.isoformat()), # pylint: disable=no-member
'You can review your data on your upload page: %s' % config.gui_url(page='uploads'),
'',
'If you encounter any issues with your upload, please let us know and reply to this email.',
'',
'The nomad team'
])
try:
infrastructure.send_mail(
name=name, email=user.email, message=message, subject='Processing completed')
except Exception as e:
# probably due to email configuration problems
# don't fail or present this error to clients
self.logger.error('could not send after processing email', exc_info=e)
if not self.publish_directly:
user = self.uploader
name = '%s %s' % (user.first_name, user.last_name)
message = '\n'.join([
'Dear %s,' % name,
'',
'your data %suploaded at %s has completed processing.' % (
'"%s" ' % self.name if self.name else '', self.upload_time.isoformat()), # pylint: disable=no-member
'You can review your data on your upload page: %s' % config.gui_url(page='uploads'),
'',
'If you encounter any issues with your upload, please let us know and reply to this email.',
'',
'The nomad team'
])
try:
infrastructure.send_mail(
name=name, email=user.email, message=message, subject='Processing completed')
except Exception as e:
# probably due to email configuration problems
# don't fail or present this error to clients
self.logger.error('could not send after processing email', exc_info=e)
def _cleanup_after_processing_oasis_upload(self):
'''
Moves the upload out of staging to the public area. It will
pack the staging upload files in to public upload files.
'''
assert self.processed_calcs > 0
if not self.publish_directly or self.processed_calcs == 0:
return
logger = self.get_logger()
logger.info('started to publish oasis upload')
logger.info('started to publish upload directly')
with utils.lnr(logger, 'publish failed'):
metadata = self.metadata_file_cached(
......@@ -1343,12 +1353,13 @@ class Upload(Proc):
upload_size=self.upload_files.size):
self.upload_files.delete()
if metadata is not None:
self.upload_time = metadata.get('upload_time')
if self.from_oasis:
if metadata is not None:
self.upload_time = metadata.get('upload_time')
if self.upload_time is None:
self.upload_time = datetime.utcnow()
logger.warn('oasis upload without upload time')
if self.upload_time is None:
self.upload_time = datetime.utcnow()
logger.warn('oasis upload without upload time')
self.publish_time = datetime.utcnow()
self.published = True
......@@ -1389,10 +1400,7 @@ class Upload(Proc):
if self.current_process == 're_process_upload':
self._cleanup_after_re_processing()
else:
if self.from_oasis:
self._cleanup_after_processing_oasis_upload()
else:
self._cleanup_after_processing()
self._cleanup_after_processing()
def get_calc(self, calc_id) -> Calc:
''' Returns the upload calc with the given id or ``None``. '''
......
......@@ -37,6 +37,7 @@ Depending on the configuration all logs will also be send to a central logstash.
.. autofunc::nomad.utils.lnr
'''
from typing import cast, Any
import logging
import structlog
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper, JSONRenderer
......@@ -277,7 +278,7 @@ def logger_factory(*args):
structlog.configure(
processors=log_processors,
processors=cast(Any, log_processors),
logger_factory=logger_factory,
wrapper_class=structlog.stdlib.BoundLogger)
......
......@@ -532,6 +532,29 @@ class TestUploads:
rv = api.get('/raw/%s/examples_potcar/POTCAR%s.stripped' % (upload_id, ending))
assert rv.status_code == 200
def test_put_publish_directly(self, api, test_user_auth, non_empty_example_upload, proc_infra, no_warn):
rv = api.put('/uploads/?%s' % urlencode(dict(
local_path=non_empty_example_upload,
publish_directly=True)), headers=test_user_auth)
assert rv.status_code == 200, rv.data
upload = self.assert_upload(rv.data)
upload_id = upload['upload_id']
# poll until completed
upload = self.block_until_completed(api, upload_id, test_user_auth)
assert len(upload['tasks']) == 4
assert upload['tasks_status'] == SUCCESS
assert upload['current_task'] == 'cleanup'
assert not upload['process_running']
upload_proc = Upload.objects(upload_id=upload_id).first()
assert upload_proc.published
entries = get_upload_entries_metadata(upload)
assert_upload_files(upload_id, entries, files.PublicUploadFiles)
assert_search_upload(entries, additional_keys=['atoms', 'dft.system'])
def test_post_from_oasis_admin(self, api, non_empty_uploaded, other_test_user_auth, test_user, no_warn):
url = '/uploads/?%s' % urlencode(dict(
local_path=non_empty_uploaded[1], oasis_upload_id='oasis_upload_id',
......
......@@ -53,10 +53,10 @@ def uploaded_id_with_warning(raw_files) -> Generator[Tuple[str, str], None, None
yield example_upload_id, example_file
def run_processing(uploaded: Tuple[str, str], test_user) -> Upload:
def run_processing(uploaded: Tuple[str, str], test_user, **kwargs) -> Upload:
uploaded_id, uploaded_path = uploaded
upload = Upload.create(
upload_id=uploaded_id, user=test_user, upload_path=uploaded_path)
upload_id=uploaded_id, user=test_user, upload_path=uploaded_path, **kwargs)
upload.upload_time = datetime.utcnow()
assert upload.tasks_status == 'RUNNING'
......@@ -165,6 +165,16 @@ def test_publish(non_empty_processed: Upload, no_warn, internal_example_user_met
assert_processing(Upload.get(processed.upload_id, include_published=True), published=True)
def test_publish_directly(non_empty_uploaded, test_user, proc_infra, no_warn, monkeypatch):
processed = run_processing(non_empty_uploaded, test_user, publish_directly=True)
with processed.entries_metadata() as entries:
assert_upload_files(processed.upload_id, entries, PublicUploadFiles, published=True)
assert_search_upload(entries, [], published=True)
assert_processing(Upload.get(processed.upload_id, include_published=True), published=True)
def test_republish(non_empty_processed: Upload, no_warn, internal_example_user_metadata, monkeypatch):
processed = non_empty_processed
processed.compress_and_set_metadata(internal_example_user_metadata)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment