migration.py 47.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module contains functions to read data from NOMAD coe, external sources,
other/older nomad@FAIRDI instances to mass upload it to a new nomad@FAIRDI instance.
18
19
20

.. autoclass:: NomadCOEMigration
.. autoclass:: SourceCalc
21
22
"""

23
from typing import Generator, Tuple, List, Iterable, Any, Dict
24
import multiprocessing
25
import multiprocessing.pool
26
import time
Markus Scheidgen's avatar
Markus Scheidgen committed
27
import os
28
import os.path
29
import zipfile
30
import tarfile
31
import math
32
from mongoengine import Document, IntField, StringField, DictField
33
import datetime
34
from bravado.exception import HTTPNotFound, HTTPBadRequest, HTTPGatewayTimeout
35
import os
36
import runstats
Markus Scheidgen's avatar
Markus Scheidgen committed
37
import io
Markus Scheidgen's avatar
Markus Scheidgen committed
38
import threading
39
from contextlib import contextmanager
40
import shutil
41
import json
42

43
from nomad import utils, infrastructure, files, config
44
from nomad.coe_repo import User, Calc, LoginException
45
from nomad.datamodel import CalcWithMetadata
46
from nomad.processing import FAILURE
47
48


49
default_pid_prefix = 7000000
50
51
""" The default pid prefix for new non migrated calculations """

52
max_package_size = 32 * 1024 * 1024 * 1024  # 32 GB
53
""" The maximum size of a package that will be used as an upload on nomad@FAIRDI """
54
use_stats_for_filestats_threshold = 1024
55

56
57
58
default_comment = 'entry with unknown provernance'
default_uploader = dict(id=1)

59

Markus Scheidgen's avatar
Markus Scheidgen committed
60
61
62
63
64
65
66
67
68
69
70
71
def iterable_to_stream(iterable, buffer_size=io.DEFAULT_BUFFER_SIZE):
    """
    Lets you use an iterable (e.g. a generator) that yields bytestrings as a read-only
    input stream.

    The stream implements Python 3's newer I/O API (available in Python 2's io module).
    For efficiency, the stream is buffered.
    """
    class IterStream(io.RawIOBase):
        def __init__(self):
            self.leftover = None
            self.iterator = iter(iterable)
72

Markus Scheidgen's avatar
Markus Scheidgen committed
73
74
        def readable(self):
            return True
75

Markus Scheidgen's avatar
Markus Scheidgen committed
76
        def readinto(self, b):
77
            requested_len = len(b)  # We're supposed to return at most this much
Markus Scheidgen's avatar
Markus Scheidgen committed
78
79
80
            while True:
                try:
                    chunk = next(self.iterator)
81
                except StopIteration:
Markus Scheidgen's avatar
Markus Scheidgen committed
82
83
84
                    if len(self.leftover) == 0:
                        return 0  # indicate EOF
                    chunk = self.leftover
85
                output, self.leftover = chunk[:requested_len], chunk[requested_len:]
Markus Scheidgen's avatar
Markus Scheidgen committed
86
87
88
89
90
                len_output = len(output)
                if len_output == 0:
                    continue  # do not prematurely indicate EOF
                b[:len_output] = output
                return len_output
91

Markus Scheidgen's avatar
Markus Scheidgen committed
92
93
94
    return io.BufferedReader(IterStream(), buffer_size=buffer_size)


95
Directory = Tuple[List[str], str, int]
96
97


98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def create_package_zip(
        upload_id: str, upload_path: str, package_id: str, package_path: str, compress: bool,
        package_filepaths: Iterable[str]) -> None:
    logger = utils.get_logger(
        __name__, source_upload_id=upload_id,
        source_upload_path=upload_path, package_id=package_id)

    package_zip = zipfile.ZipFile(
        package_path, 'w',
        compression=zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED)

    try:
        for filepath in package_filepaths:
            package_zip.write(filepath, filepath[len(upload_path):])
    except Exception as e:
        logger.error('could not write file to zip', filepath=filepath, exc_info=e)
    finally:
        package_zip.close()

    logger.info('package zip created')


120
121
122
123
124
125
126
class Package(Document):
    """
    A Package represents split origin NOMAD CoE uploads. We use packages as uploads
    in nomad@FAIRDI. Some of the uploads in nomad are very big (alfow lib) and need
    to be split down to yield practical (i.e. for mirrors) upload sizes. Therefore,
    uploads are split over multiple packages if one upload gets to large. A package
    always contains full directories of files to preserve *mainfile* *aux* file relations.
127
    Package have a package entry in mongo and a .zip file with the raw data.
128
129
130
    """

    package_id = StringField(primary_key=True)
131
132
133
134
    """ A random UUID for the package. Could serve later its target upload id."""
    package_path = StringField(required=True)
    """ The path to the package .zip file. """
    upload_path = StringField(required=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
135
    """ The absolute path of the source upload """
136
    upload_id = StringField(required=True)
137
    """ The source upload_id. There might be multiple packages per upload (this is the point). """
138
139
    restricted = IntField(default=0)
    """ The restricted in month, 0 for unrestricted. """
140
    size = IntField()
141
142
143
    """ The sum of all file sizes. """
    files = IntField()
    """ The number of files. """
144
145
    packages = IntField(default=-1)
    """ The number of packages in the same upload. """
146

147
148
    migration_version = IntField(default=-1)
    """ The version of the last successful migration of this package """
Markus Scheidgen's avatar
Markus Scheidgen committed
149
    report = DictField()
150
    """ The report of the last successful migration of this package """
151

152
153
154
    migration_failure = StringField()
    """ String that describe the cause for last failed migration attempt """

155
    meta = dict(indexes=['upload_id', 'migration_version'])
156

157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
    @classmethod
    def aggregate_reports(cls, migration_version: str = None) -> 'Report':
        """
        Returns an aggregated report over all package reports of the given migration version,
        or all packages.
        """
        if migration_version is not None:
            query = cls.objects(migration_version__gte=migration_version, report__exists=True)
        else:
            query = cls.objects(report__exists=True)

        report = Report()
        for package in query:
            report.add(Report(package.report))

        return report

174
    @classmethod
175
176
    def get_packages(
            cls, upload_path: str, target_dir: str, create: bool = False,
177
            compress: bool = False, parallel: int = 1) -> Iterable['Package']:
178
        """
179
180
181
        Will get packages for the given upload_path. Creates the package zip files and
        package index entries if ``create`` is True. But, either will only be created if
        it does not already exist. Yields the Package objects.
182
183
184
        """
        upload_id = os.path.basename(upload_path)
        logger = utils.get_logger(__name__, source_upload_path=upload_path, source_upload_id=upload_id)
185
186
187
188
189

        if not os.path.isdir(upload_path):
            logger.error('upload path is not a directory')
            return []

190
191
        upload_directory = files.DirectoryObject(target_dir, upload_id, create=True, prefix=True)
        restricted = 0
192

193
194
195
        # The packages number is written after all packages of an upload have been created.
        # this should allow to abort mid upload packaging and continue later by removing
        # all started packages first.
196
197
        is_packaged = cls.objects(upload_id=upload_id, packages__ne=-1).count() != 0

198
        async_results: List[multiprocessing.pool.AsyncResult] = []
199
200
201
        pool = multiprocessing.Pool(parallel)
        pool.__enter__()

202
203
204
        if not is_packaged:
            if not create:
                return None
205
206
207

            cls.objects(upload_id=upload_id).delete()

208
209
210
211
212
213
214
215
            def create_package_entry():
                package_id = utils.create_uuid()
                return Package(
                    package_id=package_id,
                    upload_path=upload_path,
                    upload_id=upload_id,
                    package_path=upload_directory.join_file('%s.zip' % package_id).os_path)

216
217
            def close_package(package_size: int, package_filepaths: List[str]):
                package_entry_to_close = package_entry
218

219
220
221
222
223
224
225
226
227
228
229
230
231
232
                def save_package_entry(*args) -> None:
                    package_entry_to_close.size = package_size
                    package_entry_to_close.files = len(package_filepaths)
                    package_entry_to_close.save()

                    logger.debug(
                        'created package',
                        package_id=package_entry.package_id, size=package_size)

                def handle_package_error(*args) -> None:
                    logger.error(
                        'could not create package zip due to unexpected exception',
                        exc_info=args[0])

233
234
235
236
237
238
                while len(async_results) > parallel:
                    async_results[:] = [
                        async_result for async_result in async_results
                        if not async_result.ready()]
                    time.sleep(0.1)

239
240
241
242
243
244
245
246
                async_result = pool.apply_async(
                    create_package_zip,
                    args=(
                        upload_id, upload_path, package_entry.package_id,
                        package_entry.package_path, compress, package_filepaths),
                    callback=save_package_entry, error_callback=handle_package_error)

                async_results.append(async_result)
247
248
249

            package_entry = create_package_entry()
            package_size = 0
250
            package_filepaths = []
251
            with cls.upload_iterator(upload_path) as directory:
252
                for filepaths, parent_directory, size in directory:
253
254
255
256
257
258
259
260
261
                    for filepath in filepaths:
                        basepath = os.path.basename(filepath)
                        if basepath.startswith('RESTRICTED'):
                            restricted = 36
                            try:
                                restricted = min(36, int(basepath[len('RESTRICTED_'):]))
                            except Exception:
                                pass

262
                        package_filepaths.append(os.path.join(parent_directory, filepath))
263
264
265
266
267
268
269
270

                    if size > max_package_size:
                        logger.warn(
                            'directory exceeds max package size',
                            package_id=package_entry.package_id, size=package_size)

                    package_size += size
                    if package_size > max_package_size:
271
272
                        close_package(package_size, package_filepaths)
                        package_size, package_filepaths = 0, []
273
274
                        package_entry = create_package_entry()

275
276
277
278
279
280
                if len(package_filepaths) > 0:
                    close_package(package_size, package_filepaths)

            # wait for all zip processes to complete
            while not all(async_result.ready() for async_result in async_results):
                time.sleep(0.1)
281

282
283
            pool.__exit__(None, None, None)

284
            package_query = cls.objects(upload_id=upload_id)
285
            package_query.update(restricted=restricted, packages=package_query.count())
286
287
288
289
290
291
            logger.debug(
                'packaged upload', source_upload_id=upload_id, source_upload_path=upload_path,
                restricted=restricted)

            return package_query
        else:
292
            return cls.objects(upload_id=upload_id)
293

294
    @classmethod
295
296
    @contextmanager
    def upload_iterator(cls, upload_path: str) -> Generator[Generator[Directory, None, None], None, None]:
297
        """
298
299
300
        A contextmanager that opens the given upload and provides a generator for
        directories. Directories are tuple of an iterable of upload relative filepaths
        and the directory size.
301
        """
302
303
        potential_archive_path = os.path.join(upload_path, 'archive.tar.gz')
        if os.path.isfile(potential_archive_path):
304
305
            with cls.extracted_archive(potential_archive_path) as extracted_archive:
                yield cls.iterate_upload_directory(extracted_archive)
306
        else:
307
            yield cls.iterate_upload_directory(upload_path)
308

309
    @classmethod
310
    def iterate_upload_directory(cls, upload_path) -> Generator[Directory, None, None]:
311
        """
312
313
        Interprets the given upload path as a directory. Files path are given as upload
        path relative paths.
314
        """
315
316
317
318
        stats = runstats.Statistics()
        for root, _, files in os.walk(upload_path):
            directory_filepaths: List[str] = []
            directory_size = 0
319

320
321
            if len(files) == 0:
                continue
Markus Scheidgen's avatar
Markus Scheidgen committed
322

323
324
325
326
327
328
329
330
331
332
            if len(files) < 20 and any(file.endswith('.tar.gz') for file in files):
                # TODO the OQMD case, files are managed as bunch of .tar.gz files
                for file in files:
                    archive_path = os.path.join(root, file)
                    prefix = os.path.dirname(archive_path)[len(upload_path) + 1:]
                    with cls.extracted_archive(archive_path) as extracted_archive:
                        for paths, _, size in cls.iterate_upload_directory(extracted_archive):
                            yield [os.path.join(prefix, path) for path in paths], upload_path, size
                continue

333
334
335
336
337
338
339
340
            for file in files:
                filepath = os.path.join(root, file)
                filename = filepath[len(upload_path) + 1:]
                directory_filepaths.append(filename)
                # getting file stats is pretty expensive with gpfs
                # if an upload has more then 1000 files, its pretty likely that
                # size patterns repeat ... goood enough
                if len(stats) < use_stats_for_filestats_threshold:
341
342
343
344
345
346
347
                    try:
                        filesize = os.path.getsize(filepath)
                    except Exception:
                        # if there are individual files that cannot be accessed, we fully ignore them
                        # they are most likely just broken links
                        pass

348
349
350
351
                    stats.push(filesize)
                else:
                    filesize = stats.mean()
                directory_size += filesize
Markus Scheidgen's avatar
Markus Scheidgen committed
352

353
            yield directory_filepaths, upload_path, directory_size
Markus Scheidgen's avatar
Markus Scheidgen committed
354

355
    @classmethod
356
357
    @contextmanager
    def extracted_archive(cls, archive_path: str) -> Generator[str, None, None]:
358
        """
359
360
        Temporarily extracts the given archive and returns directory with the extracted
        data.
361
        """
362
363
364
365
366
        tmp_directory = os.path.join(config.fs.local_tmp, utils.create_uuid())
        os.mkdir(tmp_directory)

        with tarfile.TarFile.open(archive_path) as tar_file:
            tar_file.extractall(tmp_directory)
367
368
        # try to fix permissions, do not care if command fails
        os.system('chmod -Rf 0755 %s/*' % tmp_directory)
369
370
371
372

        yield tmp_directory

        shutil.rmtree(tmp_directory)
Markus Scheidgen's avatar
Markus Scheidgen committed
373

374

375
class SourceCalc(Document):
376
    """
377
    Mongo document used as a calculation, upload, and metadata db and index
378
379
380
381
382
    build from a given source db. Each :class:`SourceCacl` entry relates
    a pid, mainfile, upload "id" with each other for a corressponding calculation.
    It might alos contain the user metadata. The uploads are "id"ed via the
    specific path segment that identifies an upload on the CoE repo FS(s) without
    any prefixes (e.g. $EXTRACTED, /data/upload, etc.)
383
384
385
386
387
388
    """
    pid = IntField(primary_key=True)
    mainfile = StringField()
    upload = StringField()
    metadata = DictField()

389
    migration_version = IntField(default=-1)
390

391
392
    extracted_prefix = '$EXTRACTED/'
    sites = ['/data/nomad/extracted/', '/nomad/repository/extracted/']
393
394
    prefixes = [extracted_prefix] + sites

395
    meta = dict(indexes=['upload', 'mainfile', 'migration_version'])
396

397
    _dataset_cache: dict = {}
398

399
    @staticmethod
400
    def missing(use_cache=False):
401
402
403
404
        """
        Produces data about non migrated calcs
        """
        tmp_data_path = '/tmp/nomad_migration_missing.json'
405
        if os.path.exists(tmp_data_path) and use_cache:
406
407
408
409
410
411
412
            with open(tmp_data_path, 'rt') as f:
                data = utils.POPO(**json.load(f))
        else:
            data = utils.POPO(step=0)

        try:
            # get source_uploads that have non migrated calcs
413
            if data.step < 1 or not use_cache:
414
415
416
417
418
419
                import re
                data.source_uploads = SourceCalc._get_collection() \
                    .find({'migration_version': {'$lt': 0}, 'mainfile': {'$not': re.compile(r'^aflowlib_data.*')}}) \
                    .distinct('upload')
                data.step = 1

420
            if data.step < 2 or not use_cache:
421
422
423
424
425
426
427
428
                source_uploads = []
                data.packages = utils.POPO()
                data.uploads_with_no_package = []
                for source_upload in data.source_uploads:
                    package = Package.objects(upload_id=source_upload).first()
                    if package is None:
                        data.uploads_with_no_package.append(source_upload)
                    else:
429
430
431
                        calcs = SourceCalc.objects(upload=source_upload).count()
                        packages = Package.objects(upload_id=source_upload).count()
                        source_uploads.append(dict(
432
433
                            id=source_upload, package_count=packages,
                            packages=package.packages, calcs=calcs,
434
                            path=package.upload_path))
435
                        source_uploads = sorted(source_uploads, key=lambda k: k['calcs'], reverse=True)
436
437
438
439
440
441
442
443
                data.source_uploads = source_uploads
                data.step = 2
        finally:
            with open(tmp_data_path, 'wt') as f:
                json.dump(data, f)

        return data

444
    @staticmethod
445
446
447
448
    def index(source, drop: bool = False, with_metadata: bool = True, per_query: int = 100) \
            -> Generator[Tuple['SourceCalc', int], None, None]:
        """
        Creates a collection of :class:`SourceCalc` documents that represent source repo
449
        db entries.
450
451
452

        Arguments:
            source: The source db sql alchemy session
453
454
455
456
457
458
459
            drop: True to drop and create a new collection, update the existing otherwise,
                default is False.
            with_metadata: True to also grab all metadata and store it, default is True.
            per_query: The implementation tries to grab almost all data with a heavely joined
                query on the CoE snoflake/star shaped schema.
                The query cannot ask for the whole db at once: choose how many calculations
                should be read at a time to optimize for your application.
460
461

        Returns:
462
            yields tuples (:class:`SourceCalc`, #calcs_total[incl. datasets])
463
        """
464
        logger = utils.get_logger(__name__)
465
466
467
        if drop:
            SourceCalc.drop_collection()

468
        last_source_calc = SourceCalc.objects().order_by('-pid').first()
469
470
        start_pid = last_source_calc.pid if last_source_calc is not None else 0
        source_query = source.query(Calc)
471
        total = source_query.count() - SourceCalc.objects.count()
472
473

        while True:
474
475
            query_timer = utils.timer(logger, 'query source db')
            query_timer.__enter__()  # pylint: disable=E1101
Markus Scheidgen's avatar
Markus Scheidgen committed
476
            calcs: Iterable[Calc] = source_query \
477
478
479
                .filter(Calc.coe_calc_id > start_pid) \
                .order_by(Calc.coe_calc_id) \
                .limit(per_query)
Markus Scheidgen's avatar
Markus Scheidgen committed
480

481
482
            source_calcs = []
            for calc in calcs:
483
                query_timer.__exit__(None, None, None)  # pylint: disable=E1101
484
                try:
Markus Scheidgen's avatar
Markus Scheidgen committed
485
486
                    filenames = calc.files
                    if filenames is None or len(filenames) == 0:
487
488
489
                        continue  # dataset case

                    filename = filenames[0]
490
491
492
                    if len(filenames) == 1 and (filename.endswith('.tgz') or filename.endswith('.zip')):
                        continue  # also a dataset, some datasets have a downloadable archive

493
494
495
496
497
498
499
500
                    for prefix in SourceCalc.prefixes:
                        filename = filename.replace(prefix, '')
                    segments = [file.strip('\\') for file in filename.split('/')]

                    source_calc = SourceCalc(pid=calc.pid)
                    source_calc.upload = segments[0]
                    source_calc.mainfile = os.path.join(*segments[1:])
                    if with_metadata:
Markus Scheidgen's avatar
Markus Scheidgen committed
501
                        source_calc.metadata = calc.to_calc_with_metadata().__dict__
502
503
504
505
506
507
                    source_calcs.append(source_calc)
                    start_pid = source_calc.pid

                    yield source_calc, total
                except Exception as e:
                    logger.error('could not index', pid=calc.pid, exc_info=e)
508

509
510
511
            if len(source_calcs) == 0:
                break
            else:
512
513
                with utils.timer(logger, 'write index'):
                    SourceCalc.objects.insert(source_calcs)
514
515


516
517
518
519
520
NO_PROCESSED_CALCS = 0
FAILED_PROCESSING = 1
FAILED_PUBLISH = 2


521
522
class NomadCOEMigration:
    """
523
    Drives a migration from the NOMAD coe repository db to nomad@FAIRDI.
524
525

    Arguments:
526
527
        migration_version: The migration version. Only packages/calculations with
            no migration version or a lower migration version are migrated.
528
        package_directory: The directory that packages are/get stored in.
529
        compress_packages: True to use compression on package creation.
Markus Scheidgen's avatar
Markus Scheidgen committed
530
531
        threads: Number of threads to run migration in parallel.
        quiet: Prints stats if not quiet
532
    """
533

534
535
536
537
538
539
540
541
542
543
544
    default_sites = [
        '/nomad/repository/data/uploads',
        '/nomad/repository/data/extracted',
        '/data/nomad/uploaded/',
        '/data/nomad/extracted/']

    default_pid_prefix = int(1e7)

    archive_filename = 'archive.tar.gz'
    """ The standard name for tarred uploads in the CoE repository. """

545
546
547
548
    def __init__(
            self,
            migration_version: int = 0,
            package_directory: str = None,
549
            compress_packages: bool = False,
550
            threads: int = 1, quiet: bool = False) -> None:
551
        self.logger = utils.get_logger(__name__, migration_version=migration_version)
552

553
        self.migration_version = migration_version
554
        self.package_directory = package_directory if package_directory is not None else config.fs.migration_packages
555
        self.compress_packages = compress_packages
556
        self._client = None
Markus Scheidgen's avatar
Markus Scheidgen committed
557
558
        self._threads = threads
        self._quiet = quiet
559

560
561
562
563
564
565
566
567
568
        self.source = infrastructure.repository_db

    @property
    def client(self):
        if self._client is None:
            from nomad.client import create_client
            self._client = create_client()

        return self._client
569

570
571
572
573
    def report(self):
        """ returns an aggregated report over all prior migrated packages """
        return Package.aggregate_reports(migration_version=self.migration_version)

574
575
    def copy_users(self):
        """ Copy all users. """
576
        for source_user in self.source.query(User).all():
577
578
579
580
581
582
583
584
585
586
            if source_user.user_id <= 2:
                # skip first two users to keep example users
                # they probably are either already the example users, or [root, Evgeny]
                continue

            create_user_payload = dict(
                user_id=source_user.user_id,
                email=source_user.email,
                first_name=source_user.first_name,
                last_name=source_user.last_name,
587
588
                password=source_user.password,
                created=source_user.created
589
590
591
592
593
594
595
596
597
598
599
600
601
602
            )

            try:
                create_user_payload.update(token=source_user.token)
            except LoginException:
                pass

            if source_user.affiliation is not None:
                create_user_payload.update(affiliation=dict(
                    name=source_user.affiliation.name,
                    address=source_user.affiliation.address))

            try:
                self.client.auth.create_user(payload=create_user_payload).response()
603
                self.logger.info('copied user', user_id=source_user.user_id)
604
605
            except HTTPBadRequest as e:
                self.logger.error('could not create user due to bad data', exc_info=e, user_id=source_user.user_id)
606

Markus Scheidgen's avatar
Markus Scheidgen committed
607
608
609
610
611
612
613
    expected_differences = {
        '0d': 'molecule / cluster',
        '3d': 'bulk',
        '2d': '2d / surface',
        '+u': 'gga'
    }

614
    def validate(self, repo_calc: dict, source_calc: CalcWithMetadata, logger) -> bool:
615
616
617
618
619
620
621
        """
        Validates the given processed calculation, assuming that the data in the given
        source_calc is correct.

        Returns:
            False, if the calculation differs from the source calc.
        """
622
623
624
        keys_to_validate = [
            'atoms', 'basis_set', 'xc_functional', 'system', 'crystal_system',
            'spacegroup', 'code_name', 'code_version']
625

626
627
628
629
630
631
632
633
634
        def to_comparable_list(list):
            for item in list:
                if isinstance(item, dict):
                    for key in item.keys():
                        if key.endswith('id'):
                            yield item.get(key)
                else:
                    yield item

635
        is_valid = True
Markus Scheidgen's avatar
Markus Scheidgen committed
636
        for key, target_value in repo_calc.items():
637
            if key not in keys_to_validate:
638
639
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
640
            source_value = getattr(source_calc, key, None)
641

642
643
            def check_mismatch() -> bool:
                # some exceptions
644
645
                if isinstance(source_value, str) and \
                        source_value in NomadCOEMigration.expected_differences and \
Markus Scheidgen's avatar
Markus Scheidgen committed
646
                        target_value == NomadCOEMigration.expected_differences.get(source_value):
647
648
                    return True

649
650
                logger.info(
                    'source target missmatch', quantity=key,
Markus Scheidgen's avatar
Markus Scheidgen committed
651
652
                    source_value=source_value, target_value=target_value,
                    value_diff='%s->%s' % (str(source_value), str(target_value)))
653
                return False
654

655
            if source_value is None and target_value is not None:
656
657
                continue

658
659
660
            if target_value is None and source_value is not None:
                is_valid &= check_mismatch()

661
            if isinstance(target_value, list):
662
663
                source_list = list(to_comparable_list(source_value))
                target_list = list(to_comparable_list(target_value))
664
665
666
                if len(source_list) != len(target_list):
                    is_valid &= check_mismatch()
                elif any(a != b for a, b in zip(source_list, target_list)):
667
                    is_valid &= check_mismatch()
668
669
670
671
672
673
674
                continue

            if isinstance(source_value, str):
                source_value = source_value.lower()
                target_value = str(target_value).lower()

            if source_value != target_value:
675
                is_valid &= check_mismatch()
676
677
678

        return is_valid

679
    def surrogate_metadata(self, source: CalcWithMetadata):
680
681
682
683
684
685
686
687
688
689
690
691
692
693
        """
        Compute metadata from the given metadata that can be used for new calcs of the
        same upload.
        """
        return CalcWithMetadata(
            uploader=source.uploader,
            with_embargo=source.with_embargo,
            upload_time=source.upload_time,
            coauthors=source.coauthors,
            shared_with=source.shared_with,
            comment=source.comment,
            references=source.references,
            datasets=source.datasets)

694
695
696
697
698
699
700
701
    def set_pid_prefix(self, prefix: int = default_pid_prefix):
        """
        Sets the repo db pid counter to the given values. Allows to create new calcs
        without interfering with migration calcs with already existing PIDs.
        """
        self.logger.info('set pid prefix', pid_prefix=prefix)
        self.client.admin.exec_pidprefix_command(payload=dict(prefix=prefix)).response()

702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
    def call_api(self, operation: str, *args, **kwargs) -> Any:
        """
        Calls nomad via the bravado client. It deals with a very busy nomad and catches,
        backsoff, and retries on gateway timouts. It also circumvents bravados/jsonschemas
        thread safety issues using a global lock on client usage.

        Arguments:
            operation: Comma separated string of api, endpoint, operation,
                e.g. 'uploads.get_upload'.
        """
        op_path = operation.split('.')
        op = self.client
        for op_path_segment in op_path:
            op = getattr(op, op_path_segment)

        sleep = utils.SleepTimeBackoff()
        while True:
            try:
                NomadCOEMigration._client_lock.acquire(blocking=True)
                return op(*args, **kwargs).response().result
            except HTTPGatewayTimeout:
                sleep()
            except Exception as e:
                raise e
            finally:
                NomadCOEMigration._client_lock.release()

729
    def migrate(self, *args, delete_failed: str = '', create_packages: bool = False) -> utils.POPO:
730
731
732
        """
        Migrate the given uploads.

733
734
735
736
737
        It takes paths to extracted uploads as arguments.

        Requires :class:`Package` instances for the given upload paths. Those will
        be created, if they do not already exists. The packages determine the uploads
        for the target infrastructure.
738
739
740
741
742

        Requires a build :func:`index` to look for existing data in the source db. This
        will be used to add user (and other, PID, ...) metadata and validate calculations.

        Uses PIDs of identified old calculations. Will create new PIDs for previously
743
        unknown uploads. See :func:`set_pid_prefix` on how to avoid conflicts.
744

745
        Arguments:
746
            upload_path: A filepath to the upload directory.
747
748
749
            delete_failed: String from ``N``, ``U``, ``P`` to determine that uploads with
                no processed calcs (N), failed upload processing (U), or failed publish
                operation (P) should be deleted after the migration attempt.
750
751
            create_packages: If True, it will attempt to create upload packages if they
                do not exists.
752

Markus Scheidgen's avatar
Markus Scheidgen committed
753
        Returns: Dictionary with statistics on the migration.
754
        """
755

Markus Scheidgen's avatar
Markus Scheidgen committed
756
757
758
759
        cv = threading.Condition()
        overall_report = Report()
        threads = []

760
        def print_report():
Markus Scheidgen's avatar
Markus Scheidgen committed
761
            if not self._quiet:
762
                print(overall_report)
763

764
        def migrate_package(package: Package):
765
766
767
            logger = self.logger.bind(
                package_id=package.package_id, source_upload_id=package.upload_id)

768
769
770
771
            if package.migration_version is not None and package.migration_version >= self.migration_version:
                self.logger.info(
                    'package already migrated, skip it',
                    package_id=package.package_id, source_upload_id=package.upload_id)
772

773
774
775
776
777
778
779
780
781
                package_report = package.report
                overall_report.skipped_packages += 1
            else:
                try:
                    package_report = self.migrate_package(package, delete_failed=delete_failed)

                except Exception as e:
                    package_report = Report()
                    package_report.failed_packages = 1
782
783
784
                    event = 'unexpected exception while migrating packages'
                    package.migration_failure = event + ': ' + str(e)
                    logger.error(event, exc_info=e)
785
786
787
788
                finally:
                    package.report = package_report
                    package.migration_version = self.migration_version
                    package.save()
Markus Scheidgen's avatar
Markus Scheidgen committed
789
790
791
792
793

            with cv:
                try:
                    overall_report.add(package_report)

794
795
796
                    migrated_all_packages = all(
                        p.migration_version == self.migration_version
                        for p in Package.objects(upload_id=package.upload_id))
Markus Scheidgen's avatar
Markus Scheidgen committed
797

798
                    if migrated_all_packages:
Markus Scheidgen's avatar
Markus Scheidgen committed
799
800
801
802
803
804
805
                        missing_calcs = SourceCalc.objects(
                            upload=package.upload_id, migration_version__ne=self.migration_version).count()
                        total_source_calcs = SourceCalc.objects(upload=package.upload_id).count()

                        overall_report.missing_calcs += missing_calcs
                        overall_report.total_source_calcs += total_source_calcs

806
                        logger.info('migrated upload')
Markus Scheidgen's avatar
Markus Scheidgen committed
807

808
                    print_report()
Markus Scheidgen's avatar
Markus Scheidgen committed
809
810
811
812
813
814
815
                except Exception as e:
                    logger.error('unexpected exception while migrating packages', exc_info=e)

                self._threads += 1
                cv.notify()

        for arg in args:
816
817
818
819
820
821
822
            packages = Package.get_packages(
                arg, self.package_directory, create=create_packages,
                compress=self.compress_packages)

            if packages is None:
                self.logger.error('there are no packages for upload', upload_source_path=arg)
                continue
823

824
            for package in packages:
Markus Scheidgen's avatar
Markus Scheidgen committed
825
826
827
                with cv:
                    cv.wait_for(lambda: self._threads > 0)
                    self._threads -= 1
828
                    thread = threading.Thread(target=lambda: migrate_package(package))
Markus Scheidgen's avatar
Markus Scheidgen committed
829
830
                    threads.append(thread)
                    thread.start()
831

Markus Scheidgen's avatar
Markus Scheidgen committed
832
833
        for thread in threads:
            thread.join()
834

Markus Scheidgen's avatar
Markus Scheidgen committed
835
        return overall_report
836

Markus Scheidgen's avatar
Markus Scheidgen committed
837
838
    _client_lock = threading.Lock()

839
    def migrate_package(self, package: Package, delete_failed: str = '') -> 'Report':
840
841
        """ Migrates the given package. For other params see :func:`migrate`. """

842
843
844
845
846
847
        source_upload_id = package.upload_id
        package_id = package.package_id

        logger = self.logger.bind(package_id=package_id, source_upload_id=source_upload_id)
        logger.debug('start to process package')

848
849
        report = Report()
        report.total_packages += 1
850

851
852
853
854
855
856
857
858
859
        # check if the package is already uploaded
        upload = None
        try:
            uploads = self.call_api('uploads.get_uploads')
            for a_upload in uploads:
                if a_upload.name == package_id and len(a_upload.errors) == 0:
                    assert upload is None, 'duplicate upload name'
                    upload = a_upload
        except Exception as e:
860
861
862
            event = 'could verify if upload already exists'
            self.logger.error(event, exc_info=e)
            package.migration_failure(event)
863
864
865
            report.failed_packages += 1
            return report

866
        # upload and process the upload file
867
868
869
870
871
872
        if upload is None:
            with utils.timer(logger, 'upload completed'):
                try:
                    upload = self.call_api(
                        'uploads.upload', name=package_id, local_path=package.package_path)
                except Exception as e:
873
874
875
                    event = 'could not upload package'
                    self.logger.error(event, exc_info=e)
                    package.migration_failure = event + ': ' + str(e)
876
877
878
879
                    report.failed_packages += 1
                    return report
        else:
            self.logger.info('package was already uploaded')
880
881
882
883

        logger = logger.bind(
            source_upload_id=source_upload_id, upload_id=upload.upload_id)

884
885
886
887
888
889
890
891
892
        def delete_upload(reason: int):
            delete = \
                (reason == NO_PROCESSED_CALCS and 'N' in delete_failed) or \
                (reason == FAILED_PROCESSING and 'U' in delete_failed) or \
                (reason == FAILED_PUBLISH and 'P' in delete_failed)

            upload_to_delete = upload

            if delete:
893
                upload_to_delete = self.call_api(
894
895
                    'uploads.delete_upload', upload_id=upload_to_delete.upload_id)

896
                sleep = utils.SleepTimeBackoff()
897
                while upload_to_delete.process_running:
898
                    try:
899
                        upload_to_delete = self.call_api(
900
                            'uploads.get_upload', upload_id=upload_to_delete.upload_id)
901
902
903
904
905
                        sleep()
                    except HTTPNotFound:
                        # the proc upload will be deleted by the delete operation
                        break
                logger.info('deleted upload after migration failure')
906
907
908
909
            else:
                logger.warning(
                    'will keep upload after migration failure for debugging',
                    reason=reason, delete_failed=delete_failed)
910

911
912
913
914
        # grab source calcs, while waiting for upload
        source_calcs = dict()
        surrogate_source_calc_with_metadata = None
        with utils.timer(logger, 'loaded source metadata'):
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
            with zipfile.ZipFile(package.package_path) as zf:
                for filenames_chunk in utils.chunks(zf.namelist(), 1000):
                    for source_calc in SourceCalc.objects(
                            upload=source_upload_id, mainfile__in=filenames_chunk):

                        source_calc_with_metadata = CalcWithMetadata(**source_calc.metadata)
                        source_calc_with_metadata.pid = source_calc.pid
                        source_calc_with_metadata.mainfile = source_calc.mainfile
                        source_calcs[source_calc.mainfile] = (source_calc, source_calc_with_metadata)

                        # establish a surrogate for new calcs
                        if surrogate_source_calc_with_metadata is None:
                            surrogate_source_calc_with_metadata = \
                                self.surrogate_metadata(source_calc_with_metadata)

930
931
932
933
934
935
        # try to find a surrogate outside the package, if necessary
        if surrogate_source_calc_with_metadata is None:
            source_calc = SourceCalc.objects(upload=source_upload_id).first()
            if source_calc is not None:
                source_calc_with_metadata = CalcWithMetadata(**source_calc.metadata)
                surrogate_source_calc_with_metadata = \
936
                    self.surrogate_metadata(source_calc_with_metadata)
937

938
939
940
941
        # wait for complete upload
        with utils.timer(logger, 'upload processing completed'):
            sleep = utils.SleepTimeBackoff()
            while upload.tasks_running:
942
                upload = self.call_api('uploads.get_upload', upload_id=upload.upload_id)
943
944
945
                sleep()

        if upload.tasks_status == FAILURE:
946
947
948
            event = 'failed to process upload'
            logger.error(event, process_errors=upload.errors)
            package.migration_failure = event + ': ' + str(upload.errors)
949
            report.failed_packages += 1
950
            delete_upload(FAILED_PROCESSING)
951
            return report
952
953
954
955
956
957
958
959
        else:
            report.total_calcs += upload.calcs.pagination.total

        calc_mainfiles = []
        upload_total_calcs = upload.calcs.pagination.total

        # check for processing errors
        with utils.timer(logger, 'checked upload processing'):
960
            per_page = 10000
961
            for page in range(1, math.ceil(upload_total_calcs / per_page) + 1):
962
                upload = self.call_api(
963
964
                    'uploads.get_upload', upload_id=upload.upload_id, per_page=per_page,
                    page=page, order_by='mainfile')
965
966
967
968
969
970

                for calc_proc in upload.calcs.results:
                    calc_logger = logger.bind(
                        calc_id=calc_proc.calc_id,
                        mainfile=calc_proc.mainfile)

971
972
973
                    calc_mainfiles.append(calc_proc.mainfile)

                    if calc_proc.tasks_status == FAILURE:
974
                        report.failed_calcs += 1
975
                        calc_logger.info(
976
977
                            'could not process a calc', process_errors=calc_proc.errors)
                        continue
978

979
980
        # verify upload against source
        calcs_in_search = 0
981
        with utils.timer(logger, 'verified upload against source calcs'):
982
983
984
985
986
987
            scroll_id = 'first'
            while scroll_id is not None:
                scroll_args: Dict[str, Any] = dict(scroll=True)
                if scroll_id != 'first':
                    scroll_args['scroll_id'] = scroll_id

988
                search = self.call_api('repo.search', upload_id=upload.upload_id, **scroll_args)
989
990

                scroll_id = search.scroll_id
991

992
993
994
995
996
997
998
999
1000
                for calc in search.results:
                    calcs_in_search += 1
                    source_calc, source_calc_with_metadata = source_calcs.get(
                        calc['mainfile'], (None, None))

                    if source_calc is not None:
                        report.migrated_calcs += 1

                        calc_logger = logger.bind(calc_id=calc['calc_id'], mainfile=calc['mainfile'])