migration.py 41.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module contains functions to read data from NOMAD coe, external sources,
other/older nomad@FAIRDI instances to mass upload it to a new nomad@FAIRDI instance.
18
19
20

.. autoclass:: NomadCOEMigration
.. autoclass:: SourceCalc
21
22
"""

23
from typing import Generator, Tuple, List, Iterable, Any, Dict
Markus Scheidgen's avatar
Markus Scheidgen committed
24
import os
25
import os.path
26
import zipfile
27
import tarfile
28
import math
29
from mongoengine import Document, IntField, StringField, DictField
30
import datetime
31
from bravado.exception import HTTPNotFound, HTTPBadRequest, HTTPGatewayTimeout
32
import os
33
import runstats
Markus Scheidgen's avatar
Markus Scheidgen committed
34
import io
Markus Scheidgen's avatar
Markus Scheidgen committed
35
import threading
36
from contextlib import contextmanager
37
import shutil
38

39
from nomad import utils, infrastructure, files, config
40
from nomad.coe_repo import User, Calc, LoginException
41
from nomad.datamodel import CalcWithMetadata
42
from nomad.processing import FAILURE
43
44


45
default_pid_prefix = 7000000
46
47
""" The default pid prefix for new non migrated calculations """

Markus Scheidgen's avatar
Markus Scheidgen committed
48
max_package_size = 32 * 1024 * 1024 * 1024  # 16 GB
49
""" The maximum size of a package that will be used as an upload on nomad@FAIRDI """
50
use_stats_for_filestats_threshold = 1024
51

52
53
54
default_comment = 'entry with unknown provernance'
default_uploader = dict(id=1)

55

Markus Scheidgen's avatar
Markus Scheidgen committed
56
57
58
59
60
61
62
63
64
65
66
67
def iterable_to_stream(iterable, buffer_size=io.DEFAULT_BUFFER_SIZE):
    """
    Lets you use an iterable (e.g. a generator) that yields bytestrings as a read-only
    input stream.

    The stream implements Python 3's newer I/O API (available in Python 2's io module).
    For efficiency, the stream is buffered.
    """
    class IterStream(io.RawIOBase):
        def __init__(self):
            self.leftover = None
            self.iterator = iter(iterable)
68

Markus Scheidgen's avatar
Markus Scheidgen committed
69
70
        def readable(self):
            return True
71

Markus Scheidgen's avatar
Markus Scheidgen committed
72
        def readinto(self, b):
73
            requested_len = len(b)  # We're supposed to return at most this much
Markus Scheidgen's avatar
Markus Scheidgen committed
74
75
76
            while True:
                try:
                    chunk = next(self.iterator)
77
                except StopIteration:
Markus Scheidgen's avatar
Markus Scheidgen committed
78
79
80
                    if len(self.leftover) == 0:
                        return 0  # indicate EOF
                    chunk = self.leftover
81
                output, self.leftover = chunk[:requested_len], chunk[requested_len:]
Markus Scheidgen's avatar
Markus Scheidgen committed
82
83
84
85
86
                len_output = len(output)
                if len_output == 0:
                    continue  # do not prematurely indicate EOF
                b[:len_output] = output
                return len_output
87

Markus Scheidgen's avatar
Markus Scheidgen committed
88
89
90
    return io.BufferedReader(IterStream(), buffer_size=buffer_size)


91
Directory = Tuple[List[str], str, int]
92
93


94
95
96
97
98
99
100
class Package(Document):
    """
    A Package represents split origin NOMAD CoE uploads. We use packages as uploads
    in nomad@FAIRDI. Some of the uploads in nomad are very big (alfow lib) and need
    to be split down to yield practical (i.e. for mirrors) upload sizes. Therefore,
    uploads are split over multiple packages if one upload gets to large. A package
    always contains full directories of files to preserve *mainfile* *aux* file relations.
101
    Package have a package entry in mongo and a .zip file with the raw data.
102
103
104
    """

    package_id = StringField(primary_key=True)
105
106
107
108
    """ A random UUID for the package. Could serve later its target upload id."""
    package_path = StringField(required=True)
    """ The path to the package .zip file. """
    upload_path = StringField(required=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
109
    """ The absolute path of the source upload """
110
    upload_id = StringField(required=True)
111
    """ The source upload_id. There might be multiple packages per upload (this is the point). """
112
113
    restricted = IntField(default=0)
    """ The restricted in month, 0 for unrestricted. """
114
    size = IntField()
115
116
117
    """ The sum of all file sizes. """
    files = IntField()
    """ The number of files. """
118
119
    packages = IntField(default=-1)
    """ The number of packages in the same upload. """
120

121
122
    migration_version = IntField(default=-1)
    """ The version of the last successful migration of this package """
Markus Scheidgen's avatar
Markus Scheidgen committed
123
    report = DictField()
124
    """ The report of the last successful migration of this package """
125
126

    meta = dict(indexes=['upload_id', 'migration_version'])
127

128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
    @classmethod
    def aggregate_reports(cls, migration_version: str = None) -> 'Report':
        """
        Returns an aggregated report over all package reports of the given migration version,
        or all packages.
        """
        if migration_version is not None:
            query = cls.objects(migration_version__gte=migration_version, report__exists=True)
        else:
            query = cls.objects(report__exists=True)

        report = Report()
        for package in query:
            report.add(Report(package.report))

        return report

145
    @classmethod
146
147
    def get_packages(
            cls, upload_path: str, target_dir: str, create: bool = False,
148
            compress: bool = False) -> Iterable['Package']:
149
        """
150
151
152
        Will get packages for the given upload_path. Creates the package zip files and
        package index entries if ``create`` is True. But, either will only be created if
        it does not already exist. Yields the Package objects.
153
154
155
        """
        upload_id = os.path.basename(upload_path)
        logger = utils.get_logger(__name__, source_upload_path=upload_path, source_upload_id=upload_id)
156
157
158
159
160

        if not os.path.isdir(upload_path):
            logger.error('upload path is not a directory')
            return []

161
162
        upload_directory = files.DirectoryObject(target_dir, upload_id, create=True, prefix=True)
        restricted = 0
163

164
165
166
        # The packages number is written after all packages of an upload have been created.
        # this should allow to abort mid upload packaging and continue later by removing
        # all started packages first.
167
168
169
170
171
        is_packaged = cls.objects(upload_id=upload_id, packages__ne=-1).count() != 0

        if not is_packaged:
            if not create:
                return None
172
173
174

            cls.objects(upload_id=upload_id).delete()

175
            def open_package_zip(package_entry: 'Package'):
176
177
178
                return zipfile.ZipFile(
                    package_entry.package_path, 'w',
                    compression=zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED)
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199

            def create_package_entry():
                package_id = utils.create_uuid()
                return Package(
                    package_id=package_id,
                    upload_path=upload_path,
                    upload_id=upload_id,
                    package_path=upload_directory.join_file('%s.zip' % package_id).os_path)

            def close_package(package_size: int, package_files: int):
                package_zip.close()
                package_entry.size = package_size
                package_entry.files = package_files
                package_entry.save()

                logger.debug('created package', package_id=package_entry.package_id, size=package_size)

            package_entry = create_package_entry()
            package_size = 0
            package_files = 0
            package_zip = open_package_zip(package_entry)
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
            with cls.upload_iterator(upload_path) as directory:
                for filepaths, parent_diretory, size in directory:
                    for filepath in filepaths:
                        basepath = os.path.basename(filepath)
                        if basepath.startswith('RESTRICTED'):
                            restricted = 36
                            try:
                                restricted = min(36, int(basepath[len('RESTRICTED_'):]))
                            except Exception:
                                pass

                        package_zip.write(os.path.join(parent_diretory, filepath), filepath)
                        package_files += 1

                    if size > max_package_size:
                        logger.warn(
                            'directory exceeds max package size',
                            package_id=package_entry.package_id, size=package_size)

                    package_size += size
                    if package_size > max_package_size:
                        close_package(package_size, package_files)
                        package_size, package_files = 0, 0
                        package_entry = create_package_entry()
                        package_zip = open_package_zip(package_entry)

                if package_files > 0:
227
228
229
                    close_package(package_size, package_files)

            package_query = cls.objects(upload_id=upload_id)
230
            package_query.update(restricted=restricted, packages=package_query.count())
231
232
233
234
235
236
            logger.debug(
                'packaged upload', source_upload_id=upload_id, source_upload_path=upload_path,
                restricted=restricted)

            return package_query
        else:
237
            return cls.objects(upload_id=upload_id)
238

239
    @classmethod
240
241
    @contextmanager
    def upload_iterator(cls, upload_path: str) -> Generator[Generator[Directory, None, None], None, None]:
242
        """
243
244
245
        A contextmanager that opens the given upload and provides a generator for
        directories. Directories are tuple of an iterable of upload relative filepaths
        and the directory size.
246
        """
247
248
        potential_archive_path = os.path.join(upload_path, 'archive.tar.gz')
        if os.path.isfile(potential_archive_path):
249
250
            with cls.extracted_archive(potential_archive_path) as extracted_archive:
                yield cls.iterate_upload_directory(extracted_archive)
251
        else:
252
            yield cls.iterate_upload_directory(upload_path)
253

254
    @classmethod
255
    def iterate_upload_directory(cls, upload_path) -> Generator[Directory, None, None]:
256
        """
257
258
        Interprets the given upload path as a directory. Files path are given as upload
        path relative paths.
259
        """
260
261
262
263
        stats = runstats.Statistics()
        for root, _, files in os.walk(upload_path):
            directory_filepaths: List[str] = []
            directory_size = 0
264

265
266
            if len(files) == 0:
                continue
Markus Scheidgen's avatar
Markus Scheidgen committed
267

268
269
270
271
272
273
274
275
276
277
278
279
280
            for file in files:
                filepath = os.path.join(root, file)
                filename = filepath[len(upload_path) + 1:]
                directory_filepaths.append(filename)
                # getting file stats is pretty expensive with gpfs
                # if an upload has more then 1000 files, its pretty likely that
                # size patterns repeat ... goood enough
                if len(stats) < use_stats_for_filestats_threshold:
                    filesize = os.path.getsize(filepath)
                    stats.push(filesize)
                else:
                    filesize = stats.mean()
                directory_size += filesize
Markus Scheidgen's avatar
Markus Scheidgen committed
281

282
            yield directory_filepaths, upload_path, directory_size
Markus Scheidgen's avatar
Markus Scheidgen committed
283

284
    @classmethod
285
286
    @contextmanager
    def extracted_archive(cls, archive_path: str) -> Generator[str, None, None]:
287
        """
288
289
        Temporarily extracts the given archive and returns directory with the extracted
        data.
290
        """
291
292
293
294
295
        tmp_directory = os.path.join(config.fs.local_tmp, utils.create_uuid())
        os.mkdir(tmp_directory)

        with tarfile.TarFile.open(archive_path) as tar_file:
            tar_file.extractall(tmp_directory)
296
297
        # try to fix permissions, do not care if command fails
        os.system('chmod -Rf 0755 %s/*' % tmp_directory)
298
299
300
301

        yield tmp_directory

        shutil.rmtree(tmp_directory)
Markus Scheidgen's avatar
Markus Scheidgen committed
302

303

304
class SourceCalc(Document):
305
    """
306
    Mongo document used as a calculation, upload, and metadata db and index
307
308
309
310
311
    build from a given source db. Each :class:`SourceCacl` entry relates
    a pid, mainfile, upload "id" with each other for a corressponding calculation.
    It might alos contain the user metadata. The uploads are "id"ed via the
    specific path segment that identifies an upload on the CoE repo FS(s) without
    any prefixes (e.g. $EXTRACTED, /data/upload, etc.)
312
313
314
315
316
317
    """
    pid = IntField(primary_key=True)
    mainfile = StringField()
    upload = StringField()
    metadata = DictField()

318
    migration_version = IntField(default=-1)
319

320
321
    extracted_prefix = '$EXTRACTED/'
    sites = ['/data/nomad/extracted/', '/nomad/repository/extracted/']
322
323
    prefixes = [extracted_prefix] + sites

324
    meta = dict(indexes=['upload', 'mainfile', 'migration_version'])
325

326
    _dataset_cache: dict = {}
327
328

    @staticmethod
329
330
331
332
    def index(source, drop: bool = False, with_metadata: bool = True, per_query: int = 100) \
            -> Generator[Tuple['SourceCalc', int], None, None]:
        """
        Creates a collection of :class:`SourceCalc` documents that represent source repo
333
        db entries.
334
335
336

        Arguments:
            source: The source db sql alchemy session
337
338
339
340
341
342
343
            drop: True to drop and create a new collection, update the existing otherwise,
                default is False.
            with_metadata: True to also grab all metadata and store it, default is True.
            per_query: The implementation tries to grab almost all data with a heavely joined
                query on the CoE snoflake/star shaped schema.
                The query cannot ask for the whole db at once: choose how many calculations
                should be read at a time to optimize for your application.
344
345

        Returns:
