Commit 1b0cb08e authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Merge branch 'mp' into 'v0.9.9'

Added upload put with direct publish after processing. Updated external db upload example.

See merge request !237
parents 5d37c347 48a6280d
Pipeline #90855 passed with stages
in 26 minutes and 44 seconds
......@@ -3,29 +3,59 @@ This is a brief example demonstrating the public nomad@FAIRDI API for doing oper
that might be necessary to integrate external project data.
"""
from bravado.requests_client import RequestsClient
from bravado.requests_client import RequestsClient, Authenticator
from bravado.client import SwaggerClient
from keycloak import KeycloakOpenID
from urllib.parse import urlparse
import time
import os.path
import sys
nomad_url = 'http://nomad-lab.eu/prod/rae/api'
user = 'leonard.hofstadter@nomad-fairdi.tests.de'
password = 'password'
user = 'youruser'
password = 'yourpassword'
upload_file = os.path.join(os.path.dirname(__file__), 'example.zip')
# an authenticator for NOMAD's keycloak user management
class KeycloakAuthenticator(Authenticator):
def __init__(self, user, password):
super().__init__(host=urlparse(nomad_url).netloc)
self.user = user
self.password = password
self.token = None
self.__oidc = KeycloakOpenID(
server_url='https://nomad-lab.eu/fairdi/keycloak/auth/',
realm_name='fairdi_nomad_test',
client_id='nomad_public')
def apply(self, request):
if self.token is None:
self.token = self.__oidc.token(username=self.user, password=self.password)
self.token['time'] = time.time()
elif self.token['expires_in'] < int(time.time()) - self.token['time'] + 10:
try:
self.token = self.__oidc.refresh_token(self.token['refresh_token'])
self.token['time'] = time.time()
except Exception:
self.token = self.__oidc.token(username=self.user, password=self.password)
self.token['time'] = time.time()
request.headers.setdefault('Authorization', 'Bearer %s' % self.token['access_token'])
return request
upload_file = os.path.join(os.path.dirname(__file__), 'external_project_example.zip')
# create the bravado client
host = urlparse(nomad_url).netloc.split(':')[0]
http_client = RequestsClient()
http_client.set_basic_auth(host, user, password)
http_client.authenticator = KeycloakAuthenticator(user=user, password=password)
client = SwaggerClient.from_url('%s/swagger.json' % nomad_url, http_client=http_client)
# upload data
print('uploading a file with "external_id/AcAg/vasp.xml" inside ...')
with open(upload_file, 'rb') as f:
upload = client.uploads.upload(file=f).response().result
upload = client.uploads.upload(file=f, publish_directly=True).response().result
print('processing ...')
while upload.tasks_running:
......@@ -37,70 +67,6 @@ while upload.tasks_running:
if upload.tasks_status != 'SUCCESS':
print('something went wrong')
print('errors: %s' % str(upload.errors))
# delete the unsuccessful upload
# try to delete the unsuccessful upload
client.uploads.delete_upload(upload_id=upload.upload_id).response().result
sys.exit(1)
# publish data
print('publishing ...')
client.uploads.exec_upload_operation(upload_id=upload.upload_id, payload={
'operation': 'publish',
'metadata': {
# these metadata are applied to all calcs in the upload
'comment': 'Data from a cool external project',
'references': ['http://external.project.eu'],
'calculations': [
{
# these metadata are only applied to the calc identified by its 'mainfile'
'mainfile': 'external_id/AcAg/vasp.xml',
# 'coauthors': ['sheldon.cooper@ucla.edu'], this does not YET work with emails,
# Currently you have to use user_ids: leonard (the uploader, who is automatically an author) is 2 and sheldon is 1.
# Ask NOMAD developers about how to find out about user_ids.
'coauthors': [1],
# If users demand, we can implement a specific metadata keys (e.g. 'external_id', 'external_url') for external projects.
# This could allow to directly search for, or even have API endpoints that work with external_ids
# 'external_id': 'external_id',
# 'external_url': 'http://external.project.eu/data/calc/external_id/'
}
]
}
}).response().result
while upload.process_running:
upload = client.uploads.get_upload(upload_id=upload.upload_id).response().result
time.sleep(1)
if upload.tasks_status != 'SUCCESS' or len(upload.errors) > 0:
print('something went wrong')
print('errors: %s' % str(upload.errors))
# delete the unsuccessful upload
client.uploads.delete_upload(upload_id=upload.upload_id).response().result
sys.exit(1)
# search for data
result = client.repo.search(paths=['external_id']).response().result
if result.pagination.total == 0:
print('not found')
sys.exit(1)
elif result.pagination.total > 1:
print('my ids are not specific enough, bummer ... or did I uploaded stuff multiple times?')
# The results key holds an array with the current page data
print('Found the following calcs for my "external_id".')
print(', '.join(calc['calc_id'] for calc in result.results))
# download data
calc = result.results[0]
client.raw.get(upload_id=calc['upload_id'], path=calc['mainfile']).response()
print('Download of first calc works.')
# download urls, e.g. for curl
print('Possible download URLs are:')
print('%s/raw/%s/%s' % (nomad_url, calc['upload_id'], calc['mainfile']))
print('%s/raw/%s/%s/*' % (nomad_url, calc['upload_id'], os.path.dirname(calc['mainfile'])))
# direct download urls without having to search before
print('%s/raw/query?paths=external_id' % nomad_url)
......@@ -75,7 +75,6 @@ export default function RepoEntryView({uploadId, calcId}) {
const loading = !state.calcData
const quantityProps = {data: calcData, loading: loading}
const authors = loading ? null : calcData.authors
const domain = calcData.domain && domains[calcData.domain]
let entryHeader = 'Entry metadata'
......@@ -117,7 +116,7 @@ export default function RepoEntryView({uploadId, calcId}) {
</Quantity>
<Quantity quantity='authors' {...quantityProps}>
<Typography>
{authorList(authors || [])}
{authorList(loading ? null : calcData)}
</Typography>
</Quantity>
<Quantity quantity='datasets' placeholder='no datasets' {...quantityProps}>
......
......@@ -256,14 +256,7 @@ class DatasetListUnstyled extends React.Component {
authors: {
label: 'Authors',
description: 'Authors including the uploader and the co-authors',
render: (dataset) => {
const authors = dataset.example.authors
if (authors.length > 3) {
return authorList(authors.filter((_, index) => index < 2)) + ' et al'
} else {
return authorList(authors)
}
}
render: (dataset) => authorList(dataset.example)
}
}
......
......@@ -32,7 +32,7 @@ import SharedIcon from '@material-ui/icons/SupervisedUserCircle'
import PrivateIcon from '@material-ui/icons/VisibilityOff'
import { domains } from '../domains'
import { apiContext, withApi } from '../api'
import { authorList } from '../../utils'
import { authorList, nameList } from '../../utils'
export function Published(props) {
const api = useContext(apiContext)
......@@ -147,19 +147,19 @@ export class EntryListUnstyled extends React.Component {
},
authors: {
label: 'Authors',
render: entry => authorList(entry.authors),
render: entry => authorList(entry),
supportsSort: true,
description: 'The authors of this entry. This includes the uploader and its co-authors.'
},
co_authors: {
label: 'co-Authors',
render: entry => authorList(entry.authors),
render: entry => nameList(entry.authors),
supportsSort: false,
description: 'The people that this entry was co authored with'
},
shared_with: {
label: 'Shared with',
render: entry => authorList(entry.authors),
render: entry => nameList(entry.authors),
supportsSort: false,
description: 'The people that this entry was shared with'
},
......@@ -280,7 +280,7 @@ export class EntryListUnstyled extends React.Component {
</Quantity>
<Quantity quantity='authors' data={row}>
<Typography>
{authorList(row.authors || [])}
{authorList(row)}
</Typography>
</Quantity>
<Quantity quantity='datasets' placeholder='no datasets' data={row}>
......
......@@ -309,6 +309,23 @@ export function titleCase(str) {
return splitStr.join(' ')
}
export function authorList(authors) {
return authors.map(author => titleCase(author.name)).filter(name => name !== '').join(', ')
export function nameList(users) {
const names = users.map(user => titleCase(user.name)).filter(name => name !== '')
if (names.length > 3) {
return names.slice(0, 2).join(', ') + ' et al'
} else {
return names.join(', ')
}
}
export function authorList(entry) {
if (!entry) {
return ''
}
if (entry.external_db) {
return entry.external_db
} else {
return nameList(entry.authors || [])
}
}
......@@ -125,6 +125,7 @@ upload_metadata_parser.add_argument('name', type=str, help='An optional name for
upload_metadata_parser.add_argument('local_path', type=str, help='Use a local file on the server.', location='args')
upload_metadata_parser.add_argument('token', type=str, help='Upload token to authenticate with curl command.', location='args')
upload_metadata_parser.add_argument('file', type=FileStorage, help='The file to upload.', location='files')
upload_metadata_parser.add_argument('publish_directly', type=bool, help='Set this parameter to publish the upload directly after processing.', location='args')
upload_metadata_parser.add_argument('oasis_upload_id', type=str, help='Use if this is an upload from an OASIS to the central NOMAD and set it to the upload_id.', location='args')
upload_metadata_parser.add_argument('oasis_uploader_id', type=str, help='Use if this is an upload from an OASIS to the central NOMAD and set it to the uploader\' id.', location='args')
upload_metadata_parser.add_argument('oasis_deployment_id', type=str, help='Use if this is an upload from an OASIS to the central NOMAD and set it to the OASIS\' deployment id.', location='args')
......@@ -224,7 +225,7 @@ class UploadListResource(Resource):
@api.expect(upload_metadata_parser)
@api.response(400, 'To many uploads')
@marshal_with(upload_model, skip_none=True, code=200, description='Upload received')
@authenticate(required=True, upload_token=True)
@authenticate(required=True, upload_token=True, basic=True)
def put(self):
'''
Upload a file and automatically create a new upload in the process.
......@@ -255,6 +256,9 @@ class UploadListResource(Resource):
if Upload.user_uploads(g.user, published=False).count() >= config.services.upload_limit:
abort(400, 'Limit of unpublished uploads exceeded for user.')
# check if the upload is to be published directly
publish_directly = request.args.get('publish_directly') is not None
# check if allowed to perform oasis upload
oasis_upload_id = request.args.get('oasis_upload_id')
oasis_uploader_id = request.args.get('oasis_uploader_id')
......@@ -344,6 +348,7 @@ class UploadListResource(Resource):
upload_time=datetime.utcnow(),
upload_path=upload_path,
temporary=local_path != upload_path,
publish_directly=publish_directly or from_oasis,
from_oasis=from_oasis,
oasis_deployment_id=oasis_deployment_id)
......
......@@ -424,7 +424,7 @@ class EntryMetadata(metainfo.MSection):
a_search=Search())
external_db = metainfo.Quantity(
type=metainfo.MEnum('EELSDB'), categories=[MongoMetadata, UserProvidableMetadata],
type=metainfo.MEnum('EELSDB', 'Materials Project'), categories=[MongoMetadata, UserProvidableMetadata],
description='The repository or external database where the original data resides',
a_search=Search())
......
......@@ -29,7 +29,8 @@ calculations, and files
'''
from typing import cast, List, Any, Tuple, Iterator, Dict, cast, Iterable
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField, ListField
from mongoengine import (
StringField, DateTimeField, DictField, BooleanField, IntField, ListField)
import logging
from structlog import wrap_logger
from contextlib import contextmanager
......@@ -44,15 +45,17 @@ from functools import lru_cache
import urllib.parse
import requests
from nomad import utils, config, infrastructure, search, datamodel
from nomad import utils, config, infrastructure, search, datamodel, metainfo
from nomad.files import (
PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles,
PublicUploadFiles, StagingUploadFiles)
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
from nomad.parsing.parsers import parser_dict, match_parser
from nomad.normalizing import normalizers
from nomad.datamodel import EntryArchive, EditableUserMetadata, OasisMetadata
from nomad.archive import query_archive, write_partial_archive_to_mongo, delete_partial_archives_from_mongo
from nomad.datamodel import (
EntryArchive, EditableUserMetadata, OasisMetadata, UserProvidableMetadata)
from nomad.archive import (
query_archive, write_partial_archive_to_mongo, delete_partial_archives_from_mongo)
from nomad.datamodel.encyclopedia import EncyclopediaMetadata
......@@ -60,8 +63,12 @@ section_metadata = datamodel.EntryArchive.section_metadata.name
section_workflow = datamodel.EntryArchive.section_workflow.name
_editable_metadata = {
quantity.name: quantity for quantity in EditableUserMetadata.m_def.definitions}
_editable_metadata: Dict[str, metainfo.Definition] = {}
_editable_metadata.update(**{
quantity.name: quantity for quantity in UserProvidableMetadata.m_def.definitions})
_editable_metadata.update(**{
quantity.name: quantity for quantity in EditableUserMetadata.m_def.definitions})
_oasis_metadata = {
quantity.name: quantity for quantity in OasisMetadata.m_def.definitions}
......@@ -600,6 +607,9 @@ class Calc(Proc):
self._entry_metadata.apply_domain_metadata(self._parser_results)
self._entry_metadata.processed = True
if self.upload.publish_directly:
self._entry_metadata.published |= True
self._read_metadata_from_file(logger)
# persist the calc metadata
......@@ -680,6 +690,7 @@ class Upload(Proc):
publish_time: Datetime when the upload was initially published on this NOMAD deployment.
last_update: Datetime of the last modifying process run (publish, re-processing, upload).
publish_directly: Boolean indicating that this upload should be published after initial processing.
from_oasis: Boolean indicating that this upload is coming from another NOMAD deployment.
oasis_id: The deployment id of the NOMAD that uploaded the upload.
published_to: A list of deployment ids where this upload has been successfully uploaded to.
......@@ -700,6 +711,7 @@ class Upload(Proc):
publish_time = DateTimeField()
last_update = DateTimeField()
publish_directly = BooleanField(default=False)
from_oasis = BooleanField(default=False)
oasis_deployment_id = StringField(default=None)
published_to = ListField(StringField())
......@@ -715,6 +727,7 @@ class Upload(Proc):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.publish_directly = self.publish_directly or self.from_oasis
self._upload_files: ArchiveBasedStagingUploadFiles = None
@lru_cache()
......@@ -1297,36 +1310,33 @@ class Upload(Proc):
def _cleanup_after_processing(self):
# send email about process finish
user = self.uploader
name = '%s %s' % (user.first_name, user.last_name)
message = '\n'.join([
'Dear %s,' % name,
'',
'your data %suploaded at %s has completed processing.' % (
'"%s" ' % self.name if self.name else '', self.upload_time.isoformat()), # pylint: disable=no-member
'You can review your data on your upload page: %s' % config.gui_url(page='uploads'),
'',
'If you encounter any issues with your upload, please let us know and reply to this email.',
'',
'The nomad team'
])
try:
infrastructure.send_mail(
name=name, email=user.email, message=message, subject='Processing completed')
except Exception as e:
# probably due to email configuration problems
# don't fail or present this error to clients
self.logger.error('could not send after processing email', exc_info=e)
if not self.publish_directly:
user = self.uploader
name = '%s %s' % (user.first_name, user.last_name)
message = '\n'.join([
'Dear %s,' % name,
'',
'your data %suploaded at %s has completed processing.' % (
'"%s" ' % self.name if self.name else '', self.upload_time.isoformat()), # pylint: disable=no-member
'You can review your data on your upload page: %s' % config.gui_url(page='uploads'),
'',
'If you encounter any issues with your upload, please let us know and reply to this email.',
'',
'The nomad team'
])
try:
infrastructure.send_mail(
name=name, email=user.email, message=message, subject='Processing completed')
except Exception as e:
# probably due to email configuration problems
# don't fail or present this error to clients
self.logger.error('could not send after processing email', exc_info=e)
def _cleanup_after_processing_oasis_upload(self):
'''
Moves the upload out of staging to the public area. It will
pack the staging upload files in to public upload files.
'''
assert self.processed_calcs > 0
if not self.publish_directly or self.processed_calcs == 0:
return
logger = self.get_logger()
logger.info('started to publish oasis upload')
logger.info('started to publish upload directly')
with utils.lnr(logger, 'publish failed'):
metadata = self.metadata_file_cached(
......@@ -1343,12 +1353,13 @@ class Upload(Proc):
upload_size=self.upload_files.size):
self.upload_files.delete()
if metadata is not None:
self.upload_time = metadata.get('upload_time')
if self.from_oasis:
if metadata is not None:
self.upload_time = metadata.get('upload_time')
if self.upload_time is None:
self.upload_time = datetime.utcnow()
logger.warn('oasis upload without upload time')
if self.upload_time is None:
self.upload_time = datetime.utcnow()
logger.warn('oasis upload without upload time')
self.publish_time = datetime.utcnow()
self.published = True
......@@ -1389,10 +1400,7 @@ class Upload(Proc):
if self.current_process == 're_process_upload':
self._cleanup_after_re_processing()
else:
if self.from_oasis:
self._cleanup_after_processing_oasis_upload()
else:
self._cleanup_after_processing()
self._cleanup_after_processing()
def get_calc(self, calc_id) -> Calc:
''' Returns the upload calc with the given id or ``None``. '''
......
......@@ -37,6 +37,7 @@ Depending on the configuration all logs will also be send to a central logstash.
.. autofunc::nomad.utils.lnr
'''
from typing import cast, Any
import logging
import structlog
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper, JSONRenderer
......@@ -277,7 +278,7 @@ def logger_factory(*args):
structlog.configure(
processors=log_processors,
processors=cast(Any, log_processors),
logger_factory=logger_factory,
wrapper_class=structlog.stdlib.BoundLogger)
......
......@@ -532,6 +532,29 @@ class TestUploads:
rv = api.get('/raw/%s/examples_potcar/POTCAR%s.stripped' % (upload_id, ending))
assert rv.status_code == 200
def test_put_publish_directly(self, api, test_user_auth, non_empty_example_upload, proc_infra, no_warn):
rv = api.put('/uploads/?%s' % urlencode(dict(
local_path=non_empty_example_upload,
publish_directly=True)), headers=test_user_auth)
assert rv.status_code == 200, rv.data
upload = self.assert_upload(rv.data)
upload_id = upload['upload_id']
# poll until completed
upload = self.block_until_completed(api, upload_id, test_user_auth)
assert len(upload['tasks']) == 4
assert upload['tasks_status'] == SUCCESS
assert upload['current_task'] == 'cleanup'
assert not upload['process_running']
upload_proc = Upload.objects(upload_id=upload_id).first()
assert upload_proc.published
entries = get_upload_entries_metadata(upload)
assert_upload_files(upload_id, entries, files.PublicUploadFiles)
assert_search_upload(entries, additional_keys=['atoms', 'dft.system'])
def test_post_from_oasis_admin(self, api, non_empty_uploaded, other_test_user_auth, test_user, no_warn):
url = '/uploads/?%s' % urlencode(dict(
local_path=non_empty_uploaded[1], oasis_upload_id='oasis_upload_id',
......
......@@ -53,10 +53,10 @@ def uploaded_id_with_warning(raw_files) -> Generator[Tuple[str, str], None, None
yield example_upload_id, example_file
def run_processing(uploaded: Tuple[str, str], test_user) -> Upload:
def run_processing(uploaded: Tuple[str, str], test_user, **kwargs) -> Upload:
uploaded_id, uploaded_path = uploaded
upload = Upload.create(
upload_id=uploaded_id, user=test_user, upload_path=uploaded_path)
upload_id=uploaded_id, user=test_user, upload_path=uploaded_path, **kwargs)
upload.upload_time = datetime.utcnow()
assert upload.tasks_status == 'RUNNING'
......@@ -165,6 +165,16 @@ def test_publish(non_empty_processed: Upload, no_warn, internal_example_user_met
assert_processing(Upload.get(processed.upload_id, include_published=True), published=True)
def test_publish_directly(non_empty_uploaded, test_user, proc_infra, no_warn, monkeypatch):
processed = run_processing(non_empty_uploaded, test_user, publish_directly=True)
with processed.entries_metadata() as entries:
assert_upload_files(processed.upload_id, entries, PublicUploadFiles, published=True)
assert_search_upload(entries, [], published=True)
assert_processing(Upload.get(processed.upload_id, include_published=True), published=True)
def test_republish(non_empty_processed: Upload, no_warn, internal_example_user_metadata, monkeypatch):
processed = non_empty_processed
processed.compress_and_set_metadata(internal_example_user_metadata)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment