Commit 3f79b6e1 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Turned commit and delete of uploads into processes. GUI still needs adoption.

parent 38291b9e
......@@ -188,12 +188,12 @@ async function deleteUpload(uploadId) {
.then(response => response.body)
}
async function unstageUpload(uploadId) {
async function commitUpload(uploadId) {
const client = await swaggerPromise
return client.apis.uploads.exec_upload_command({
upload_id: uploadId,
payload: {
operation: 'unstage'
operation: 'commit'
}
})
.catch(networkError)
......@@ -252,7 +252,7 @@ const api = {
getUploadCommand: getUploadCommand,
createUpload: createUpload,
deleteUpload: deleteUpload,
unstageUpload: unstageUpload,
commitUpload: commitUpload,
getUploads: getUploads,
archive: archive,
calcProcLog: calcProcLog,
......
......@@ -108,7 +108,7 @@ class Uploads extends React.Component {
handleAccept() {
this.setState({loading: true})
Promise.all(this.state.selectedUploads.map(upload => api.unstageUpload(upload.upload_id)))
Promise.all(this.state.selectedUploads.map(upload => api.commitUpload(upload.upload_id)))
.then(() => {
this.setState({showAccept: false})
return this.update()
......
......@@ -23,10 +23,10 @@ from datetime import datetime
from werkzeug.datastructures import FileStorage
import os.path
from nomad import config, utils
from nomad import config
from nomad.processing import Upload
from nomad.processing import NotAllowedDuringProcessing
from nomad.files import ArchiveBasedStagingUploadFiles, StagingUploadFiles, UploadFiles
from nomad.processing import ProcessAlreadyRunning
from nomad.files import ArchiveBasedStagingUploadFiles
from .app import api, with_logger
from .auth import login_really_required
......@@ -41,13 +41,34 @@ ns = api.namespace(
proc_model = api.model('Processing', {
'tasks': fields.List(fields.String),
'current_task': fields.String,
'status': fields.String,
'completed': fields.Boolean,
'tasks_completed': fields.Boolean,
'tasks_status': fields.String,
'errors': fields.List(fields.String),
'warnings': fields.List(fields.String),
'create_time': fields.DateTime(dt_format='iso8601'),
'complete_time': fields.DateTime(dt_format='iso8601'),
'_async_status': fields.String(description='Only for debugging nomad')
'current_process': fields.String,
'process_running': fields.Boolean,
})
metadata_model = api.model('MetaData', {
'with_embargo': fields.Boolean(default=False, description='Data with embargo is only visible to the upload until the embargo period ended.'),
'comment': fields.String(description='The comment are shown in the repository for each calculation.'),
'references': fields.List(fields.String, descriptions='References allow to link calculations to external source, e.g. URLs.'),
'coauthors': fields.List(fields.String, description='A list of co-authors given by user_id.'),
'shared_with': fields.List(fields.String, description='A list of users to share calculations with given by user_id.'),
'_upload_time': fields.List(fields.DateTime(dt_format='iso8601'), description='Overrride the upload time.'),
'_uploader': fields.List(fields.String, description='Override the uploader with the given user id.')
})
calc_metadata_model = api.inherit('CalcMetaData', metadata_model, {
'mainfile': fields.String(description='The calculation main output file is used to identify the calculation in the upload.'),
'_checksum': fields.String(description='Override the calculation checksum'),
'_pid': fields.String(description='Assign a specific pid. It must be unique.')
})
upload_metadata_model = api.inherit('UploadMetaData', metadata_model, {
'calculations': fields.List(fields.Nested(model=calc_metadata_model), description='Specific per calculation data that will override the upload data.')
})
upload_model = api.inherit('UploadProcessing', proc_model, {
......@@ -56,7 +77,7 @@ upload_model = api.inherit('UploadProcessing', proc_model, {
'using the name query parameter.'),
'upload_id': fields.String(
description='The unique id for the upload.'),
'additional_metadata': fields.Arbitrary,
'metadata': fields.Nested(model=upload_metadata_model, description='Additional upload and calculation meta data.'),
'local_path': fields.String,
'upload_time': fields.DateTime(dt_format='iso8601'),
})
......@@ -82,29 +103,9 @@ upload_with_calcs_model = api.inherit('UploadWithPaginatedCalculations', upload_
}))
})
meta_data_model = api.model('MetaData', {
'with_embargo': fields.Boolean(default=False, description='Data with embargo is only visible to the upload until the embargo period ended.'),
'comment': fields.List(fields.String, description='The comment are shown in the repository for each calculation.'),
'references': fields.List(fields.String, descriptions='References allow to link calculations to external source, e.g. URLs.'),
'coauthors': fields.List(fields.String, description='A list of co-authors given by user_id.'),
'shared_with': fields.List(fields.String, description='A list of users to share calculations with given by user_id.'),
'_upload_time': fields.List(fields.DateTime(dt_format='iso8601'), description='Overrride the upload time.'),
'_uploader': fields.List(fields.String, description='Override the uploader with the given user id.')
})
calc_meta_data_model = api.inherit('CalcMetaData', meta_data_model, {
'mainfile': fields.String(description='The calculation main output file is used to identify the calculation in the upload.'),
'_checksum': fields.String(description='Override the calculation checksum'),
'_pid': fields.String(description='Assign a specific pid. It must be unique.')
})
upload_meta_data_model = api.inherit('UploadMetaData', meta_data_model, {
'calculations': fields.List(fields.Nested(model=calc_meta_data_model), description='Specific per calculation data that will override the upload data.')
})
upload_operation_model = api.model('UploadOperation', {
'operation': fields.String(description='Currently unstage is the only operation.'),
'metadata': fields.Nested(model=upload_meta_data_model, description='Additional upload and calculation meta data that should be considered for the operation')
'operation': fields.String(description='Currently commit is the only operation.'),
'metadata': fields.Nested(model=upload_metadata_model, description='Additional upload and calculation meta data. Will replace previously given metadata.')
})
......@@ -200,13 +201,13 @@ class UploadListResource(Resource):
abort(400, message='Some IO went wrong, download probably aborted/disrupted.')
except Exception as e:
upload_files.delete()
upload.delete(force=True)
upload.delete()
logger.info('Invalid or aborted upload')
raise e
logger.info('received uploaded file')
upload.upload_time = datetime.now()
upload.process()
upload.process_upload()
logger.info('initiated processing')
return upload, 200
......@@ -258,7 +259,7 @@ class UploadResource(Resource):
except AssertionError:
abort(400, message='invalid pagination')
if order_by not in ['mainfile', 'status', 'parser']:
if order_by not in ['mainfile', 'tasks_status', 'parser']:
abort(400, message='invalid order_by field %s' % order_by)
order_by = ('-%s' if order == -1 else '+%s') % order_by
......@@ -277,7 +278,7 @@ class UploadResource(Resource):
@api.doc('delete_upload')
@api.response(404, 'Upload does not exist')
@api.response(401, 'Upload does not belong to authenticated user.')
@api.response(400, 'Not allowed during processing')
@api.response(400, 'The upload is still/already processed')
@api.marshal_with(upload_model, skip_none=True, code=200, description='Upload deleted')
@login_really_required
@with_logger
......@@ -296,31 +297,29 @@ class UploadResource(Resource):
if upload.user_id != str(g.user.user_id) and not g.user.is_admin:
abort(401, message='Upload with id %s does not belong to you.' % upload_id)
with utils.lnr(logger, 'delete processing upload'):
try:
upload.delete()
except NotAllowedDuringProcessing:
abort(400, message='You must not delete an upload during processing.')
if not upload.tasks_completed:
abort(400, message='The upload is not processed yet')
with utils.lnr(logger, 'delete upload files'):
upload_files = UploadFiles.get(upload_id)
assert upload_files is not None, 'Uploads existing in staging must have files.'
if upload_files is not None:
assert isinstance(upload_files, StagingUploadFiles), 'Uploads in staging must have staging files.'
upload_files.delete()
try:
upload.delete_upload()
except ProcessAlreadyRunning:
abort(400, message='The upload is still/already processed')
except Exception as e:
logger.error('could not delete processing upload', exc_info=e)
raise e
return upload, 200
@api.doc('exec_upload_command')
@api.response(404, 'Upload does not exist or not in staging')
@api.response(400, 'Operation is not supported')
@api.response(400, 'Operation is not supported or the upload is still/already processed')
@api.response(401, 'If the operation is not allowed for the current user')
@api.marshal_with(upload_model, skip_none=True, code=200, description='Upload unstaged successfully')
@api.marshal_with(upload_model, skip_none=True, code=200, description='Upload commited successfully')
@api.expect(upload_operation_model)
@login_really_required
def post(self, upload_id):
"""
Execute an upload operation. Available operations: ``unstage``
Execute an upload operation. Available operations: ``commit``
Unstage accepts further meta data that allows to provide coauthors, comments,
external references, etc. See the model for details. The fields that start with
......@@ -343,18 +342,21 @@ class UploadResource(Resource):
operation = json_data.get('operation')
meta_data = json_data.get('meta_data', {})
for key in meta_data:
metadata = json_data.get('metadata', {})
for key in metadata:
if key.startswith('_'):
if not g.user.is_admin:
abort(401, message='Only admin users can use _meta_data_keys.')
abort(401, message='Only admin users can use _metadata_keys.')
break
if operation == 'unstage':
if operation == 'commit':
if not upload.tasks_completed:
abort(400, message='The upload is not processed yet')
try:
upload.unstage(meta_data)
except NotAllowedDuringProcessing:
abort(400, message='You must not unstage an upload during processing.')
upload.metadata = metadata
upload.commit_upload()
except ProcessAlreadyRunning:
abort(400, message='The upload is still/already processed')
return upload, 200
......
......@@ -72,7 +72,7 @@ def handle_common_errors(func):
return wrapper
def upload_file(file_path: str, name: str = None, offline: bool = False, unstage: bool = False, client=None):
def upload_file(file_path: str, name: str = None, offline: bool = False, commit: bool = False, client=None):
"""
Upload a file to nomad.
......@@ -80,7 +80,7 @@ def upload_file(file_path: str, name: str = None, offline: bool = False, unstage
file_path: path to the file, absolute or relative to call directory
name: optional name, default is the file_path's basename
offline: allows to process data without upload, requires client to be run on the server
unstage: automatically unstage after successful processing
commit: automatically commit after successful processing
Returns: The upload_id
"""
......@@ -95,7 +95,7 @@ def upload_file(file_path: str, name: str = None, offline: bool = False, unstage
upload = client.uploads.upload(file=f, name=name).response().result
click.echo('process online: %s' % file_path)
while upload.status not in ['SUCCESS', 'FAILURE']:
while upload.tasks_status not in ['SUCCESS', 'FAILURE']:
upload = client.uploads.get_upload(upload_id=upload.upload_id).response().result
calcs = upload.calcs.pagination
if calcs is None:
......@@ -103,20 +103,20 @@ def upload_file(file_path: str, name: str = None, offline: bool = False, unstage
else:
total, successes, failures = (calcs.total, calcs.successes, calcs.failures)
ret = '\n' if upload.status in ('SUCCESS', 'FAILURE') else '\r'
ret = '\n' if upload.tasks_status in ('SUCCESS', 'FAILURE') else '\r'
print(
'status: %s; task: %s; parsing: %d/%d/%d %s' %
(upload.status, upload.current_task, successes, failures, total, ret), end='')
(upload.tasks_status, upload.current_task, successes, failures, total, ret), end='')
time.sleep(3)
if upload.status == 'FAILURE':
if upload.tasks_status == 'FAILURE':
click.echo('There have been errors:')
for error in upload.errors:
click.echo(' %s' % error)
elif unstage:
client.uploads.exec_upload_command(upload_id=upload.upload_id, operation='unstage').reponse()
elif commit:
client.uploads.exec_upload_command(upload_id=upload.upload_id, operation='commit').reponse()
return upload.upload_id
......@@ -271,9 +271,9 @@ def cli(host: str, port: int, verbose: bool, user: str, password: str):
help='Upload files "offline": files will not be uploaded, but processed were they are. '
'Only works when run on the nomad host.')
@click.option(
'--unstage', is_flag=True, default=False,
'--commit', is_flag=True, default=False,
help='Automatically move upload out of the staging area after successful processing')
def upload(path, name: str, offline: bool, unstage: bool):
def upload(path, name: str, offline: bool, commit: bool):
utils.configure_logging()
paths = path
click.echo('uploading files from %s paths' % len(paths))
......@@ -281,7 +281,7 @@ def upload(path, name: str, offline: bool, unstage: bool):
click.echo('uploading %s' % path)
if os.path.isfile(path):
name = name if name is not None else os.path.basename(path)
upload_file(path, name, offline, unstage)
upload_file(path, name, offline, commit)
elif os.path.isdir(path):
for (dirpath, _, filenames) in os.walk(path):
......@@ -289,7 +289,7 @@ def upload(path, name: str, offline: bool, unstage: bool):
if filename.endswith('.zip'):
file_path = os.path.abspath(os.path.join(dirpath, filename))
name = os.path.basename(file_path)
upload_file(file_path, name, offline, unstage)
upload_file(file_path, name, offline, commit)
else:
click.echo('Unknown path type %s.' % path)
......
......@@ -33,8 +33,8 @@ class Calc(Base, datamodel.Calc): # type: ignore
upload = relationship('Upload')
checksum = Column(String)
calc_meta_data = relationship('CalcMetaData', uselist=False, lazy='joined')
user_meta_data = relationship('UserMetaData', uselist=False, lazy='joined')
calc_metadata = relationship('CalcMetaData', uselist=False, lazy='joined')
user_metadata = relationship('UserMetaData', uselist=False, lazy='joined')
citations = relationship('Citation', secondary=calc_citation_association, lazy='joined')
owners = relationship('User', secondary=ownership, lazy='joined')
coauthors = relationship('User', secondary=co_authorship, lazy='joined')
......@@ -54,7 +54,7 @@ class Calc(Base, datamodel.Calc): # type: ignore
@property
def mainfile(self) -> str:
return self.calc_meta_data.location
return self.calc_metadata.location
@property
def pid(self):
......@@ -62,7 +62,7 @@ class Calc(Base, datamodel.Calc): # type: ignore
@property
def comment(self) -> str:
return self.user_meta_data.label
return self.user_metadata.label
@property
def calc_id(self) -> str:
......@@ -79,15 +79,15 @@ class Calc(Base, datamodel.Calc): # type: ignore
@property
def with_embargo(self) -> bool:
return self.user_meta_data.permission == 1
return self.user_metadata.permission == 1
@property
def chemical_formula(self) -> str:
return self.calc_meta_data.chemical_formula
return self.calc_metadata.chemical_formula
@property
def filenames(self) -> List[str]:
filenames = self.calc_meta_data.filenames.decode('utf-8')
filenames = self.calc_metadata.filenames.decode('utf-8')
return json.loads(filenames)
@property
......@@ -140,4 +140,4 @@ class DataSet:
@property
def name(self):
return self._dataset_calc.calc_meta_data.chemical_formula
return self._dataset_calc.calc_metadata.chemical_formula
......@@ -68,15 +68,15 @@ class UploadMetaData:
Utility class that provides per upload meta data and overriding per calculation
meta data. For a given *mainfile* data is first read from the `calculations` key
(a list of calculation dict with a matching `mainfile` key), before it is read
from :param:`meta_data_dict` it self.
from :param:`metadata_dict` it self.
The class is used to deal with user provided meta-data.
Arguments:
meta_data_dict: The python dict with the meta-data.
metadata_dict: The python dict with the meta-data.
"""
def __init__(self, meta_data_dict: dict) -> None:
self._upload_data = meta_data_dict
def __init__(self, metadata_dict: dict) -> None:
self._upload_data = metadata_dict
self._calc_data: dict = {
calc['mainfile']: calc
for calc in self._upload_data.get('calculations', [])}
......@@ -121,7 +121,7 @@ class Upload(Base, datamodel.Upload): # type: ignore
return self.created
@staticmethod
def add(upload: datamodel.Upload, meta_data: dict = {}) -> int:
def add(upload: datamodel.Upload, metadata: dict = {}) -> int:
"""
Add the upload to the NOMAD-coe repository db. It creates an
uploads-entry, respective calculation and property entries. Everything in one
......@@ -132,10 +132,10 @@ class Upload(Base, datamodel.Upload): # type: ignore
Arguments:
upload: The upload to add.
upload_meta_data: A dictionary with additional meta data (e.g. user provided
upload_metadata: A dictionary with additional meta data (e.g. user provided
meta data) that should be added to upload and calculations.
"""
upload_meta_data = UploadMetaData(meta_data)
upload_metadata = UploadMetaData(metadata)
repo_db = infrastructure.repository_db
repo_db.begin()
......@@ -147,7 +147,7 @@ class Upload(Base, datamodel.Upload): # type: ignore
# create upload
coe_upload = Upload(
upload_name=upload.upload_id,
created=meta_data.get('_upload_time', upload.upload_time),
created=metadata.get('_upload_time', upload.upload_time),
user=upload.uploader,
is_processed=True)
repo_db.add(coe_upload)
......@@ -156,7 +156,7 @@ class Upload(Base, datamodel.Upload): # type: ignore
has_calcs = False
for calc in upload.calcs:
has_calcs = True
coe_upload._add_calculation(calc.to(files.Calc), upload_meta_data.get(calc.mainfile))
coe_upload._add_calculation(calc.to(files.Calc), upload_metadata.get(calc.mainfile))
# commit
if has_calcs:
......@@ -175,13 +175,13 @@ class Upload(Base, datamodel.Upload): # type: ignore
return result
def _add_calculation(self, calc: files.Calc, calc_meta_data: dict) -> None:
def _add_calculation(self, calc: files.Calc, calc_metadata: dict) -> None:
repo_db = infrastructure.repository_db
# table based properties
coe_calc = Calc(
coe_calc_id=calc_meta_data.get('_pid', None),
checksum=calc_meta_data.get('_checksum', calc.calc_id),
coe_calc_id=calc_metadata.get('_pid', None),
checksum=calc_metadata.get('_checksum', calc.calc_id),
upload=self)
repo_db.add(coe_calc)
......@@ -193,7 +193,7 @@ class Upload(Base, datamodel.Upload): # type: ignore
metadata = CalcMetaData(
calc=coe_calc,
added=calc_meta_data.get('_upload_time', self.upload_time),
added=calc_metadata.get('_upload_time', self.upload_time),
chemical_formula=calc.chemical_composition,
filenames=('[%s]' % ','.join(['"%s"' % filename for filename in calc.files])).encode('utf-8'),
location=calc.mainfile,
......@@ -208,8 +208,8 @@ class Upload(Base, datamodel.Upload): # type: ignore
user_metadata = UserMetaData(
calc=coe_calc,
label=calc_meta_data.get('comment', None),
permission=(1 if calc_meta_data.get('with_embargo', False) else 0))
label=calc_metadata.get('comment', None),
permission=(1 if calc_metadata.get('with_embargo', False) else 0))
repo_db.add(user_metadata)
spacegroup = Spacegroup(
......@@ -228,22 +228,22 @@ class Upload(Base, datamodel.Upload): # type: ignore
coe_calc.set_value(topic_basis_set_type, calc.basis_set_type)
# user relations
owner_user_id = calc_meta_data.get('_uploader', int(self.user_id))
owner_user_id = calc_metadata.get('_uploader', int(self.user_id))
coe_calc.owners.append(repo_db.query(User).get(owner_user_id))
for coauthor_id in calc_meta_data.get('coauthors', []):
for coauthor_id in calc_metadata.get('coauthors', []):
coe_calc.coauthors.append(repo_db.query(User).get(coauthor_id))
for shared_with_id in calc_meta_data.get('shared_with', []):
for shared_with_id in calc_metadata.get('shared_with', []):
coe_calc.shared_with.append(repo_db.query(User).get(shared_with_id))
# datasets
for dataset_id in calc_meta_data.get('datasets', []):
for dataset_id in calc_metadata.get('datasets', []):
dataset = CalcSet(parent_calc_id=dataset_id, children_calc_id=coe_calc.coe_calc_id)
repo_db.add(dataset)
# references
for reference in calc_meta_data.get('references', []):
for reference in calc_metadata.get('references', []):
citation = repo_db.query(Citation).filter_by(
value=reference,
kind='EXTERNAL').first()
......
......@@ -67,5 +67,6 @@ classes do represent the processing state, as well as the respective entity.
:members:
"""
from nomad.processing.base import app, InvalidId, ProcNotRegistered, SUCCESS, FAILURE, RUNNING, PENDING
from nomad.processing.data import Upload, Calc, NotAllowedDuringProcessing
from nomad.processing.base import app, InvalidId, ProcNotRegistered, SUCCESS, FAILURE, \
RUNNING, PENDING, PROCESS_COMPLETED, PROCESS_RUNNING, ProcessAlreadyRunning
from nomad.processing.data import Upload, Calc
......@@ -47,11 +47,16 @@ def setup(**kwargs):
app = Celery('nomad.processing', broker=config.celery.broker_url)
app.conf.update(worker_hijack_root_logger=False)
CREATED = 'CREATED'
PENDING = 'PENDING'
RUNNING = 'RUNNING'
FAILURE = 'FAILURE'
SUCCESS = 'SUCCESS'
PROCESS_CALLED = 'CALLD'
PROCESS_RUNNING = 'RUNNING'
PROCESS_COMPLETED = 'COMPLETED'
class InvalidId(Exception): pass
......@@ -59,6 +64,9 @@ class InvalidId(Exception): pass
class ProcNotRegistered(Exception): pass
class ProcessAlreadyRunning(Exception): pass
class ProcMetaclass(TopLevelDocumentMetaclass):
def __new__(cls, name, bases, attrs):
cls = super().__new__(cls, name, bases, attrs)
......@@ -92,13 +100,15 @@ class Proc(Document, metaclass=ProcMetaclass):
Attributes:
current_task: the currently running or last completed task
status: the overall status of the processing
tasks_status: the overall status of the processing
errors: a list of errors that happened during processing. Error fail a processing
run
warnings: a list of warnings that happened during processing. Warnings do not
fail a processing run
create_time: the time of creation (not the start of processing)
proc_time: the time that processing completed (successfully or not)
complete_time: the time that processing completed (successfully or not)
current_process: the currently or last run asyncronous process
process_status: the status of the currently or last run asyncronous process
"""
meta: Any = {
......@@ -109,37 +119,43 @@ class Proc(Document, metaclass=ProcMetaclass):
""" the ordered list of tasks that comprise a processing run """
current_task = StringField(default=None)
status = StringField(default='CREATED')
tasks_status = StringField(default=CREATED)
create_time = DateTimeField(required=True)
complete_time = DateTimeField()
errors = ListField(StringField())
warnings = ListField(StringField())
create_time = DateTimeField(required=True)
complete_time = DateTimeField()
_async_status = StringField(default='UNCALLED')
current_process = StringField(default=None)
process_status = StringField(default=None)
@property
def completed(self) -> bool:
def tasks_completed(self) -> bool:
""" Returns True of the process has failed or succeeded. """
return self.status in [SUCCESS, FAILURE]
return self.tasks_status in [SUCCESS, FAILURE]
@property
def process_running(self) -> bool:
""" Returns True of an asynchrounous process is currently running. """
return self.process_status is not None and self.process_status != PROCESS_COMPLETED
def get_logger(self):
return utils.get_logger(
'nomad.processing', current_task=self.current_task, process=self.__class__.__name__,
status=self.status)
'nomad.processing', current_task=self.current_task, proc=self.__class__.__name__,
current_process=self.current_process, process_status=self.process_status,
tasks_status=self.tasks_status)
@classmethod
def create(cls, **kwargs):
""" Factory method that must be used instead of regular constructor. """
assert cls.tasks is not None and len(cls.tasks) > 0, \
""" the class attribute tasks must be overwritten with an actual list """
assert 'status' not in kwargs, \
assert 'tasks_status' not in kwargs, \
""" do not set the status manually, its managed """
kwargs.setdefault('create_time', datetime.now())
self = cls(**kwargs)
self.status = PENDING if self.current_task is None else RUNNING
self.tasks_status = PENDING if self.current_task is None else RUNNING
self.save()
return self
......@@ -178,11 +194,11 @@ class Proc(Document, metaclass=ProcMetaclass):
def fail(self, *errors, log_level=logging.ERROR, **kwargs):
""" Allows to fail the process. Takes strings or exceptions as args. """
assert not self.completed, 'Cannot fail a completed process.'
assert not self.tasks_completed, 'Cannot fail a completed process.'
failed_with_exception = False
self.status = FAILURE
self.tasks_status = FAILURE