346
            yields tuples (:class:`SourceCalc`, #calcs_total[incl. datasets])
347
        """
348
        logger = utils.get_logger(__name__)
349
350
351
        if drop:
            SourceCalc.drop_collection()

352
        last_source_calc = SourceCalc.objects().order_by('-pid').first()
353
354
        start_pid = last_source_calc.pid if last_source_calc is not None else 0
        source_query = source.query(Calc)
355
        total = source_query.count() - SourceCalc.objects.count()
356
357

        while True:
358
359
            query_timer = utils.timer(logger, 'query source db')
            query_timer.__enter__()  # pylint: disable=E1101
Markus Scheidgen's avatar
Markus Scheidgen committed
360
            calcs: Iterable[Calc] = source_query \
361
362
363
                .filter(Calc.coe_calc_id > start_pid) \
                .order_by(Calc.coe_calc_id) \
                .limit(per_query)
Markus Scheidgen's avatar
Markus Scheidgen committed
364

365
366
            source_calcs = []
            for calc in calcs:
367
                query_timer.__exit__(None, None, None)  # pylint: disable=E1101
368
                try:
Markus Scheidgen's avatar
Markus Scheidgen committed
369
370
                    filenames = calc.files
                    if filenames is None or len(filenames) == 0:
371
372
373
                        continue  # dataset case

                    filename = filenames[0]
374
375
376
                    if len(filenames) == 1 and (filename.endswith('.tgz') or filename.endswith('.zip')):
                        continue  # also a dataset, some datasets have a downloadable archive

377
378
379
380
381
382
383
384
                    for prefix in SourceCalc.prefixes:
                        filename = filename.replace(prefix, '')
                    segments = [file.strip('\\') for file in filename.split('/')]

                    source_calc = SourceCalc(pid=calc.pid)
                    source_calc.upload = segments[0]
                    source_calc.mainfile = os.path.join(*segments[1:])
                    if with_metadata:
Markus Scheidgen's avatar
Markus Scheidgen committed
385
                        source_calc.metadata = calc.to_calc_with_metadata().__dict__
386
387
388
389
390
391
                    source_calcs.append(source_calc)
                    start_pid = source_calc.pid

                    yield source_calc, total
                except Exception as e:
                    logger.error('could not index', pid=calc.pid, exc_info=e)
392

393
394
395
            if len(source_calcs) == 0:
                break
            else:
396
397
                with utils.timer(logger, 'write index'):
                    SourceCalc.objects.insert(source_calcs)
398
399


400
401
402
403
404
NO_PROCESSED_CALCS = 0
FAILED_PROCESSING = 1
FAILED_PUBLISH = 2


405
406
class NomadCOEMigration:
    """
407
    Drives a migration from the NOMAD coe repository db to nomad@FAIRDI.
408
409

    Arguments:
410
411
        migration_version: The migration version. Only packages/calculations with
            no migration version or a lower migration version are migrated.
412
        package_directory: The directory that packages are/get stored in.
413
        compress_packages: True to use compression on package creation.
Markus Scheidgen's avatar
Markus Scheidgen committed
414
415
        threads: Number of threads to run migration in parallel.
        quiet: Prints stats if not quiet
416
    """
417

418
419
420
421
422
423
424
425
426
427
428
    default_sites = [
        '/nomad/repository/data/uploads',
        '/nomad/repository/data/extracted',
        '/data/nomad/uploaded/',
        '/data/nomad/extracted/']

    default_pid_prefix = int(1e7)

    archive_filename = 'archive.tar.gz'
    """ The standard name for tarred uploads in the CoE repository. """

429
430
431
432
    def __init__(
            self,
            migration_version: int = 0,
            package_directory: str = None,
433
            compress_packages: bool = False,
434
            threads: int = 1, quiet: bool = False) -> None:
435
        self.logger = utils.get_logger(__name__, migration_version=migration_version)
436

437
        self.migration_version = migration_version
438
        self.package_directory = package_directory if package_directory is not None else config.fs.migration_packages
439
        self.compress_packages = compress_packages
440
        self._client = None
Markus Scheidgen's avatar
Markus Scheidgen committed
441
442
        self._threads = threads
        self._quiet = quiet
443

444
445
446
447
448
449
450
451
452
        self.source = infrastructure.repository_db

    @property
    def client(self):
        if self._client is None:
            from nomad.client import create_client
            self._client = create_client()

        return self._client
453

454
455
456
457
    def report(self):
        """ returns an aggregated report over all prior migrated packages """
        return Package.aggregate_reports(migration_version=self.migration_version)

458
459
    def copy_users(self):
        """ Copy all users. """
460
        for source_user in self.source.query(User).all():
461
462
463
464
465
466
467
468
469
470
            if source_user.user_id <= 2:
                # skip first two users to keep example users
                # they probably are either already the example users, or [root, Evgeny]
                continue

            create_user_payload = dict(
                user_id=source_user.user_id,
                email=source_user.email,
                first_name=source_user.first_name,
                last_name=source_user.last_name,
471
472
                password=source_user.password,
                created=source_user.created
473
474
475
476
477
478
479
480
481
482
483
484
485
486
            )

            try:
                create_user_payload.update(token=source_user.token)
            except LoginException:
                pass

            if source_user.affiliation is not None:
                create_user_payload.update(affiliation=dict(
                    name=source_user.affiliation.name,
                    address=source_user.affiliation.address))

            try:
                self.client.auth.create_user(payload=create_user_payload).response()
487
                self.logger.info('copied user', user_id=source_user.user_id)
488
489
            except HTTPBadRequest as e:
                self.logger.error('could not create user due to bad data', exc_info=e, user_id=source_user.user_id)
490

Markus Scheidgen's avatar
Markus Scheidgen committed
491
492
493
494
495
496
497
    expected_differences = {
        '0d': 'molecule / cluster',
        '3d': 'bulk',
        '2d': '2d / surface',
        '+u': 'gga'
    }

498
    def validate(self, repo_calc: dict, source_calc: CalcWithMetadata, logger) -> bool:
499
500
501
502
503
504
505
        """
        Validates the given processed calculation, assuming that the data in the given
        source_calc is correct.

        Returns:
            False, if the calculation differs from the source calc.
        """
506
507
508
        keys_to_validate = [
            'atoms', 'basis_set', 'xc_functional', 'system', 'crystal_system',
            'spacegroup', 'code_name', 'code_version']
509

510
511
512
513
514
515
516
517
518
        def to_comparable_list(list):
            for item in list:
                if isinstance(item, dict):
                    for key in item.keys():
                        if key.endswith('id'):
                            yield item.get(key)
                else:
                    yield item

519
        is_valid = True
Markus Scheidgen's avatar
Markus Scheidgen committed
520
        for key, target_value in repo_calc.items():
521
            if key not in keys_to_validate:
522
523
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
524
            source_value = getattr(source_calc, key, None)
525

526
527
            def check_mismatch() -> bool:
                # some exceptions
Markus Scheidgen's avatar
Markus Scheidgen committed
528
529
                if source_value in NomadCOEMigration.expected_differences and \
                        target_value == NomadCOEMigration.expected_differences.get(source_value):
530
531
                    return True

532
533
                logger.info(
                    'source target missmatch', quantity=key,
Markus Scheidgen's avatar
Markus Scheidgen committed
534
535
                    source_value=source_value, target_value=target_value,
                    value_diff='%s->%s' % (str(source_value), str(target_value)))
536
                return False
537

538
            if source_value is None and target_value is not None:
539
540
                continue

541
542
543
            if target_value is None and source_value is not None:
                is_valid &= check_mismatch()

544
            if isinstance(target_value, list):
545
546
                source_list = list(to_comparable_list(source_value))
                target_list = list(to_comparable_list(target_value))
Markus Scheidgen's avatar
Markus Scheidgen committed
547
                if len(set(source_list).intersection(target_list)) != len(target_list):
548
                    is_valid &= check_mismatch()
549
550
551
552
553
554
555
                continue

            if isinstance(source_value, str):
                source_value = source_value.lower()
                target_value = str(target_value).lower()

            if source_value != target_value:
556
                is_valid &= check_mismatch()
557
558
559

        return is_valid

560
    def surrogate_metadata(self, source: CalcWithMetadata):
561
562
563
564
565
566
567
568
569
570
571
572
573
574
        """
        Compute metadata from the given metadata that can be used for new calcs of the
        same upload.
        """
        return CalcWithMetadata(
            uploader=source.uploader,
            with_embargo=source.with_embargo,
            upload_time=source.upload_time,
            coauthors=source.coauthors,
            shared_with=source.shared_with,
            comment=source.comment,
            references=source.references,
            datasets=source.datasets)

575
576
577
578
579
580
581
582
    def set_pid_prefix(self, prefix: int = default_pid_prefix):
        """
        Sets the repo db pid counter to the given values. Allows to create new calcs
        without interfering with migration calcs with already existing PIDs.
        """
        self.logger.info('set pid prefix', pid_prefix=prefix)
        self.client.admin.exec_pidprefix_command(payload=dict(prefix=prefix)).response()

583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
    def call_api(self, operation: str, *args, **kwargs) -> Any:
        """
        Calls nomad via the bravado client. It deals with a very busy nomad and catches,
        backsoff, and retries on gateway timouts. It also circumvents bravados/jsonschemas
        thread safety issues using a global lock on client usage.

        Arguments:
            operation: Comma separated string of api, endpoint, operation,
                e.g. 'uploads.get_upload'.
        """
        op_path = operation.split('.')
        op = self.client
        for op_path_segment in op_path:
            op = getattr(op, op_path_segment)

        sleep = utils.SleepTimeBackoff()
        while True:
            try:
                NomadCOEMigration._client_lock.acquire(blocking=True)
                return op(*args, **kwargs).response().result
            except HTTPGatewayTimeout:
                sleep()
            except Exception as e:
                raise e
            finally:
                NomadCOEMigration._client_lock.release()

610
    def migrate(self, *args, delete_failed: str = '', create_packages: bool = False) -> utils.POPO:
611
612
613
        """
        Migrate the given uploads.

614
615
616
617
618
        It takes paths to extracted uploads as arguments.

        Requires :class:`Package` instances for the given upload paths. Those will
        be created, if they do not already exists. The packages determine the uploads
        for the target infrastructure.
619
620
621
622
623

        Requires a build :func:`index` to look for existing data in the source db. This
        will be used to add user (and other, PID, ...) metadata and validate calculations.

        Uses PIDs of identified old calculations. Will create new PIDs for previously
624
        unknown uploads. See :func:`set_pid_prefix` on how to avoid conflicts.
625

626
        Arguments:
627
            upload_path: A filepath to the upload directory.
628
629
630
            delete_failed: String from ``N``, ``U``, ``P`` to determine that uploads with
                no processed calcs (N), failed upload processing (U), or failed publish
                operation (P) should be deleted after the migration attempt.
631
632
            create_packages: If True, it will attempt to create upload packages if they
                do not exists.
633

Markus Scheidgen's avatar
Markus Scheidgen committed
634
        Returns: Dictionary with statistics on the migration.
635
        """
636

Markus Scheidgen's avatar
Markus Scheidgen committed
637
638
639
640
        cv = threading.Condition()
        overall_report = Report()
        threads = []

641
        def print_report():
Markus Scheidgen's avatar
Markus Scheidgen committed
642
            if not self._quiet:
643
                print(overall_report)
644

645
        def migrate_package(package: Package):
646
647
648
            logger = self.logger.bind(
                package_id=package.package_id, source_upload_id=package.upload_id)

649
650
651
652
            if package.migration_version is not None and package.migration_version >= self.migration_version:
                self.logger.info(
                    'package already migrated, skip it',
                    package_id=package.package_id, source_upload_id=package.upload_id)
653

654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
                package_report = package.report
                overall_report.skipped_packages += 1
            else:
                try:
                    package_report = self.migrate_package(package, delete_failed=delete_failed)

                except Exception as e:
                    package_report = Report()
                    package_report.failed_packages = 1
                    logger.error(
                        'unexpected exception while migrating packages', exc_info=e)
                finally:
                    package.report = package_report
                    package.migration_version = self.migration_version
                    package.save()
Markus Scheidgen's avatar
Markus Scheidgen committed
669
670
671
672
673

            with cv:
                try:
                    overall_report.add(package_report)

674
675
676
                    migrated_all_packages = all(
                        p.migration_version == self.migration_version
                        for p in Package.objects(upload_id=package.upload_id))
Markus Scheidgen's avatar
Markus Scheidgen committed
677

678
                    if migrated_all_packages:
Markus Scheidgen's avatar
Markus Scheidgen committed
679
680
681
682
683
684
685
                        missing_calcs = SourceCalc.objects(
                            upload=package.upload_id, migration_version__ne=self.migration_version).count()
                        total_source_calcs = SourceCalc.objects(upload=package.upload_id).count()

                        overall_report.missing_calcs += missing_calcs
                        overall_report.total_source_calcs += total_source_calcs

686
                        logger.info('migrated upload')
Markus Scheidgen's avatar
Markus Scheidgen committed
687

688
                    print_report()
Markus Scheidgen's avatar
Markus Scheidgen committed
689
690
691
692
693
694
695
                except Exception as e:
                    logger.error('unexpected exception while migrating packages', exc_info=e)

                self._threads += 1
                cv.notify()

        for arg in args:
696
697
698
699
700
701
702
            packages = Package.get_packages(
                arg, self.package_directory, create=create_packages,
                compress=self.compress_packages)

            if packages is None:
                self.logger.error('there are no packages for upload', upload_source_path=arg)
                continue
703

704
            for package in packages:
Markus Scheidgen's avatar
Markus Scheidgen committed
705
706
707
                with cv:
                    cv.wait_for(lambda: self._threads > 0)
                    self._threads -= 1
708
                    thread = threading.Thread(target=lambda: migrate_package(package))
Markus Scheidgen's avatar
Markus Scheidgen committed
709
710
                    threads.append(thread)
                    thread.start()
711

Markus Scheidgen's avatar
Markus Scheidgen committed
712
713
        for thread in threads:
            thread.join()
714

Markus Scheidgen's avatar
Markus Scheidgen committed
715
        return overall_report
716

Markus Scheidgen's avatar
Markus Scheidgen committed
717
718
    _client_lock = threading.Lock()

719
    def migrate_package(self, package: Package, delete_failed: str = '') -> 'Report':
720
721
        """ Migrates the given package. For other params see :func:`migrate`. """

722
723
724
725
726
727
        source_upload_id = package.upload_id
        package_id = package.package_id

        logger = self.logger.bind(package_id=package_id, source_upload_id=source_upload_id)
        logger.debug('start to process package')

728
729
        report = Report()
        report.total_packages += 1
730
731
732
733

        # upload and process the upload file
        with utils.timer(logger, 'upload completed'):
            try:
734
735
                upload = self.call_api(
                    'uploads.upload', name=package_id, local_path=package.package_path)
736
737
            except Exception as e:
                self.logger.error('could not upload package', exc_info=e)
738
739
                report.failed_packages += 1
                return report
740
741
742
743

        logger = logger.bind(
            source_upload_id=source_upload_id, upload_id=upload.upload_id)

744
745
746
747
748
749
750
751
752
        def delete_upload(reason: int):
            delete = \
                (reason == NO_PROCESSED_CALCS and 'N' in delete_failed) or \
                (reason == FAILED_PROCESSING and 'U' in delete_failed) or \
                (reason == FAILED_PUBLISH and 'P' in delete_failed)

            upload_to_delete = upload

            if delete:
753
                upload_to_delete = self.call_api(
754
755
                    'uploads.delete_upload', upload_id=upload_to_delete.upload_id)

756
                sleep = utils.SleepTimeBackoff()
757
                while upload_to_delete.process_running:
758
                    try:
759
                        upload_to_delete = self.call_api(
760
                            'uploads.get_upload', upload_id=upload_to_delete.upload_id)
761
762
763
764
765
                        sleep()
                    except HTTPNotFound:
                        # the proc upload will be deleted by the delete operation
                        break
                logger.info('deleted upload after migration failure')
766
767
768
769
            else:
                logger.warning(
                    'will keep upload after migration failure for debugging',
                    reason=reason, delete_failed=delete_failed)
770

771
772
773
774
        # grab source calcs, while waiting for upload
        source_calcs = dict()
        surrogate_source_calc_with_metadata = None
        with utils.timer(logger, 'loaded source metadata'):
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
            with zipfile.ZipFile(package.package_path) as zf:
                for filenames_chunk in utils.chunks(zf.namelist(), 1000):
                    for source_calc in SourceCalc.objects(
                            upload=source_upload_id, mainfile__in=filenames_chunk):

                        source_calc_with_metadata = CalcWithMetadata(**source_calc.metadata)
                        source_calc_with_metadata.pid = source_calc.pid
                        source_calc_with_metadata.mainfile = source_calc.mainfile
                        source_calcs[source_calc.mainfile] = (source_calc, source_calc_with_metadata)

                        # establish a surrogate for new calcs
                        if surrogate_source_calc_with_metadata is None:
                            surrogate_source_calc_with_metadata = \
                                self.surrogate_metadata(source_calc_with_metadata)

790
791
792
793
794
795
        # try to find a surrogate outside the package, if necessary
        if surrogate_source_calc_with_metadata is None:
            source_calc = SourceCalc.objects(upload=source_upload_id).first()
            if source_calc is not None:
                source_calc_with_metadata = CalcWithMetadata(**source_calc.metadata)
                surrogate_source_calc_with_metadata = \
796
                    self.surrogate_metadata(source_calc_with_metadata)
797

798
799
800
801
        # wait for complete upload
        with utils.timer(logger, 'upload processing completed'):
            sleep = utils.SleepTimeBackoff()
            while upload.tasks_running:
802
                upload = self.call_api('uploads.get_upload', upload_id=upload.upload_id)
803
804
805
806
                sleep()

        if upload.tasks_status == FAILURE:
            logger.error('failed to process upload', process_errors=upload.errors)
807
            report.failed_packages += 1
808
            delete_upload(FAILED_PROCESSING)
809
            return report
810
811
812
813
814
815
816
817
        else:
            report.total_calcs += upload.calcs.pagination.total

        calc_mainfiles = []
        upload_total_calcs = upload.calcs.pagination.total

        # check for processing errors
        with utils.timer(logger, 'checked upload processing'):
818
            per_page = 500
819
            for page in range(1, math.ceil(upload_total_calcs / per_page) + 1):
820
                upload = self.call_api(
821
822
                    'uploads.get_upload', upload_id=upload.upload_id, per_page=per_page,
                    page=page, order_by='mainfile')
823
824
825
826
827
828

                for calc_proc in upload.calcs.results:
                    calc_logger = logger.bind(
                        calc_id=calc_proc.calc_id,
                        mainfile=calc_proc.mainfile)

829
830
831
                    calc_mainfiles.append(calc_proc.mainfile)

                    if calc_proc.tasks_status == FAILURE:
832
                        report.failed_calcs += 1
833
                        calc_logger.info(
834
835
                            'could not process a calc', process_errors=calc_proc.errors)
                        continue
836

837
838
        # verify upload against source
        calcs_in_search = 0
839
        with utils.timer(logger, 'verified upload against source calcs'):
840
841
842
843
844
845
            scroll_id = 'first'
            while scroll_id is not None:
                scroll_args: Dict[str, Any] = dict(scroll=True)
                if scroll_id != 'first':
                    scroll_args['scroll_id'] = scroll_id

846
                search = self.call_api('repo.search', upload_id=upload.upload_id, **scroll_args)
847
848

                scroll_id = search.scroll_id
849

850
851
852
853
854
855
856
857
858
                for calc in search.results:
                    calcs_in_search += 1
                    source_calc, source_calc_with_metadata = source_calcs.get(
                        calc['mainfile'], (None, None))

                    if source_calc is not None:
                        report.migrated_calcs += 1

                        calc_logger = logger.bind(calc_id=calc['calc_id'], mainfile=calc['mainfile'])
859
860
861
862
863
864
865
866
                        if calc.get('processed', False):
                            try:
                                if not self.validate(
                                        calc, source_calc_with_metadata, calc_logger):
                                    report.calcs_with_diffs += 1
                            except Exception as e:
                                calc_logger.warning(
                                    'unexpected exception during validation', exc_info=e)
867
                                report.calcs_with_diffs += 1
868
869
870
                    else:
                        calc_logger.info('processed a calc that has no source')
                        report.new_calcs += 1
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
                        # guessing the metadata from other calcs in upload/package
                        if surrogate_source_calc_with_metadata is not None:
                            new_calc_with_metadata = CalcWithMetadata(**surrogate_source_calc_with_metadata.to_dict())
                            new_calc_with_metadata.mainfile = calc['mainfile']
                        else:
                            calc_logger.warning('could not determine any metadata for new calc')
                            create_time_epoch = os.path.getctime(package.upload_path)
                            new_calc_with_metadata = CalcWithMetadata(
                                upload_time=datetime.datetime.fromtimestamp(create_time_epoch),
                                with_embargo=package.restricted > 0,
                                comment=default_comment,
                                uploader=default_uploader,
                                mainfile=calc['mainfile'])
                            surrogate_source_calc_with_metadata = new_calc_with_metadata

                        source_calcs[calc['mainfile']] = (None, new_calc_with_metadata)
887

888
889
            if len(calc_mainfiles) != calcs_in_search:
                logger.error('missmatch between processed calcs and calcs found with search')
Markus Scheidgen's avatar
Markus Scheidgen committed
890

891
892
893
894
895
896
897
        # publish upload
        if len(calc_mainfiles) > 0:
            with utils.timer(logger, 'upload published'):
                upload_metadata = dict(with_embargo=(package.restricted > 0))
                upload_metadata['calculations'] = [
                    self._to_api_metadata(source_calc_with_metadata)
                    for _, source_calc_with_metadata in source_calcs.values()]
898

899
                upload = self.call_api(
900
901
                    'uploads.exec_upload_operation', upload_id=upload.upload_id,
                    payload=dict(operation='publish', metadata=upload_metadata))
902

903
904
905
                sleep = utils.SleepTimeBackoff()
                while upload.process_running:
                    try:
906
                        upload = self.call_api('uploads.get_upload', upload_id=upload.upload_id)
907
908
909
910
911
912
913
914
915
916
917
                        sleep()
                    except HTTPNotFound:
                        # the proc upload will be deleted by the publish operation
                        break

                if upload.tasks_status == FAILURE:
                    logger.error('could not publish upload', process_errors=upload.errors)
                    report.failed_calcs = report.total_calcs
                    report.migrated_calcs = 0
                    report.calcs_with_diffs = 0
                    report.new_calcs = 0
918
                    report.failed_packages += 1
919
920

                    delete_upload(FAILED_PUBLISH)
921
922
                    SourceCalc.objects(upload=source_upload_id, mainfile__in=calc_mainfiles) \
                        .update(migration_version=-1)
923
924
925
926
                else:
                    SourceCalc.objects(upload=source_upload_id, mainfile__in=calc_mainfiles) \
                        .update(migration_version=self.migration_version)
        else:
927
            delete_upload(NO_PROCESSED_CALCS)
928
            logger.info('no successful calcs, skip publish')
929

930
931
        logger.info('migrated package', **report)
        return report
932
933

    def _to_api_metadata(self, calc_with_metadata: CalcWithMetadata) -> dict:
Markus Scheidgen's avatar
Markus Scheidgen committed
934
        """ Transforms to a dict that fullfils the API's uploade metadata model. """
935

Markus Scheidgen's avatar
Markus Scheidgen committed
936
        return dict(
937
938
939
940
            _upload_time=calc_with_metadata.upload_time,
            _uploader=calc_with_metadata.uploader['id'],
            _pid=calc_with_metadata.pid,
            references=[ref['value'] for ref in calc_with_metadata.references],
Markus Scheidgen's avatar
Markus Scheidgen committed
941
942
943
            datasets=[dict(
                id=ds['id'],
                _doi=ds.get('doi', {'value': None})['value'],
944
945
946
947
948
949
                _name=ds.get('name', None)) for ds in calc_with_metadata.datasets],
            mainfile=calc_with_metadata.mainfile,
            with_embargo=calc_with_metadata.with_embargo,
            comment=calc_with_metadata.comment,
            coauthors=list(int(user['id']) for user in calc_with_metadata.coauthors),
            shared_with=list(int(user['id']) for user in calc_with_metadata.shared_with)
Markus Scheidgen's avatar
Markus Scheidgen committed
950
951
        )

952
    def source_calc_index(self, *args, **kwargs):
953
        """ see :func:`SourceCalc.index` """
954
        return SourceCalc.index(self.source, *args, **kwargs)
955

956
    def package_index(self, upload_path) -> None:
957
958
959
960
961
962
963
        """
        Creates Package objects and respective package zip files for the given uploads.
        The given uploads are supposed to be path to the extracted upload directories.
        If the upload is already in the index, it is not recreated.
        """
        logger = utils.get_logger(__name__)

964
        try:
965
966
            for package_entry in Package.get_packages(
                    upload_path, self.package_directory, create=True,
967
                    compress=self.compress_packages):
968

969
970
971
972
973
974
975
976
977
                logger.info(
                    'package in index',
                    source_upload_path=upload_path,
                    source_upload_id=package_entry.upload_id,
                    package_id=package_entry.package_id)
        except Exception as e:
            logger.error(
                'could not create package from upload',
                upload_path=upload_path, exc_info=e)
978
979
980


class Report(utils.POPO):
Markus Scheidgen's avatar
Markus Scheidgen committed
981
    def __init__(self, *args, **kwargs):
982
983
        self.total_packages = 0
        self.failed_packages = 0
Markus Scheidgen's avatar
Markus Scheidgen committed
984
        self.skipped_packages = 0
985
986
        self.total_calcs = 0  # the calcs that have been found by the target
        self.total_source_calcs = 0  # the calcs in the source index
987
        self.failed_calcs = 0  # calcs that have been migrated with failed processing
988
989
990
991
992
        self.migrated_calcs = 0   # the calcs from the source, successfully added to the target
        self.calcs_with_diffs = 0  # the calcs from the source, successfully added to the target with different metadata
        self.new_calcs = 0  # the calcs successfully added to the target that were not found in the source
        self.missing_calcs = 0  # the calcs in the source, that could not be added to the target due to failure or not founding the calc

Markus Scheidgen's avatar
Markus Scheidgen committed
993
994
        super().__init__(*args, **kwargs)

995
996
997
    def add(self, other: 'Report') -> None:
        for key, value in other.items():
            self[key] = self.get(key, 0) + value
998
999
1000

    def __str__(self):
        return (
For faster browsing, not all history is shown. View entire blame