migration.py 8.38 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import click
16
17
import time
import datetime
18
19
20
import os
import os.path
import re
21
import shutil
22
23
import multiprocessing
import queue
24
import json
25

Markus Scheidgen's avatar
Markus Scheidgen committed
26
from nomad import config, infrastructure
27
from nomad.migration import NomadCOEMigration, SourceCalc, Package
28
29
30
31

from .main import cli


32
def _Migration(**kwargs) -> NomadCOEMigration:
33
    return NomadCOEMigration(**kwargs)
34
35


36
37
38
39
def _setup():
    pass


40
41
42
43
44
45
@cli.group(help='Migrate data from NOMAD CoE to nomad@FAIRDI')
@click.option('-h', '--host', default=config.migration_source_db.host, help='The migration repository source db host, default is "%s".' % config.migration_source_db.host)
@click.option('-p', '--port', default=config.migration_source_db.port, help='The migration repository source db port, default is %d.' % config.migration_source_db.port)
@click.option('-u', '--user', default=config.migration_source_db.user, help='The migration repository source db user, default is %s.' % config.migration_source_db.user)
@click.option('-w', '--password', default=config.migration_source_db.password, help='The migration repository source db password.')
@click.option('-db', '--dbname', default=config.migration_source_db.dbname, help='The migration repository source db name, default is %s.' % config.migration_source_db.dbname)
46
@click.option('--migration-version', default=0, type=int, help='The version number, only packages with lower or no number will be migrated.')
47
@click.option('--package-directory', default=config.fs.migration_packages, help='The directory used as bucket for upload packages, default is %s.' % config.fs.migration_packages)
48
49
@click.option('--compress-packages', is_flag=True, help='Turn on compression for creating migration packages')
def migration(
50
        host, port, user, password, dbname, migration_version, package_directory, compress_packages):
51
52
53
54
55
56
57
    global _setup

    def _setup():
        infrastructure.setup_logging()
        infrastructure.setup_repository_db(
            readony=True, host=host, port=port, user=user, password=password, dbname=dbname)
        infrastructure.setup_mongo()
58

59
60
61
    global _Migration

    def _Migration(**kwargs):
62
63
        return NomadCOEMigration(
            migration_version=migration_version, package_directory=package_directory,
64
            compress_packages=compress_packages, **kwargs)
65
66
67
68


@migration.command(help='Create/update the coe repository db migration index')
@click.option('--drop', help='Drop the existing index, otherwise it will only add new data.', is_flag=True)
69
70
71
@click.option('--with-metadata', help='Extract metadata for each calc and add it to the index.', is_flag=True)
@click.option('--per-query', default=100, help='We index many objects with one query. Default is 100.')
def index(drop, with_metadata, per_query):
72
    _setup()
73
    start = time.time()
74
75
    indexed_total = 0
    indexed_calcs = 0
76
    for calc, total in _Migration().source_calc_index(drop=drop, with_metadata=with_metadata, per_query=int(per_query)):
77
78
79
80
81
82
        indexed_total += 1
        indexed_calcs += 1 if calc is not None else 0
        eta = total * ((time.time() - start) / indexed_total)
        print(
            'indexed: %8d, calcs: %8d, total: %8d, ETA: %s\r' %
            (indexed_total, indexed_calcs, total, datetime.timedelta(seconds=eta)), end='')
83
    print('done')
84
85


86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
@migration.command(help='Reset migration version to start a new migration.')
@click.option('--delete-packages', is_flag=True, help='Also remove all packages.')
def reset(delete_packages: bool):
    infrastructure.setup_logging()
    infrastructure.setup_mongo()

    SourceCalc.objects(migration_version__ne=-1).update(migration_version=-1)
    if delete_packages:
        for subdir in os.listdir(config.fs.migration_packages):
            shutil.rmtree(os.path.join(config.fs.migration_packages, subdir))
        Package.objects().delete()
    else:
        Package.objects(migration_version__ne=-1).update(migration_version=-1)


101
102
def determine_upload_paths(paths, pattern=None):
    if pattern is not None:
