Commit a365f8a8 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Refactored migration packaging. Added support for upload archives.

parent 84eebe94
Pipeline #45730 passed with stages
in 27 minutes and 21 seconds
......@@ -26,7 +26,7 @@ from .main import cli
def _Migration(**kwargs) -> NomadCOEMigration:
return NomadCOEMigration()
return NomadCOEMigration(**kwargs)
def _setup():
......@@ -40,7 +40,8 @@ def _setup():
@click.option('-w', '--password', default=config.migration_source_db.password, help='The migration repository source db password.')
@click.option('-db', '--dbname', default=config.migration_source_db.dbname, help='The migration repository source db name, default is %s.' % config.migration_source_db.dbname)
@click.option('--migration-version', default=0, type=int, help='The version number, only packages with lower or no number will be migrated.')
def migration(host, port, user, password, dbname, migration_version):
@click.option('--package-directory', default=config.fs.migration_packages, help='The directory used as bucket for upload packages, default is %s.' % config.fs.migration_packages)
def migration(host, port, user, password, dbname, migration_version, package_directory):
global _setup
def _setup():
......@@ -52,7 +53,7 @@ def migration(host, port, user, password, dbname, migration_version):
global _Migration
def _Migration(**kwargs):
return NomadCOEMigration(migration_version=migration_version, **kwargs)
return NomadCOEMigration(migration_version=migration_version, package_directory=package_directory, **kwargs)
@migration.command(help='Create/update the coe repository db migration index')
......@@ -64,7 +65,7 @@ def index(drop, with_metadata, per_query):
start = time.time()
indexed_total = 0
indexed_calcs = 0
for calc, total in _Migration().index(drop=drop, with_metadata=with_metadata, per_query=int(per_query)):
for calc, total in _Migration().source_calc_index(drop=drop, with_metadata=with_metadata, per_query=int(per_query)):
indexed_total += 1
indexed_calcs += 1 if calc is not None else 0
eta = total * ((time.time() - start) / indexed_total)
......@@ -80,7 +81,7 @@ def package(upload_paths):
infrastructure.setup_logging()
infrastructure.setup_mongo()
_Migration().package(*upload_paths)
_Migration().package_index(*upload_paths)
@migration.command(help='Get an report over all migrated packages.')
......@@ -108,14 +109,9 @@ def pid_prefix(prefix: int):
@migration.command(help='Upload the given upload locations. Uses the existing index to provide user metadata')
@click.argument('paths', nargs=-1)
@click.option('--pattern', default=None, type=str, help='Interpret the paths as directory and migrate those subdirectory that match the given regexp')
@click.option('--create-packages', help='Allow migration to create package entries on the fly.', is_flag=True)
@click.option('--local', help='Create local upload files.', is_flag=True)
@click.option('--delete-local', help='Delete created local upload files after upload.', is_flag=True)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.option('--delete-failed', default='', type=str, help='String from N, U, P to determine if empty (N), failed (U), or failed to publish (P) uploads should be deleted or kept for debugging.')
def upload(
paths: list, pattern: str, create_packages: bool, local: bool, delete_local: bool,
parallel: int, delete_failed: str):
def upload(paths: list, pattern: str, parallel: int, delete_failed: str):
infrastructure.setup_logging()
infrastructure.setup_mongo()
......@@ -131,6 +127,4 @@ def upload(
if re.fullmatch(compiled_pattern, sub_directory):
paths.append(os.path.join(path, sub_directory))
_Migration(threads=parallel).migrate(
*paths, local=local, delete_local=delete_local, create_packages=create_packages,
delete_failed=delete_failed)
_Migration(threads=parallel).migrate(*paths, delete_failed=delete_failed)
......@@ -67,6 +67,7 @@ fs = NomadConfig(
tmp='.volumes/fs/tmp',
staging='.volumes/fs/staging',
public='.volumes/fs/public',
migration_packages='.volumes/fs/migration_packages',
prefix_size=2
)
......
......@@ -72,7 +72,7 @@ class PathObject:
bucket: The bucket to store this object in
object_id: The object id (i.e. directory path)
os_path: Override the "object storage" path with the given path.
prefix: Add a 3-digit prefix directory, e.g. foo/test/ -> foo/tes/test
prefix: Add a x-digit prefix directory, e.g. foo/test/ -> foo/tes/test
create_prefix: Create the prefix right away
"""
def __init__(
......
......@@ -51,7 +51,7 @@ repository_db_conn = None
def setup():
"""
Uses the current configuration (nomad/config.py and environemnt) to setup all the
Uses the current configuration (nomad/config.py and environment) to setup all the
infrastructure services (repository db, mongo, elastic search) and logging.
Will create client instances for the databases and has to be called before they
can be used.
......
This diff is collapsed.
export NOMAD_CLIENT_URL=http://enc-staging-nomad.esc.rzg.mpg.de/fairdi/nomad/migration/api
export NOMAD_CLIENT_USER=admin
export NOMAD_FS_TMP=/nomad/mscheidg/migration_tmp
export NOMAD_LOGSTASH_TCP_PORT=15000
export NOMAF_FS_MIGRATION_PACKAGES=/nomad/fairdi/migration/fs/migration_packages
export NOMAF_FS_STAGING=/nomad/fairdi/migration/fs/staging
export NOMAF_FS_PUBLIC=/nomad/fairdi/migration/fs/public
export NOMAF_FS_TMP=/nomad/fairdi/migration/fs/tmp
proxy:
nodePort: 30001
external:
host: "enc-staging-nomad.esc.rzg.mpg.de"
path: "/fairdi/nomad/migration"
worker:
replicas: 2
dbname: fairdi_nomad_migration
uploadurl: 'http://enc-staging-nomad.esc.rzg.mpg.de/fairdi/nomad/migration/upload'
volumes:
prefixSize: 2
public: /nomad/fairdi/migration/fs/public
staging: /nomad/fairdi/migration/fs/staging
tmp: /nomad/fairdi/migration/fs/tmp
nomad: /nomad
......@@ -60,13 +60,14 @@ def raw_files_infra():
config.fs.tmp = '.volumes/test_fs/tmp'
config.fs.staging = '.volumes/test_fs/staging'
config.fs.public = '.volumes/test_fs/public'
config.fs.migration_packages = '.volumes/test_fs/migration_packages'
config.fs.prefix_size = 2
@pytest.fixture(scope='function')
def raw_files(raw_files_infra):
""" Provides cleaned out files directory structure per function. Clears files after test. """
directories = [config.fs.staging, config.fs.public, config.fs.tmp]
directories = [config.fs.staging, config.fs.public, config.fs.migration_packages, config.fs.tmp]
for directory in directories:
if not os.path.exists(directory):
os.makedirs(directory)
......
Y
\ No newline at end of file
Y�231
\ No newline at end of file
......@@ -18,8 +18,8 @@ import os.path
from bravado.client import SwaggerClient
import json
import glob
from io import StringIO
import bravado.exception
import zipfile
from nomad import infrastructure, coe_repo, utils, files, processing
......@@ -84,7 +84,7 @@ def target_repo(postgres):
@pytest.fixture(scope='function')
def migration(source_repo, target_repo):
def migration(source_repo, target_repo, raw_files):
Package.objects().delete() # the mongo fixture drops the db, but we still get old results, probably mongoengine caching
migration = NomadCOEMigration(quiet=True)
yield migration
......@@ -95,16 +95,25 @@ def source_package(mongo, migration):
migration.package(*glob.glob('tests/data/migration/*'))
@pytest.mark.parametrize('n_packages, restriction, upload', [
(1, 36, 'baseline'), (2, 0, 'too_big'), (1, 24, 'restriction')])
def test_package(mongo, migration, monkeypatch, n_packages, restriction, upload):
@pytest.mark.parametrize('archived', [False, True])
@pytest.mark.parametrize('n_packages, restriction, upload', [(1, 36, 'baseline'), (2, 0, 'too_big'), (1, 24, 'restriction')])
def test_package(
mongo, migration: NomadCOEMigration, monkeypatch, n_packages, restriction, upload, archived):
monkeypatch.setattr('nomad.migration.max_package_size', 3)
migration.package(*glob.glob(os.path.join('tests/data/migration/packaging', upload)))
if archived:
upload = os.path.join('tests/data/migration/packaging_archived', upload)
else:
upload = os.path.join('tests/data/migration/packaging', upload)
migration.package_index(upload)
packages = Package.objects()
for package in packages:
assert len(package.filenames) > 0
assert os.path.exists(package.package_path)
assert package.size > 0
assert package.files > 0
assert package.restricted == restriction
with zipfile.ZipFile(package.package_path, 'r') as zf:
len(zf.filelist) == package.files
assert packages.count() == n_packages
......@@ -155,7 +164,7 @@ def migrate_infra(migration, target_repo, proc_infra, client, monkeypatch):
All with two calcs, two users (for coauthors)
"""
# source repo is the infrastructure repo
indexed = list(migration.index(drop=True, with_metadata=True))
indexed = list(migration.source_calc_index(drop=True, with_metadata=True))
assert len(indexed) == 2
# target repo is the infrastructure repo
......@@ -198,32 +207,32 @@ def test_copy_users(migrate_infra, target_repo):
mirgation_test_specs = [
('baseline', 'baseline', dict(migrated=2, source=2), {}),
('local', 'baseline', dict(migrated=2, source=2), dict(local=True)),
# ('archive', dict(migrated=2, source=2)),
('new_upload', 'new_upload', dict(new=2), {}),
('new_calc', 'new_calc', dict(migrated=2, source=2, new=1), {}),
('missing_calc', 'missing_calc', dict(migrated=1, source=2, missing=1), {}),
('missmatch', 'missmatch', dict(migrated=2, source=2, diffs=1), {}),
('failed_calc', 'failed_calc', dict(migrated=1, source=2, diffs=0, missing=1, failed=1), {}),
('failed_upload', 'baseline', dict(migrated=0, source=2, missing=2, errors=1), {}),
('failed_publish', 'baseline', dict(migrated=0, source=2, missing=2, failed=2, errors=1), {})
('baseline', 'baseline', dict(migrated=2, source=2)),
('archive', 'baseline', dict(migrated=2, source=2)),
('new_upload', 'new_upload', dict(new=2)),
('new_calc', 'new_calc', dict(migrated=2, source=2, new=1)),
('missing_calc', 'missing_calc', dict(migrated=1, source=2, missing=1)),
('missmatch', 'missmatch', dict(migrated=2, source=2, diffs=1)),
('failed_calc', 'failed_calc', dict(migrated=1, source=2, diffs=0, missing=1, failed=1)),
('failed_upload', 'baseline', dict(migrated=0, source=2, missing=2, errors=1)),
('failed_publish', 'baseline', dict(migrated=0, source=2, missing=2, failed=2, errors=1))
]
@pytest.mark.filterwarnings("ignore:SAWarning")
@pytest.mark.parametrize('name, test_directory, assertions, kwargs', mirgation_test_specs)
@pytest.mark.parametrize('name, test_directory, assertions', mirgation_test_specs)
@pytest.mark.timeout(30)
def test_migrate(migrate_infra, name, test_directory, assertions, kwargs, monkeypatch, caplog):
perform_migration_test(migrate_infra, name, test_directory, assertions, kwargs, monkeypatch, caplog)
def test_migrate(migrate_infra, name, test_directory, assertions, monkeypatch, caplog):
perform_migration_test(migrate_infra, name, test_directory, assertions, monkeypatch, caplog)
def perform_migration_test(migrate_infra, name, test_directory, assertions, monkeypatch, caplog):
def perform_migration_test(migrate_infra, name, test_directory, assertions, kwargs, monkeypatch, caplog):
def with_error(*args, **kwargs):
return StringIO('hello, this is not a zip')
raise Exception('test error')
if name == 'failed_upload':
monkeypatch.setattr('nomad.migration.Package.open_package_upload_file', with_error)
monkeypatch.setattr('nomad.files.ArchiveBasedStagingUploadFiles.extract', with_error)
if name == 'failed_publish':
monkeypatch.setattr('nomad.processing.data.Upload.to_upload_with_metadata', with_error)
......@@ -233,7 +242,7 @@ def perform_migration_test(migrate_infra, name, test_directory, assertions, kwar
pid_prefix = 10
migrate_infra.migration.set_pid_prefix(pid_prefix)
report = migrate_infra.migration.migrate(upload_path, create_packages=True, **kwargs)
report = migrate_infra.migration.migrate(upload_path)
assert report.total_calcs == assertions.get('migrated', 0) + assertions.get('new', 0) + assertions.get('failed', 0)
......@@ -318,7 +327,7 @@ def perform_migration_test(migrate_infra, name, test_directory, assertions, kwar
def test_skip_on_same_version(migrate_infra, monkeypatch, caplog):
assertions = dict(migrated=2, source=2, skipped_packages=0)
perform_migration_test(migrate_infra, 'baseline', 'baseline', assertions, {}, monkeypatch, caplog)
perform_migration_test(migrate_infra, 'baseline', 'baseline', assertions, monkeypatch, caplog)
assertions = dict(migrated=2, source=2, skipped_packages=1)
perform_migration_test(migrate_infra, 'baseline', 'baseline', assertions, {}, monkeypatch, caplog)
perform_migration_test(migrate_infra, 'baseline', 'baseline', assertions, monkeypatch, caplog)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment