Commit 0e5b2e1d authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Implemented a 'republish' function #138.

parent 96458feb
Pipeline #47415 passed with stages
in 27 minutes and 48 seconds
......@@ -122,15 +122,27 @@ upload_metadata_parser.add_argument('name', type=str, help='An optional name for
upload_metadata_parser.add_argument('local_path', type=str, help='Use a local file on the server.', location='args')
upload_metadata_parser.add_argument('file', type=FileStorage, help='The file to upload.', location='files')
upload_list_parser = api.parser()
upload_list_parser.add_argument('all', type=bool, help='List all uploads, including published.', location='args')
upload_list_parser.add_argument('name', type=str, help='Filter for uploads with the given name.', location='args')
@ns.route('/')
class UploadListResource(Resource):
@api.doc('get_uploads')
@api.marshal_list_with(upload_model, skip_none=True, code=200, description='Uploads send')
@api.expect(upload_list_parser)
@login_really_required
def get(self):
""" Get the list of all uploads from the authenticated user. """
return [upload for upload in Upload.user_uploads(g.user)], 200
all = bool(request.args.get('all', False))
name = request.args.get('name', None)
query_kwargs = {}
if not all:
query_kwargs.update(published=False)
if name is not None:
query_kwargs.update(name=name)
return [upload for upload in Upload.user_uploads(g.user, **query_kwargs)], 200
@api.doc('upload')
@api.marshal_with(upload_model, skip_none=True, code=200, description='Upload received')
......@@ -300,7 +312,7 @@ class UploadResource(Resource):
"""
Delete an existing upload.
Only uploads that are sill in staging, not already delete, not still uploaded, and
Only uploads that are sill in staging, not already deleted, not still uploaded, and
not currently processed, can be deleted.
"""
try:
......@@ -311,6 +323,9 @@ class UploadResource(Resource):
if upload.user_id != str(g.user.user_id) and not g.user.is_admin:
abort(401, message='Upload with id %s does not belong to you.' % upload_id)
if upload.published:
abort(400, message='The upload is already published')
if upload.tasks_running:
abort(400, message='The upload is not processed yet')
......
......@@ -214,16 +214,17 @@ def pid_prefix(prefix: int):
@click.option('--delete-failed', default='', type=str, help='String from N, U, P to determine if empty (N), failed (U), or failed to publish (P) uploads should be deleted or kept for debugging.')
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.option('--create-packages', is_flag=True, help='Indicate that packages should be created, if they do not already exist.')
@click.option('--republish', is_flag=True, help='Will only republish already published packages.')
def upload(
upload_paths: list, pattern: str, parallel: int, delete_failed: str,
create_packages: bool):
create_packages: bool, republish: bool):
infrastructure.setup_logging()
infrastructure.setup_mongo()
_Migration(threads=parallel).migrate(
*determine_upload_paths(upload_paths, pattern), delete_failed=delete_failed,
create_packages=create_packages)
create_packages=create_packages, only_republish=republish)
@migration.command(help='Get an report about not migrated calcs.')
......
......@@ -201,18 +201,18 @@ class Upload(Base): # type: ignore
logger.info('filled publish transaction')
upload_id = -1
result = None
if has_calcs:
repo_db.commit()
logger.info('committed publish transaction')
upload_id = coe_upload.coe_upload_id
result = coe_upload
else:
# empty upload case
repo_db.rollback()
return -1
return None
logger.info('added upload')
return upload_id
return result
except Exception as e:
repo_db.rollback()
if last_error != str(e) and retries < 3:
......
......@@ -726,7 +726,9 @@ class NomadCOEMigration:
finally:
NomadCOEMigration._client_lock.release()
def migrate(self, *args, delete_failed: str = '', create_packages: bool = False) -> utils.POPO:
def migrate(
self, *args, delete_failed: str = '',
create_packages: bool = False, only_republish: bool = False) -> utils.POPO:
"""
Migrate the given uploads.
......@@ -749,6 +751,8 @@ class NomadCOEMigration:
operation (P) should be deleted after the migration attempt.
create_packages: If True, it will attempt to create upload packages if they
do not exists.
only_republish: If the package exists and is published, it will be republished.
Nothing else. Useful to reindex/recreate coe repo, etc.
Returns: Dictionary with statistics on the migration.
"""
......@@ -766,9 +770,12 @@ class NomadCOEMigration:
package_id=package.package_id, source_upload_id=package.upload_id)
if package.migration_version is not None and package.migration_version >= self.migration_version:
self.logger.info(
'package already migrated, skip it',
package_id=package.package_id, source_upload_id=package.upload_id)
if only_republish:
self.republish_package(package)
else:
self.logger.info(
'package already migrated, skip it',
package_id=package.package_id, source_upload_id=package.upload_id)
package_report = package.report
overall_report.skipped_packages += 1
......@@ -836,6 +843,40 @@ class NomadCOEMigration:
_client_lock = threading.Lock()
def republish_package(self, package: Package) -> None:
source_upload_id = package.upload_id
package_id = package.package_id
logger = self.logger.bind(package_id=package_id, source_upload_id=source_upload_id)
uploads = self.call_api('uploads.get_uploads', name=package_id)
if len(uploads) > 1:
self.logger.warning('upload name is not unique')
if len(uploads) == 0:
self.logger.info('upload does not exist')
return
for upload in uploads:
if not upload.published:
self.logger.info('upload is not published, therefore cannot re-publish')
continue
upload = self.call_api(
'uploads.exec_upload_operation', upload_id=upload.upload_id,
payload=dict(operation='publish'))
sleep = utils.SleepTimeBackoff()
while upload.process_running:
upload = self.call_api('uploads.get_upload', upload_id=upload.upload_id)
sleep()
if upload.tasks_status == FAILURE:
event = 'could not re publish upload'
logger.error(event, process_errors=upload.errors)
else:
logger.info('republished upload')
def migrate_package(self, package: Package, delete_failed: str = '') -> 'Report':
""" Migrates the given package. For other params see :func:`migrate`. """
......@@ -851,14 +892,18 @@ class NomadCOEMigration:
# check if the package is already uploaded
upload = None
try:
uploads = self.call_api('uploads.get_uploads')
for a_upload in uploads:
if a_upload.name == package_id and len(a_upload.errors) == 0:
assert upload is None, 'duplicate upload name'
upload = a_upload
uploads = self.call_api('uploads.get_uploads', name=package_id)
if len(uploads) > 1:
event = 'duplicate upload name'
package.migration_failure(event)
report.failed_packages += 1
return report
elif len(uploads) == 1:
upload = uploads[0]
except Exception as e:
event = 'could verify if upload already exists'
self.logger.error(event, exc_info=e)
event = 'could not verify if upload already exists'
logger.error(event, exc_info=e)
package.migration_failure(event)
report.failed_packages += 1
return report
......@@ -871,12 +916,12 @@ class NomadCOEMigration:
'uploads.upload', name=package_id, local_path=package.package_path)
except Exception as e:
event = 'could not upload package'
self.logger.error(event, exc_info=e)
logger.error(event, exc_info=e)
package.migration_failure = event + ': ' + str(e)
report.failed_packages += 1
return report
else:
self.logger.info('package was already uploaded')
logger.info('package was already uploaded')
logger = logger.bind(
source_upload_id=source_upload_id, upload_id=upload.upload_id)
......@@ -1044,12 +1089,8 @@ class NomadCOEMigration:
sleep = utils.SleepTimeBackoff()
while upload.process_running:
try:
upload = self.call_api('uploads.get_upload', upload_id=upload.upload_id)
sleep()
except HTTPNotFound:
# the proc upload will be deleted by the publish operation
break
upload = self.call_api('uploads.get_upload', upload_id=upload.upload_id)
sleep()
if upload.tasks_status == FAILURE:
event = 'could not publish upload'
......
......@@ -174,14 +174,15 @@ class Proc(Document, metaclass=ProcMetaclass):
@classmethod
def create(cls, **kwargs):
""" Factory method that must be used instead of regular constructor. """
assert cls.tasks is not None and len(cls.tasks) > 0, \
""" the class attribute tasks must be overwritten with an actual list """
assert 'tasks_status' not in kwargs, \
""" do not set the status manually, its managed """
kwargs.setdefault('create_time', datetime.now())
self = cls(**kwargs)
self.tasks_status = PENDING if self.current_task is None else RUNNING
if len(cls.tasks) == 0:
self.tasks_status = SUCCESS
else:
self.tasks_status = PENDING if self.current_task is None else RUNNING
self.save()
return self
......@@ -424,7 +425,7 @@ def unwarp_task(task, cls_name, self_id, *args, **kwargs):
logger.warning('called object is missing')
raise task.retry(exc=e, countdown=3)
except KeyError:
logger.critical('called object is missing, retries exeeded')
logger.critical('called object is missing, retries exeeded', proc_id=self_id)
raise ProcObjectDoesNotExist()
return self
......
......@@ -34,7 +34,7 @@ from datetime import datetime
from pymongo import UpdateOne
from nomad import utils, coe_repo, config, infrastructure, search, datamodel
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles, StagingUploadFiles
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
from nomad.parsing import parser_dict, match_parser, LocalBackend
from nomad.normalizing import normalizers
......@@ -377,6 +377,7 @@ class Upload(Proc):
user_id = StringField(required=True)
published = BooleanField(default=False)
publish_time = DateTimeField()
last_update = DateTimeField()
meta: Any = {
'indexes': [
......@@ -404,18 +405,17 @@ class Upload(Proc):
upload_files.user_metadata = data
@classmethod
def get(cls, id: str, include_published: bool = False) -> 'Upload':
def get(cls, id: str, include_published: bool = True) -> 'Upload':
upload = cls.get_by_id(id, 'upload_id')
# TODO published uploads should not be hidden by this and API
if upload is not None and (not upload.published or include_published):
if upload is not None:
return upload
raise KeyError()
@classmethod
def user_uploads(cls, user: coe_repo.User) -> List['Upload']:
""" Returns all uploads for the given user. Currently returns all uploads. """
return cls.objects(user_id=str(user.user_id), published=False)
def user_uploads(cls, user: coe_repo.User, **kwargs) -> List['Upload']:
""" Returns all uploads for the given user. Kwargs are passed to mongo query. """
return cls.objects(user_id=str(user.user_id), **kwargs)
@property
def uploader(self):
......@@ -480,20 +480,6 @@ class Upload(Proc):
return True # do not save the process status on the delete upload
@process
def enforce_consistency(self):
"""
Takes the proc data of this upload as truth and updates coe repository db and
ES index accordingly. It takes userdata from coe repository db, if exists as truth.
"""
# retrive data from coe repository
# diff coe repository data with proc data
# update coe repository data with diffs
# update the elastic search index (with diffs?)
@process
def publish_upload(self):
"""
......@@ -501,27 +487,32 @@ class Upload(Proc):
pack the staging upload files in to public upload files, add entries to the
coe repository db and remove this instance and its calculation from the
processing state db.
If the upload is already published (i.e. re-publish), it will update user metadata from
repository db, publish to repository db if not exists, update the search index.
"""
assert self.processed_calcs > 0
logger = self.get_logger()
logger.info('started to publish')
with utils.lnr(logger, 'publish failed'):
upload_with_metadata = self.to_upload_with_metadata()
with utils.lnr(logger, '(re-)publish failed'):
upload_with_metadata = self.to_upload_with_metadata(self.metadata)
if config.repository_db.publish_enabled:
coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id)
if coe_upload is None:
with utils.timer(
logger, 'upload added to repository', step='repo',
upload_size=self.upload_files.size):
coe_upload = coe_repo.Upload.publish(upload_with_metadata)
with utils.timer(
logger, 'upload added to repository', step='repo',
logger, 'upload read from repository', step='repo',
upload_size=self.upload_files.size):
coe_repo.Upload.publish(upload_with_metadata)
if config.repository_db.publish_enabled:
coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id)
if coe_upload is not None:
calcs = [coe_calc.to_calc_with_metadata() for coe_calc in coe_upload.calcs]
else:
calcs = []
calcs = [
coe_calc.to_calc_with_metadata()
for coe_calc in coe_upload.calcs]
else:
calcs = upload_with_metadata.calcs
......@@ -537,22 +528,28 @@ class Upload(Proc):
Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])
with utils.timer(
logger, 'staged upload files packed', step='pack',
upload_size=self.upload_files.size):
self.upload_files.pack(upload_with_metadata)
if isinstance(self.upload_files, StagingUploadFiles):
with utils.timer(
logger, 'staged upload files packed', step='pack',
upload_size=self.upload_files.size):
self.upload_files.pack(upload_with_metadata)
with utils.timer(
logger, 'index updated', step='index',
upload_size=self.upload_files.size):
search.publish(calcs)
with utils.timer(
logger, 'staged upload deleted', step='delete staged',
upload_size=self.upload_files.size):
self.upload_files.delete()
self.published = True
self.publish_time = datetime.now()
if isinstance(self.upload_files, StagingUploadFiles):
with utils.timer(
logger, 'staged upload deleted', step='delete staged',
upload_size=self.upload_files.size):
self.upload_files.delete()
self.published = True
self.publish_time = datetime.now()
self.last_update = datetime.now()
self.save()
else:
self.last_update = datetime.now()
self.save()
@process
......@@ -723,38 +720,44 @@ class Upload(Proc):
def calcs(self):
return Calc.objects(upload_id=self.upload_id, tasks_status=SUCCESS)
def to_upload_with_metadata(self) -> UploadWithMetadata:
def to_upload_with_metadata(self, user_metadata: dict = None) -> UploadWithMetadata:
# prepare user metadata per upload and per calc
calc_metadatas: Dict[str, Any] = dict()
upload_metadata: Dict[str, Any] = dict()
if user_metadata is not None:
calc_metadatas: Dict[str, Any] = dict()
upload_metadata: Dict[str, Any] = dict()
if self.metadata is not None:
upload_metadata.update(self.metadata)
upload_metadata.update(user_metadata)
if 'calculations' in upload_metadata:
del(upload_metadata['calculations'])
for calc in self.metadata.get('calculations', []): # pylint: disable=no-member
for calc in user_metadata.get('calculations', []): # pylint: disable=no-member
calc_metadatas[calc['mainfile']] = calc
user_upload_time = upload_metadata.get('_upload_time', None)
user_upload_time = upload_metadata.get('_upload_time', None)
def get_metadata(calc: Calc):
"""
Assemble metadata from calc's processed calc metadata and the uploads
user metadata.
"""
calc_data = calc.metadata
calc_with_metadata = datamodel.CalcWithMetadata(**calc_data)
calc_metadata = dict(upload_metadata)
calc_metadata.update(calc_metadatas.get(calc.mainfile, {}))
calc_with_metadata.apply_user_metadata(calc_metadata)
return calc_with_metadata
else:
user_upload_time = None
def get_metadata(calc: Calc):
return datamodel.CalcWithMetadata(**calc.metadata)
result = UploadWithMetadata(
upload_id=self.upload_id,
uploader=utils.POPO(id=int(self.user_id)),
upload_time=self.upload_time if user_upload_time is None else user_upload_time)
def get_metadata(calc: Calc):
"""
Assemble metadata from calc's processed calc metadata and the uploads
user metadata.
"""
calc_data = calc.metadata
calc_with_metadata = datamodel.CalcWithMetadata(**calc_data)
calc_metadata = dict(upload_metadata)
calc_metadata.update(calc_metadatas.get(calc.mainfile, {}))
calc_with_metadata.apply_user_metadata(calc_metadata)
return calc_with_metadata
result.calcs = [get_metadata(calc) for calc in Calc.objects(upload_id=self.upload_id)]
return result
......
......@@ -88,6 +88,21 @@ def test_simple_process(worker, mongo, no_warn):
assert_proc(p, 'two')
class ProcTwice(Proc):
@process
def process(self):
pass
def test_process_twice(worker, mongo, no_warn):
p = ProcTwice.create()
p.process()
p.block_until_complete()
p.process()
p.block_until_complete()
assert_proc(p, None)
class TaskInProc(Proc):
@process
@task
......
......@@ -103,7 +103,7 @@ def test_processing(processed, no_warn, mails, monkeypatch):
def test_publish(non_empty_processed: Upload, no_warn, example_user_metadata, monkeypatch, with_publish_to_coe_repo):
processed = non_empty_processed
processed.metadata = example_user_metadata
processed.compress_and_set_metadata(example_user_metadata)
additional_keys = ['with_embargo']
if with_publish_to_coe_repo:
......@@ -115,7 +115,7 @@ def test_publish(non_empty_processed: Upload, no_warn, example_user_metadata, mo
except Exception:
pass
upload = processed.to_upload_with_metadata()
upload = processed.to_upload_with_metadata(example_user_metadata)
if with_publish_to_coe_repo:
assert_coe_upload(upload.upload_id, user_metadata=example_user_metadata)
......@@ -123,6 +123,55 @@ def test_publish(non_empty_processed: Upload, no_warn, example_user_metadata, mo
assert_search_upload(upload, additional_keys, published=True)
def test_republish(non_empty_processed: Upload, no_warn, example_user_metadata, monkeypatch, with_publish_to_coe_repo):
processed = non_empty_processed
processed.compress_and_set_metadata(example_user_metadata)
additional_keys = ['with_embargo']
if with_publish_to_coe_repo:
additional_keys.append('pid')
processed.publish_upload()
processed.block_until_complete(interval=.01)
assert Upload.get('examples_template') is not None
processed.publish_upload()
processed.block_until_complete(interval=.01)
upload = processed.to_upload_with_metadata(example_user_metadata)
if with_publish_to_coe_repo:
assert_coe_upload(upload.upload_id, user_metadata=example_user_metadata)
assert_upload_files(upload, PublicUploadFiles, published=True)
assert_search_upload(upload, additional_keys, published=True)
def test_republish_to_coe(non_empty_processed: Upload, no_warn, example_user_metadata, monkeypatch):
"""
Test the following scenario: initial processing + publish without coe repo, then
republishing with coe repo.
"""
monkeypatch.setattr('nomad.config.repository_db.publish_enabled', False)
processed = non_empty_processed
processed.compress_and_set_metadata(example_user_metadata)
processed.publish_upload()
processed.block_until_complete(interval=.01)
assert Upload.get('examples_template') is not None
monkeypatch.setattr('nomad.config.repository_db.publish_enabled', True)
processed.publish_upload()
processed.block_until_complete(interval=.01)
upload = processed.to_upload_with_metadata(example_user_metadata)
additional_keys = ['with_embargo', 'pid']
assert_coe_upload(upload.upload_id, user_metadata=example_user_metadata)
assert_upload_files(upload, PublicUploadFiles, published=True)
assert_search_upload(upload, additional_keys, published=True)
def test_publish_failed(
non_empty_uploaded: Tuple[str, str], example_user_metadata, test_user,
monkeypatch, proc_infra, with_publish_to_coe_repo):
......@@ -130,7 +179,7 @@ def test_publish_failed(
mock_failure(Calc, 'parsing', monkeypatch)
processed = run_processing(non_empty_uploaded, test_user)
processed.metadata = example_user_metadata
processed.compress_and_set_metadata(example_user_metadata)
additional_keys = ['with_embargo']
if with_publish_to_coe_repo:
......@@ -142,11 +191,11 @@ def test_publish_failed(
except Exception:
pass
upload = processed.to_upload_with_metadata()
upload = processed.to_upload_with_metadata(example_user_metadata)
if with_publish_to_coe_repo:
assert_coe_upload(upload.upload_id, user_metadata=example_user_metadata)
assert_upload_files(upload, PublicUploadFiles, published=True)
assert_upload_files(upload, PublicUploadFiles, published=True, no_archive=True)
assert_search_upload(upload, additional_keys, published=True, processed=False)
......
......@@ -234,7 +234,7 @@ class TestUploads:
assert_upload_files(upload_with_metadata, files.StagingUploadFiles)
assert_search_upload(upload_with_metadata, additional_keys=['atoms', 'system'])
def assert_published(self, client, test_user_auth, upload_id, proc_infra, with_coe_repo=True, metadata={}):
def assert_published(self, client, test_user_auth, upload_id, proc_infra, with_coe_repo=True, metadata={}, publish_with_metadata: bool = True):
rv = client.get('/uploads/%s' % upload_id, headers=test_user_auth)
upload = self.assert_upload(rv.data)
......@@ -243,7 +243,7 @@ class TestUploads:
rv = client.post(
'/uploads/%s' % upload_id,
headers=test_user_auth,
data=json.dumps(dict(operation='publish', metadata=metadata)),
data=json.dumps(dict(operation='publish', metadata=metadata if publish_with_metadata else {})),
content_type='application/json')
assert rv.status_code == 200
upload = self.assert_upload(rv.data)
......@@ -359,7 +359,7 @@ class TestUploads:
self.assert_processing(client, test_user_auth, upload['upload_id'])
self.assert_published(client, test_user_auth, upload['upload_id'], proc_infra, with_coe_repo=with_publish_to_coe_repo)
rv = client.delete('/uploads/%s' % upload['upload_id'], headers=test_user_auth)
assert rv.status_code == 404
assert rv.status_code == 400