Commit 8100b926 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added repack funcitonality.

parent 5ddd305d
Pipeline #64007 passed with stages
in 21 minutes and 59 seconds
from nomad import infrastructure, files, processing as proc
infrastructure.setup_logging()
infrastructure.setup_mongo()
upload_id = 'NvVyk3gATxCJW6dWS4cRWw'
upload = proc.Upload.get(upload_id)
upload_with_metadata = upload.to_upload_with_metadata()
upload_files = files.PublicUploadFiles(upload_id)
upload_files.repack(upload_with_metadata)
# try:
# public_upload_files = files.PublicUploadFiles(upload_id)
# public_upload_files.delete()
# except Exception:
# pass
# staging_upload_files = files.StagingUploadFiles(upload_id)
# staging_upload_files.pack(upload_with_metadata)
......@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
from typing import List, Callable
import click
from tabulate import tabulate
from mongoengine import Q
......@@ -216,11 +216,8 @@ def rm(ctx, uploads, with_coe_repo, skip_es, skip_mongo, skip_files):
upload.delete()
@uploads.command(help='Reprocess selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.pass_context
def re_process(ctx, uploads, parallel: int):
def __run_processing(
ctx, uploads, parallel: int, process: Callable[[proc.Upload], None], label: str):
_, uploads = query_uploads(ctx, uploads)
uploads_count = uploads.count()
uploads = list(uploads) # copy the whole mongo query set to avoid cursor timeouts
......@@ -235,29 +232,29 @@ def re_process(ctx, uploads, parallel: int):
logger = utils.get_logger(__name__)
print('%d uploads selected, re-processing ...' % uploads_count)
print('%d uploads selected, %s ...' % (uploads_count, label))
def re_process_upload(upload: proc.Upload):
logger.info('re-processing started', upload_id=upload.upload_id)
def process_upload(upload: proc.Upload):
logger.info('%s started' % label, upload_id=upload.upload_id)
completed = False
if upload.process_running:
logger.warn(
'cannot trigger re-process, since the upload is already/still processing',
'cannot trigger %s, since the upload is already/still processing' % label,
current_process=upload.current_process,
current_task=upload.current_task, upload_id=upload.upload_id)
else:
upload.reset()
upload.re_process_upload()
process(upload)
upload.block_until_complete(interval=.5)
if upload.tasks_status == proc.FAILURE:
logger.info('re-processing with failure', upload_id=upload.upload_id)
logger.info('%s with failure' % label, upload_id=upload.upload_id)
completed = True
logger.info('re-processing complete', upload_id=upload.upload_id)
logger.info('%s complete' % label, upload_id=upload.upload_id)
with cv:
state['completed_count'] += 1 if completed else 0
......@@ -265,8 +262,8 @@ def re_process(ctx, uploads, parallel: int):
state['available_threads_count'] += 1
print(
' re-processed %s and skipped %s of %s uploads' %
(state['completed_count'], state['skipped_count'], uploads_count))
' %s %s and skipped %s of %s uploads' %
(label, state['completed_count'], state['skipped_count'], uploads_count))
cv.notify()
......@@ -274,7 +271,7 @@ def re_process(ctx, uploads, parallel: int):
with cv:
cv.wait_for(lambda: state['available_threads_count'] > 0)
state['available_threads_count'] -= 1
thread = threading.Thread(target=lambda: re_process_upload(upload))
thread = threading.Thread(target=lambda: process_upload(upload))
threads.append(thread)
thread.start()
......@@ -282,6 +279,22 @@ def re_process(ctx, uploads, parallel: int):
thread.join()
@uploads.command(help='Reprocess selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.pass_context
def re_process(ctx, uploads, parallel: int):
__run_processing(ctx, uploads, parallel, lambda upload: upload.re_process_upload(), 're-processing')
@uploads.command(help='Repack selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.pass_context
def re_pack(ctx, uploads, parallel: int):
__run_processing(ctx, uploads, parallel, lambda upload: upload.re_pack(), 're-packing')
@uploads.command(help='Attempt to abort the processing of uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--calcs', is_flag=True, help='Only stop calculation processing.')
......
......@@ -368,7 +368,9 @@ class StagingUploadFiles(UploadFiles):
def archive_log_file_object(self, calc_id: str) -> PathObject:
return self._archive_dir.join_file('%s.log' % calc_id)
def add_rawfiles(self, path: str, move: bool = False, prefix: str = None, force_archive: bool = False) -> None:
def add_rawfiles(
self, path: str, move: bool = False, prefix: str = None,
force_archive: bool = False, target_dir: DirectoryObject = None) -> None:
"""
Add rawfiles to the upload. The given file will be copied, moved, or extracted.
......@@ -378,11 +380,12 @@ class StagingUploadFiles(UploadFiles):
prefix: Optional path prefix for the added files.
force_archive: Expect the file to be a zip or other support archive file.
Usually those files are only extracted if they can be extracted and copied instead.
target_dir: Overwrite the used directory to extract to. Default is the raw directory of this upload.
"""
assert not self.is_frozen
assert os.path.exists(path)
self._size += os.stat(path).st_size
target_dir = self._raw_dir
target_dir = self._raw_dir if target_dir is None else target_dir
if prefix is not None:
target_dir = target_dir.join_dir(prefix, create=True)
ext = os.path.splitext(path)[1]
......@@ -431,7 +434,7 @@ class StagingUploadFiles(UploadFiles):
def pack(
self, upload: UploadWithMetadata, target_dir: DirectoryObject = None,
skip_raw: bool = False) -> None:
skip_raw: bool = False, skip_archive: bool = False) -> None:
"""
Replaces the staging upload data with a public upload record by packing all
data into files. It is only available if upload *is_bag*.
......@@ -442,6 +445,7 @@ class StagingUploadFiles(UploadFiles):
target_dir: optional DirectoryObject to override where to put the files. Default
is the corresponding public upload files directory.
skip_raw: determine to not pack the raw data, only archive and user metadata
skip_raw: determine to not pack the archive data, only raw and user metadata
"""
self.logger.info('started to pack upload')
......@@ -471,6 +475,17 @@ class StagingUploadFiles(UploadFiles):
return zipfile.ZipFile(file.os_path, mode='w')
# zip archives
if not skip_archive:
self._pack_archive_files(upload, create_zipfile)
self.logger.info('packed archives')
# zip raw files
if not skip_raw:
self._pack_raw_files(upload, create_zipfile)
self.logger.info('packed raw files')
def _pack_archive_files(self, upload: UploadWithMetadata, create_zipfile):
archive_public_zip = create_zipfile('archive', 'public', self._archive_ext)
archive_restricted_zip = create_zipfile('archive', 'restricted', self._archive_ext)
......@@ -495,12 +510,7 @@ class StagingUploadFiles(UploadFiles):
archive_restricted_zip.close()
archive_public_zip.close()
self.logger.info('packed archives')
if skip_raw:
return
# zip raw files
def _pack_raw_files(self, upload: UploadWithMetadata, create_zipfile):
raw_public_zip = create_zipfile('raw', 'public', 'plain')
raw_restricted_zip = create_zipfile('raw', 'restricted', 'plain')
......@@ -540,8 +550,6 @@ class StagingUploadFiles(UploadFiles):
raw_restricted_zip.close()
raw_public_zip.close()
self.logger.info('packed raw files')
def raw_file_manifest(self, path_prefix: str = None) -> Generator[str, None, None]:
upload_prefix_len = len(self._raw_dir.os_path) + 1
for root, _, files in os.walk(self._raw_dir.os_path):
......@@ -671,7 +679,9 @@ class ArchiveBasedStagingUploadFiles(StagingUploadFiles):
assert next(self.raw_file_manifest(), None) is None, 'can only extract once'
super().add_rawfiles(self.upload_path, force_archive=True)
def add_rawfiles(self, path: str, move: bool = False, prefix: str = None, force_archive: bool = False) -> None:
def add_rawfiles(
self, path: str, move: bool = False, prefix: str = None,
force_archive: bool = False, target_dir: DirectoryObject = None) -> None:
assert False, 'do not add_rawfiles to a %s' % self.__class__.__name__
......@@ -688,14 +698,19 @@ class PublicUploadFilesBasedStagingUploadFiles(StagingUploadFiles):
super().__init__(public_upload_files.upload_id, *args, **kwargs)
self.public_upload_files = public_upload_files
def extract(self) -> None:
def extract(self, include_archive: bool = False) -> None:
assert next(self.raw_file_manifest(), None) is None, 'can only extract once'
for access in ['public', 'restricted']:
super().add_rawfiles(
self.public_upload_files.get_zip_file('raw', access, 'plain').os_path,
force_archive=True)
def add_rawfiles(self, path: str, move: bool = False, prefix: str = None, force_archive: bool = False) -> None:
if include_archive:
super().add_rawfiles(
self.public_upload_files.get_zip_file('archive', access, self._archive_ext).os_path,
force_archive=True, target_dir=self._archive_dir)
def add_rawfiles(self, *args, **kwargs) -> None:
assert False, 'do not add_rawfiles to a %s' % self.__class__.__name__
def pack(self, upload: UploadWithMetadata, *args, **kwargs) -> None:
......@@ -751,15 +766,20 @@ class PublicUploadFiles(UploadFiles):
raise KeyError()
def to_staging_upload_files(self, create: bool = False) -> 'StagingUploadFiles':
def to_staging_upload_files(self, create: bool = False, **kwargs) -> 'StagingUploadFiles':
exists = False
try:
staging_upload_files = PublicUploadFilesBasedStagingUploadFiles(self)
exists = True
except KeyError:
if not create:
return None
staging_upload_files = PublicUploadFilesBasedStagingUploadFiles(self, create=True)
staging_upload_files.extract()
staging_upload_files.extract(**kwargs)
if exists and create:
raise FileExistsError('Staging upload does already exist')
return staging_upload_files
......@@ -826,13 +846,53 @@ class PublicUploadFiles(UploadFiles):
def archive_log_file(self, calc_id: str, *args, **kwargs) -> IO:
return self._file('archive', self._archive_ext, '%s.log' % calc_id, *args, **kwargs)
def repack(self) -> None:
def re_pack(
self, upload: UploadWithMetadata, skip_raw: bool = False,
skip_archive: bool = False) -> None:
"""
Replaces the existing public/restricted data file pairs with new ones, based
on current restricted information in the metadata. Should be used after updating
the restrictions on calculations. This is potentially a long running operation.
"""
raise NotImplementedError()
# compute a list of files to repack
files = []
kinds = []
if not skip_archive:
kinds.append(('archive', self._archive_ext))
if not skip_raw:
kinds.append(('raw', 'plain'))
for kind, ext in kinds:
for prefix in ['public', 'restricted']:
files.append((
self.join_file('%s-%s.%s.repacked.zip' % (kind, prefix, ext)),
self.join_file('%s-%s.%s.zip' % (kind, prefix, ext))))
# check if there already is a running repack
for repacked_file, _ in files:
if repacked_file.exists():
raise FileExistsError('Repacked files already exist')
# create staging files
staging_upload = self.to_staging_upload_files(create=True, include_archive=True)
def create_zipfile(kind: str, prefix: str, ext: str) -> zipfile.ZipFile:
file = self.join_file('%s-%s.%s.repacked.zip' % (kind, prefix, ext))
return zipfile.ZipFile(file.os_path, mode='w')
# perform the repacking
try:
if not skip_archive:
staging_upload._pack_archive_files(upload, create_zipfile)
if not skip_raw:
staging_upload._pack_raw_files(upload, create_zipfile)
finally:
staging_upload.delete()
# replace the original files with the repacked ones
for repacked_file, public_file in files:
shutil.move(
repacked_file.os_path,
public_file.os_path)
@contextmanager
def zipfile_cache(self):
......
SM_all08.db
\ No newline at end of file
......@@ -705,6 +705,21 @@ class Upload(Proc):
# the packing and removing of the staging upload files, will be trigged by
# the 'cleanup' task after processing all calcs
@process
def re_pack(self):
""" A *process* that repacks the raw and archive data based on the current embargo data. """
assert self.published
# mock the steps of actual processing
self._continue_with('uploading')
self._continue_with('extracting')
self._continue_with('parse_all')
self._continue_with('cleanup')
self.upload_files.re_pack(self.to_upload_with_metadata())
self.joined = True
self._complete()
@process
def process_upload(self):
""" A *process* that performs the initial upload processing. """
......
......@@ -325,6 +325,30 @@ def test_re_processing(published: Upload, example_user_metadata, monkeypatch, wi
assert first_calc.metadata['atoms'][0] == 'Br'
@pytest.mark.timeout(config.tests.default_timeout)
@pytest.mark.parametrize('with_failure', [None, 'before', 'after'])
def test_re_pack(published: Upload, example_user_metadata, monkeypatch, with_failure):
upload_id = published.upload_id
calc = Calc.objects(upload_id=upload_id).first()
assert calc.metadata['with_embargo']
calc.metadata['with_embargo'] = False
calc.save()
published.re_pack()
try:
published.block_until_complete(interval=.01)
except Exception:
pass
upload_files = PublicUploadFiles(upload_id)
for raw_file in upload_files.raw_file_manifest():
with upload_files.raw_file(raw_file) as f:
f.read()
for calc in Calc.objects(upload_id=upload_id):
with upload_files.archive_file(calc.calc_id) as f:
f.read()
def mock_failure(cls, task, monkeypatch):
def mock(self):
raise Exception('fail for test')
......
......@@ -17,7 +17,7 @@ import pytest
import click.testing
import json
from nomad import utils, search, processing as proc
from nomad import utils, search, processing as proc, files
from nomad.cli import cli
from nomad.processing import Upload, Calc
......@@ -125,6 +125,27 @@ class TestAdminUploads:
calc.reload()
assert calc.metadata['nomad_version'] == 'test_version'
def test_re_pack(self, published, monkeypatch):
upload_id = published.upload_id
calc = Calc.objects(upload_id=upload_id).first()
assert calc.metadata['with_embargo']
calc.metadata['with_embargo'] = False
calc.save()
result = click.testing.CliRunner().invoke(
cli, ['admin', 'uploads', 're-pack', '--parallel', '2', upload_id], catch_exceptions=False, obj=utils.POPO())
assert result.exit_code == 0
assert 're-pack' in result.stdout
calc.reload()
upload_files = files.PublicUploadFiles(upload_id)
for raw_file in upload_files.raw_file_manifest():
with upload_files.raw_file(raw_file) as f:
f.read()
for calc in Calc.objects(upload_id=upload_id):
with upload_files.archive_file(calc.calc_id) as f:
f.read()
@pytest.mark.usefixtures('reset_config')
class TestClient:
......
......@@ -28,6 +28,8 @@ from nomad.files import DirectoryObject, PathObject
from nomad.files import StagingUploadFiles, PublicUploadFiles, UploadFiles, Restricted, \
ArchiveBasedStagingUploadFiles
from tests.utils import assert_exception
CalcWithFiles = Tuple[CalcWithMetadata, str]
UploadWithFiles = Tuple[UploadWithMetadata, UploadFiles]
......@@ -138,6 +140,7 @@ def generate_example_calc(
example_calc.update(**kwargs)
example_file = os.path.join(config.fs.tmp, 'example.zip')
example_calc.files = []
with zipfile.ZipFile(example_file, 'w', zipfile.ZIP_DEFLATED) as zf:
for filepath in example_file_contents:
filename = os.path.basename(filepath)
......@@ -147,6 +150,7 @@ def generate_example_calc(
if subdirectory is not None:
arcname = os.path.join(subdirectory, arcname)
example_calc.files.append(arcname)
zf.write(os.path.join(example_directory, filename), arcname)
return example_calc, example_file
......@@ -207,10 +211,11 @@ class UploadFilesContract(UploadFilesFixtures):
upload, upload_files = test_upload
for calc in upload.calcs:
try:
with upload_files.raw_file(calc.mainfile) as f:
assert len(f.read()) > 0
if not upload_files._is_authorized():
assert not calc.with_embargo
for file_path in calc.files:
with upload_files.raw_file(file_path) as f:
assert len(f.read()) > 0
if not upload_files._is_authorized():
assert not calc.with_embargo
except Restricted:
assert not upload_files._is_authorized()
assert calc.with_embargo
......@@ -231,7 +236,7 @@ class UploadFilesContract(UploadFilesFixtures):
assert '1.aux' in list(path for path, _ in raw_files)
for file, size in raw_files:
if file.endswith('.aux'):
assert size == 0
assert size == 8
else:
assert size > 0
assert_example_files([os.path.join(prefix, path) for path, _ in raw_files])
......@@ -429,6 +434,16 @@ class TestPublicUploadFiles(UploadFilesContract):
assert upload_files.to_staging_upload_files() is None
def test_repack(self, test_upload):
upload, upload_files = test_upload
for calc in upload.calcs:
calc.with_embargo = False
upload_files.re_pack(upload)
assert_upload_files(upload, PublicUploadFiles, with_embargo=False)
assert len(os.listdir(upload_files.os_path)) == 4
with assert_exception(KeyError):
StagingUploadFiles(upload_files.upload_id)
def assert_upload_files(
upload: UploadWithMetadata, cls, no_archive: bool = False, **kwargs):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment