Commit 7e6375b3 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added proper multiprocessing for packaging.

parent 355fece9
Pipeline #45765 passed with stages
in 46 minutes and 23 seconds
......@@ -19,6 +19,8 @@ import os
import os.path
import re
import shutil
import multiprocessing
import queue
from nomad import config, infrastructure
from nomad.migration import NomadCOEMigration, SourceCalc, Package
......@@ -42,10 +44,9 @@ def _setup():
@click.option('-db', '--dbname', default=config.migration_source_db.dbname, help='The migration repository source db name, default is %s.' % config.migration_source_db.dbname)
@click.option('--migration-version', default=0, type=int, help='The version number, only packages with lower or no number will be migrated.')
@click.option('--package-directory', default=config.fs.migration_packages, help='The directory used as bucket for upload packages, default is %s.' % config.fs.migration_packages)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.option('--compress-packages', is_flag=True, help='Turn on compression for creating migration packages')
def migration(
host, port, user, password, dbname, migration_version, package_directory, parallel, compress_packages):
host, port, user, password, dbname, migration_version, package_directory, compress_packages):
global _setup
def _setup():
......@@ -59,7 +60,7 @@ def migration(
def _Migration(**kwargs):
return NomadCOEMigration(
migration_version=migration_version, package_directory=package_directory,
compress_packages=compress_packages, threads=parallel, **kwargs)
compress_packages=compress_packages, **kwargs)
@migration.command(help='Create/update the coe repository db migration index')
......@@ -114,11 +115,33 @@ def determine_upload_paths(paths, pattern=None):
@migration.command(help='Add an upload folder to the package index.')
@click.argument('upload-paths', nargs=-1)
@click.option('--pattern', default=None, type=str, help='Interpret the paths as directory and migrate those subdirectory that match the given regexp')
def package(upload_paths, pattern):
infrastructure.setup_logging()
infrastructure.setup_mongo()
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
def package(upload_paths, pattern, parallel):
upload_path_queue = multiprocessing.Queue()
for upload_path in determine_upload_paths(upload_paths, pattern):
upload_path_queue.put(upload_path)
_Migration().package_index(*determine_upload_paths(upload_paths, pattern))
def package_paths():
infrastructure.setup_logging()
infrastructure.setup_mongo()
migration = _Migration()
try:
while True:
upload_path = upload_path_queue.get_nowait()
migration.package_index(upload_path)
except queue.Empty:
pass
processes = []
for _ in range(0, parallel):
process = multiprocessing.Process(target=package_paths)
process.start()
processes.append(process)
for process in processes:
process.join()
@migration.command(help='Get an report over all migrated packages.')
......@@ -147,9 +170,10 @@ def pid_prefix(prefix: int):
@click.argument('upload-paths', nargs=-1)
@click.option('--pattern', default=None, type=str, help='Interpret the paths as directory and migrate those subdirectory that match the given regexp')
@click.option('--delete-failed', default='', type=str, help='String from N, U, P to determine if empty (N), failed (U), or failed to publish (P) uploads should be deleted or kept for debugging.')
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
def upload(upload_paths: list, pattern: str, parallel: int, delete_failed: str):
infrastructure.setup_logging()
infrastructure.setup_mongo()
_Migration().migrate(*determine_upload_paths(upload_paths, pattern), delete_failed=delete_failed)
_Migration(threads=parallel).migrate(*determine_upload_paths(upload_paths, pattern), delete_failed=delete_failed)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment