migration.py 41.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module contains functions to read data from NOMAD coe, external sources,
other/older nomad@FAIRDI instances to mass upload it to a new nomad@FAIRDI instance.
18
19
20

.. autoclass:: NomadCOEMigration
.. autoclass:: SourceCalc
21
22
"""

23
from typing import Generator, Tuple, List, Iterable, Any, Dict, BinaryIO, Callable, ContextManager
Markus Scheidgen's avatar
Markus Scheidgen committed
24
import os
25
import os.path
26
import zipfile
27
import tarfile
28
import math
29
from mongoengine import Document, IntField, StringField, DictField
30
import datetime
31
from bravado.exception import HTTPNotFound, HTTPBadRequest, HTTPGatewayTimeout
32
import os
33
import runstats
Markus Scheidgen's avatar
Markus Scheidgen committed
34
import io
Markus Scheidgen's avatar
Markus Scheidgen committed
35
import threading
36
from contextlib import contextmanager
37

38
from nomad import utils, infrastructure, files, config
39
from nomad.coe_repo import User, Calc, LoginException
40
41
from nomad.datamodel import CalcWithMetadata
from nomad.processing import FAILURE, SUCCESS
42
43


44
default_pid_prefix = 7000000
45
46
""" The default pid prefix for new non migrated calculations """

Markus Scheidgen's avatar
Markus Scheidgen committed
47
max_package_size = 16 * 1024 * 1024 * 1024  # 16 GB
48
""" The maximum size of a package that will be used as an upload on nomad@FAIRDI """
49
use_stats_for_filestats_threshold = 1024
50

51
52
53
default_comment = 'entry with unknown provernance'
default_uploader = dict(id=1)

54

Markus Scheidgen's avatar
Markus Scheidgen committed
55
56
57
58
59
60
61
62
63
64
65
66
def iterable_to_stream(iterable, buffer_size=io.DEFAULT_BUFFER_SIZE):
    """
    Lets you use an iterable (e.g. a generator) that yields bytestrings as a read-only
    input stream.

    The stream implements Python 3's newer I/O API (available in Python 2's io module).
    For efficiency, the stream is buffered.
    """
    class IterStream(io.RawIOBase):
        def __init__(self):
            self.leftover = None
            self.iterator = iter(iterable)
67

Markus Scheidgen's avatar
Markus Scheidgen committed
68
69
        def readable(self):
            return True
70

Markus Scheidgen's avatar
Markus Scheidgen committed
71
        def readinto(self, b):
72
            requested_len = len(b)  # We're supposed to return at most this much
Markus Scheidgen's avatar
Markus Scheidgen committed
73
74
75
            while True:
                try:
                    chunk = next(self.iterator)
76
                except StopIteration:
Markus Scheidgen's avatar
Markus Scheidgen committed
77
78
79
                    if len(self.leftover) == 0:
                        return 0  # indicate EOF
                    chunk = self.leftover
80
                output, self.leftover = chunk[:requested_len], chunk[requested_len:]
Markus Scheidgen's avatar
Markus Scheidgen committed
81
82
83
84
85
                len_output = len(output)
                if len_output == 0:
                    continue  # do not prematurely indicate EOF
                b[:len_output] = output
                return len_output
86

Markus Scheidgen's avatar
Markus Scheidgen committed
87
88
89
    return io.BufferedReader(IterStream(), buffer_size=buffer_size)


90
91
92
OpenFunction = Callable[[Any], ContextManager[BinaryIO]]
PathFunction = Callable[[Any], str]
Directory = Tuple[List[Any], int]
93
94


95
96
97
98
99
100
101
class Package(Document):
    """
    A Package represents split origin NOMAD CoE uploads. We use packages as uploads
    in nomad@FAIRDI. Some of the uploads in nomad are very big (alfow lib) and need
    to be split down to yield practical (i.e. for mirrors) upload sizes. Therefore,
    uploads are split over multiple packages if one upload gets to large. A package
    always contains full directories of files to preserve *mainfile* *aux* file relations.
102
    Package have a package entry in mongo and a .zip file with the raw data.
103
104
105
    """

    package_id = StringField(primary_key=True)
106
107
108
109
    """ A random UUID for the package. Could serve later its target upload id."""
    package_path = StringField(required=True)
    """ The path to the package .zip file. """
    upload_path = StringField(required=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
110
    """ The absolute path of the source upload """
111
    upload_id = StringField(required=True)
112
    """ The source upload_id. There might be multiple packages per upload (this is the point). """
113
114
    restricted = IntField(default=0)
    """ The restricted in month, 0 for unrestricted. """
115
    size = IntField()
116
117
118
    """ The sum of all file sizes. """
    files = IntField()
    """ The number of files. """
119

120
121
    migration_version = IntField(default=-1)
    """ The version of the last successful migration of this package """
Markus Scheidgen's avatar
Markus Scheidgen committed
122
    report = DictField()
123
    """ The report of the last successful migration of this package """
124
125

    meta = dict(indexes=['upload_id', 'migration_version'])
126

127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
    @classmethod
    def aggregate_reports(cls, migration_version: str = None) -> 'Report':
        """
        Returns an aggregated report over all package reports of the given migration version,
        or all packages.
        """
        if migration_version is not None:
            query = cls.objects(migration_version__gte=migration_version, report__exists=True)
        else:
            query = cls.objects(report__exists=True)

        report = Report()
        for package in query:
            report.add(Report(package.report))

        return report

144
    @classmethod
145
146
147
    def create_packages(
            cls, upload_path: str, target_dir: str,
            compress: bool = False) -> Iterable['Package']:
148
149
150
151
152
153
154
        """
        Will create packages for the given upload_path. Creates the package zip files and
        package index entries. Either will only be created if it does not already exist.
        Yields the Package objects.
        """
        upload_id = os.path.basename(upload_path)
        logger = utils.get_logger(__name__, source_upload_path=upload_path, source_upload_id=upload_id)
155
156
157
158
159

        if not os.path.isdir(upload_path):
            logger.error('upload path is not a directory')
            return []

160
161
162
        package_query = cls.objects(upload_id=upload_id)
        upload_directory = files.DirectoryObject(target_dir, upload_id, create=True, prefix=True)
        restricted = 0
163

164
165
        if package_query.count() == 0:
            def open_package_zip(package_entry: 'Package'):
166
167
168
                return zipfile.ZipFile(
                    package_entry.package_path, 'w',
                    compression=zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED)
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189

            def create_package_entry():
                package_id = utils.create_uuid()
                return Package(
                    package_id=package_id,
                    upload_path=upload_path,
                    upload_id=upload_id,
                    package_path=upload_directory.join_file('%s.zip' % package_id).os_path)

            def close_package(package_size: int, package_files: int):
                package_zip.close()
                package_entry.size = package_size
                package_entry.files = package_files
                package_entry.save()

                logger.debug('created package', package_id=package_entry.package_id, size=package_size)

            package_entry = create_package_entry()
            package_size = 0
            package_files = 0
            package_zip = open_package_zip(package_entry)
190
191
192
            for (fileobjects, size), path, open in cls.iterate_directory(upload_path):
                for fileobject in fileobjects:
                    filepath = path(fileobject)
193
194
195
196
197
198
199
200
201
                    basepath = os.path.basename(filepath)
                    if basepath.startswith('RESTRICTED'):
                        restricted = 36
                        try:
                            restricted = min(36, int(basepath[len('RESTRICTED_'):]))
                        except Exception:
                            pass

                    with package_zip.open(filepath, "w") as target:
202
                        with open(fileobject) as source:
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
                            buffer = source.read(1024 * 1024)
                            while len(buffer) > 0:
                                target.write(buffer)
                                buffer = source.read(1024 * 1024)
                    package_files += 1

                if size > max_package_size:
                    logger.warn(
                        'directory exceeds max package size',
                        package_id=package_entry.package_id, size=package_size)

                package_size += size
                if package_size > max_package_size:
                    close_package(package_size, package_files)
                    package_size, package_files = 0, 0
                    package_entry = create_package_entry()
                    package_zip = open_package_zip(package_entry)

            if package_files > 0:
                close_package(package_size, package_files)

            package_query = cls.objects(upload_id=upload_id)
            package_query.update(restricted=restricted)
            logger.debug(
                'packaged upload', source_upload_id=upload_id, source_upload_path=upload_path,
                restricted=restricted)

            return package_query
        else:
            return package_query
233

234
    @classmethod
235
236
237
    def iterate_directory(
            cls, upload_path: str) \
            -> Generator[Tuple[Directory, PathFunction, OpenFunction], None, None]:
238
        """
239
240
241
        Traverses the given upload path and provides directory and file information.

        Returns:
242
243
244
245
            A generator of tuples with the directory, a path function, and an open
            function. The directory contains an iterable of fileobjects and the directory
            size. The path function gives the pathname of fileobject. The open function
            create a contextmanager for a readable file-like for a fileobject.
246
        """
247
248
        potential_archive_path = os.path.join(upload_path, 'archive.tar.gz')
        if os.path.isfile(potential_archive_path):
249
            with tarfile.TarFile.open(potential_archive_path) as tar_file:
250
                @contextmanager
251
252
                def open_function(info):
                    file = tar_file.extractfile(info)
253
254
255
256
                    yield file
                    file.close()

                for dir in cls._iterate_upload_archive(tar_file):
257
                    yield dir, lambda o: o.name, open_function
258
        else:
259
260
            def open_function(filepath):
                return open(os.path.join(upload_path, filepath), 'rb')
261

262
            for dir in cls._iterate_upload_directory(upload_path):
263
                yield dir, lambda p: p, open_function
264

265
    @classmethod
266
    def _iterate_upload_directory(cls, upload_path) -> Generator[Directory, None, None]:
267
        """
268
269
        Interprets the given upload path as a directory. Files path are given as upload
        path relative paths.
270
        """
271
272
273
274
        stats = runstats.Statistics()
        for root, _, files in os.walk(upload_path):
            directory_filepaths: List[str] = []
            directory_size = 0
275

276
277
            if len(files) == 0:
                continue
Markus Scheidgen's avatar
Markus Scheidgen committed
278

279
280
281
282
283
284
285
286
287
288
289
290
291
            for file in files:
                filepath = os.path.join(root, file)
                filename = filepath[len(upload_path) + 1:]
                directory_filepaths.append(filename)
                # getting file stats is pretty expensive with gpfs
                # if an upload has more then 1000 files, its pretty likely that
                # size patterns repeat ... goood enough
                if len(stats) < use_stats_for_filestats_threshold:
                    filesize = os.path.getsize(filepath)
                    stats.push(filesize)
                else:
                    filesize = stats.mean()
                directory_size += filesize
Markus Scheidgen's avatar
Markus Scheidgen committed
292

293
            yield directory_filepaths, directory_size
Markus Scheidgen's avatar
Markus Scheidgen committed
294

295
    @classmethod
296
    def _iterate_upload_archive(cls, tar_file: tarfile.TarFile) -> Generator[Directory, None, None]:
297
298
299
300
        """
        Interprets the given upload path as an archive. File paths are as in the archive.
        """
        current_directory = None
301
        directory_infos: List[tarfile.TarInfo] = []
302
303
        directory_size = 0

304
305
        f = tar_file.next()
        while f is not None:
306
307
            if f.isfile():
                directory = os.path.dirname(f.name)
308
309
310
                if current_directory != directory and len(directory_infos) > 0:
                    yield directory_infos, directory_size
                    directory_infos = []
Markus Scheidgen's avatar
Markus Scheidgen committed
311
                    directory_size = 0
312
                    current_directory = directory
Markus Scheidgen's avatar
Markus Scheidgen committed
313

314
                directory_infos.append(f)
315
                directory_size += f.size
Markus Scheidgen's avatar
Markus Scheidgen committed
316

317
            f = tar_file.next()
Markus Scheidgen's avatar
Markus Scheidgen committed
318

319
        yield directory_infos, directory_size
Markus Scheidgen's avatar
Markus Scheidgen committed
320

321

322
class SourceCalc(Document):
323
    """
324
    Mongo document used as a calculation, upload, and metadata db and index
325
326
327
328
329
    build from a given source db. Each :class:`SourceCacl` entry relates
    a pid, mainfile, upload "id" with each other for a corressponding calculation.
    It might alos contain the user metadata. The uploads are "id"ed via the
    specific path segment that identifies an upload on the CoE repo FS(s) without
    any prefixes (e.g. $EXTRACTED, /data/upload, etc.)
330
331
332
333
334
335
    """
    pid = IntField(primary_key=True)
    mainfile = StringField()
    upload = StringField()
    metadata = DictField()

336
    migration_version = IntField(default=-1)
337

338
339
    extracted_prefix = '$EXTRACTED/'
    sites = ['/data/nomad/extracted/', '/nomad/repository/extracted/']
340
341
    prefixes = [extracted_prefix] + sites

342
    meta = dict(indexes=['upload', 'mainfile', 'migration_version'])
343

344
    _dataset_cache: dict = {}
345
346

    @staticmethod
347
348
349
350
    def index(source, drop: bool = False, with_metadata: bool = True, per_query: int = 100) \
            -> Generator[Tuple['SourceCalc', int], None, None]:
        """
        Creates a collection of :class:`SourceCalc` documents that represent source repo
351
        db entries.
352
353
354

        Arguments:
            source: The source db sql alchemy session
355
356
357
358
359
360
361
            drop: True to drop and create a new collection, update the existing otherwise,
                default is False.
            with_metadata: True to also grab all metadata and store it, default is True.
            per_query: The implementation tries to grab almost all data with a heavely joined
                query on the CoE snoflake/star shaped schema.
                The query cannot ask for the whole db at once: choose how many calculations
                should be read at a time to optimize for your application.
362
363

        Returns:
364
            yields tuples (:class:`SourceCalc`, #calcs_total[incl. datasets])
365
        """
366
        logger = utils.get_logger(__name__)
367
368
369
        if drop:
            SourceCalc.drop_collection()

370
        last_source_calc = SourceCalc.objects().order_by('-pid').first()
371
372
        start_pid = last_source_calc.pid if last_source_calc is not None else 0
        source_query = source.query(Calc)
373
        total = source_query.count() - SourceCalc.objects.count()
374
375

        while True:
376
377
            query_timer = utils.timer(logger, 'query source db')
            query_timer.__enter__()  # pylint: disable=E1101
Markus Scheidgen's avatar
Markus Scheidgen committed
378
            calcs: Iterable[Calc] = source_query \
379
380
381
                .filter(Calc.coe_calc_id > start_pid) \
                .order_by(Calc.coe_calc_id) \
                .limit(per_query)
Markus Scheidgen's avatar
Markus Scheidgen committed
382

383
384
            source_calcs = []
            for calc in calcs:
385
                query_timer.__exit__(None, None, None)  # pylint: disable=E1101
386
                try:
Markus Scheidgen's avatar
Markus Scheidgen committed
387
388
                    filenames = calc.files
                    if filenames is None or len(filenames) == 0:
389
390
391
                        continue  # dataset case

                    filename = filenames[0]
392
393
394
                    if len(filenames) == 1 and (filename.endswith('.tgz') or filename.endswith('.zip')):
                        continue  # also a dataset, some datasets have a downloadable archive

395
396
397
398
399
400
401
402
                    for prefix in SourceCalc.prefixes:
                        filename = filename.replace(prefix, '')
                    segments = [file.strip('\\') for file in filename.split('/')]

                    source_calc = SourceCalc(pid=calc.pid)
                    source_calc.upload = segments[0]
                    source_calc.mainfile = os.path.join(*segments[1:])
                    if with_metadata:
Markus Scheidgen's avatar
Markus Scheidgen committed
403
                        source_calc.metadata = calc.to_calc_with_metadata().__dict__
404
405
406
407
408
409
                    source_calcs.append(source_calc)
                    start_pid = source_calc.pid

                    yield source_calc, total
                except Exception as e:
                    logger.error('could not index', pid=calc.pid, exc_info=e)
410

411
412
413
            if len(source_calcs) == 0:
                break
            else:
414
415
                with utils.timer(logger, 'write index'):
                    SourceCalc.objects.insert(source_calcs)
416
417


418
419
420
421
422
NO_PROCESSED_CALCS = 0
FAILED_PROCESSING = 1
FAILED_PUBLISH = 2


423
424
class NomadCOEMigration:
    """
425
    Drives a migration from the NOMAD coe repository db to nomad@FAIRDI.
426
427

    Arguments:
428
429
        migration_version: The migration version. Only packages/calculations with
            no migration version or a lower migration version are migrated.
430
        package_directory: The directory that packages are/get stored in.
431
        compress_packages: True to use compression on package creation.
Markus Scheidgen's avatar
Markus Scheidgen committed
432
433
        threads: Number of threads to run migration in parallel.
        quiet: Prints stats if not quiet
434
    """
435

436
437
438
439
440
441
442
443
444
445
446
    default_sites = [
        '/nomad/repository/data/uploads',
        '/nomad/repository/data/extracted',
        '/data/nomad/uploaded/',
        '/data/nomad/extracted/']

    default_pid_prefix = int(1e7)

    archive_filename = 'archive.tar.gz'
    """ The standard name for tarred uploads in the CoE repository. """

447
448
449
450
    def __init__(
            self,
            migration_version: int = 0,
            package_directory: str = None,
451
            compress_packages: bool = False,
452
            threads: int = 1, quiet: bool = False) -> None:
453
        self.logger = utils.get_logger(__name__, migration_version=migration_version)
454

455
        self.migration_version = migration_version
456
        self.package_directory = package_directory if package_directory is not None else config.fs.migration_packages
457
        self.compress_packages = compress_packages
458
        self._client = None
Markus Scheidgen's avatar
Markus Scheidgen committed
459
460
        self._threads = threads
        self._quiet = quiet
461

462
463
464
465
466
467
468
469
470
        self.source = infrastructure.repository_db

    @property
    def client(self):
        if self._client is None:
            from nomad.client import create_client
            self._client = create_client()

        return self._client
471

472
473
474
475
    def report(self):
        """ returns an aggregated report over all prior migrated packages """
        return Package.aggregate_reports(migration_version=self.migration_version)

476
477
    def copy_users(self):
        """ Copy all users. """
478
        for source_user in self.source.query(User).all():
479
480
481
482
483
484
485
486
487
488
            if source_user.user_id <= 2:
                # skip first two users to keep example users
                # they probably are either already the example users, or [root, Evgeny]
                continue

            create_user_payload = dict(
                user_id=source_user.user_id,
                email=source_user.email,
                first_name=source_user.first_name,
                last_name=source_user.last_name,
489
490
                password=source_user.password,
                created=source_user.created
491
492
493
494
495
496
497
498
499
500
501
502
503
504
            )

            try:
                create_user_payload.update(token=source_user.token)
            except LoginException:
                pass

            if source_user.affiliation is not None:
                create_user_payload.update(affiliation=dict(
                    name=source_user.affiliation.name,
                    address=source_user.affiliation.address))

            try:
                self.client.auth.create_user(payload=create_user_payload).response()
505
                self.logger.info('copied user', user_id=source_user.user_id)
506
507
            except HTTPBadRequest as e:
                self.logger.error('could not create user due to bad data', exc_info=e, user_id=source_user.user_id)
508

Markus Scheidgen's avatar
Markus Scheidgen committed
509
510
511
512
513
514
515
    expected_differences = {
        '0d': 'molecule / cluster',
        '3d': 'bulk',
        '2d': '2d / surface',
        '+u': 'gga'
    }

516
    def validate(self, repo_calc: dict, source_calc: CalcWithMetadata, logger) -> bool:
517
518
519
520
521
522
523
        """
        Validates the given processed calculation, assuming that the data in the given
        source_calc is correct.

        Returns:
            False, if the calculation differs from the source calc.
        """
524
525
526
        keys_to_validate = [
            'atoms', 'basis_set', 'xc_functional', 'system', 'crystal_system',
            'spacegroup', 'code_name', 'code_version']
527

528
529
530
531
532
533
534
535
536
        def to_comparable_list(list):
            for item in list:
                if isinstance(item, dict):
                    for key in item.keys():
                        if key.endswith('id'):
                            yield item.get(key)
                else:
                    yield item

537
        is_valid = True
Markus Scheidgen's avatar
Markus Scheidgen committed
538
        for key, target_value in repo_calc.items():
539
            if key not in keys_to_validate:
540
541
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
542
            source_value = getattr(source_calc, key, None)
543

544
545
            def check_mismatch() -> bool:
                # some exceptions
Markus Scheidgen's avatar
Markus Scheidgen committed
546
547
                if source_value in NomadCOEMigration.expected_differences and \
                        target_value == NomadCOEMigration.expected_differences.get(source_value):
548
549
                    return True

550
551
                logger.info(
                    'source target missmatch', quantity=key,
Markus Scheidgen's avatar
Markus Scheidgen committed
552
553
                    source_value=source_value, target_value=target_value,
                    value_diff='%s->%s' % (str(source_value), str(target_value)))
554
                return False
555

556
            if source_value is None and target_value is not None:
557
558
                continue

559
560
561
            if target_value is None and source_value is not None:
                is_valid &= check_mismatch()

562
            if isinstance(target_value, list):
563
564
                source_list = list(to_comparable_list(source_value))
                target_list = list(to_comparable_list(target_value))
Markus Scheidgen's avatar
Markus Scheidgen committed
565
                if len(set(source_list).intersection(target_list)) != len(target_list):
566
                    is_valid &= check_mismatch()
567
568
569
570
571
572
573
                continue

            if isinstance(source_value, str):
                source_value = source_value.lower()
                target_value = str(target_value).lower()

            if source_value != target_value:
574
                is_valid &= check_mismatch()
575
576
577

        return is_valid

578
    def surrogate_metadata(self, source: CalcWithMetadata):
579
580
581
582
583
584
585
586
587
588
589
590
591
592
        """
        Compute metadata from the given metadata that can be used for new calcs of the
        same upload.
        """
        return CalcWithMetadata(
            uploader=source.uploader,
            with_embargo=source.with_embargo,
            upload_time=source.upload_time,
            coauthors=source.coauthors,
            shared_with=source.shared_with,
            comment=source.comment,
            references=source.references,
            datasets=source.datasets)

593
594
595
596
597
598
599
600
    def set_pid_prefix(self, prefix: int = default_pid_prefix):
        """
        Sets the repo db pid counter to the given values. Allows to create new calcs
        without interfering with migration calcs with already existing PIDs.
        """
        self.logger.info('set pid prefix', pid_prefix=prefix)
        self.client.admin.exec_pidprefix_command(payload=dict(prefix=prefix)).response()

601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
    def call_api(self, operation: str, *args, **kwargs) -> Any:
        """
        Calls nomad via the bravado client. It deals with a very busy nomad and catches,
        backsoff, and retries on gateway timouts. It also circumvents bravados/jsonschemas
        thread safety issues using a global lock on client usage.

        Arguments:
            operation: Comma separated string of api, endpoint, operation,
                e.g. 'uploads.get_upload'.
        """
        op_path = operation.split('.')
        op = self.client
        for op_path_segment in op_path:
            op = getattr(op, op_path_segment)

        sleep = utils.SleepTimeBackoff()
        while True:
            try:
                NomadCOEMigration._client_lock.acquire(blocking=True)
                return op(*args, **kwargs).response().result
            except HTTPGatewayTimeout:
                sleep()
            except Exception as e:
                raise e
            finally:
                NomadCOEMigration._client_lock.release()

    def migrate(self, *args, delete_failed: str = '') -> utils.POPO:
629
630
631
        """
        Migrate the given uploads.

632
633
634
635
636
        It takes paths to extracted uploads as arguments.

        Requires :class:`Package` instances for the given upload paths. Those will
        be created, if they do not already exists. The packages determine the uploads
        for the target infrastructure.
637
638
639
640
641

        Requires a build :func:`index` to look for existing data in the source db. This
        will be used to add user (and other, PID, ...) metadata and validate calculations.

        Uses PIDs of identified old calculations. Will create new PIDs for previously
642
        unknown uploads. See :func:`set_pid_prefix` on how to avoid conflicts.
643

644
        Arguments:
645
            upload_path: A filepath to the upload directory.
646
647
648
            delete_failed: String from ``N``, ``U``, ``P`` to determine that uploads with
                no processed calcs (N), failed upload processing (U), or failed publish
                operation (P) should be deleted after the migration attempt.
649

Markus Scheidgen's avatar
Markus Scheidgen committed
650
        Returns: Dictionary with statistics on the migration.
651
        """
652

Markus Scheidgen's avatar
Markus Scheidgen committed
653
654
655
656
        cv = threading.Condition()
        overall_report = Report()
        threads = []

657
        def print_report():
Markus Scheidgen's avatar
Markus Scheidgen committed
658
            if not self._quiet:
659
                print(overall_report)
660

661
        def migrate_package(package: Package):
662
663
664
            logger = self.logger.bind(
                package_id=package.package_id, source_upload_id=package.upload_id)

665
666
667
668
            if package.migration_version is not None and package.migration_version >= self.migration_version:
                self.logger.info(
                    'package already migrated, skip it',
                    package_id=package.package_id, source_upload_id=package.upload_id)
669

670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
                package_report = package.report
                overall_report.skipped_packages += 1
            else:
                try:
                    package_report = self.migrate_package(package, delete_failed=delete_failed)

                except Exception as e:
                    package_report = Report()
                    package_report.failed_packages = 1
                    logger.error(
                        'unexpected exception while migrating packages', exc_info=e)
                finally:
                    package.report = package_report
                    package.migration_version = self.migration_version
                    package.save()
Markus Scheidgen's avatar
Markus Scheidgen committed
685
686
687
688
689

            with cv:
                try:
                    overall_report.add(package_report)

690
691
692
                    migrated_all_packages = all(
                        p.migration_version == self.migration_version
                        for p in Package.objects(upload_id=package.upload_id))
Markus Scheidgen's avatar
Markus Scheidgen committed
693

694
                    if migrated_all_packages:
Markus Scheidgen's avatar
Markus Scheidgen committed
695
696
697
698
699
700
701
                        missing_calcs = SourceCalc.objects(
                            upload=package.upload_id, migration_version__ne=self.migration_version).count()
                        total_source_calcs = SourceCalc.objects(upload=package.upload_id).count()

                        overall_report.missing_calcs += missing_calcs
                        overall_report.total_source_calcs += total_source_calcs

702
                        logger.info('migrated upload')
Markus Scheidgen's avatar
Markus Scheidgen committed
703

704
                    print_report()
Markus Scheidgen's avatar
Markus Scheidgen committed
705
706
707
708
709
710
711
                except Exception as e:
                    logger.error('unexpected exception while migrating packages', exc_info=e)

                self._threads += 1
                cv.notify()

        for arg in args:
712
713
714
            for package in Package.create_packages(
                    arg, self.package_directory, compress=self.compress_packages):

Markus Scheidgen's avatar
Markus Scheidgen committed
715
716
717
                with cv:
                    cv.wait_for(lambda: self._threads > 0)
                    self._threads -= 1
718
                    thread = threading.Thread(target=lambda: migrate_package(package))
Markus Scheidgen's avatar
Markus Scheidgen committed
719
720
                    threads.append(thread)
                    thread.start()
721

Markus Scheidgen's avatar
Markus Scheidgen committed
722
723
        for thread in threads:
            thread.join()
724

Markus Scheidgen's avatar
Markus Scheidgen committed
725
        return overall_report
726

Markus Scheidgen's avatar
Markus Scheidgen committed
727
728
    _client_lock = threading.Lock()

729
    def migrate_package(self, package: Package, delete_failed: str = '') -> 'Report':
730
731
        """ Migrates the given package. For other params see :func:`migrate`. """

732
733
734
735
736
737
        source_upload_id = package.upload_id
        package_id = package.package_id

        logger = self.logger.bind(package_id=package_id, source_upload_id=source_upload_id)
        logger.debug('start to process package')

738
739
        report = Report()
        report.total_packages += 1
740
741
742
743

        # upload and process the upload file
        with utils.timer(logger, 'upload completed'):
            try:
744
745
                upload = self.call_api(
                    'uploads.upload', name=package_id, local_path=package.package_path)
746
747
            except Exception as e:
                self.logger.error('could not upload package', exc_info=e)
748
749
                report.failed_packages += 1
                return report
750
751
752
753

        logger = logger.bind(
            source_upload_id=source_upload_id, upload_id=upload.upload_id)

754
755
756
757
758
759
760
761
762
        def delete_upload(reason: int):
            delete = \
                (reason == NO_PROCESSED_CALCS and 'N' in delete_failed) or \
                (reason == FAILED_PROCESSING and 'U' in delete_failed) or \
                (reason == FAILED_PUBLISH and 'P' in delete_failed)

            upload_to_delete = upload

            if delete:
763
                upload_to_delete = self.call_api(
764
765
                    'uploads.delete_upload', upload_id=upload_to_delete.upload_id)

766
                sleep = utils.SleepTimeBackoff()
767
                while upload_to_delete.process_running:
768
                    try:
769
                        upload_to_delete = self.call_api(
770
                            'uploads.get_upload', upload_id=upload_to_delete.upload_id)
771
772
773
774
775
                        sleep()
                    except HTTPNotFound:
                        # the proc upload will be deleted by the delete operation
                        break
                logger.info('deleted upload after migration failure')
776
777
778
779
            else:
                logger.warning(
                    'will keep upload after migration failure for debugging',
                    reason=reason, delete_failed=delete_failed)
780

781
782
783
784
        # grab source calcs, while waiting for upload
        source_calcs = dict()
        surrogate_source_calc_with_metadata = None
        with utils.timer(logger, 'loaded source metadata'):
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
            with zipfile.ZipFile(package.package_path) as zf:
                for filenames_chunk in utils.chunks(zf.namelist(), 1000):
                    for source_calc in SourceCalc.objects(
                            upload=source_upload_id, mainfile__in=filenames_chunk):

                        source_calc_with_metadata = CalcWithMetadata(**source_calc.metadata)
                        source_calc_with_metadata.pid = source_calc.pid
                        source_calc_with_metadata.mainfile = source_calc.mainfile
                        source_calcs[source_calc.mainfile] = (source_calc, source_calc_with_metadata)

                        # establish a surrogate for new calcs
                        if surrogate_source_calc_with_metadata is None:
                            surrogate_source_calc_with_metadata = \
                                self.surrogate_metadata(source_calc_with_metadata)

800
801
802
803
804
805
        # try to find a surrogate outside the package, if necessary
        if surrogate_source_calc_with_metadata is None:
            source_calc = SourceCalc.objects(upload=source_upload_id).first()
            if source_calc is not None:
                source_calc_with_metadata = CalcWithMetadata(**source_calc.metadata)
                surrogate_source_calc_with_metadata = \
806
                    self.surrogate_metadata(source_calc_with_metadata)
807

808
809
810
811
        # wait for complete upload
        with utils.timer(logger, 'upload processing completed'):
            sleep = utils.SleepTimeBackoff()
            while upload.tasks_running:
812
                upload = self.call_api('uploads.get_upload', upload_id=upload.upload_id)
813
814
815
816
                sleep()

        if upload.tasks_status == FAILURE:
            logger.error('failed to process upload', process_errors=upload.errors)
817
            report.failed_packages += 1
818
            delete_upload(FAILED_PROCESSING)
819
            return report
820
821
822
823
824
825
826
827
        else:
            report.total_calcs += upload.calcs.pagination.total

        calc_mainfiles = []
        upload_total_calcs = upload.calcs.pagination.total

        # check for processing errors
        with utils.timer(logger, 'checked upload processing'):
828
            per_page = 500
829
            for page in range(1, math.ceil(upload_total_calcs / per_page) + 1):
830
                upload = self.call_api(
831
832
                    'uploads.get_upload', upload_id=upload.upload_id, per_page=per_page,
                    page=page, order_by='mainfile')
833
834
835
836
837
838
839
840

                for calc_proc in upload.calcs.results:
                    calc_logger = logger.bind(
                        calc_id=calc_proc.calc_id,
                        mainfile=calc_proc.mainfile)

                    if calc_proc.tasks_status == SUCCESS:
                        calc_mainfiles.append(calc_proc.mainfile)
841
                    else:
842
                        report.failed_calcs += 1
843
                        calc_logger.info(
844
845
                            'could not process a calc', process_errors=calc_proc.errors)
                        continue
846

847
848
        # verify upload against source
        calcs_in_search = 0
849
        with utils.timer(logger, 'verified upload against source calcs'):
850
851
852
853
854
855
            scroll_id = 'first'
            while scroll_id is not None:
                scroll_args: Dict[str, Any] = dict(scroll=True)
                if scroll_id != 'first':
                    scroll_args['scroll_id'] = scroll_id

856
                search = self.call_api('repo.search', upload_id=upload.upload_id, **scroll_args)
857
858

                scroll_id = search.scroll_id
859

860
861
862
863
864
865
866
867
868
                for calc in search.results:
                    calcs_in_search += 1
                    source_calc, source_calc_with_metadata = source_calcs.get(
                        calc['mainfile'], (None, None))

                    if source_calc is not None:
                        report.migrated_calcs += 1

                        calc_logger = logger.bind(calc_id=calc['calc_id'], mainfile=calc['mainfile'])
869
                        try:
870
                            if not self.validate(calc, source_calc_with_metadata, calc_logger):
871
872
                                report.calcs_with_diffs += 1
                        except Exception as e:
Markus Scheidgen's avatar
Markus Scheidgen committed
873
                            calc_logger.warning('unexpected exception during validation', exc_info=e)
874
875
876
877
                            report.calcs_with_diffs += 1
                    else:
                        calc_logger.info('processed a calc that has no source')
                        report.new_calcs += 1
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
                        # guessing the metadata from other calcs in upload/package
                        if surrogate_source_calc_with_metadata is not None:
                            new_calc_with_metadata = CalcWithMetadata(**surrogate_source_calc_with_metadata.to_dict())
                            new_calc_with_metadata.mainfile = calc['mainfile']
                        else:
                            calc_logger.warning('could not determine any metadata for new calc')
                            create_time_epoch = os.path.getctime(package.upload_path)
                            new_calc_with_metadata = CalcWithMetadata(
                                upload_time=datetime.datetime.fromtimestamp(create_time_epoch),
                                with_embargo=package.restricted > 0,
                                comment=default_comment,
                                uploader=default_uploader,
                                mainfile=calc['mainfile'])
                            surrogate_source_calc_with_metadata = new_calc_with_metadata

                        source_calcs[calc['mainfile']] = (None, new_calc_with_metadata)
894

895
896
            if len(calc_mainfiles) != calcs_in_search:
                logger.error('missmatch between processed calcs and calcs found with search')
Markus Scheidgen's avatar
Markus Scheidgen committed
897

898
899
900
901
902
903
904
        # publish upload
        if len(calc_mainfiles) > 0:
            with utils.timer(logger, 'upload published'):
                upload_metadata = dict(with_embargo=(package.restricted > 0))
                upload_metadata['calculations'] = [
                    self._to_api_metadata(source_calc_with_metadata)
                    for _, source_calc_with_metadata in source_calcs.values()]
905

906
                upload = self.call_api(
907
908
                    'uploads.exec_upload_operation', upload_id=upload.upload_id,
                    payload=dict(operation='publish', metadata=upload_metadata))
909

910
911
912
                sleep = utils.SleepTimeBackoff()
                while upload.process_running:
                    try:
913
                        upload = self.call_api('uploads.get_upload', upload_id=upload.upload_id)
914
915
916
917
918
919
920
921
922
923
924
                        sleep()
                    except HTTPNotFound:
                        # the proc upload will be deleted by the publish operation
                        break

                if upload.tasks_status == FAILURE:
                    logger.error('could not publish upload', process_errors=upload.errors)
                    report.failed_calcs = report.total_calcs
                    report.migrated_calcs = 0
                    report.calcs_with_diffs = 0
                    report.new_calcs = 0
925
                    report.failed_packages += 1
926
927

                    delete_upload(FAILED_PUBLISH)
928
929
                    SourceCalc.objects(upload=source_upload_id, mainfile__in=calc_mainfiles) \
                        .update(migration_version=-1)
930
931
932
933
                else:
                    SourceCalc.objects(upload=source_upload_id, mainfile__in=calc_mainfiles) \
                        .update(migration_version=self.migration_version)
        else:
934
            delete_upload(NO_PROCESSED_CALCS)
935
            logger.info('no successful calcs, skip publish')
936

937
938
        logger.info('migrated package', **report)
        return report
939
940

    def _to_api_metadata(self, calc_with_metadata: CalcWithMetadata) -> dict:
Markus Scheidgen's avatar
Markus Scheidgen committed
941
        """ Transforms to a dict that fullfils the API's uploade metadata model. """
942

Markus Scheidgen's avatar
Markus Scheidgen committed
943
        return dict(
944
945
946
947
            _upload_time=calc_with_metadata.upload_time,
            _uploader=calc_with_metadata.uploader['id'],
            _pid=calc_with_metadata.pid,
            references=[ref['value'] for ref in calc_with_metadata.references],
Markus Scheidgen's avatar
Markus Scheidgen committed
948
949
950
            datasets=[dict(
                id=ds['id'],
                _doi=ds.get('doi', {'value': None})['value'],
951
952
953
954
955
956
                _name=ds.get('name', None)) for ds in calc_with_metadata.datasets],
            mainfile=calc_with_metadata.mainfile,
            with_embargo=calc_with_metadata.with_embargo,
            comment=calc_with_metadata.comment,
            coauthors=list(int(user['id']) for user in calc_with_metadata.coauthors),
            shared_with=list(int(user['id']) for user in calc_with_metadata.shared_with)
Markus Scheidgen's avatar
Markus Scheidgen committed
957
958
        )

959
    def source_calc_index(self, *args, **kwargs):
960
        """ see :func:`SourceCalc.index` """
961
        return SourceCalc.index(self.source, *args, **kwargs)
962

963
964
965
966
967
968
969
970
971
972
    def package_index(self, *upload_paths) -> None:
        """
        Creates Package objects and respective package zip files for the given uploads.
        The given uploads are supposed to be path to the extracted upload directories.
        If the upload is already in the index, it is not recreated.
        """
        logger = utils.get_logger(__name__)

        for upload_path in upload_paths:
            try:
973
974
975
976
                for package_entry in Package.create_packages(
                        upload_path, self.package_directory,
                        compress=self.compress_packages):

977
978
979
980
981
982
983
984
985
986
                    logger.info(
                        'package in index',
                        source_upload_path=upload_path,
                        source_upload_id=package_entry.upload_id,
                        package_id=package_entry.package_id)
            except Exception as e:
                logger.error(
                    'could create package from upload',
                    upload_path=upload_path, exc_info=e)
                continue
987
988
989


class Report(utils.POPO):
Markus Scheidgen's avatar
Markus Scheidgen committed
990
    def __init__(self, *args, **kwargs):
991
992
        self.total_packages = 0
        self.failed_packages = 0
Markus Scheidgen's avatar
Markus Scheidgen committed
993
        self.skipped_packages = 0
994
995
996
997
998
999
1000
        self.total_calcs = 0  # the calcs that have been found by the target
        self.total_source_calcs = 0  # the calcs in the source index
        self.failed_calcs = 0  # the calcs found b the target that could not be processed/published
        self.migrated_calcs = 0   # the calcs from the source, successfully added to the target
        self.calcs_with_diffs = 0  # the calcs from the source, successfully added to the target with different metadata
        self.new_calcs = 0  # the calcs successfully added to the target that were not found in the source
        self.missing_calcs = 0  # the calcs in the source, that could not be added to the target due to failure or not founding the calc
For faster browsing, not all history is shown. View entire blame