migration.py 7.37 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import click
16
17
import time
import datetime
18
19
20
import os
import os.path
import re
21
import shutil
22
23
import multiprocessing
import queue
24

Markus Scheidgen's avatar
Markus Scheidgen committed
25
from nomad import config, infrastructure
26
from nomad.migration import NomadCOEMigration, SourceCalc, Package
27
28
29
30

from .main import cli


31
def _Migration(**kwargs) -> NomadCOEMigration:
32
    return NomadCOEMigration(**kwargs)
33
34


35
36
37
38
def _setup():
    pass


39
40
41
42
43
44
@cli.group(help='Migrate data from NOMAD CoE to nomad@FAIRDI')
@click.option('-h', '--host', default=config.migration_source_db.host, help='The migration repository source db host, default is "%s".' % config.migration_source_db.host)
@click.option('-p', '--port', default=config.migration_source_db.port, help='The migration repository source db port, default is %d.' % config.migration_source_db.port)
@click.option('-u', '--user', default=config.migration_source_db.user, help='The migration repository source db user, default is %s.' % config.migration_source_db.user)
@click.option('-w', '--password', default=config.migration_source_db.password, help='The migration repository source db password.')
@click.option('-db', '--dbname', default=config.migration_source_db.dbname, help='The migration repository source db name, default is %s.' % config.migration_source_db.dbname)
45
@click.option('--migration-version', default=0, type=int, help='The version number, only packages with lower or no number will be migrated.')
46
@click.option('--package-directory', default=config.fs.migration_packages, help='The directory used as bucket for upload packages, default is %s.' % config.fs.migration_packages)
47
48
@click.option('--compress-packages', is_flag=True, help='Turn on compression for creating migration packages')
def migration(
49
        host, port, user, password, dbname, migration_version, package_directory, compress_packages):
50
51
52
53
54
55
56
    global _setup

    def _setup():
        infrastructure.setup_logging()
        infrastructure.setup_repository_db(
            readony=True, host=host, port=port, user=user, password=password, dbname=dbname)
        infrastructure.setup_mongo()
57

58
59
60
    global _Migration

    def _Migration(**kwargs):
61
62
        return NomadCOEMigration(
            migration_version=migration_version, package_directory=package_directory,
63
            compress_packages=compress_packages, **kwargs)
64
65
66
67


@migration.command(help='Create/update the coe repository db migration index')
@click.option('--drop', help='Drop the existing index, otherwise it will only add new data.', is_flag=True)
68
69
70
@click.option('--with-metadata', help='Extract metadata for each calc and add it to the index.', is_flag=True)
@click.option('--per-query', default=100, help='We index many objects with one query. Default is 100.')
def index(drop, with_metadata, per_query):
71
    _setup()
72
    start = time.time()
73
74
    indexed_total = 0
    indexed_calcs = 0
75
    for calc, total in _Migration().source_calc_index(drop=drop, with_metadata=with_metadata, per_query=int(per_query)):
76
77
78
79
80
81
        indexed_total += 1
        indexed_calcs += 1 if calc is not None else 0
        eta = total * ((time.time() - start) / indexed_total)
        print(
            'indexed: %8d, calcs: %8d, total: %8d, ETA: %s\r' %
            (indexed_total, indexed_calcs, total, datetime.timedelta(seconds=eta)), end='')
82
    print('done')
83
84


85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@migration.command(help='Reset migration version to start a new migration.')
@click.option('--delete-packages', is_flag=True, help='Also remove all packages.')
def reset(delete_packages: bool):
    infrastructure.setup_logging()
    infrastructure.setup_mongo()

    SourceCalc.objects(migration_version__ne=-1).update(migration_version=-1)
    if delete_packages:
        for subdir in os.listdir(config.fs.migration_packages):
            shutil.rmtree(os.path.join(config.fs.migration_packages, subdir))
        Package.objects().delete()
    else:
        Package.objects(migration_version__ne=-1).update(migration_version=-1)


100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def determine_upload_paths(paths, pattern=None):
    if pattern is not None:
        assert len(paths) == 1
        path = paths[0]
        paths = []
        compiled_pattern = re.compile(pattern)
        directories = os.listdir(path)
        directories.sort()
        for sub_directory in directories:
            if re.fullmatch(compiled_pattern, sub_directory):
                paths.append(os.path.join(path, sub_directory))

    return paths


115
116
@migration.command(help='Add an upload folder to the package index.')
@click.argument('upload-paths', nargs=-1)
117
@click.option('--pattern', default=None, type=str, help='Interpret the paths as directory and migrate those subdirectory that match the given regexp')
118
119
120
121
122
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
def package(upload_paths, pattern, parallel):
    upload_path_queue = multiprocessing.Queue()
    for upload_path in determine_upload_paths(upload_paths, pattern):
        upload_path_queue.put(upload_path)
123

124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
    def package_paths():
        infrastructure.setup_logging()
        infrastructure.setup_mongo()

        migration = _Migration()

        try:
            while True:
                upload_path = upload_path_queue.get_nowait()
                migration.package_index(upload_path)
        except queue.Empty:
            pass

    processes = []
    for _ in range(0, parallel):
        process = multiprocessing.Process(target=package_paths)
        process.start()
        processes.append(process)

    for process in processes:
        process.join()
145
146
147
148
149
150
151
152
153


@migration.command(help='Get an report over all migrated packages.')
def report():
    infrastructure.setup_logging()
    infrastructure.setup_mongo()

    report = _Migration().report()
    print(report)
154
155


156
157
@migration.command(help='Copy users from source into empty target db')
def copy_users(**kwargs):
158
    _setup()
159
    _Migration().copy_users()
160
161


162
163
164
165
@migration.command(help='Set the repo db PID calc counter.')
@click.argument('prefix', nargs=1, type=int, default=7000000)
def pid_prefix(prefix: int):
    infrastructure.setup_logging()
166
    _Migration().set_pid_prefix(prefix=prefix)
167
168


169
@migration.command(help='Upload the given upload locations. Uses the existing index to provide user metadata')
170
@click.argument('upload-paths', nargs=-1)
171
@click.option('--pattern', default=None, type=str, help='Interpret the paths as directory and migrate those subdirectory that match the given regexp')
172
@click.option('--delete-failed', default='', type=str, help='String from N, U, P to determine if empty (N), failed (U), or failed to publish (P) uploads should be deleted or kept for debugging.')
173
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
174
def upload(upload_paths: list, pattern: str, parallel: int, delete_failed: str):
175

Markus Scheidgen's avatar
Markus Scheidgen committed
176
177
    infrastructure.setup_logging()
    infrastructure.setup_mongo()
178

179
    _Migration(threads=parallel).migrate(*determine_upload_paths(upload_paths, pattern), delete_failed=delete_failed)