migration.py 41.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module contains functions to read data from NOMAD coe, external sources,
other/older nomad@FAIRDI instances to mass upload it to a new nomad@FAIRDI instance.
18
19
20

.. autoclass:: NomadCOEMigration
.. autoclass:: SourceCalc
21
22
"""

23
from typing import Generator, Tuple, List, Iterable, Any, Dict, BinaryIO, Callable, ContextManager
Markus Scheidgen's avatar
Markus Scheidgen committed
24
import os
25
import os.path
26
import zipfile
27
import math
28
from mongoengine import Document, IntField, StringField, DictField
29
import datetime
30
from bravado.exception import HTTPNotFound, HTTPBadRequest, HTTPGatewayTimeout
31
import os
32
import runstats
Markus Scheidgen's avatar
Markus Scheidgen committed
33
import io
Markus Scheidgen's avatar
Markus Scheidgen committed
34
import threading
35
36
from tarfile import TarFile
from contextlib import contextmanager
37

38
from nomad import utils, infrastructure, files, config
39
from nomad.coe_repo import User, Calc, LoginException
40
41
from nomad.datamodel import CalcWithMetadata
from nomad.processing import FAILURE, SUCCESS
42
43


44
default_pid_prefix = 7000000
45
46
""" The default pid prefix for new non migrated calculations """

Markus Scheidgen's avatar
Markus Scheidgen committed
47
max_package_size = 16 * 1024 * 1024 * 1024  # 16 GB
48
""" The maximum size of a package that will be used as an upload on nomad@FAIRDI """
49
use_stats_for_filestats_threshold = 1024
50

51
52
53
default_comment = 'entry with unknown provernance'
default_uploader = dict(id=1)

54

Markus Scheidgen's avatar
Markus Scheidgen committed
55
56
57
58
59
60
61
62
63
64
65
66
def iterable_to_stream(iterable, buffer_size=io.DEFAULT_BUFFER_SIZE):
    """
    Lets you use an iterable (e.g. a generator) that yields bytestrings as a read-only
    input stream.

    The stream implements Python 3's newer I/O API (available in Python 2's io module).
    For efficiency, the stream is buffered.
    """
    class IterStream(io.RawIOBase):
        def __init__(self):
            self.leftover = None
            self.iterator = iter(iterable)
67

Markus Scheidgen's avatar
Markus Scheidgen committed
68
69
        def readable(self):
            return True
70

Markus Scheidgen's avatar
Markus Scheidgen committed
71
        def readinto(self, b):
72
            requested_len = len(b)  # We're supposed to return at most this much
Markus Scheidgen's avatar
Markus Scheidgen committed
73
74
75
            while True:
                try:
                    chunk = next(self.iterator)
76
                except StopIteration:
Markus Scheidgen's avatar
Markus Scheidgen committed
77
78
79
                    if len(self.leftover) == 0:
                        return 0  # indicate EOF
                    chunk = self.leftover
80
                output, self.leftover = chunk[:requested_len], chunk[requested_len:]
Markus Scheidgen's avatar
Markus Scheidgen committed
81
82
83
84
85
                len_output = len(output)
                if len_output == 0:
                    continue  # do not prematurely indicate EOF
                b[:len_output] = output
                return len_output
86

Markus Scheidgen's avatar
Markus Scheidgen committed
87
88
89
    return io.BufferedReader(IterStream(), buffer_size=buffer_size)


90
91
92
93
OpenFunction = Callable[[str], ContextManager[BinaryIO]]
Directory = Tuple[List[str], int]


94
95
96
97
98
99
100
class Package(Document):
    """
    A Package represents split origin NOMAD CoE uploads. We use packages as uploads
    in nomad@FAIRDI. Some of the uploads in nomad are very big (alfow lib) and need
    to be split down to yield practical (i.e. for mirrors) upload sizes. Therefore,
    uploads are split over multiple packages if one upload gets to large. A package
    always contains full directories of files to preserve *mainfile* *aux* file relations.
101
    Package have a package entry in mongo and a .zip file with the raw data.
102
103
104
    """

    package_id = StringField(primary_key=True)
105
106
107
108
    """ A random UUID for the package. Could serve later its target upload id."""
    package_path = StringField(required=True)
    """ The path to the package .zip file. """
    upload_path = StringField(required=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
109
    """ The absolute path of the source upload """
110
    upload_id = StringField(required=True)
111
    """ The source upload_id. There might be multiple packages per upload (this is the point). """
112
113
    restricted = IntField(default=0)
    """ The restricted in month, 0 for unrestricted. """
114
    size = IntField()
115
116
117
    """ The sum of all file sizes. """
    files = IntField()
    """ The number of files. """
118

119
120
    migration_version = IntField(default=-1)
    """ The version of the last successful migration of this package """
Markus Scheidgen's avatar
Markus Scheidgen committed
121
    report = DictField()
122
    """ The report of the last successful migration of this package """
123
124

    meta = dict(indexes=['upload_id', 'migration_version'])
125

126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
    @classmethod
    def aggregate_reports(cls, migration_version: str = None) -> 'Report':
        """
        Returns an aggregated report over all package reports of the given migration version,
        or all packages.
        """
        if migration_version is not None:
            query = cls.objects(migration_version__gte=migration_version, report__exists=True)
        else:
            query = cls.objects(report__exists=True)

        report = Report()
        for package in query:
            report.add(Report(package.report))

        return report

143
144
145
146
147
148
149
150
151
    @classmethod
    def create_packages(cls, upload_path: str, target_dir: str) -> Iterable['Package']:
        """
        Will create packages for the given upload_path. Creates the package zip files and
        package index entries. Either will only be created if it does not already exist.
        Yields the Package objects.
        """
        upload_id = os.path.basename(upload_path)
        logger = utils.get_logger(__name__, source_upload_path=upload_path, source_upload_id=upload_id)
152
153
154
155
156

        if not os.path.isdir(upload_path):
            logger.error('upload path is not a directory')
            return []

157
158
159
        package_query = cls.objects(upload_id=upload_id)
        upload_directory = files.DirectoryObject(target_dir, upload_id, create=True, prefix=True)
        restricted = 0
160

161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
        if package_query.count() == 0:
            def open_package_zip(package_entry: 'Package'):
                return zipfile.ZipFile(package_entry.package_path, 'w')

            def create_package_entry():
                package_id = utils.create_uuid()
                return Package(
                    package_id=package_id,
                    upload_path=upload_path,
                    upload_id=upload_id,
                    package_path=upload_directory.join_file('%s.zip' % package_id).os_path)

            def close_package(package_size: int, package_files: int):
                package_zip.close()
                package_entry.size = package_size
                package_entry.files = package_files
                package_entry.save()

                logger.debug('created package', package_id=package_entry.package_id, size=package_size)

            package_entry = create_package_entry()
            package_size = 0
            package_files = 0
            package_zip = open_package_zip(package_entry)
            for (filepaths, size), open in cls.iterate_directory(upload_path):
                for filepath in filepaths:
                    basepath = os.path.basename(filepath)
                    if basepath.startswith('RESTRICTED'):
                        restricted = 36
                        try:
                            restricted = min(36, int(basepath[len('RESTRICTED_'):]))
                        except Exception:
                            pass

                    with package_zip.open(filepath, "w") as target:
                        with open(filepath) as source:
                            buffer = source.read(1024 * 1024)
                            while len(buffer) > 0:
                                target.write(buffer)
                                buffer = source.read(1024 * 1024)
                    package_files += 1

                if size > max_package_size:
                    logger.warn(
                        'directory exceeds max package size',
                        package_id=package_entry.package_id, size=package_size)

                package_size += size
                if package_size > max_package_size:
                    close_package(package_size, package_files)
                    package_size, package_files = 0, 0
                    package_entry = create_package_entry()
                    package_zip = open_package_zip(package_entry)

            if package_files > 0:
                close_package(package_size, package_files)

            package_query = cls.objects(upload_id=upload_id)
            package_query.update(restricted=restricted)
            logger.debug(
                'packaged upload', source_upload_id=upload_id, source_upload_path=upload_path,
                restricted=restricted)

            return package_query
        else:
            return package_query
227

228
229
    @classmethod
    def iterate_directory(cls, upload_path: str) -> Generator[Tuple[Directory, OpenFunction], None, None]:
230
        """
231
232
233
234
235
236
237
238
        Traverses the given upload path and provides directory and file information.

        Returns:
            A tuple with an open function and directory generator. The open function
            take a path as argument and create a context manager with a BinaryIO file like.
            The path are as provided by the directory generator. The directory generator
            yields directories, which are tuples of a path list and a approx. directory
            size in bytes.
239
        """
240
241
242
243
244
245
246
247
248
249
250
        potential_archive_path = os.path.join(upload_path, 'archive.tar.gz')
        if os.path.isfile(potential_archive_path):
            with TarFile.open(potential_archive_path) as tar_file:
                @contextmanager
                def open_function(filepath):
                    file = tar_file.extractfile(filepath)
                    yield file
                    file.close()

                for dir in cls._iterate_upload_archive(tar_file):
                    yield dir, open_function
251
        else:
252
253
            def open_function(filepath):
                return open(os.path.join(upload_path, filepath), 'rb')
254

255
256
            for dir in cls._iterate_upload_directory(upload_path):
                yield dir, open_function
257

258
    @classmethod
259
    def _iterate_upload_directory(cls, upload_path) -> Generator[Directory, None, None]:
260
        """
261
262
        Interprets the given upload path as a directory. Files path are given as upload
        path relative paths.
