migration.py 46.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module contains functions to read data from NOMAD coe, external sources,
other/older nomad@FAIRDI instances to mass upload it to a new nomad@FAIRDI instance.
18
19
20

.. autoclass:: NomadCOEMigration
.. autoclass:: SourceCalc
21
22
"""

23
from typing import Generator, Tuple, List, Iterable, Any, Dict
24
import multiprocessing
25
import multiprocessing.pool
26
import time
Markus Scheidgen's avatar
Markus Scheidgen committed
27
import os
28
import os.path
29
import zipfile
30
import tarfile
31
import math
32
from mongoengine import Document, IntField, StringField, DictField
33
import datetime
34
from bravado.exception import HTTPNotFound, HTTPBadRequest, HTTPGatewayTimeout
35
import os
36
import runstats
Markus Scheidgen's avatar
Markus Scheidgen committed
37
import io
Markus Scheidgen's avatar
Markus Scheidgen committed
38
import threading
39
from contextlib import contextmanager
40
import shutil
41
import json
42

43
from nomad import utils, infrastructure, files, config
44
from nomad.coe_repo import User, Calc, LoginException
45
from nomad.datamodel import CalcWithMetadata
46
from nomad.processing import FAILURE
47
48


49
default_pid_prefix = 7000000
50
51
""" The default pid prefix for new non migrated calculations """

52
max_package_size = 32 * 1024 * 1024 * 1024  # 32 GB
53
""" The maximum size of a package that will be used as an upload on nomad@FAIRDI """
54
use_stats_for_filestats_threshold = 1024
55

56
57
58
default_comment = 'entry with unknown provernance'
default_uploader = dict(id=1)

59

Markus Scheidgen's avatar
Markus Scheidgen committed
60
61
62
63
64
65
66
67
68
69
70
71
def iterable_to_stream(iterable, buffer_size=io.DEFAULT_BUFFER_SIZE):
    """
    Lets you use an iterable (e.g. a generator) that yields bytestrings as a read-only
    input stream.

    The stream implements Python 3's newer I/O API (available in Python 2's io module).
    For efficiency, the stream is buffered.
    """
    class IterStream(io.RawIOBase):
        def __init__(self):
            self.leftover = None
            self.iterator = iter(iterable)
72

Markus Scheidgen's avatar
Markus Scheidgen committed
73
74
        def readable(self):
            return True
75

Markus Scheidgen's avatar
Markus Scheidgen committed
76
        def readinto(self, b):
77
            requested_len = len(b)  # We're supposed to return at most this much
Markus Scheidgen's avatar
Markus Scheidgen committed
78
79
80
            while True:
                try:
                    chunk = next(self.iterator)
81
                except StopIteration:
Markus Scheidgen's avatar
Markus Scheidgen committed
82
83
84
                    if len(self.leftover) == 0:
                        return 0  # indicate EOF
                    chunk = self.leftover
85
                output, self.leftover = chunk[:requested_len], chunk[requested_len:]
Markus Scheidgen's avatar
Markus Scheidgen committed
86
87
88
89
90
                len_output = len(output)
                if len_output == 0:
                    continue  # do not prematurely indicate EOF
                b[:len_output] = output
                return len_output
91

Markus Scheidgen's avatar
Markus Scheidgen committed
92
93
94
    return io.BufferedReader(IterStream(), buffer_size=buffer_size)


95
Directory = Tuple[List[str], str, int]
96
97


98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def create_package_zip(
        upload_id: str, upload_path: str, package_id: str, package_path: str, compress: bool,
        package_filepaths: Iterable[str]) -> None:
    logger = utils.get_logger(
        __name__, source_upload_id=upload_id,
        source_upload_path=upload_path, package_id=package_id)

    package_zip = zipfile.ZipFile(
        package_path, 'w',
        compression=zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED)

    try:
        for filepath in package_filepaths:
            package_zip.write(filepath, filepath[len(upload_path):])
    except Exception as e:
        logger.error('could not write file to zip', filepath=filepath, exc_info=e)
    finally:
        package_zip.close()

    logger.info('package zip created')


120
121
122
123
124
125
126
class Package(Document):
    """
    A Package represents split origin NOMAD CoE uploads. We use packages as uploads
    in nomad@FAIRDI. Some of the uploads in nomad are very big (alfow lib) and need
    to be split down to yield practical (i.e. for mirrors) upload sizes. Therefore,
    uploads are split over multiple packages if one upload gets to large. A package
    always contains full directories of files to preserve *mainfile* *aux* file relations.
127
    Package have a package entry in mongo and a .zip file with the raw data.
128
129
130
    """

    package_id = StringField(primary_key=True)
131
132
133
134
    """ A random UUID for the package. Could serve later its target upload id."""
    package_path = StringField(required=True)
    """ The path to the package .zip file. """
    upload_path = StringField(required=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
135
    """ The absolute path of the source upload """
136
    upload_id = StringField(required=True)
137
    """ The source upload_id. There might be multiple packages per upload (this is the point). """
138
139
    restricted = IntField(default=0)
    """ The restricted in month, 0 for unrestricted. """
140
    size = IntField()
141
142
143
    """ The sum of all file sizes. """
    files = IntField()
    """ The number of files. """
144
145
    packages = IntField(default=-1)
    """ The number of packages in the same upload. """
146

147
148
    migration_version = IntField(default=-1)
    """ The version of the last successful migration of this package """
Markus Scheidgen's avatar
Markus Scheidgen committed
149
    report = DictField()
150
    """ The report of the last successful migration of this package """
151
152

    meta = dict(indexes=['upload_id', 'migration_version'])
153

154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
    @classmethod
    def aggregate_reports(cls, migration_version: str = None) -> 'Report':
        """
        Returns an aggregated report over all package reports of the given migration version,
        or all packages.
        """
        if migration_version is not None:
            query = cls.objects(migration_version__gte=migration_version, report__exists=True)
        else:
            query = cls.objects(report__exists=True)

        report = Report()
        for package in query:
            report.add(Report(package.report))

        return report

171
    @classmethod
172
173
    def get_packages(
            cls, upload_path: str, target_dir: str, create: bool = False,
174
            compress: bool = False, parallel: int = 1) -> Iterable['Package']:
175
        """