103
        assert len(paths) == 1, "Can only apply pattern on a single directory."
104
        path = paths[0]
105
106
107
108
109
110
111
112
113
114
        if pattern == "ALL":
            paths = [os.path.join(path, directory) for directory in os.listdir(path)]
        else:
            paths = []
            compiled_pattern = re.compile(pattern)
            directories = os.listdir(path)
            directories.sort()
            for sub_directory in directories:
                if re.fullmatch(compiled_pattern, sub_directory):
                    paths.append(os.path.join(path, sub_directory))
115
116
117
118

    return paths


119
120
@migration.command(help='Add an upload folder to the package index.')
@click.argument('upload-paths', nargs=-1)
121
@click.option('--pattern', default=None, type=str, help='Interpret the paths as directory and migrate those subdirectory that match the given regexp')
122
123
124
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes to process uploads. Default is 1.')
@click.option('--parallel-zip', default=1, type=int, help='Use the given amount of parallel processes to pack packages. Default is 1.')
def package(upload_paths, pattern, parallel, parallel_zip):
125
126
127
    upload_paths = determine_upload_paths(upload_paths, pattern)
    upload_path_queue = multiprocessing.Queue(len(upload_paths))

128
    print('Package %d uploads with %d/%d processes.' % (len(upload_paths), parallel, parallel_zip))
129
130

    for upload_path in upload_paths:
131
        upload_path_queue.put(upload_path)
132

133
134
135
136
137
138
139
140
    def package_paths():
        infrastructure.setup_logging()
        infrastructure.setup_mongo()

        migration = _Migration()

        try:
            while True:
141
                upload_path = upload_path_queue.get()
142
                migration.package_index(upload_path, parallel=parallel_zip)
143
144
145
146
147
148
149
150
151
152
153
        except queue.Empty:
            pass

    processes = []
    for _ in range(0, parallel):
        process = multiprocessing.Process(target=package_paths)
        process.start()
        processes.append(process)

    for process in processes:
        process.join()
154
    upload_path_queue.close()
155
156
157
158
159
160
161
162
163


@migration.command(help='Get an report over all migrated packages.')
def report():
    infrastructure.setup_logging()
    infrastructure.setup_mongo()

    report = _Migration().report()
    print(report)
164
165


166
167
@migration.command(help='Copy users from source into empty target db')
def copy_users(**kwargs):
168
    _setup()
169
    _Migration().copy_users()
170
171


172
173
174
175
@migration.command(help='Set the repo db PID calc counter.')
@click.argument('prefix', nargs=1, type=int, default=7000000)
def pid_prefix(prefix: int):
    infrastructure.setup_logging()
176
    _Migration().set_pid_prefix(prefix=prefix)
177
178


179
@migration.command(help='Upload the given upload locations. Uses the existing index to provide user metadata')
180
@click.argument('upload-paths', nargs=-1)
181
@click.option('--pattern', default=None, type=str, help='Interpret the paths as directory and migrate those subdirectory that match the given regexp')
182
@click.option('--delete-failed', default='', type=str, help='String from N, U, P to determine if empty (N), failed (U), or failed to publish (P) uploads should be deleted or kept for debugging.')
183
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
184
185
186
187
@click.option('--create-packages', is_flag=True, help='Indicate that packages should be created, if they do not already exist.')
def upload(
        upload_paths: list, pattern: str, parallel: int, delete_failed: str,
        create_packages: bool):
188

Markus Scheidgen's avatar
Markus Scheidgen committed
189
190
    infrastructure.setup_logging()
    infrastructure.setup_mongo()
191

192
193
194
    _Migration(threads=parallel).migrate(
        *determine_upload_paths(upload_paths, pattern), delete_failed=delete_failed,
        create_packages=create_packages)
195
196
197
198
199
200
201
202
203


@migration.command(help='Get an report about not migrated calcs.')
def missing():
    infrastructure.setup_logging()
    infrastructure.setup_mongo()

    report = SourceCalc.missing()
    print(json.dumps(report, indent=2))