263
        """
264
265
266
267
        stats = runstats.Statistics()
        for root, _, files in os.walk(upload_path):
            directory_filepaths: List[str] = []
            directory_size = 0
268

269
270
            if len(files) == 0:
                continue
Markus Scheidgen's avatar
Markus Scheidgen committed
271

272
273
274
275
276
277
278
279
280
281
282
283
284
            for file in files:
                filepath = os.path.join(root, file)
                filename = filepath[len(upload_path) + 1:]
                directory_filepaths.append(filename)
                # getting file stats is pretty expensive with gpfs
                # if an upload has more then 1000 files, its pretty likely that
                # size patterns repeat ... goood enough
                if len(stats) < use_stats_for_filestats_threshold:
                    filesize = os.path.getsize(filepath)
                    stats.push(filesize)
                else:
                    filesize = stats.mean()
                directory_size += filesize
Markus Scheidgen's avatar
Markus Scheidgen committed
285

286
            yield directory_filepaths, directory_size
Markus Scheidgen's avatar
Markus Scheidgen committed
287

288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
    @classmethod
    def _iterate_upload_archive(cls, tar_file: TarFile) -> Generator[Directory, None, None]:
        """
        Interprets the given upload path as an archive. File paths are as in the archive.
        """
        current_directory = None
        directory_filepaths: List[str] = []
        directory_size = 0

        for f in tar_file.getmembers():
            if f.isfile():
                directory = os.path.dirname(f.name)
                if current_directory != directory and len(directory_filepaths) > 0:
                    yield directory_filepaths, directory_size
                    directory_filepaths = []
Markus Scheidgen's avatar
Markus Scheidgen committed
303
                    directory_size = 0
304
                    current_directory = directory
Markus Scheidgen's avatar
Markus Scheidgen committed
305

306
307
                directory_filepaths.append(f.name)
                directory_size += f.size
Markus Scheidgen's avatar
Markus Scheidgen committed
308

309
            f = tar_file.next()
Markus Scheidgen's avatar
Markus Scheidgen committed
310

311
        yield directory_filepaths, directory_size
Markus Scheidgen's avatar
Markus Scheidgen committed
312

313

314
class SourceCalc(Document):
315
    """
316
    Mongo document used as a calculation, upload, and metadata db and index
317
318
319
320
321
    build from a given source db. Each :class:`SourceCacl` entry relates
    a pid, mainfile, upload "id" with each other for a corressponding calculation.
    It might alos contain the user metadata. The uploads are "id"ed via the
    specific path segment that identifies an upload on the CoE repo FS(s) without
    any prefixes (e.g. $EXTRACTED, /data/upload, etc.)
322
323
324
325
326
327
    """
    pid = IntField(primary_key=True)
    mainfile = StringField()
    upload = StringField()
    metadata = DictField()

328
    migration_version = IntField(default=-1)
329

330
331
    extracted_prefix = '$EXTRACTED/'
    sites = ['/data/nomad/extracted/', '/nomad/repository/extracted/']
332
333
    prefixes = [extracted_prefix] + sites

334
    meta = dict(indexes=['upload', 'mainfile', 'migration_version'])
335

336
    _dataset_cache: dict = {}
337
338

    @staticmethod
339
340
341
342
    def index(source, drop: bool = False, with_metadata: bool = True, per_query: int = 100) \
            -> Generator[Tuple['SourceCalc', int], None, None]:
        """
        Creates a collection of :class:`SourceCalc` documents that represent source repo
343
        db entries.
344
345
346

        Arguments:
            source: The source db sql alchemy session
347
348
349
350
351
352
353
            drop: True to drop and create a new collection, update the existing otherwise,
                default is False.
            with_metadata: True to also grab all metadata and store it, default is True.
            per_query: The implementation tries to grab almost all data with a heavely joined
                query on the CoE snoflake/star shaped schema.
                The query cannot ask for the whole db at once: choose how many calculations
                should be read at a time to optimize for your application.
354
355

        Returns:
356
            yields tuples (:class:`SourceCalc`, #calcs_total[incl. datasets])
357
        """
358
        logger = utils.get_logger(__name__)
359
360
361
        if drop:
            SourceCalc.drop_collection()

362
        last_source_calc = SourceCalc.objects().order_by('-pid').first()
363
364
        start_pid = last_source_calc.pid if last_source_calc is not None else 0
        source_query = source.query(Calc)
365
        total = source_query.count() - SourceCalc.objects.count()
366
367

        while True:
368
369
            query_timer = utils.timer(logger, 'query source db')
            query_timer.__enter__()  # pylint: disable=E1101
Markus Scheidgen's avatar
Markus Scheidgen committed
370
            calcs: Iterable[Calc] = source_query \
371
372
373
                .filter(Calc.coe_calc_id > start_pid) \
                .order_by(Calc.coe_calc_id) \
                .limit(per_query)
Markus Scheidgen's avatar
Markus Scheidgen committed
374

375
376
            source_calcs = []
            for calc in calcs:
377
                query_timer.__exit__(None, None, None)  # pylint: disable=E1101
378
                try:
Markus Scheidgen's avatar
Markus Scheidgen committed
379
380
                    filenames = calc.files
                    if filenames is None or len(filenames) == 0:
381
382
383
                        continue  # dataset case

                    filename = filenames[0]
384
385
386
                    if len(filenames) == 1 and (filename.endswith('.tgz') or filename.endswith('.zip')):
                        continue  # also a dataset, some datasets have a downloadable archive

387
388
389
390
391
392
393
394
                    for prefix in SourceCalc.prefixes:
                        filename = filename.replace(prefix, '')
                    segments = [file.strip('\\') for file in filename.split('/')]

                    source_calc = SourceCalc(pid=calc.pid)
                    source_calc.upload = segments[0]
                    source_calc.mainfile = os.path.join(*segments[1:])
                    if with_metadata:
Markus Scheidgen's avatar
Markus Scheidgen committed
395
                        source_calc.metadata = calc.to_calc_with_metadata().__dict__
396
397
398
399
400
401
                    source_calcs.append(source_calc)
                    start_pid = source_calc.pid

                    yield source_calc, total
                except Exception as e:
                    logger.error('could not index', pid=calc.pid, exc_info=e)
402

403
404
405
            if len(source_calcs) == 0:
                break
            else:
406
407
                with utils.timer(logger, 'write index'):
                    SourceCalc.objects.insert(source_calcs)
408
409


410
411
412
413
414
NO_PROCESSED_CALCS = 0
FAILED_PROCESSING = 1
FAILED_PUBLISH = 2


415
416
class NomadCOEMigration:
    """
417
    Drives a migration from the NOMAD coe repository db to nomad@FAIRDI.
418
419

    Arguments:
420
421
        migration_version: The migration version. Only packages/calculations with
            no migration version or a lower migration version are migrated.
422
        package_directory: The directory that packages are/get stored in.
Markus Scheidgen's avatar
Markus Scheidgen committed
423
424
        threads: Number of threads to run migration in parallel.
        quiet: Prints stats if not quiet
425
    """
426

427
428
429
430
431
432
433
434
435
436
437
    default_sites = [
        '/nomad/repository/data/uploads',
        '/nomad/repository/data/extracted',
        '/data/nomad/uploaded/',
        '/data/nomad/extracted/']

    default_pid_prefix = int(1e7)

    archive_filename = 'archive.tar.gz'
    """ The standard name for tarred uploads in the CoE repository. """

438
439
440
441
442
    def __init__(
            self,
            migration_version: int = 0,
            package_directory: str = None,
            threads: int = 1, quiet: bool = False) -> None:
443
        self.logger = utils.get_logger(__name__, migration_version=migration_version)
444

445
        self.migration_version = migration_version
446
        self.package_directory = package_directory if package_directory is not None else config.fs.migration_packages
447
        self._client = None
Markus Scheidgen's avatar
Markus Scheidgen committed
448
449
        self._threads = threads
        self._quiet = quiet
450

451
452
453
454
455
456
457
458
459
        self.source = infrastructure.repository_db

    @property
    def client(self):
        if self._client is None:
            from nomad.client import create_client
            self._client = create_client()

        return self._client
460

461
462
463
464
    def report(self):
        """ returns an aggregated report over all prior migrated packages """
        return Package.aggregate_reports(migration_version=self.migration_version)

465
466
    def copy_users(self):
        """ Copy all users. """
467
        for source_user in self.source.query(User).all():
468
469
470
471
472
473
474
475
476
477
            if source_user.user_id <= 2:
                # skip first two users to keep example users
                # they probably are either already the example users, or [root, Evgeny]
                continue

            create_user_payload = dict(
                user_id=source_user.user_id,
                email=source_user.email,
                first_name=source_user.first_name,
                last_name=source_user.last_name,
478
479
                password=source_user.password,
                created=source_user.created
480
481
482
483
484
485
486
487
488
489
490
491
492
493
            )

            try:
                create_user_payload.update(token=source_user.token)
            except LoginException:
                pass

            if source_user.affiliation is not None:
                create_user_payload.update(affiliation=dict(
                    name=source_user.affiliation.name,
                    address=source_user.affiliation.address))

            try:
                self.client.auth.create_user(payload=create_user_payload).response()
494
                self.logger.info('copied user', user_id=source_user.user_id)
495
496
            except HTTPBadRequest as e:
                self.logger.error('could not create user due to bad data', exc_info=e, user_id=source_user.user_id)
497

Markus Scheidgen's avatar
Markus Scheidgen committed
498
499
500
501
502
503
504
    expected_differences = {
        '0d': 'molecule / cluster',
        '3d': 'bulk',
        '2d': '2d / surface',
        '+u': 'gga'
    }

505
    def validate(self, repo_calc: dict, source_calc: CalcWithMetadata, logger) -> bool:
506
507
508
509
510
511
512
        """
        Validates the given processed calculation, assuming that the data in the given
        source_calc is correct.

        Returns:
            False, if the calculation differs from the source calc.
        """
513
514
515
        keys_to_validate = [
            'atoms', 'basis_set', 'xc_functional', 'system', 'crystal_system',
            'spacegroup', 'code_name', 'code_version']
516

517
518
519
520
521
522
523
524
525
        def to_comparable_list(list):
            for item in list:
                if isinstance(item, dict):
                    for key in item.keys():
                        if key.endswith('id'):
                            yield item.get(key)
                else:
                    yield item

526
        is_valid = True
Markus Scheidgen's avatar
Markus Scheidgen committed
527
        for key, target_value in repo_calc.items():
528
            if key not in keys_to_validate:
529
530
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
531
            source_value = getattr(source_calc, key, None)
532

533
534
            def check_mismatch() -> bool:
                # some exceptions
Markus Scheidgen's avatar
Markus Scheidgen committed
535
536
                if source_value in NomadCOEMigration.expected_differences and \
                        target_value == NomadCOEMigration.expected_differences.get(source_value):
537
538
                    return True

539
540
                logger.info(
                    'source target missmatch', quantity=key,
Markus Scheidgen's avatar
Markus Scheidgen committed
541
542
                    source_value=source_value, target_value=target_value,
                    value_diff='%s->%s' % (str(source_value), str(target_value)))
543
                return False
544

545
            if source_value is None and target_value is not None:
546
547
                continue

548
549
550
            if target_value is None and source_value is not None:
                is_valid &= check_mismatch()

551
            if isinstance(target_value, list):
552
553
                source_list = list(to_comparable_list(source_value))
                target_list = list(to_comparable_list(target_value))
Markus Scheidgen's avatar
Markus Scheidgen committed
554
                if len(set(source_list).intersection(target_list)) != len(target_list):
555
                    is_valid &= check_mismatch()
556
557
558
559
560
561
562
                continue

            if isinstance(source_value, str):
                source_value = source_value.lower()
                target_value = str(target_value).lower()

            if source_value != target_value:
563
                is_valid &= check_mismatch()
564
565
566

        return is_valid

567
    def surrogate_metadata(self, source: CalcWithMetadata):
568
569
570
571
572
573
574
575
576
577
578
579
580
581
        """
        Compute metadata from the given metadata that can be used for new calcs of the
        same upload.
        """
        return CalcWithMetadata(
            uploader=source.uploader,
            with_embargo=source.with_embargo,
            upload_time=source.upload_time,
            coauthors=source.coauthors,
            shared_with=source.shared_with,
            comment=source.comment,
            references=source.references,
            datasets=source.datasets)

582
583
584
585
586
587
588
589
    def set_pid_prefix(self, prefix: int = default_pid_prefix):
        """
        Sets the repo db pid counter to the given values. Allows to create new calcs
        without interfering with migration calcs with already existing PIDs.
        """
        self.logger.info('set pid prefix', pid_prefix=prefix)
        self.client.admin.exec_pidprefix_command(payload=dict(prefix=prefix)).response()

590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
    def call_api(self, operation: str, *args, **kwargs) -> Any:
        """
        Calls nomad via the bravado client. It deals with a very busy nomad and catches,
        backsoff, and retries on gateway timouts. It also circumvents bravados/jsonschemas
        thread safety issues using a global lock on client usage.

        Arguments:
            operation: Comma separated string of api, endpoint, operation,
                e.g. 'uploads.get_upload'.
        """
        op_path = operation.split('.')
        op = self.client
        for op_path_segment in op_path:
            op = getattr(op, op_path_segment)

        sleep = utils.SleepTimeBackoff()
        while True:
            try:
                NomadCOEMigration._client_lock.acquire(blocking=True)
                return op(*args, **kwargs).response().result
            except HTTPGatewayTimeout:
                sleep()
            except Exception as e:
                raise e
            finally:
                NomadCOEMigration._client_lock.release()

    def migrate(self, *args, delete_failed: str = '') -> utils.POPO:
618
619
620
        """
        Migrate the given uploads.

621
622
623
624
625
        It takes paths to extracted uploads as arguments.

        Requires :class:`Package` instances for the given upload paths. Those will
        be created, if they do not already exists. The packages determine the uploads
        for the target infrastructure.
626
627
628
629
630

        Requires a build :func:`index` to look for existing data in the source db. This
        will be used to add user (and other, PID, ...) metadata and validate calculations.

        Uses PIDs of identified old calculations. Will create new PIDs for previously
631
        unknown uploads. See :func:`set_pid_prefix` on how to avoid conflicts.
632

633
        Arguments:
634
            upload_path: A filepath to the upload directory.
635
636
637
            delete_failed: String from ``N``, ``U``, ``P`` to determine that uploads with
                no processed calcs (N), failed upload processing (U), or failed publish
                operation (P) should be deleted after the migration attempt.
638

Markus Scheidgen's avatar
Markus Scheidgen committed
639
        Returns: Dictionary with statistics on the migration.
640
        """
641

Markus Scheidgen's avatar
Markus Scheidgen committed
642
643
644
645
        cv = threading.Condition()
        overall_report = Report()
        threads = []

646
        def print_report():
Markus Scheidgen's avatar
Markus Scheidgen committed
647
            if not self._quiet:
648
                print(overall_report)
649

650
        def migrate_package(package: Package):
651
652
653
            logger = self.logger.bind(
                package_id=package.package_id, source_upload_id=package.upload_id)

654
655
656
657
            if package.migration_version is not None and package.migration_version >= self.migration_version:
                self.logger.info(
                    'package already migrated, skip it',
                    package_id=package.package_id, source_upload_id=package.upload_id)
658

659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
                package_report = package.report
                overall_report.skipped_packages += 1
            else:
                try:
                    package_report = self.migrate_package(package, delete_failed=delete_failed)

                except Exception as e:
                    package_report = Report()
                    package_report.failed_packages = 1
                    logger.error(
                        'unexpected exception while migrating packages', exc_info=e)
                finally:
                    package.report = package_report
                    package.migration_version = self.migration_version
                    package.save()
Markus Scheidgen's avatar
Markus Scheidgen committed
674
675
676
677
678

            with cv:
                try:
                    overall_report.add(package_report)

679
680
681
                    migrated_all_packages = all(
                        p.migration_version == self.migration_version
                        for p in Package.objects(upload_id=package.upload_id))
Markus Scheidgen's avatar
Markus Scheidgen committed
682

683
                    if migrated_all_packages:
Markus Scheidgen's avatar
Markus Scheidgen committed
684
685
686
687
688
689
690
                        missing_calcs = SourceCalc.objects(
                            upload=package.upload_id, migration_version__ne=self.migration_version).count()
                        total_source_calcs = SourceCalc.objects(upload=package.upload_id).count()

                        overall_report.missing_calcs += missing_calcs
                        overall_report.total_source_calcs += total_source_calcs

691
                        logger.info('migrated upload')
Markus Scheidgen's avatar
Markus Scheidgen committed
692

693
                    print_report()
Markus Scheidgen's avatar
Markus Scheidgen committed
694
695
696
697
698
699
700
                except Exception as e:
                    logger.error('unexpected exception while migrating packages', exc_info=e)

                self._threads += 1
                cv.notify()

        for arg in args:
701
            for package in Package.create_packages(arg, self.package_directory):
Markus Scheidgen's avatar
Markus Scheidgen committed
702
703
704
                with cv:
                    cv.wait_for(lambda: self._threads > 0)
                    self._threads -= 1
705
                    thread = threading.Thread(target=lambda: migrate_package(package))
Markus Scheidgen's avatar
Markus Scheidgen committed
706
707
                    threads.append(thread)
                    thread.start()
708

Markus Scheidgen's avatar
Markus Scheidgen committed
709
710
        for thread in threads:
            thread.join()
711

Markus Scheidgen's avatar
Markus Scheidgen committed
712
        return overall_report
713

Markus Scheidgen's avatar
Markus Scheidgen committed
714
715
    _client_lock = threading.Lock()

716
    def migrate_package(self, package: Package, delete_failed: str = '') -> 'Report':
717
718
        """ Migrates the given package. For other params see :func:`migrate`. """

719
720
721
722
723
724
        source_upload_id = package.upload_id
        package_id = package.package_id

        logger = self.logger.bind(package_id=package_id, source_upload_id=source_upload_id)
        logger.debug('start to process package')

725
726
        report = Report()
        report.total_packages += 1
727
728
729
730

        # upload and process the upload file
        with utils.timer(logger, 'upload completed'):
            try:
731
732
                upload = self.call_api(
                    'uploads.upload', name=package_id, local_path=package.package_path)
733
734
            except Exception as e:
                self.logger.error('could not upload package', exc_info=e)
735
736
                report.failed_packages += 1
                return report
737
738
739
740

        logger = logger.bind(
            source_upload_id=source_upload_id, upload_id=upload.upload_id)

741
742
743
744
745
746
747
748
749
        def delete_upload(reason: int):
            delete = \
                (reason == NO_PROCESSED_CALCS and 'N' in delete_failed) or \
                (reason == FAILED_PROCESSING and 'U' in delete_failed) or \
                (reason == FAILED_PUBLISH and 'P' in delete_failed)

            upload_to_delete = upload

            if delete:
750
                upload_to_delete = self.call_api(
751
752
                    'uploads.delete_upload', upload_id=upload_to_delete.upload_id)

753
                sleep = utils.SleepTimeBackoff()
754
                while upload_to_delete.process_running:
755
                    try:
756
                        upload_to_delete = self.call_api(
757
                            'uploads.get_upload', upload_id=upload_to_delete.upload_id)
758
759
760
761
762
                        sleep()
                    except HTTPNotFound:
                        # the proc upload will be deleted by the delete operation
                        break
                logger.info('deleted upload after migration failure')
763
764
765
766
            else:
                logger.warning(
                    'will keep upload after migration failure for debugging',
                    reason=reason, delete_failed=delete_failed)
767

768
769
770
771
        # grab source calcs, while waiting for upload
        source_calcs = dict()
        surrogate_source_calc_with_metadata = None
        with utils.timer(logger, 'loaded source metadata'):
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
            with zipfile.ZipFile(package.package_path) as zf:
                for filenames_chunk in utils.chunks(zf.namelist(), 1000):
                    for source_calc in SourceCalc.objects(
                            upload=source_upload_id, mainfile__in=filenames_chunk):

                        source_calc_with_metadata = CalcWithMetadata(**source_calc.metadata)
                        source_calc_with_metadata.pid = source_calc.pid
                        source_calc_with_metadata.mainfile = source_calc.mainfile
                        source_calcs[source_calc.mainfile] = (source_calc, source_calc_with_metadata)

                        # establish a surrogate for new calcs
                        if surrogate_source_calc_with_metadata is None:
                            surrogate_source_calc_with_metadata = \
                                self.surrogate_metadata(source_calc_with_metadata)

787
788
789
790
791
792
        # try to find a surrogate outside the package, if necessary
        if surrogate_source_calc_with_metadata is None:
            source_calc = SourceCalc.objects(upload=source_upload_id).first()
            if source_calc is not None:
                source_calc_with_metadata = CalcWithMetadata(**source_calc.metadata)
                surrogate_source_calc_with_metadata = \
793
                    self.surrogate_metadata(source_calc_with_metadata)
794

795
796
797
798
        # wait for complete upload
        with utils.timer(logger, 'upload processing completed'):
            sleep = utils.SleepTimeBackoff()
            while upload.tasks_running:
799
                upload = self.call_api('uploads.get_upload', upload_id=upload.upload_id)
800
801
802
803
                sleep()

        if upload.tasks_status == FAILURE:
            logger.error('failed to process upload', process_errors=upload.errors)
804
            report.failed_packages += 1
805
            delete_upload(FAILED_PROCESSING)
806
            return report
807
808
809
810
811
812
813
814
        else:
            report.total_calcs += upload.calcs.pagination.total

        calc_mainfiles = []
        upload_total_calcs = upload.calcs.pagination.total

        # check for processing errors
        with utils.timer(logger, 'checked upload processing'):
815
            per_page = 500
816
            for page in range(1, math.ceil(upload_total_calcs / per_page) + 1):
817
                upload = self.call_api(
818
819
                    'uploads.get_upload', upload_id=upload.upload_id, per_page=per_page,
                    page=page, order_by='mainfile')
820
821
822
823
824
825
826
827

                for calc_proc in upload.calcs.results:
                    calc_logger = logger.bind(
                        calc_id=calc_proc.calc_id,
                        mainfile=calc_proc.mainfile)

                    if calc_proc.tasks_status == SUCCESS:
                        calc_mainfiles.append(calc_proc.mainfile)
828
                    else:
829
                        report.failed_calcs += 1
830
                        calc_logger.info(
831
832
                            'could not process a calc', process_errors=calc_proc.errors)
                        continue
833

834
835
        # verify upload against source
        calcs_in_search = 0
836
        with utils.timer(logger, 'verified upload against source calcs'):
837
838
839
840
841
842
            scroll_id = 'first'
            while scroll_id is not None:
                scroll_args: Dict[str, Any] = dict(scroll=True)
                if scroll_id != 'first':
                    scroll_args['scroll_id'] = scroll_id

843
                search = self.call_api('repo.search', upload_id=upload.upload_id, **scroll_args)
844
845

                scroll_id = search.scroll_id
846

847
848
849
850
851
852
853
854
855
                for calc in search.results:
                    calcs_in_search += 1
                    source_calc, source_calc_with_metadata = source_calcs.get(
                        calc['mainfile'], (None, None))

                    if source_calc is not None:
                        report.migrated_calcs += 1

                        calc_logger = logger.bind(calc_id=calc['calc_id'], mainfile=calc['mainfile'])
856
                        try:
857
                            if not self.validate(calc, source_calc_with_metadata, calc_logger):
858
859
                                report.calcs_with_diffs += 1
                        except Exception as e:
Markus Scheidgen's avatar
Markus Scheidgen committed
860
                            calc_logger.warning('unexpected exception during validation', exc_info=e)
861
862
863
864
                            report.calcs_with_diffs += 1
                    else:
                        calc_logger.info('processed a calc that has no source')
                        report.new_calcs += 1
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
                        # guessing the metadata from other calcs in upload/package
                        if surrogate_source_calc_with_metadata is not None:
                            new_calc_with_metadata = CalcWithMetadata(**surrogate_source_calc_with_metadata.to_dict())
                            new_calc_with_metadata.mainfile = calc['mainfile']
                        else:
                            calc_logger.warning('could not determine any metadata for new calc')
                            create_time_epoch = os.path.getctime(package.upload_path)
                            new_calc_with_metadata = CalcWithMetadata(
                                upload_time=datetime.datetime.fromtimestamp(create_time_epoch),
                                with_embargo=package.restricted > 0,
                                comment=default_comment,
                                uploader=default_uploader,
                                mainfile=calc['mainfile'])
                            surrogate_source_calc_with_metadata = new_calc_with_metadata

                        source_calcs[calc['mainfile']] = (None, new_calc_with_metadata)
881

882
883
            if len(calc_mainfiles) != calcs_in_search:
                logger.error('missmatch between processed calcs and calcs found with search')
Markus Scheidgen's avatar
Markus Scheidgen committed
884

885
886
887
888
889
890
891
        # publish upload
        if len(calc_mainfiles) > 0:
            with utils.timer(logger, 'upload published'):
                upload_metadata = dict(with_embargo=(package.restricted > 0))
                upload_metadata['calculations'] = [
                    self._to_api_metadata(source_calc_with_metadata)
                    for _, source_calc_with_metadata in source_calcs.values()]
892

893
                upload = self.call_api(
894
895
                    'uploads.exec_upload_operation', upload_id=upload.upload_id,
                    payload=dict(operation='publish', metadata=upload_metadata))
896

897
898
899
                sleep = utils.SleepTimeBackoff()
                while upload.process_running:
                    try:
900
                        upload = self.call_api('uploads.get_upload', upload_id=upload.upload_id)
901
902
903
904
905
906
907
908
909
910
911
                        sleep()
                    except HTTPNotFound:
                        # the proc upload will be deleted by the publish operation
                        break

                if upload.tasks_status == FAILURE:
                    logger.error('could not publish upload', process_errors=upload.errors)
                    report.failed_calcs = report.total_calcs
                    report.migrated_calcs = 0
                    report.calcs_with_diffs = 0
                    report.new_calcs = 0
912
                    report.failed_packages += 1
913
914

                    delete_upload(FAILED_PUBLISH)
915
916
                    SourceCalc.objects(upload=source_upload_id, mainfile__in=calc_mainfiles) \
                        .update(migration_version=-1)
917
918
919
920
                else:
                    SourceCalc.objects(upload=source_upload_id, mainfile__in=calc_mainfiles) \
                        .update(migration_version=self.migration_version)
        else:
921
            delete_upload(NO_PROCESSED_CALCS)
922
            logger.info('no successful calcs, skip publish')
923

924
925
        logger.info('migrated package', **report)
        return report
926
927

    def _to_api_metadata(self, calc_with_metadata: CalcWithMetadata) -> dict:
Markus Scheidgen's avatar
Markus Scheidgen committed
928
        """ Transforms to a dict that fullfils the API's uploade metadata model. """
929

Markus Scheidgen's avatar
Markus Scheidgen committed
930
        return dict(
931
932
933
934
            _upload_time=calc_with_metadata.upload_time,
            _uploader=calc_with_metadata.uploader['id'],
            _pid=calc_with_metadata.pid,
            references=[ref['value'] for ref in calc_with_metadata.references],
Markus Scheidgen's avatar
Markus Scheidgen committed
935
936
937
            datasets=[dict(
                id=ds['id'],
                _doi=ds.get('doi', {'value': None})['value'],
938
939
940
941
942
943
                _name=ds.get('name', None)) for ds in calc_with_metadata.datasets],
            mainfile=calc_with_metadata.mainfile,
            with_embargo=calc_with_metadata.with_embargo,
            comment=calc_with_metadata.comment,
            coauthors=list(int(user['id']) for user in calc_with_metadata.coauthors),
            shared_with=list(int(user['id']) for user in calc_with_metadata.shared_with)
Markus Scheidgen's avatar
Markus Scheidgen committed
944
945
        )

946
    def source_calc_index(self, *args, **kwargs):
947
        """ see :func:`SourceCalc.index` """
948
        return SourceCalc.index(self.source, *args, **kwargs)
949

950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
    def package_index(self, *upload_paths) -> None:
        """
        Creates Package objects and respective package zip files for the given uploads.
        The given uploads are supposed to be path to the extracted upload directories.
        If the upload is already in the index, it is not recreated.
        """
        logger = utils.get_logger(__name__)

        for upload_path in upload_paths:
            try:
                for package_entry in Package.create_packages(upload_path, self.package_directory):
                    logger.info(
                        'package in index',
                        source_upload_path=upload_path,
                        source_upload_id=package_entry.upload_id,
                        package_id=package_entry.package_id)
            except Exception as e:
                logger.error(
                    'could create package from upload',
                    upload_path=upload_path, exc_info=e)
                continue
971
972
973


class Report(utils.POPO):
Markus Scheidgen's avatar
Markus Scheidgen committed
974
    def __init__(self, *args, **kwargs):
975
976
        self.total_packages = 0
        self.failed_packages = 0
Markus Scheidgen's avatar
Markus Scheidgen committed
977
        self.skipped_packages = 0
978
979
980
981
982
983
984
985
        self.total_calcs = 0  # the calcs that have been found by the target
        self.total_source_calcs = 0  # the calcs in the source index
        self.failed_calcs = 0  # the calcs found b the target that could not be processed/published
        self.migrated_calcs = 0   # the calcs from the source, successfully added to the target
        self.calcs_with_diffs = 0  # the calcs from the source, successfully added to the target with different metadata
        self.new_calcs = 0  # the calcs successfully added to the target that were not found in the source
        self.missing_calcs = 0  # the calcs in the source, that could not be added to the target due to failure or not founding the calc

Markus Scheidgen's avatar
Markus Scheidgen committed
986
987
        super().__init__(*args, **kwargs)

988
989
990
    def add(self, other: 'Report') -> None:
        for key, value in other.items():
            self[key] = self.get(key, 0) + value
991
992
993
994
995
996
997
998
999

    def __str__(self):
        return (
            'packages: {:,}, skipped: {:,}, source calcs: {:,}, migrated: {:,}, '
            'failed: {:,}, missing: {:,}, new: {:,}'.format(
                self.total_packages, self.skipped_packages,
                self.total_source_calcs, self.migrated_calcs,
                self.failed_calcs, self.missing_calcs,
                self.new_calcs))