176
177
178
        Will get packages for the given upload_path. Creates the package zip files and
        package index entries if ``create`` is True. But, either will only be created if
        it does not already exist. Yields the Package objects.
179
180
181
        """
        upload_id = os.path.basename(upload_path)
        logger = utils.get_logger(__name__, source_upload_path=upload_path, source_upload_id=upload_id)
182
183
184
185
186

        if not os.path.isdir(upload_path):
            logger.error('upload path is not a directory')
            return []

187
188
        upload_directory = files.DirectoryObject(target_dir, upload_id, create=True, prefix=True)
        restricted = 0
189

190
191
192
        # The packages number is written after all packages of an upload have been created.
        # this should allow to abort mid upload packaging and continue later by removing
        # all started packages first.
193
194
        is_packaged = cls.objects(upload_id=upload_id, packages__ne=-1).count() != 0

195
        async_results: List[multiprocessing.pool.AsyncResult] = []
196
197
198
        pool = multiprocessing.Pool(parallel)
        pool.__enter__()

199
200
201
        if not is_packaged:
            if not create:
                return None
202
203
204

            cls.objects(upload_id=upload_id).delete()

205
206
207
208
209
210
211
212
            def create_package_entry():
                package_id = utils.create_uuid()
                return Package(
                    package_id=package_id,
                    upload_path=upload_path,
                    upload_id=upload_id,
                    package_path=upload_directory.join_file('%s.zip' % package_id).os_path)

213
214
            def close_package(package_size: int, package_filepaths: List[str]):
                package_entry_to_close = package_entry
215

216
217
218
219
220
221
222
223
224
225
226
227
228
229
                def save_package_entry(*args) -> None:
                    package_entry_to_close.size = package_size
                    package_entry_to_close.files = len(package_filepaths)
                    package_entry_to_close.save()

                    logger.debug(
                        'created package',
                        package_id=package_entry.package_id, size=package_size)

                def handle_package_error(*args) -> None:
                    logger.error(
                        'could not create package zip due to unexpected exception',
                        exc_info=args[0])

230
231
232
233
234
235
                while len(async_results) > parallel:
                    async_results[:] = [
                        async_result for async_result in async_results
                        if not async_result.ready()]
                    time.sleep(0.1)

236
237
238
239
240
241
242
243
                async_result = pool.apply_async(
                    create_package_zip,
                    args=(
                        upload_id, upload_path, package_entry.package_id,
                        package_entry.package_path, compress, package_filepaths),
                    callback=save_package_entry, error_callback=handle_package_error)

                async_results.append(async_result)
244
245
246

            package_entry = create_package_entry()
            package_size = 0
247
            package_filepaths = []
248
            with cls.upload_iterator(upload_path) as directory:
249
                for filepaths, parent_directory, size in directory:
250
251
252
253
254
255
256
257
258
                    for filepath in filepaths:
                        basepath = os.path.basename(filepath)
                        if basepath.startswith('RESTRICTED'):
                            restricted = 36
                            try:
                                restricted = min(36, int(basepath[len('RESTRICTED_'):]))
                            except Exception:
                                pass

259
                        package_filepaths.append(os.path.join(parent_directory, filepath))
260
261
262
263
264
265
266
267

                    if size > max_package_size:
                        logger.warn(
                            'directory exceeds max package size',
                            package_id=package_entry.package_id, size=package_size)

                    package_size += size
                    if package_size > max_package_size:
268
269
                        close_package(package_size, package_filepaths)
                        package_size, package_filepaths = 0, []
270
271
                        package_entry = create_package_entry()

272
273
274
275
276
277
                if len(package_filepaths) > 0:
                    close_package(package_size, package_filepaths)

            # wait for all zip processes to complete
            while not all(async_result.ready() for async_result in async_results):
                time.sleep(0.1)
278

279
280
            pool.__exit__(None, None, None)

281
            package_query = cls.objects(upload_id=upload_id)
282
            package_query.update(restricted=restricted, packages=package_query.count())
283
284
285
286
287
288
            logger.debug(
                'packaged upload', source_upload_id=upload_id, source_upload_path=upload_path,
                restricted=restricted)

            return package_query
        else:
289
            return cls.objects(upload_id=upload_id)
290

291
    @classmethod
292
293
    @contextmanager
    def upload_iterator(cls, upload_path: str) -> Generator[Generator[Directory, None, None], None, None]:
294
        """
295
296
297
        A contextmanager that opens the given upload and provides a generator for
        directories. Directories are tuple of an iterable of upload relative filepaths
        and the directory size.
298
        """
299
300
        potential_archive_path = os.path.join(upload_path, 'archive.tar.gz')
        if os.path.isfile(potential_archive_path):
301
302
            with cls.extracted_archive(potential_archive_path) as extracted_archive:
                yield cls.iterate_upload_directory(extracted_archive)
303
        else:
304
            yield cls.iterate_upload_directory(upload_path)
305

306
    @classmethod
307
    def iterate_upload_directory(cls, upload_path) -> Generator[Directory, None, None]:
308
        """
309
310
        Interprets the given upload path as a directory. Files path are given as upload
        path relative paths.
311
        """
312
313
314
315
        stats = runstats.Statistics()
        for root, _, files in os.walk(upload_path):
            directory_filepaths: List[str] = []
            directory_size = 0
316

317
318
            if len(files) == 0:
                continue
Markus Scheidgen's avatar
Markus Scheidgen committed
319

320
321
322
323
324
325
326
327
328
329
            if len(files) < 20 and any(file.endswith('.tar.gz') for file in files):
                # TODO the OQMD case, files are managed as bunch of .tar.gz files
                for file in files:
                    archive_path = os.path.join(root, file)
                    prefix = os.path.dirname(archive_path)[len(upload_path) + 1:]
                    with cls.extracted_archive(archive_path) as extracted_archive:
                        for paths, _, size in cls.iterate_upload_directory(extracted_archive):
                            yield [os.path.join(prefix, path) for path in paths], upload_path, size
                continue

330
331
332
333
334
335
336
337
            for file in files:
                filepath = os.path.join(root, file)
                filename = filepath[len(upload_path) + 1:]
                directory_filepaths.append(filename)
                # getting file stats is pretty expensive with gpfs
                # if an upload has more then 1000 files, its pretty likely that
                # size patterns repeat ... goood enough
                if len(stats) < use_stats_for_filestats_threshold:
338
339
340
341
342
343
344
                    try:
                        filesize = os.path.getsize(filepath)
                    except Exception:
                        # if there are individual files that cannot be accessed, we fully ignore them
                        # they are most likely just broken links
                        pass

345
346
347
348
                    stats.push(filesize)
                else:
                    filesize = stats.mean()
                directory_size += filesize
Markus Scheidgen's avatar
Markus Scheidgen committed
349

350
            yield directory_filepaths, upload_path, directory_size
Markus Scheidgen's avatar
Markus Scheidgen committed
351

352
    @classmethod
353
354
    @contextmanager
    def extracted_archive(cls, archive_path: str) -> Generator[str, None, None]:
355
        """
356
357
        Temporarily extracts the given archive and returns directory with the extracted
        data.
358
        """
359
360
361
362
363
        tmp_directory = os.path.join(config.fs.local_tmp, utils.create_uuid())
        os.mkdir(tmp_directory)

        with tarfile.TarFile.open(archive_path) as tar_file:
            tar_file.extractall(tmp_directory)
364
365
        # try to fix permissions, do not care if command fails
        os.system('chmod -Rf 0755 %s/*' % tmp_directory)
366
367
368
369

        yield tmp_directory

        shutil.rmtree(tmp_directory)
Markus Scheidgen's avatar
Markus Scheidgen committed
370

371

372
class SourceCalc(Document):
373
    """
374
    Mongo document used as a calculation, upload, and metadata db and index
375
376
377
378
379
    build from a given source db. Each :class:`SourceCacl` entry relates
    a pid, mainfile, upload "id" with each other for a corressponding calculation.
    It might alos contain the user metadata. The uploads are "id"ed via the
    specific path segment that identifies an upload on the CoE repo FS(s) without
    any prefixes (e.g. $EXTRACTED, /data/upload, etc.)
380
381
382
383
384
385
    """
    pid = IntField(primary_key=True)
    mainfile = StringField()
    upload = StringField()
    metadata = DictField()

386
    migration_version = IntField(default=-1)
387

388
389
    extracted_prefix = '$EXTRACTED/'
    sites = ['/data/nomad/extracted/', '/nomad/repository/extracted/']
390
391
    prefixes = [extracted_prefix] + sites

392
    meta = dict(indexes=['upload', 'mainfile', 'migration_version'])
393

394
    _dataset_cache: dict = {}
395

396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
    @staticmethod
    def missing():
        """
        Produces data about non migrated calcs
        """
        tmp_data_path = '/tmp/nomad_migration_missing.json'
        if os.path.exists(tmp_data_path):
            with open(tmp_data_path, 'rt') as f:
                data = utils.POPO(**json.load(f))
        else:
            data = utils.POPO(step=0)

        try:
            # get source_uploads that have non migrated calcs
            if data.step < 1:
                import re
                data.source_uploads = SourceCalc._get_collection() \
                    .find({'migration_version': {'$lt': 0}, 'mainfile': {'$not': re.compile(r'^aflowlib_data.*')}}) \
                    .distinct('upload')
                data.step = 1

            if data.step < 2:
                source_uploads = []
                data.packages = utils.POPO()
                data.uploads_with_no_package = []
                for source_upload in data.source_uploads:
                    package = Package.objects(upload_id=source_upload).first()
                    if package is None:
                        data.uploads_with_no_package.append(source_upload)
                    else:
                        source_uploads.append(source_upload)
                data.source_uploads = source_uploads
                data.step = 2

            if data.step < 3:
                source_uploads = []
                for source_upload in data.source_uploads:
                    count = SourceCalc.objects(upload=source_upload).count()
                    source_uploads.append(utils.POPO(id=source_upload, calcs=count))
                data.source_uploads = sorted(source_uploads, key=lambda k: k['calcs'])
                data.step = 3

            if data.step < 4:
                source_uploads = []
                for source_upload in data.source_uploads:
                    count = Package.objects(upload_id=source_upload.get('id')).count()
                    source_upload['packages'] = count
                data.step = 4
        finally:
            with open(tmp_data_path, 'wt') as f:
                json.dump(data, f)

        return data

450
    @staticmethod
451
452
453
454
    def index(source, drop: bool = False, with_metadata: bool = True, per_query: int = 100) \
            -> Generator[Tuple['SourceCalc', int], None, None]:
        """
        Creates a collection of :class:`SourceCalc` documents that represent source repo
455
        db entries.
456
457
458

        Arguments:
            source: The source db sql alchemy session
459
460
461
462
463
464
465
            drop: True to drop and create a new collection, update the existing otherwise,
                default is False.
            with_metadata: True to also grab all metadata and store it, default is True.
            per_query: The implementation tries to grab almost all data with a heavely joined
                query on the CoE snoflake/star shaped schema.
                The query cannot ask for the whole db at once: choose how many calculations
                should be read at a time to optimize for your application.
466
467

        Returns:
468
            yields tuples (:class:`SourceCalc`, #calcs_total[incl. datasets])
469
        """
470
        logger = utils.get_logger(__name__)
471
472
473
        if drop:
            SourceCalc.drop_collection()

474
        last_source_calc = SourceCalc.objects().order_by('-pid').first()
475
476
        start_pid = last_source_calc.pid if last_source_calc is not None else 0
        source_query = source.query(Calc)
477
        total = source_query.count() - SourceCalc.objects.count()
478
479

        while True:
480
481
            query_timer = utils.timer(logger, 'query source db')
            query_timer.__enter__()  # pylint: disable=E1101
Markus Scheidgen's avatar
Markus Scheidgen committed
482
            calcs: Iterable[Calc] = source_query \
483
484
485
                .filter(Calc.coe_calc_id > start_pid) \
                .order_by(Calc.coe_calc_id) \
                .limit(per_query)
Markus Scheidgen's avatar
Markus Scheidgen committed
486

487
488
            source_calcs = []
            for calc in calcs:
489
                query_timer.__exit__(None, None, None)  # pylint: disable=E1101
490
                try:
Markus Scheidgen's avatar
Markus Scheidgen committed
491
492
                    filenames = calc.files
                    if filenames is None or len(filenames) == 0:
493
494
495
                        continue  # dataset case

                    filename = filenames[0]
496
497
498
                    if len(filenames) == 1 and (filename.endswith('.tgz') or filename.endswith('.zip')):
                        continue  # also a dataset, some datasets have a downloadable archive

499
500
501
502
503
504
505
506
                    for prefix in SourceCalc.prefixes:
                        filename = filename.replace(prefix, '')
                    segments = [file.strip('\\') for file in filename.split('/')]

                    source_calc = SourceCalc(pid=calc.pid)
                    source_calc.upload = segments[0]
                    source_calc.mainfile = os.path.join(*segments[1:])
                    if with_metadata:
Markus Scheidgen's avatar
Markus Scheidgen committed
507
                        source_calc.metadata = calc.to_calc_with_metadata().__dict__
508
509
510
511
512
513
                    source_calcs.append(source_calc)
                    start_pid = source_calc.pid

                    yield source_calc, total
                except Exception as e:
                    logger.error('could not index', pid=calc.pid, exc_info=e)
514

515
516
517
            if len(source_calcs) == 0:
                break
            else:
518
519
                with utils.timer(logger, 'write index'):
                    SourceCalc.objects.insert(source_calcs)
520
521


522
523
524
525
526
NO_PROCESSED_CALCS = 0
FAILED_PROCESSING = 1
FAILED_PUBLISH = 2


527
528
class NomadCOEMigration:
    """
529
    Drives a migration from the NOMAD coe repository db to nomad@FAIRDI.
530
531

    Arguments:
532
533
        migration_version: The migration version. Only packages/calculations with
            no migration version or a lower migration version are migrated.
534
        package_directory: The directory that packages are/get stored in.
535
        compress_packages: True to use compression on package creation.
Markus Scheidgen's avatar
Markus Scheidgen committed
536
537
        threads: Number of threads to run migration in parallel.
        quiet: Prints stats if not quiet
538
    """
539

540
541
542
543
544
545
546
547
548
549
550
    default_sites = [
        '/nomad/repository/data/uploads',
        '/nomad/repository/data/extracted',
        '/data/nomad/uploaded/',
        '/data/nomad/extracted/']

    default_pid_prefix = int(1e7)

    archive_filename = 'archive.tar.gz'
    """ The standard name for tarred uploads in the CoE repository. """

551
552
553
554
    def __init__(
            self,
            migration_version: int = 0,
            package_directory: str = None,
555
            compress_packages: bool = False,
556
            threads: int = 1, quiet: bool = False) -> None:
557
        self.logger = utils.get_logger(__name__, migration_version=migration_version)
558

559
        self.migration_version = migration_version
560
        self.package_directory = package_directory if package_directory is not None else config.fs.migration_packages
561
        self.compress_packages = compress_packages
562
        self._client = None
Markus Scheidgen's avatar
Markus Scheidgen committed
563
564
        self._threads = threads
        self._quiet = quiet
565

566
567
568
569
570
571
572
573
574
        self.source = infrastructure.repository_db

    @property
    def client(self):
        if self._client is None:
            from nomad.client import create_client
            self._client = create_client()

        return self._client
575

576
577
578
579
    def report(self):
        """ returns an aggregated report over all prior migrated packages """
        return Package.aggregate_reports(migration_version=self.migration_version)

580
581
    def copy_users(self):
        """ Copy all users. """
582
        for source_user in self.source.query(User).all():
583
584
585
586
587
588
589
590
591
592
            if source_user.user_id <= 2:
                # skip first two users to keep example users
                # they probably are either already the example users, or [root, Evgeny]
                continue

            create_user_payload = dict(
                user_id=source_user.user_id,
                email=source_user.email,
                first_name=source_user.first_name,
                last_name=source_user.last_name,
593
594
                password=source_user.password,
                created=source_user.created
595
596
597
598
599
600
601
602
603
604
605
606
607
608
            )

            try:
                create_user_payload.update(token=source_user.token)
            except LoginException:
                pass

            if source_user.affiliation is not None:
                create_user_payload.update(affiliation=dict(
                    name=source_user.affiliation.name,
                    address=source_user.affiliation.address))

            try:
                self.client.auth.create_user(payload=create_user_payload).response()
609
                self.logger.info('copied user', user_id=source_user.user_id)
610
611
            except HTTPBadRequest as e:
                self.logger.error('could not create user due to bad data', exc_info=e, user_id=source_user.user_id)
612

Markus Scheidgen's avatar
Markus Scheidgen committed
613
614
615
616
617
618
619
    expected_differences = {
        '0d': 'molecule / cluster',
        '3d': 'bulk',
        '2d': '2d / surface',
        '+u': 'gga'
    }

620
    def validate(self, repo_calc: dict, source_calc: CalcWithMetadata, logger) -> bool:
621
622
623
624
625
626
627
        """
        Validates the given processed calculation, assuming that the data in the given
        source_calc is correct.

        Returns:
            False, if the calculation differs from the source calc.
        """
628
629
630
        keys_to_validate = [
            'atoms', 'basis_set', 'xc_functional', 'system', 'crystal_system',
            'spacegroup', 'code_name', 'code_version']
631

632
633
634
635
636
637
638
639
640
        def to_comparable_list(list):
            for item in list:
                if isinstance(item, dict):
                    for key in item.keys():
                        if key.endswith('id'):
                            yield item.get(key)
                else:
                    yield item

641
        is_valid = True
Markus Scheidgen's avatar
Markus Scheidgen committed
642
        for key, target_value in repo_calc.items():
643
            if key not in keys_to_validate:
644
645
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
646
            source_value = getattr(source_calc, key, None)
647

648
649
            def check_mismatch() -> bool:
                # some exceptions
650
651
                if isinstance(source_value, str) and \
                        source_value in NomadCOEMigration.expected_differences and \
Markus Scheidgen's avatar
Markus Scheidgen committed
652
                        target_value == NomadCOEMigration.expected_differences.get(source_value):
653
654
                    return True

655
656
                logger.info(
                    'source target missmatch', quantity=key,
Markus Scheidgen's avatar
Markus Scheidgen committed
657
658
                    source_value=source_value, target_value=target_value,
                    value_diff='%s->%s' % (str(source_value), str(target_value)))
659
                return False
660

661
            if source_value is None and target_value is not None:
662
663
                continue

664
665
666
            if target_value is None and source_value is not None:
                is_valid &= check_mismatch()

667
            if isinstance(target_value, list):
668
669
                source_list = list(to_comparable_list(source_value))
                target_list = list(to_comparable_list(target_value))
670
671
672
                if len(source_list) != len(target_list):
                    is_valid &= check_mismatch()
                elif any(a != b for a, b in zip(source_list, target_list)):
673
                    is_valid &= check_mismatch()
674
675
676
677
678
679
680
                continue

            if isinstance(source_value, str):
                source_value = source_value.lower()
                target_value = str(target_value).lower()

            if source_value != target_value:
681
                is_valid &= check_mismatch()
682
683
684

        return is_valid

685
    def surrogate_metadata(self, source: CalcWithMetadata):
686
687
688
689
690
691
692
693
694
695
696
697
698
699
        """
        Compute metadata from the given metadata that can be used for new calcs of the
        same upload.
        """
        return CalcWithMetadata(
            uploader=source.uploader,
            with_embargo=source.with_embargo,
            upload_time=source.upload_time,
            coauthors=source.coauthors,
            shared_with=source.shared_with,
            comment=source.comment,
            references=source.references,
            datasets=source.datasets)

700
701
702
703
704
705
706
707
    def set_pid_prefix(self, prefix: int = default_pid_prefix):
        """
        Sets the repo db pid counter to the given values. Allows to create new calcs
        without interfering with migration calcs with already existing PIDs.
        """
        self.logger.info('set pid prefix', pid_prefix=prefix)
        self.client.admin.exec_pidprefix_command(payload=dict(prefix=prefix)).response()

708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
    def call_api(self, operation: str, *args, **kwargs) -> Any:
        """
        Calls nomad via the bravado client. It deals with a very busy nomad and catches,
        backsoff, and retries on gateway timouts. It also circumvents bravados/jsonschemas
        thread safety issues using a global lock on client usage.

        Arguments:
            operation: Comma separated string of api, endpoint, operation,
                e.g. 'uploads.get_upload'.
        """
        op_path = operation.split('.')
        op = self.client
        for op_path_segment in op_path:
            op = getattr(op, op_path_segment)

        sleep = utils.SleepTimeBackoff()
        while True:
            try:
                NomadCOEMigration._client_lock.acquire(blocking=True)
                return op(*args, **kwargs).response().result
            except HTTPGatewayTimeout:
                sleep()
            except Exception as e:
                raise e
            finally:
                NomadCOEMigration._client_lock.release()

735
    def migrate(self, *args, delete_failed: str = '', create_packages: bool = False) -> utils.POPO:
736
737
738
        """
        Migrate the given uploads.

739
740
741
742
743
        It takes paths to extracted uploads as arguments.

        Requires :class:`Package` instances for the given upload paths. Those will
        be created, if they do not already exists. The packages determine the uploads
        for the target infrastructure.
744
745
746
747
748

        Requires a build :func:`index` to look for existing data in the source db. This
        will be used to add user (and other, PID, ...) metadata and validate calculations.

        Uses PIDs of identified old calculations. Will create new PIDs for previously
749
        unknown uploads. See :func:`set_pid_prefix` on how to avoid conflicts.
750

751
        Arguments:
752
            upload_path: A filepath to the upload directory.
753
754
755
            delete_failed: String from ``N``, ``U``, ``P`` to determine that uploads with
                no processed calcs (N), failed upload processing (U), or failed publish
                operation (P) should be deleted after the migration attempt.
756
757
            create_packages: If True, it will attempt to create upload packages if they
                do not exists.
758

Markus Scheidgen's avatar
Markus Scheidgen committed
759
        Returns: Dictionary with statistics on the migration.
760
        """
761

Markus Scheidgen's avatar
Markus Scheidgen committed
762
763
764
765
        cv = threading.Condition()
        overall_report = Report()
        threads = []

766
        def print_report():
Markus Scheidgen's avatar
Markus Scheidgen committed
767
            if not self._quiet:
768
                print(overall_report)
769

770
        def migrate_package(package: Package):
771
772
773
            logger = self.logger.bind(
                package_id=package.package_id, source_upload_id=package.upload_id)

774
775
776
777
            if package.migration_version is not None and package.migration_version >= self.migration_version:
                self.logger.info(
                    'package already migrated, skip it',
                    package_id=package.package_id, source_upload_id=package.upload_id)
778

779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
                package_report = package.report
                overall_report.skipped_packages += 1
            else:
                try:
                    package_report = self.migrate_package(package, delete_failed=delete_failed)

                except Exception as e:
                    package_report = Report()
                    package_report.failed_packages = 1
                    logger.error(
                        'unexpected exception while migrating packages', exc_info=e)
                finally:
                    package.report = package_report
                    package.migration_version = self.migration_version
                    package.save()
Markus Scheidgen's avatar
Markus Scheidgen committed
794
795
796
797
798

            with cv:
                try:
                    overall_report.add(package_report)

799
800
801
                    migrated_all_packages = all(
                        p.migration_version == self.migration_version
                        for p in Package.objects(upload_id=package.upload_id))
Markus Scheidgen's avatar
Markus Scheidgen committed
802

803
                    if migrated_all_packages:
Markus Scheidgen's avatar
Markus Scheidgen committed
804
805
806
807
808
809
810
                        missing_calcs = SourceCalc.objects(
                            upload=package.upload_id, migration_version__ne=self.migration_version).count()
                        total_source_calcs = SourceCalc.objects(upload=package.upload_id).count()

                        overall_report.missing_calcs += missing_calcs
                        overall_report.total_source_calcs += total_source_calcs

811
                        logger.info('migrated upload')
Markus Scheidgen's avatar
Markus Scheidgen committed
812

813
                    print_report()
Markus Scheidgen's avatar
Markus Scheidgen committed
814
815
816
817
818
819
820
                except Exception as e:
                    logger.error('unexpected exception while migrating packages', exc_info=e)

                self._threads += 1
                cv.notify()

        for arg in args:
821
822
823
824
825
826
827
            packages = Package.get_packages(
                arg, self.package_directory, create=create_packages,
                compress=self.compress_packages)

            if packages is None:
                self.logger.error('there are no packages for upload', upload_source_path=arg)
                continue
828

829
            for package in packages:
Markus Scheidgen's avatar
Markus Scheidgen committed
830
831
832
                with cv:
                    cv.wait_for(lambda: self._threads > 0)
                    self._threads -= 1
833
                    thread = threading.Thread(target=lambda: migrate_package(package))
Markus Scheidgen's avatar
Markus Scheidgen committed
834
835
                    threads.append(thread)
                    thread.start()
836

Markus Scheidgen's avatar
Markus Scheidgen committed
837
838
        for thread in threads:
            thread.join()
839

Markus Scheidgen's avatar
Markus Scheidgen committed
840
        return overall_report
841

Markus Scheidgen's avatar
Markus Scheidgen committed
842
843
    _client_lock = threading.Lock()

844
    def migrate_package(self, package: Package, delete_failed: str = '') -> 'Report':
845
846
        """ Migrates the given package. For other params see :func:`migrate`. """

847
848
849
850
851
852
        source_upload_id = package.upload_id
        package_id = package.package_id

        logger = self.logger.bind(package_id=package_id, source_upload_id=source_upload_id)
        logger.debug('start to process package')

853
854
        report = Report()
        report.total_packages += 1
855
856
857
858

        # upload and process the upload file
        with utils.timer(logger, 'upload completed'):
            try:
859
860
                upload = self.call_api(
                    'uploads.upload', name=package_id, local_path=package.package_path)
861
862
            except Exception as e:
                self.logger.error('could not upload package', exc_info=e)
863
864
                report.failed_packages += 1
                return report
865
866
867
868

        logger = logger.bind(
            source_upload_id=source_upload_id, upload_id=upload.upload_id)

869
870
871
872
873
874
875
876
877
        def delete_upload(reason: int):
            delete = \
                (reason == NO_PROCESSED_CALCS and 'N' in delete_failed) or \
                (reason == FAILED_PROCESSING and 'U' in delete_failed) or \
                (reason == FAILED_PUBLISH and 'P' in delete_failed)

            upload_to_delete = upload

            if delete:
878
                upload_to_delete = self.call_api(
879
880
                    'uploads.delete_upload', upload_id=upload_to_delete.upload_id)

881
                sleep = utils.SleepTimeBackoff()
882
                while upload_to_delete.process_running:
883
                    try:
884
                        upload_to_delete = self.call_api(
885
                            'uploads.get_upload', upload_id=upload_to_delete.upload_id)
886
887
888
889
890
                        sleep()
                    except HTTPNotFound:
                        # the proc upload will be deleted by the delete operation
                        break
                logger.info('deleted upload after migration failure')
891
892
893
894
            else:
                logger.warning(
                    'will keep upload after migration failure for debugging',
                    reason=reason, delete_failed=delete_failed)
895

896
897
898
899
        # grab source calcs, while waiting for upload
        source_calcs = dict()
        surrogate_source_calc_with_metadata = None
        with utils.timer(logger, 'loaded source metadata'):
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
            with zipfile.ZipFile(package.package_path) as zf:
                for filenames_chunk in utils.chunks(zf.namelist(), 1000):
                    for source_calc in SourceCalc.objects(
                            upload=source_upload_id, mainfile__in=filenames_chunk):

                        source_calc_with_metadata = CalcWithMetadata(**source_calc.metadata)
                        source_calc_with_metadata.pid = source_calc.pid
                        source_calc_with_metadata.mainfile = source_calc.mainfile
                        source_calcs[source_calc.mainfile] = (source_calc, source_calc_with_metadata)

                        # establish a surrogate for new calcs
                        if surrogate_source_calc_with_metadata is None:
                            surrogate_source_calc_with_metadata = \
                                self.surrogate_metadata(source_calc_with_metadata)

915
916
917
918
919
920
        # try to find a surrogate outside the package, if necessary
        if surrogate_source_calc_with_metadata is None:
            source_calc = SourceCalc.objects(upload=source_upload_id).first()
            if source_calc is not None:
                source_calc_with_metadata = CalcWithMetadata(**source_calc.metadata)
                surrogate_source_calc_with_metadata = \
921
                    self.surrogate_metadata(source_calc_with_metadata)
922

923
924
925
926
        # wait for complete upload
        with utils.timer(logger, 'upload processing completed'):
            sleep = utils.SleepTimeBackoff()
            while upload.tasks_running:
927
                upload = self.call_api('uploads.get_upload', upload_id=upload.upload_id)
928
929
930
931
                sleep()

        if upload.tasks_status == FAILURE:
            logger.error('failed to process upload', process_errors=upload.errors)
932
            report.failed_packages += 1
933
            delete_upload(FAILED_PROCESSING)
934
            return report
935
936
937
938
939
940
941
942
        else:
            report.total_calcs += upload.calcs.pagination.total

        calc_mainfiles = []
        upload_total_calcs = upload.calcs.pagination.total

        # check for processing errors
        with utils.timer(logger, 'checked upload processing'):
943
            per_page = 500
944
            for page in range(1, math.ceil(upload_total_calcs / per_page) + 1):
945
                upload = self.call_api(
946
947
                    'uploads.get_upload', upload_id=upload.upload_id, per_page=per_page,
                    page=page, order_by='mainfile')
948
949
950
951
952
953

                for calc_proc in upload.calcs.results:
                    calc_logger = logger.bind(
                        calc_id=calc_proc.calc_id,
                        mainfile=calc_proc.mainfile)

954
955
956
                    calc_mainfiles.append(calc_proc.mainfile)

                    if calc_proc.tasks_status == FAILURE:
957
                        report.failed_calcs += 1
958
                        calc_logger.info(
959
960
                            'could not process a calc', process_errors=calc_proc.errors)
                        continue
961

962
963
        # verify upload against source
        calcs_in_search = 0
964
        with utils.timer(logger, 'verified upload against source calcs'):
965
966
967
968
969
970
            scroll_id = 'first'
            while scroll_id is not None:
                scroll_args: Dict[str, Any] = dict(scroll=True)
                if scroll_id != 'first':
                    scroll_args['scroll_id'] = scroll_id

971
                search = self.call_api('repo.search', upload_id=upload.upload_id, **scroll_args)
972
973

                scroll_id = search.scroll_id
974

975
976
977
978
979
980
981
982
983
                for calc in search.results:
                    calcs_in_search += 1
                    source_calc, source_calc_with_metadata = source_calcs.get(
                        calc['mainfile'], (None, None))

                    if source_calc is not None:
                        report.migrated_calcs += 1

                        calc_logger = logger.bind(calc_id=calc['calc_id'], mainfile=calc['mainfile'])
984
985
986
987
988
989
990
991
                        if calc.get('processed', False):
                            try:
                                if not self.validate(
                                        calc, source_calc_with_metadata, calc_logger):
                                    report.calcs_with_diffs += 1
                            except Exception as e:
                                calc_logger.warning(
                                    'unexpected exception during validation', exc_info=e)
992
                                report.calcs_with_diffs += 1
993
994
995
                    else:
                        calc_logger.info('processed a calc that has no source')
                        report.new_calcs += 1
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
                        # guessing the metadata from other calcs in upload/package
                        if surrogate_source_calc_with_metadata is not None:
                            new_calc_with_metadata = CalcWithMetadata(**surrogate_source_calc_with_metadata.to_dict())
                            new_calc_with_metadata.mainfile = calc['mainfile']
                        else:
                            calc_logger.warning('could not determine any metadata for new calc')
                            create_time_epoch = os.path.getctime(package.upload_path)
                            new_calc_with_metadata = CalcWithMetadata(
                                upload_time=datetime.datetime.fromtimestamp(create_time_epoch),
                                with_embargo=package.restricted > 0,
                                comment=default_comment,
                                uploader=default_uploader,
                                mainfile=calc['mainfile'])
                            surrogate_source_calc_with_metadata = new_calc_with_metadata

                        source_calcs[calc['mainfile']] = (None, new_calc_with_metadata)
1012

1013
1014
            if len(calc_mainfiles) != calcs_in_search:
                logger.error('missmatch between processed calcs and calcs found with search')
Markus Scheidgen's avatar
Markus Scheidgen committed
1015

1016
1017
1018
1019
1020
1021
1022
        # publish upload
        if len(calc_mainfiles) > 0:
            with utils.timer(logger, 'upload published'):
                upload_metadata = dict(with_embargo=(package.restricted > 0))
                upload_metadata['calculations'] = [
                    self._to_api_metadata(source_calc_with_metadata)
                    for _, source_calc_with_metadata in source_calcs.values()]
1023

1024
                upload = self.call_api(
1025
1026
                    'uploads.exec_upload_operation', upload_id=upload.upload_id,
                    payload=dict(operation='publish', metadata=upload_metadata))
1027

1028
1029
1030
                sleep = utils.SleepTimeBackoff()
                while upload.process_running:
                    try:
1031
                        upload = self.call_api('uploads.get_upload', upload_id=upload.upload_id)
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
                        sleep()
                    except HTTPNotFound:
                        # the proc upload will be deleted by the publish operation
                        break

                if upload.tasks_status == FAILURE:
                    logger.error('could not publish upload', process_errors=upload.errors)
                    report.failed_calcs = report.total_calcs
                    report.migrated_calcs = 0
                    report.calcs_with_diffs = 0
                    report.new_calcs = 0
1043
                    report.failed_packages += 1
1044
1045

                    delete_upload(FAILED_PUBLISH)
1046
1047
                    SourceCalc.objects(upload=source_upload_id, mainfile__in=calc_mainfiles) \
                        .update(migration_version=-1)
1048
1049
1050
1051
                else:
                    SourceCalc.objects(upload=source_upload_id, mainfile__in=calc_mainfiles) \
                        .update(migration_version=self.migration_version)
        else:
1052
            delete_upload(NO_PROCESSED_CALCS)
1053
            logger.info('no successful calcs, skip publish')
1054

1055
1056
        logger.info('migrated package', **report)
        return report
1057
1058

    def _to_api_metadata(self, calc_with_metadata: CalcWithMetadata) -> dict:
Markus Scheidgen's avatar
Markus Scheidgen committed
1059
        """ Transforms to a dict that fullfils the API's uploade metadata model. """
1060

Markus Scheidgen's avatar
Markus Scheidgen committed
1061
        return dict(
1062
1063
1064
1065
            _upload_time=calc_with_metadata.upload_time,
            _uploader=calc_with_metadata.uploader['id'],
            _pid=calc_with_metadata.pid,
            references=[ref['value'] for ref in calc_with_metadata.references],
Markus Scheidgen's avatar
Markus Scheidgen committed
1066
1067
1068
            datasets=[dict(
                id=ds['id'],
                _doi=ds.get('doi', {'value': None})['value'],
1069
1070
1071
1072
1073
1074
                _name=ds.get('name', None)) for ds in calc_with_metadata.datasets],
            mainfile=calc_with_metadata.mainfile,
            with_embargo=calc_with_metadata.with_embargo,
            comment=calc_with_metadata.comment,
            coauthors=list(int(user['id']) for user in calc_with_metadata.coauthors),
            shared_with=list(int(user['id']) for user in calc_with_metadata.shared_with)
Markus Scheidgen's avatar
Markus Scheidgen committed
1075
1076
        )

1077
    def source_calc_index(self, *args, **kwargs):
1078
        """ see :func:`SourceCalc.index` """
1079
        return SourceCalc.index(self.source, *args, **kwargs)
1080

1081
    def package_index(self, upload_path, **kwargs) -> None:
1082
1083
1084
1085
1086
1087
1088
        """
        Creates Package objects and respective package zip files for the given uploads.
        The given uploads are supposed to be path to the extracted upload directories.
        If the upload is already in the index, it is not recreated.
        """
        logger = utils.get_logger(__name__)

1089
        try:
1090
1091
            for package_entry in Package.get_packages(
                    upload_path, self.package_directory, create=True,
1092
                    compress=self.compress_packages, **kwargs):
1093

1094
1095
1096
1097
1098
1099
1100
1101
1102
                logger.info(
                    'package in index',
                    source_upload_path=upload_path,
                    source_upload_id=package_entry.upload_id,
                    package_id=package_entry.package_id)
        except Exception as e:
            logger.error(
                'could not create package from upload',
                upload_path=upload_path, exc_info=e)