Commit 43b37555 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added functions to processing that allows to reprocess a published calc. Related to #110, #174

parent 5bbd5da0
Pipeline #52316 passed with stages
in 29 minutes and 27 seconds
......@@ -229,6 +229,10 @@ class UploadFiles(DirectoryObject, metaclass=ABCMeta):
with open(self._user_metadata_file.os_path, 'wb') as f:
pickle.dump(data, f)
def to_staging_upload_files(self, create: bool = False) -> 'StagingUploadFiles':
""" Casts to or creates corresponding staging upload files or returns None. """
raise NotImplementedError()
@staticmethod
def get(upload_id: str, *args, **kwargs) -> 'UploadFiles':
if DirectoryObject(config.fs.staging, upload_id, prefix=True).exists():
......@@ -315,6 +319,9 @@ class StagingUploadFiles(UploadFiles):
self._size = 0
self._shared = DirectoryObject(config.fs.public, upload_id, create=create)
def to_staging_upload_files(self, create: bool = False) -> 'StagingUploadFiles':
return self
@property
def _user_metadata_file(self):
return self._shared.join_file('user_metadata.pickle')
......@@ -421,14 +428,19 @@ class StagingUploadFiles(UploadFiles):
"""
copytree(self._raw_dir.os_path, os.path.join(config.fs.coe_extracted, self.upload_id))
def pack(self, upload: UploadWithMetadata) -> None:
def pack(
self, upload: UploadWithMetadata, target_dir: DirectoryObject = None,
skip_raw: bool = False) -> None:
"""
Replaces the staging upload data with a public upload record by packing all
data into files. It is only available if upload *is_bag*.
This is potentially a long running operation.
Arguments:
calcs: The calculation metadata of the upload used to determine what files to
pack and what the embargo situation is.
upload: The upload with all calcs and calculation metadata of the upload
used to determine what files to pack and what the embargo situation is.
target_dir: optional DirectoryObject to override where to put the files. Default
is the corresponding public upload files directory.
skip_raw: determine to not pack the raw data, only archive and user metadata
"""
self.logger.debug('started to pack upload')
......@@ -438,9 +450,10 @@ class StagingUploadFiles(UploadFiles):
f.write('frozen')
# create a target dir in the public bucket
target_dir = DirectoryObject(
config.fs.public, self.upload_id, create=True, prefix=True,
create_prefix=True)
if target_dir is None:
target_dir = DirectoryObject(
config.fs.public, self.upload_id, create=True, prefix=True,
create_prefix=True)
assert target_dir.exists()
# copy user metadata
......@@ -450,12 +463,35 @@ class StagingUploadFiles(UploadFiles):
self._user_metadata_file.os_path,
target_metadata_file.os_path)
# In prior versions we used bagit on raw files. There was not much purpose for
# it, so it was removed. Check 0.3.x for the implementation
def create_zipfile(kind: str, prefix: str, ext: str) -> zipfile.ZipFile:
file = target_dir.join_file('%s-%s.%s.zip' % (kind, prefix, ext))
return zipfile.ZipFile(file.os_path, mode='w')
# In prior versions we used bagit on raw files. There was not much purpose for
# it, so it was removed. Check 0.3.x for the implementation
# zip archives
archive_public_zip = create_zipfile('archive', 'public', self._archive_ext)
archive_restricted_zip = create_zipfile('archive', 'restricted', self._archive_ext)
for calc in upload.calcs:
archive_zip = archive_restricted_zip if calc.with_embargo else archive_public_zip
archive_filename = '%s.%s' % (calc.calc_id, self._archive_ext)
archive_file = self._archive_dir.join_file(archive_filename)
if archive_file.exists():
archive_zip.write(archive_file.os_path, archive_filename)
archive_log_filename = '%s.%s' % (calc.calc_id, 'log')
log_file = self._archive_dir.join_file(archive_log_filename)
if log_file.exists():
archive_zip.write(log_file.os_path, archive_log_filename)
archive_restricted_zip.close()
archive_public_zip.close()
self.logger.debug('packed archives')
if skip_raw:
return
# zip raw files
raw_public_zip = create_zipfile('raw', 'public', 'plain')
......@@ -493,29 +529,6 @@ class StagingUploadFiles(UploadFiles):
raw_public_zip.close()
self.logger.debug('packed raw files')
# zip archives
archive_public_zip = create_zipfile('archive', 'public', self._archive_ext)
archive_restricted_zip = create_zipfile('archive', 'restricted', self._archive_ext)
for calc in upload.calcs:
archive_zip = archive_restricted_zip if calc.with_embargo else archive_public_zip
archive_filename = '%s.%s' % (calc.calc_id, self._archive_ext)
archive_file = self._archive_dir.join_file(archive_filename)
if archive_file.exists():
archive_zip.write(archive_file.os_path, archive_filename)
archive_log_filename = '%s.%s' % (calc.calc_id, 'log')
log_file = self._archive_dir.join_file(archive_log_filename)
if log_file.exists():
archive_zip.write(log_file.os_path, archive_log_filename)
archive_restricted_zip.close()
archive_public_zip.close()
self.logger.debug('packed archives')
self.logger.debug('packed upload')
def raw_file_manifest(self, path_prefix: str = None) -> Generator[str, None, None]:
upload_prefix_len = len(self._raw_dir.os_path) + 1
for root, _, files in os.walk(self._raw_dir.os_path):
......@@ -649,6 +662,34 @@ class ArchiveBasedStagingUploadFiles(StagingUploadFiles):
assert False, 'do not add_rawfiles to a %s' % self.__class__.__name__
class PublicUploadFilesBasedStagingUploadFiles(StagingUploadFiles):
"""
:class:`StagingUploadFiles` based on a single uploaded archive file (.zip)
Arguments:
upload_path: The path to the uploaded file.
"""
def __init__(
self, public_upload_files: 'PublicUploadFiles', *args, **kwargs) -> None:
super().__init__(public_upload_files.upload_id, *args, **kwargs)
self.public_upload_files = public_upload_files
def extract(self) -> None:
assert next(self.raw_file_manifest(), None) is None, 'can only extract once'
for access in ['public', 'restricted']:
super().add_rawfiles(
self.public_upload_files.get_zip_file('raw', access, 'plain').os_path,
force_archive=True)
def add_rawfiles(self, path: str, move: bool = False, prefix: str = None, force_archive: bool = False) -> None:
assert False, 'do not add_rawfiles to a %s' % self.__class__.__name__
def pack(self, upload: UploadWithMetadata, *args, **kwargs) -> None:
""" Packs only the archive contents and stores it in the existing public upload files. """
super().pack(upload, target_dir=self.public_upload_files, skip_raw=True)
class LRUZipFileCache(cachetools.LRUCache):
""" Specialized cache that closes the cached zipfiles on eviction """
def __init__(self, maxsize):
......@@ -666,9 +707,12 @@ class PublicUploadFiles(UploadFiles):
def __init__(self, *args, **kwargs) -> None:
super().__init__(config.fs.public, *args, **kwargs)
def get_zip_file(self, prefix: str, access: str, ext: str) -> PathObject:
return self.join_file('%s-%s.%s.zip' % (prefix, access, ext))
@cachetools.cached(cache=__zip_file_cache)
def get_zip_file(self, prefix: str, access: str, ext: str) -> zipfile.ZipFile:
zip_file = self.join_file('%s-%s.%s.zip' % (prefix, access, ext))
def open_zip_file(self, prefix: str, access: str, ext: str) -> zipfile.ZipFile:
zip_file = self.get_zip_file(prefix, access, ext)
return zipfile.ZipFile(zip_file.os_path)
def _file(self, prefix: str, ext: str, path: str, *args, **kwargs) -> IO:
......@@ -679,7 +723,7 @@ class PublicUploadFiles(UploadFiles):
for access in ['public', 'restricted']:
try:
zf = self.get_zip_file(prefix, access, ext)
zf = self.open_zip_file(prefix, access, ext)
f = zf.open(path, 'r', **kwargs)
if (access == 'restricted' or always_restricted(path)) and not self._is_authorized():
......@@ -697,13 +741,25 @@ class PublicUploadFiles(UploadFiles):
raise KeyError()
def to_staging_upload_files(self, create: bool = False) -> 'StagingUploadFiles':
try:
staging_upload_files = PublicUploadFilesBasedStagingUploadFiles(self)
except KeyError:
if not create:
return None
staging_upload_files = PublicUploadFilesBasedStagingUploadFiles(self, create=True)
staging_upload_files.extract()
return staging_upload_files
def raw_file(self, file_path: str, *args, **kwargs) -> IO:
return self._file('raw', 'plain', file_path, *args, *kwargs)
def raw_file_size(self, file_path: str) -> int:
for access in ['public', 'restricted']:
try:
zf = self.get_zip_file('raw', access, 'plain')
zf = self.open_zip_file('raw', access, 'plain')
info = zf.getinfo(file_path)
if (access == 'restricted' or always_restricted(file_path)) and not self._is_authorized():
raise Restricted
......@@ -719,7 +775,7 @@ class PublicUploadFiles(UploadFiles):
def raw_file_manifest(self, path_prefix: str = None) -> Generator[str, None, None]:
for access in ['public', 'restricted']:
try:
zf = self.get_zip_file('raw', access, 'plain')
zf = self.open_zip_file('raw', access, 'plain')
for path in zf.namelist():
if path_prefix is None or path.startswith(path_prefix):
yield path
......@@ -737,7 +793,7 @@ class PublicUploadFiles(UploadFiles):
continue
try:
zf = self.get_zip_file('raw', access, 'plain')
zf = self.open_zip_file('raw', access, 'plain')
for path in zf.namelist():
content_path = path[directory_len + (0 if directory_len == 0 else 1):]
if path.startswith(directory) and '/' not in content_path:
......
......@@ -187,6 +187,13 @@ class Proc(Document, metaclass=ProcMetaclass):
return self
def reset(self):
""" Resets the task chain. Assumes there no current running process. """
self.current_task = None
self.tasks_status = PENDING
self.errors = []
self.warnings = []
@classmethod
def get_by_id(cls, id: str, id_field: str):
try:
......@@ -261,7 +268,7 @@ class Proc(Document, metaclass=ProcMetaclass):
tasks = self.__class__.tasks
assert task in tasks, 'task %s must be one of the classes tasks %s' % (task, str(tasks)) # pylint: disable=E1135
if self.current_task is None:
assert task == tasks[0], "process has to start with first task" # pylint: disable=E1136
assert task == tasks[0], "process has to start with first task %s" % tasks[0] # pylint: disable=E1136
elif tasks.index(task) <= tasks.index(self.current_task):
# task is repeated, probably the celery task of the process was reschedule
# due to prior worker failure
......
......@@ -143,8 +143,38 @@ class Calc(Proc):
return wrap_logger(logger, processors=[save_to_calc_log])
@process
def re_process_calc(self):
"""
Processes a calculation again. This means there is already metadata and
instead of creating it initially, we are just updating the existing
records.
"""
logger = self.get_logger()
try:
self.metadata['nomad_version'] = config.version
self.metadata['nomad_commit'] = config.commit
self.metadata['last_processing'] = datetime.now()
self.parsing()
self.normalizing()
self.archiving()
finally:
# close loghandler that was not closed due to failures
try:
if self._calc_proc_logwriter is not None:
self._calc_proc_logwriter.close()
self._calc_proc_logwriter = None
except Exception as e:
logger.error('could not close calculation proc log', exc_info=e)
@process
def process_calc(self):
"""
Processes a new calculation that has no prior records in the mongo, elastic,
or filesystem storage. It will create an initial set of (user) metadata.
"""
logger = self.get_logger()
if self.upload is None:
logger.error('calculation upload does not exist')
......@@ -215,7 +245,7 @@ class Calc(Proc):
# the save might be necessary to correctly read the join condition from the db
self.save()
# in case of error, the process_name might be unknown
if process_name == 'process_calc' or process_name is None:
if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None:
self.upload.reload()
self.upload.check_join()
......@@ -571,6 +601,35 @@ class Upload(Proc):
self.last_update = datetime.now()
self.save()
@process
def re_process_upload(self):
"""
Runs the distributed process of fully reparsing/renormalizing an existing and
already published upload. Will renew the archive part of the upload and update
mongo and elastic search entries.
TODO this implementation does not do any re-matching. This will be more complex
due to handling of new or missing matches.
"""
assert self.published
self.reset()
# mock the steps of actual processing
self._continue_with('uploading')
# extract the published raw files into a staging upload files instance
self._continue_with('extracting')
public_upload_files = cast(PublicUploadFiles, self.upload_files)
public_upload_files.to_staging_upload_files(create=True)
self._continue_with('parse_all')
for calc in self.calcs:
calc.reset()
calc.re_process_calc()
# the packing and removing of the staging upload files, will be trigged by
# the 'cleanup' task after processing all calcs
@process
def process_upload(self):
self.extracting()
......@@ -705,7 +764,7 @@ class Upload(Proc):
calc.process_calc()
def on_process_complete(self, process_name):
if process_name == 'process_upload':
if process_name == 'process_upload' or process_name == 're_process_upload':
self.check_join()
def check_join(self):
......@@ -727,10 +786,7 @@ class Upload(Proc):
base = base[:-1]
return '%s/uploads/' % base
@task
def cleanup(self):
search.refresh()
def _cleanup_after_processing(self):
# send email about process finish
user = self.uploader
name = '%s %s' % (user.first_name, user.last_name)
......@@ -753,6 +809,35 @@ class Upload(Proc):
# don't fail or present this error to clients
self.logger.error('could not send after processing email', exc_info=e)
def _cleanup_after_re_processing(self):
logger = self.get_logger()
logger.info('started to repack re-processed upload')
staging_upload_files = self.upload_files.to_staging_upload_files()
with utils.timer(
logger, 'reprocessed staged upload packed', step='delete staged',
upload_size=self.upload_files.size):
staging_upload_files.pack(self.to_upload_with_metadata())
with utils.timer(
logger, 'reprocessed staged upload deleted', step='delete staged',
upload_size=self.upload_files.size):
staging_upload_files.delete()
self.last_update = datetime.now()
self.save()
@task
def cleanup(self):
search.refresh()
if self.current_process == 're_process_upload':
self._cleanup_after_re_processing()
else:
self._cleanup_after_processing()
def get_calc(self, calc_id) -> Calc:
return Calc.objects(upload_id=self.upload_id, calc_id=calc_id).first()
......
......@@ -18,6 +18,7 @@ from datetime import datetime
import os.path
import json
import re
import shutil
from nomad import utils, infrastructure, config, datamodel
from nomad.files import UploadFiles, StagingUploadFiles, PublicUploadFiles
......@@ -239,6 +240,84 @@ def test_process_non_existing(proc_infra, test_user, with_error):
assert len(upload.errors) > 0
def test_re_processing(non_empty_processed: Upload, no_warn, example_user_metadata, with_publish_to_coe_repo, monkeypatch):
processed = non_empty_processed
# publish
processed.compress_and_set_metadata(example_user_metadata)
additional_keys = ['with_embargo']
if with_publish_to_coe_repo:
additional_keys.append('pid')
processed.publish_upload()
try:
processed.block_until_complete(interval=.01)
except Exception:
pass
processed.reload()
assert processed.published
assert processed.upload_files.to_staging_upload_files() is None
old_upload_time = processed.last_update
first_calc = processed.calcs.first()
old_calc_time = first_calc.metadata['last_processing']
old_archive_files = list(
archive_file
for archive_file in os.listdir(processed.upload_files.os_path)
if 'archive' in archive_file)
for archive_file in old_archive_files:
with open(processed.upload_files.join_file(archive_file).os_path, 'wt') as f:
f.write('')
shutil.copyfile(
'tests/data/proc/examples_template_different_atoms.zip',
processed.upload_files.join_file('raw-restricted.plain.zip').os_path)
upload = processed.to_upload_with_metadata(example_user_metadata)
# reprocess
monkeypatch.setattr('nomad.config.version', 're_process_test_version')
monkeypatch.setattr('nomad.config.commit', 're_process_test_commit')
processed.re_process_upload()
try:
processed.block_until_complete(interval=.01)
except Exception:
pass
processed.reload()
first_calc.reload()
# assert new process time
assert processed.last_update > old_upload_time
assert first_calc.metadata['last_processing'] > old_calc_time
# assert new process version
assert first_calc.metadata['nomad_version'] == 're_process_test_version'
assert first_calc.metadata['nomad_commit'] == 're_process_test_commit'
# assert changed archive files
for archive_file in old_archive_files:
assert os.path.getsize(processed.upload_files.join_file(archive_file).os_path) > 0
# assert maintained user metadata (mongo+es+coe)
if with_publish_to_coe_repo:
assert_coe_upload(upload.upload_id, user_metadata=example_user_metadata)
assert_upload_files(upload, PublicUploadFiles, published=True)
assert_search_upload(upload, additional_keys, published=True)
if with_publish_to_coe_repo and config.repository_db.mode == 'coe':
assert(os.path.exists(os.path.join(config.fs.coe_extracted, upload.upload_id)))
assert_processing(Upload.get(upload.upload_id, include_published=True), published=True)
# assert changed calc metadata (mongo)
assert first_calc.metadata['atoms'][0] == 'H'
def mock_failure(cls, task, monkeypatch):
def mock(self):
raise Exception('fail for test')
......
......@@ -800,7 +800,8 @@ class TestRepo():
@pytest.mark.parametrize('first, order_by, order', [
('1', 'formula', -1), ('2', 'formula', 1),
('2', 'basis_set', -1), ('1', 'basis_set', 1)])
('2', 'basis_set', -1), ('1', 'basis_set', 1),
(None, 'authors', -1)])
def test_search_order(self, client, example_elastic_calcs, no_warn, first, order_by, order):
rv = client.get('/repo/?order_by=%s&order=%d' % (order_by, order))
assert rv.status_code == 200
......@@ -808,7 +809,8 @@ class TestRepo():
results = data.get('results', None)
assert data['pagination']['total'] == 2
assert len(results) == 2
assert results[0]['calc_id'] == first
if first is not None:
assert results[0]['calc_id'] == first
@pytest.mark.parametrize('n_results, size', [(2, None), (2, 5), (1, 1)])
def test_search_scroll(self, client, example_elastic_calcs, no_warn, n_results, size):
......
......@@ -394,8 +394,41 @@ class TestPublicUploadFiles(UploadFilesContract):
calc_specs, protected = request.param
upload, upload_files = create_staging_upload(test_upload_id, calc_specs=calc_specs)
upload_files.pack(upload)
upload_files.delete()
return upload, PublicUploadFiles(test_upload_id, is_authorized=lambda: not protected)
def test_to_staging_upload_files(self, test_upload):
upload, upload_files = test_upload
assert upload_files.to_staging_upload_files() is None
staging_upload_files = upload_files.to_staging_upload_files(create=True)
assert staging_upload_files is not None
assert str(staging_upload_files) == str(upload_files.to_staging_upload_files())
upload_path = upload_files.os_path
all_files = list(
os.path.join(upload_path, f)
for f in os.listdir(upload_path)
if os.path.isfile(os.path.join(upload_path, f)))
# We override the public files before packing to see what packing does to the files
for f in all_files:
with open(f, 'wt') as fh:
fh.write('')
staging_upload_files.pack(upload)
staging_upload_files.delete()
# We do a very simple check. We made all files empty, those that are rezipped
# by pack, should not be empty anymore.
new_sizes = list(os.path.getsize(f) for f in all_files)
for f, new in zip(all_files, new_sizes):
if 'archive' in f:
assert new > 0
else:
assert new == 0
assert upload_files.to_staging_upload_files() is None
def assert_upload_files(
upload: UploadWithMetadata, cls, no_archive: bool = False, **kwargs):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment