Commit 89cdbb5a authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added parallize to package index. [skip ci]

parent 864361b5
Pipeline #45763 skipped
......@@ -42,9 +42,10 @@ def _setup():
@click.option('-db', '--dbname', default=config.migration_source_db.dbname, help='The migration repository source db name, default is %s.' % config.migration_source_db.dbname)
@click.option('--migration-version', default=0, type=int, help='The version number, only packages with lower or no number will be migrated.')
@click.option('--package-directory', default=config.fs.migration_packages, help='The directory used as bucket for upload packages, default is %s.' % config.fs.migration_packages)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.option('--compress-packages', is_flag=True, help='Turn on compression for creating migration packages')
def migration(
host, port, user, password, dbname, migration_version, package_directory, compress_packages):
host, port, user, password, dbname, migration_version, package_directory, parallel, compress_packages):
global _setup
def _setup():
......@@ -58,7 +59,7 @@ def migration(
def _Migration(**kwargs):
return NomadCOEMigration(
migration_version=migration_version, package_directory=package_directory,
compress_packages=compress_packages, **kwargs)
compress_packages=compress_packages, threads=parallel, **kwargs)
@migration.command(help='Create/update the coe repository db migration index')
......@@ -95,13 +96,29 @@ def reset(delete_packages: bool):
Package.objects(migration_version__ne=-1).update(migration_version=-1)
def determine_upload_paths(paths, pattern=None):
if pattern is not None:
assert len(paths) == 1
path = paths[0]
paths = []
compiled_pattern = re.compile(pattern)
directories = os.listdir(path)
directories.sort()
for sub_directory in directories:
if re.fullmatch(compiled_pattern, sub_directory):
paths.append(os.path.join(path, sub_directory))
return paths
@migration.command(help='Add an upload folder to the package index.')
@click.argument('upload-paths', nargs=-1)
def package(upload_paths):
@click.option('--pattern', default=None, type=str, help='Interpret the paths as directory and migrate those subdirectory that match the given regexp')
def package(upload_paths, pattern):
infrastructure.setup_logging()
infrastructure.setup_mongo()
_Migration().package_index(*upload_paths)
_Migration().package_index(*determine_upload_paths(upload_paths, pattern))
@migration.command(help='Get an report over all migrated packages.')
......@@ -127,24 +144,12 @@ def pid_prefix(prefix: int):
@migration.command(help='Upload the given upload locations. Uses the existing index to provide user metadata')
@click.argument('paths', nargs=-1)
@click.argument('upload-paths', nargs=-1)
@click.option('--pattern', default=None, type=str, help='Interpret the paths as directory and migrate those subdirectory that match the given regexp')
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.option('--delete-failed', default='', type=str, help='String from N, U, P to determine if empty (N), failed (U), or failed to publish (P) uploads should be deleted or kept for debugging.')
def upload(paths: list, pattern: str, parallel: int, delete_failed: str):
def upload(upload_paths: list, pattern: str, parallel: int, delete_failed: str):
infrastructure.setup_logging()
infrastructure.setup_mongo()
if pattern is not None:
assert len(paths) == 1
path = paths[0]
paths = []
compiled_pattern = re.compile(pattern)
directories = os.listdir(path)
directories.sort()
for sub_directory in directories:
if re.fullmatch(compiled_pattern, sub_directory):
paths.append(os.path.join(path, sub_directory))
_Migration(threads=parallel).migrate(*paths, delete_failed=delete_failed)
_Migration().migrate(*determine_upload_paths(upload_paths, pattern), delete_failed=delete_failed)
......@@ -937,7 +937,10 @@ class NomadCOEMigration:
"""
logger = utils.get_logger(__name__)
for upload_path in upload_paths:
cv = threading.Condition()
threads = []
def package_upload(upload_path):
try:
for package_entry in Package.create_packages(
upload_path, self.package_directory,
......@@ -952,7 +955,21 @@ class NomadCOEMigration:
logger.error(
'could create package from upload',
upload_path=upload_path, exc_info=e)
continue
finally:
with cv:
self._threads += 1
cv.notify()
for upload_path in upload_paths:
with cv:
cv.wait_for(lambda: self._threads > 0)
self._threads -= 1
thread = threading.Thread(target=lambda: package_upload(upload_path))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
class Report(utils.POPO):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment