migration.py 41.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module contains functions to read data from NOMAD coe, external sources,
other/older nomad@FAIRDI instances to mass upload it to a new nomad@FAIRDI instance.
18
19
20

.. autoclass:: NomadCOEMigration
.. autoclass:: SourceCalc
21
22
"""

23
from typing import Generator, Tuple, List, Iterable, Any, Dict
Markus Scheidgen's avatar
Markus Scheidgen committed
24
import os
25
import os.path
26
import zipfile
27
import tarfile
28
import math
29
from mongoengine import Document, IntField, StringField, DictField
30
import datetime
31
from bravado.exception import HTTPNotFound, HTTPBadRequest, HTTPGatewayTimeout
32
import os
33
import runstats
Markus Scheidgen's avatar
Markus Scheidgen committed
34
import io
Markus Scheidgen's avatar
Markus Scheidgen committed
35
import threading
36
from contextlib import contextmanager
37
import shutil
38

39
from nomad import utils, infrastructure, files, config
40
from nomad.coe_repo import User, Calc, LoginException
41
42
from nomad.datamodel import CalcWithMetadata
from nomad.processing import FAILURE, SUCCESS
43
44


45
default_pid_prefix = 7000000
46
47
""" The default pid prefix for new non migrated calculations """

Markus Scheidgen's avatar
Markus Scheidgen committed
48
max_package_size = 32 * 1024 * 1024 * 1024  # 16 GB
49
""" The maximum size of a package that will be used as an upload on nomad@FAIRDI """
50
use_stats_for_filestats_threshold = 1024
51

52
53
54
default_comment = 'entry with unknown provernance'
default_uploader = dict(id=1)

55

Markus Scheidgen's avatar
Markus Scheidgen committed
56
57
58
59
60
61
62
63
64
65
66
67
def iterable_to_stream(iterable, buffer_size=io.DEFAULT_BUFFER_SIZE):
    """
    Lets you use an iterable (e.g. a generator) that yields bytestrings as a read-only
    input stream.

    The stream implements Python 3's newer I/O API (available in Python 2's io module).
    For efficiency, the stream is buffered.
    """
    class IterStream(io.RawIOBase):
        def __init__(self):
            self.leftover = None
            self.iterator = iter(iterable)
68

Markus Scheidgen's avatar
Markus Scheidgen committed
69
70
        def readable(self):
            return True
71

Markus Scheidgen's avatar
Markus Scheidgen committed
72
        def readinto(self, b):
73
            requested_len = len(b)  # We're supposed to return at most this much
Markus Scheidgen's avatar
Markus Scheidgen committed
74
75
76
            while True:
                try:
                    chunk = next(self.iterator)
77
                except StopIteration:
Markus Scheidgen's avatar
Markus Scheidgen committed
78
79
80
                    if len(self.leftover) == 0:
                        return 0  # indicate EOF
                    chunk = self.leftover
81
                output, self.leftover = chunk[:requested_len], chunk[requested_len:]
Markus Scheidgen's avatar
Markus Scheidgen committed
82
83
84
85
86
                len_output = len(output)
                if len_output == 0:
                    continue  # do not prematurely indicate EOF
                b[:len_output] = output
                return len_output
87

Markus Scheidgen's avatar
Markus Scheidgen committed
88
89
90
    return io.BufferedReader(IterStream(), buffer_size=buffer_size)


91
Directory = Tuple[List[str], str, int]
92
93


94
95
96
97
98
99
100
class Package(Document):
    """
    A Package represents split origin NOMAD CoE uploads. We use packages as uploads
    in nomad@FAIRDI. Some of the uploads in nomad are very big (alfow lib) and need
    to be split down to yield practical (i.e. for mirrors) upload sizes. Therefore,
    uploads are split over multiple packages if one upload gets to large. A package
    always contains full directories of files to preserve *mainfile* *aux* file relations.
101
    Package have a package entry in mongo and a .zip file with the raw data.
102
103
104
    """

    package_id = StringField(primary_key=True)
105
106
107
108
    """ A random UUID for the package. Could serve later its target upload id."""
    package_path = StringField(required=True)
    """ The path to the package .zip file. """
    upload_path = StringField(required=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
109
    """ The absolute path of the source upload """
110
    upload_id = StringField(required=True)
111
    """ The source upload_id. There might be multiple packages per upload (this is the point). """
112
113
    restricted = IntField(default=0)
    """ The restricted in month, 0 for unrestricted. """
114
    size = IntField()
115
116
117
    """ The sum of all file sizes. """
    files = IntField()
    """ The number of files. """
118
119
    packages = IntField(default=-1)
    """ The number of packages in the same upload. """
120

121
122
    migration_version = IntField(default=-1)
    """ The version of the last successful migration of this package """
Markus Scheidgen's avatar
Markus Scheidgen committed
123
    report = DictField()
124
    """ The report of the last successful migration of this package """
125
126

    meta = dict(indexes=['upload_id', 'migration_version'])
127

128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
    @classmethod
    def aggregate_reports(cls, migration_version: str = None) -> 'Report':
        """
        Returns an aggregated report over all package reports of the given migration version,
        or all packages.
        """
        if migration_version is not None:
            query = cls.objects(migration_version__gte=migration_version, report__exists=True)
        else:
            query = cls.objects(report__exists=True)

        report = Report()
        for package in query:
            report.add(Report(package.report))

        return report

145
    @classmethod
146
147
148
    def create_packages(
            cls, upload_path: str, target_dir: str,
            compress: bool = False) -> Iterable['Package']:
149
150
151
152
153
154
155
        """
        Will create packages for the given upload_path. Creates the package zip files and
        package index entries. Either will only be created if it does not already exist.
        Yields the Package objects.
        """
        upload_id = os.path.basename(upload_path)
        logger = utils.get_logger(__name__, source_upload_path=upload_path, source_upload_id=upload_id)
156
157
158
159
160

        if not os.path.isdir(upload_path):
            logger.error('upload path is not a directory')
            return []

161
162
        upload_directory = files.DirectoryObject(target_dir, upload_id, create=True, prefix=True)
        restricted = 0
163

164
165
166
167
168
169
170
171
        # The packages number is written after all packages of an upload have been created.
        # this should allow to abort mid upload packaging and continue later by removing
        # all started packages first.
        complete = cls.objects(upload_id=upload_id, packages__ne=-1).count() != 0

        if not complete:
            cls.objects(upload_id=upload_id).delete()

172
            def open_package_zip(package_entry: 'Package'):
173
174
175
                return zipfile.ZipFile(
                    package_entry.package_path, 'w',
                    compression=zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED)
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196

            def create_package_entry():
                package_id = utils.create_uuid()
                return Package(
                    package_id=package_id,
                    upload_path=upload_path,
                    upload_id=upload_id,
                    package_path=upload_directory.join_file('%s.zip' % package_id).os_path)

            def close_package(package_size: int, package_files: int):
                package_zip.close()
                package_entry.size = package_size
                package_entry.files = package_files
                package_entry.save()

                logger.debug('created package', package_id=package_entry.package_id, size=package_size)

            package_entry = create_package_entry()
            package_size = 0
            package_files = 0
            package_zip = open_package_zip(package_entry)
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
            with cls.upload_iterator(upload_path) as directory:
                for filepaths, parent_diretory, size in directory:
                    for filepath in filepaths:
                        basepath = os.path.basename(filepath)
                        if basepath.startswith('RESTRICTED'):
                            restricted = 36
                            try:
                                restricted = min(36, int(basepath[len('RESTRICTED_'):]))
                            except Exception:
                                pass

                        package_zip.write(os.path.join(parent_diretory, filepath), filepath)
                        package_files += 1

                    if size > max_package_size:
                        logger.warn(
                            'directory exceeds max package size',
                            package_id=package_entry.package_id, size=package_size)

                    package_size += size
                    if package_size > max_package_size:
                        close_package(package_size, package_files)
                        package_size, package_files = 0, 0
                        package_entry = create_package_entry()
                        package_zip = open_package_zip(package_entry)

                if package_files > 0:
224
225
226
                    close_package(package_size, package_files)

            package_query = cls.objects(upload_id=upload_id)
227
            package_query.update(restricted=restricted, packages=package_query.count())
228
229
230
231
232
233
            logger.debug(
                'packaged upload', source_upload_id=upload_id, source_upload_path=upload_path,
                restricted=restricted)

            return package_query
        else:
234
            return cls.objects(upload_id=upload_id)
235

236
    @classmethod
237
238
    @contextmanager
    def upload_iterator(cls, upload_path: str) -> Generator[Generator[Directory, None, None], None, None]:
239
        """
240
241
242
        A contextmanager that opens the given upload and provides a generator for
        directories. Directories are tuple of an iterable of upload relative filepaths
        and the directory size.
243
        """
244
245
        potential_archive_path = os.path.join(upload_path, 'archive.tar.gz')
        if os.path.isfile(potential_archive_path):
246
247
            with cls.extracted_archive(potential_archive_path) as extracted_archive:
                yield cls.iterate_upload_directory(extracted_archive)
248
        else:
249
            yield cls.iterate_upload_directory(upload_path)
250

251
    @classmethod
252
    def iterate_upload_directory(cls, upload_path) -> Generator[Directory, None, None]:
253
        """
254
255
        Interprets the given upload path as a directory. Files path are given as upload
        path relative paths.
256
        """
257
258
259
260
        stats = runstats.Statistics()
        for root, _, files in os.walk(upload_path):
            directory_filepaths: List[str] = []
            directory_size = 0
261

262
263
            if len(files) == 0:
                continue
Markus Scheidgen's avatar
Markus Scheidgen committed
264

265
266
267
268
269
270
271
272
273
274
275
276
277
            for file in files:
                filepath = os.path.join(root, file)
                filename = filepath[len(upload_path) + 1:]
                directory_filepaths.append(filename)
                # getting file stats is pretty expensive with gpfs
                # if an upload has more then 1000 files, its pretty likely that
                # size patterns repeat ... goood enough
                if len(stats) < use_stats_for_filestats_threshold:
                    filesize = os.path.getsize(filepath)
                    stats.push(filesize)
                else:
                    filesize = stats.mean()
                directory_size += filesize
Markus Scheidgen's avatar
Markus Scheidgen committed
278

279
            yield directory_filepaths, upload_path, directory_size
Markus Scheidgen's avatar
Markus Scheidgen committed
280

281
    @classmethod
282
283
    @contextmanager
    def extracted_archive(cls, archive_path: str) -> Generator[str, None, None]:
284
        """
285
286
        Temporarily extracts the given archive and returns directory with the extracted
        data.
287
        """
288
289
290
291
292
293
294
295
296
        tmp_directory = os.path.join(config.fs.local_tmp, utils.create_uuid())
        os.mkdir(tmp_directory)

        with tarfile.TarFile.open(archive_path) as tar_file:
            tar_file.extractall(tmp_directory)

        yield tmp_directory

        shutil.rmtree(tmp_directory)
Markus Scheidgen's avatar
Markus Scheidgen committed
297

298

299
class SourceCalc(Document):
300
    """
301
    Mongo document used as a calculation, upload, and metadata db and index
302
303
304
305
306
    build from a given source db. Each :class:`SourceCacl` entry relates
    a pid, mainfile, upload "id" with each other for a corressponding calculation.
    It might alos contain the user metadata. The uploads are "id"ed via the
    specific path segment that identifies an upload on the CoE repo FS(s) without
    any prefixes (e.g. $EXTRACTED, /data/upload, etc.)
307
308
309
310
311
312
    """
    pid = IntField(primary_key=True)
    mainfile = StringField()
    upload = StringField()
    metadata = DictField()

313
    migration_version = IntField(default=-1)
314

315
316
    extracted_prefix = '$EXTRACTED/'
    sites = ['/data/nomad/extracted/', '/nomad/repository/extracted/']
317
318
    prefixes = [extracted_prefix] + sites

319
    meta = dict(indexes=['upload', 'mainfile', 'migration_version'])
320

321
    _dataset_cache: dict = {}
322
323

    @staticmethod
324
325
326
327
    def index(source, drop: bool = False, with_metadata: bool = True, per_query: int = 100) \
            -> Generator[Tuple['SourceCalc', int], None, None]:
        """
        Creates a collection of :class:`SourceCalc` documents that represent source repo
328
        db entries.
329
330
331

        Arguments:
            source: The source db sql alchemy session
332
333
334
335
336
337
338
            drop: True to drop and create a new collection, update the existing otherwise,
                default is False.
            with_metadata: True to also grab all metadata and store it, default is True.
            per_query: The implementation tries to grab almost all data with a heavely joined
                query on the CoE snoflake/star shaped schema.
                The query cannot ask for the whole db at once: choose how many calculations
                should be read at a time to optimize for your application.
339
340

        Returns:
341
            yields tuples (:class:`SourceCalc`, #calcs_total[incl. datasets])
342
        """
343
        logger = utils.get_logger(__name__)
344
345
346
        if drop:
            SourceCalc.drop_collection()

347
        last_source_calc = SourceCalc.objects().order_by('-pid').first()
348
349
        start_pid = last_source_calc.pid if last_source_calc is not None else 0
        source_query = source.query(Calc)
350
        total = source_query.count() - SourceCalc.objects.count()
351
352

        while True:
353
354
            query_timer = utils.timer(logger, 'query source db')
            query_timer.__enter__()  # pylint: disable=E1101
Markus Scheidgen's avatar
Markus Scheidgen committed
355
            calcs: Iterable[Calc] = source_query \
356
357
358
                .filter(Calc.coe_calc_id > start_pid) \
                .order_by(Calc.coe_calc_id) \
                .limit(per_query)
Markus Scheidgen's avatar
Markus Scheidgen committed
359

360
361
            source_calcs = []
            for calc in calcs:
362
                query_timer.__exit__(None, None, None)  # pylint: disable=E1101
363
                try:
Markus Scheidgen's avatar
Markus Scheidgen committed
364
365
                    filenames = calc.files
                    if filenames is None or len(filenames) == 0:
366
367
368
                        continue  # dataset case

                    filename = filenames[0]
369
370
371
                    if len(filenames) == 1 and (filename.endswith('.tgz') or filename.endswith('.zip')):
                        continue  # also a dataset, some datasets have a downloadable archive

372
373
374
375
376
377
378
379
                    for prefix in SourceCalc.prefixes:
                        filename = filename.replace(prefix, '')
                    segments = [file.strip('\\') for file in filename.split('/')]

                    source_calc = SourceCalc(pid=calc.pid)
                    source_calc.upload = segments[0]
                    source_calc.mainfile = os.path.join(*segments[1:])
                    if with_metadata:
Markus Scheidgen's avatar
Markus Scheidgen committed
380
                        source_calc.metadata = calc.to_calc_with_metadata().__dict__
381
382
383
384
385
386
                    source_calcs.append(source_calc)
                    start_pid = source_calc.pid

                    yield source_calc, total
                except Exception as e:
                    logger.error('could not index', pid=calc.pid, exc_info=e)
387

388
389
390
            if len(source_calcs) == 0:
                break
            else:
391
392
                with utils.timer(logger, 'write index'):
                    SourceCalc.objects.insert(source_calcs)
393
394


395
396
397
398
399
NO_PROCESSED_CALCS = 0
FAILED_PROCESSING = 1
FAILED_PUBLISH = 2


400
401
class NomadCOEMigration:
    """
402
    Drives a migration from the NOMAD coe repository db to nomad@FAIRDI.
403
404

    Arguments:
405
406
        migration_version: The migration version. Only packages/calculations with
            no migration version or a lower migration version are migrated.
407
        package_directory: The directory that packages are/get stored in.
408
        compress_packages: True to use compression on package creation.
Markus Scheidgen's avatar
Markus Scheidgen committed
409
410
        threads: Number of threads to run migration in parallel.
        quiet: Prints stats if not quiet
411
    """
412

413
414
415
416
417
418
419
420
421
422
423
    default_sites = [
        '/nomad/repository/data/uploads',
        '/nomad/repository/data/extracted',
        '/data/nomad/uploaded/',
        '/data/nomad/extracted/']

    default_pid_prefix = int(1e7)

    archive_filename = 'archive.tar.gz'
    """ The standard name for tarred uploads in the CoE repository. """

424
425
426
427
    def __init__(
            self,
            migration_version: int = 0,
            package_directory: str = None,
428
            compress_packages: bool = False,
429
            threads: int = 1, quiet: bool = False) -> None:
430
        self.logger = utils.get_logger(__name__, migration_version=migration_version)
431

432
        self.migration_version = migration_version
433
        self.package_directory = package_directory if package_directory is not None else config.fs.migration_packages
434
        self.compress_packages = compress_packages
435
        self._client = None
Markus Scheidgen's avatar
Markus Scheidgen committed
436
437
        self._threads = threads
        self._quiet = quiet
438

439
440
441
442
443
444
445
446
447
        self.source = infrastructure.repository_db

    @property
    def client(self):
        if self._client is None:
            from nomad.client import create_client
            self._client = create_client()

        return self._client
448

449
450
451
452
    def report(self):
        """ returns an aggregated report over all prior migrated packages """
        return Package.aggregate_reports(migration_version=self.migration_version)

453
454
    def copy_users(self):
        """ Copy all users. """
455
        for source_user in self.source.query(User).all():
456
457
458
459
460
461
462
463
464
465
            if source_user.user_id <= 2:
                # skip first two users to keep example users
                # they probably are either already the example users, or [root, Evgeny]
                continue

            create_user_payload = dict(
                user_id=source_user.user_id,
                email=source_user.email,
                first_name=source_user.first_name,
                last_name=source_user.last_name,
466
467
                password=source_user.password,
                created=source_user.created
468
469
470
471
472
473
474
475
476
477
478
479
480
481
            )

            try:
                create_user_payload.update(token=source_user.token)
            except LoginException:
                pass

            if source_user.affiliation is not None:
                create_user_payload.update(affiliation=dict(
                    name=source_user.affiliation.name,
                    address=source_user.affiliation.address))

            try:
                self.client.auth.create_user(payload=create_user_payload).response()
482
                self.logger.info('copied user', user_id=source_user.user_id)
483
484
            except HTTPBadRequest as e:
                self.logger.error('could not create user due to bad data', exc_info=e, user_id=source_user.user_id)
485

Markus Scheidgen's avatar
Markus Scheidgen committed
486
487
488
489
490
491
492
    expected_differences = {
        '0d': 'molecule / cluster',
        '3d': 'bulk',
        '2d': '2d / surface',
        '+u': 'gga'
    }

493
    def validate(self, repo_calc: dict, source_calc: CalcWithMetadata, logger) -> bool:
494
495
496
497
498
499
500
        """
        Validates the given processed calculation, assuming that the data in the given
        source_calc is correct.

        Returns:
            False, if the calculation differs from the source calc.
        """
501
502
503
        keys_to_validate = [
            'atoms', 'basis_set', 'xc_functional', 'system', 'crystal_system',
            'spacegroup', 'code_name', 'code_version']
504

505
506
507
508
509
510
511
512
513
        def to_comparable_list(list):
            for item in list:
                if isinstance(item, dict):
                    for key in item.keys():
                        if key.endswith('id'):
                            yield item.get(key)
                else:
                    yield item

514
        is_valid = True
Markus Scheidgen's avatar
Markus Scheidgen committed
515
        for key, target_value in repo_calc.items():
516
            if key not in keys_to_validate:
517
518
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
519
            source_value = getattr(source_calc, key, None)
520

521
522
            def check_mismatch() -> bool:
                # some exceptions
Markus Scheidgen's avatar
Markus Scheidgen committed
523
524
                if source_value in NomadCOEMigration.expected_differences and \
                        target_value == NomadCOEMigration.expected_differences.get(source_value):
525
526
                    return True

527
528
                logger.info(
                    'source target missmatch', quantity=key,
Markus Scheidgen's avatar
Markus Scheidgen committed
529
530
                    source_value=source_value, target_value=target_value,
                    value_diff='%s->%s' % (str(source_value), str(target_value)))
531
                return False
532

533
            if source_value is None and target_value is not None:
534
535
                continue

536
537
538
            if target_value is None and source_value is not None:
                is_valid &= check_mismatch()

539
            if isinstance(target_value, list):
540
541
                source_list = list(to_comparable_list(source_value))
                target_list = list(to_comparable_list(target_value))
Markus Scheidgen's avatar
Markus Scheidgen committed
542
                if len(set(source_list).intersection(target_list)) != len(target_list):
543
                    is_valid &= check_mismatch()
544
545
546
547
548
549
550
                continue

            if isinstance(source_value, str):
                source_value = source_value.lower()
                target_value = str(target_value).lower()

            if source_value != target_value:
551
                is_valid &= check_mismatch()
552
553
554

        return is_valid

555
    def surrogate_metadata(self, source: CalcWithMetadata):
556
557
558
559
560
561
562
563
564
565
566
567
568
569
        """
        Compute metadata from the given metadata that can be used for new calcs of the
        same upload.
        """
        return CalcWithMetadata(
            uploader=source.uploader,
            with_embargo=source.with_embargo,
            upload_time=source.upload_time,
            coauthors=source.coauthors,
            shared_with=source.shared_with,
            comment=source.comment,
            references=source.references,
            datasets=source.datasets)

570
571
572
573
574
575
576
577
    def set_pid_prefix(self, prefix: int = default_pid_prefix):
        """
        Sets the repo db pid counter to the given values. Allows to create new calcs
        without interfering with migration calcs with already existing PIDs.
        """
        self.logger.info('set pid prefix', pid_prefix=prefix)
        self.client.admin.exec_pidprefix_command(payload=dict(prefix=prefix)).response()

578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
    def call_api(self, operation: str, *args, **kwargs) -> Any:
        """
        Calls nomad via the bravado client. It deals with a very busy nomad and catches,
        backsoff, and retries on gateway timouts. It also circumvents bravados/jsonschemas
        thread safety issues using a global lock on client usage.

        Arguments:
            operation: Comma separated string of api, endpoint, operation,
                e.g. 'uploads.get_upload'.
        """
        op_path = operation.split('.')
        op = self.client
        for op_path_segment in op_path:
            op = getattr(op, op_path_segment)

        sleep = utils.SleepTimeBackoff()
        while True:
            try:
                NomadCOEMigration._client_lock.acquire(blocking=True)
                return op(*args, **kwargs).response().result
            except HTTPGatewayTimeout:
                sleep()
            except Exception as e:
                raise e
            finally:
                NomadCOEMigration._client_lock.release()

    def migrate(self, *args, delete_failed: str = '') -> utils.POPO:
606
607
608
        """
        Migrate the given uploads.

609
610
611
612
613
        It takes paths to extracted uploads as arguments.

        Requires :class:`Package` instances for the given upload paths. Those will
        be created, if they do not already exists. The packages determine the uploads
        for the target infrastructure.
614
615
616
617
618

        Requires a build :func:`index` to look for existing data in the source db. This
        will be used to add user (and other, PID, ...) metadata and validate calculations.

        Uses PIDs of identified old calculations. Will create new PIDs for previously
619
        unknown uploads. See :func:`set_pid_prefix` on how to avoid conflicts.
620

621
        Arguments:
622
            upload_path: A filepath to the upload directory.
623
624
625
            delete_failed: String from ``N``, ``U``, ``P`` to determine that uploads with
                no processed calcs (N), failed upload processing (U), or failed publish
                operation (P) should be deleted after the migration attempt.
626

Markus Scheidgen's avatar
Markus Scheidgen committed
627
        Returns: Dictionary with statistics on the migration.
628
        """
629

Markus Scheidgen's avatar
Markus Scheidgen committed
630
631
632
633
        cv = threading.Condition()
        overall_report = Report()
        threads = []

634
        def print_report():
Markus Scheidgen's avatar
Markus Scheidgen committed
635
            if not self._quiet:
636
                print(overall_report)
637

638
        def migrate_package(package: Package):
639
640
641
            logger = self.logger.bind(
                package_id=package.package_id, source_upload_id=package.upload_id)

642
643
644
645
            if package.migration_version is not None and package.migration_version >= self.migration_version:
                self.logger.info(
                    'package already migrated, skip it',
                    package_id=package.package_id, source_upload_id=package.upload_id)
646

647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
                package_report = package.report
                overall_report.skipped_packages += 1
            else:
                try:
                    package_report = self.migrate_package(package, delete_failed=delete_failed)

                except Exception as e:
                    package_report = Report()
                    package_report.failed_packages = 1
                    logger.error(
                        'unexpected exception while migrating packages', exc_info=e)
                finally:
                    package.report = package_report
                    package.migration_version = self.migration_version
                    package.save()
Markus Scheidgen's avatar
Markus Scheidgen committed
662
663
664
665
666

            with cv:
                try:
                    overall_report.add(package_report)

667
668
669
                    migrated_all_packages = all(
                        p.migration_version == self.migration_version
                        for p in Package.objects(upload_id=package.upload_id))
Markus Scheidgen's avatar
Markus Scheidgen committed
670

671
                    if migrated_all_packages:
Markus Scheidgen's avatar
Markus Scheidgen committed
672
673
674
675
676
677
678
                        missing_calcs = SourceCalc.objects(
                            upload=package.upload_id, migration_version__ne=self.migration_version).count()
                        total_source_calcs = SourceCalc.objects(upload=package.upload_id).count()

                        overall_report.missing_calcs += missing_calcs
                        overall_report.total_source_calcs += total_source_calcs

679
                        logger.info('migrated upload')
Markus Scheidgen's avatar
Markus Scheidgen committed
680

681
                    print_report()
Markus Scheidgen's avatar
Markus Scheidgen committed
682
683
684
685
686
687
688
                except Exception as e:
                    logger.error('unexpected exception while migrating packages', exc_info=e)

                self._threads += 1
                cv.notify()

        for arg in args:
689
690
691
            for package in Package.create_packages(
                    arg, self.package_directory, compress=self.compress_packages):

Markus Scheidgen's avatar
Markus Scheidgen committed
692
693
694
                with cv:
                    cv.wait_for(lambda: self._threads > 0)
                    self._threads -= 1
695
                    thread = threading.Thread(target=lambda: migrate_package(package))
Markus Scheidgen's avatar
Markus Scheidgen committed
696
697
                    threads.append(thread)
                    thread.start()
698

Markus Scheidgen's avatar
Markus Scheidgen committed
699
700
        for thread in threads:
            thread.join()
701

Markus Scheidgen's avatar
Markus Scheidgen committed
702
        return overall_report
703

Markus Scheidgen's avatar
Markus Scheidgen committed
704
705
    _client_lock = threading.Lock()

706
    def migrate_package(self, package: Package, delete_failed: str = '') -> 'Report':
707
708
        """ Migrates the given package. For other params see :func:`migrate`. """

709
710
711
712
713
714
        source_upload_id = package.upload_id
        package_id = package.package_id

        logger = self.logger.bind(package_id=package_id, source_upload_id=source_upload_id)
        logger.debug('start to process package')

715
716
        report = Report()
        report.total_packages += 1
717
718
719
720

        # upload and process the upload file
        with utils.timer(logger, 'upload completed'):
            try:
721
722
                upload = self.call_api(
                    'uploads.upload', name=package_id, local_path=package.package_path)
723
724
            except Exception as e:
                self.logger.error('could not upload package', exc_info=e)
725
726
                report.failed_packages += 1
                return report
727
728
729
730

        logger = logger.bind(
            source_upload_id=source_upload_id, upload_id=upload.upload_id)

731
732
733
734
735
736
737
738
739
        def delete_upload(reason: int):
            delete = \
                (reason == NO_PROCESSED_CALCS and 'N' in delete_failed) or \
                (reason == FAILED_PROCESSING and 'U' in delete_failed) or \
                (reason == FAILED_PUBLISH and 'P' in delete_failed)

            upload_to_delete = upload

            if delete:
740
                upload_to_delete = self.call_api(
741
742
                    'uploads.delete_upload', upload_id=upload_to_delete.upload_id)

743
                sleep = utils.SleepTimeBackoff()
744
                while upload_to_delete.process_running:
745
                    try:
746
                        upload_to_delete = self.call_api(
747
                            'uploads.get_upload', upload_id=upload_to_delete.upload_id)
748
749
750
751
752
                        sleep()
                    except HTTPNotFound:
                        # the proc upload will be deleted by the delete operation
                        break
                logger.info('deleted upload after migration failure')
753
754
755
756
            else:
                logger.warning(
                    'will keep upload after migration failure for debugging',
                    reason=reason, delete_failed=delete_failed)
757

758
759
760
761
        # grab source calcs, while waiting for upload
        source_calcs = dict()
        surrogate_source_calc_with_metadata = None
        with utils.timer(logger, 'loaded source metadata'):
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
            with zipfile.ZipFile(package.package_path) as zf:
                for filenames_chunk in utils.chunks(zf.namelist(), 1000):
                    for source_calc in SourceCalc.objects(
                            upload=source_upload_id, mainfile__in=filenames_chunk):

                        source_calc_with_metadata = CalcWithMetadata(**source_calc.metadata)
                        source_calc_with_metadata.pid = source_calc.pid
                        source_calc_with_metadata.mainfile = source_calc.mainfile
                        source_calcs[source_calc.mainfile] = (source_calc, source_calc_with_metadata)

                        # establish a surrogate for new calcs
                        if surrogate_source_calc_with_metadata is None:
                            surrogate_source_calc_with_metadata = \
                                self.surrogate_metadata(source_calc_with_metadata)

777
778
779
780
781
782
        # try to find a surrogate outside the package, if necessary
        if surrogate_source_calc_with_metadata is None:
            source_calc = SourceCalc.objects(upload=source_upload_id).first()
            if source_calc is not None:
                source_calc_with_metadata = CalcWithMetadata(**source_calc.metadata)
                surrogate_source_calc_with_metadata = \
783
                    self.surrogate_metadata(source_calc_with_metadata)
784

785
786
787
788
        # wait for complete upload
        with utils.timer(logger, 'upload processing completed'):
            sleep = utils.SleepTimeBackoff()
            while upload.tasks_running:
789
                upload = self.call_api('uploads.get_upload', upload_id=upload.upload_id)
790
791
792
793
                sleep()

        if upload.tasks_status == FAILURE:
            logger.error('failed to process upload', process_errors=upload.errors)
794
            report.failed_packages += 1
795
            delete_upload(FAILED_PROCESSING)
796
            return report
797
798
799
800
801
802
803
804
        else:
            report.total_calcs += upload.calcs.pagination.total

        calc_mainfiles = []
        upload_total_calcs = upload.calcs.pagination.total

        # check for processing errors
        with utils.timer(logger, 'checked upload processing'):
805
            per_page = 500
806
            for page in range(1, math.ceil(upload_total_calcs / per_page) + 1):
807
                upload = self.call_api(
808
809
                    'uploads.get_upload', upload_id=upload.upload_id, per_page=per_page,
                    page=page, order_by='mainfile')
810
811
812
813
814
815
816
817

                for calc_proc in upload.calcs.results:
                    calc_logger = logger.bind(
                        calc_id=calc_proc.calc_id,
                        mainfile=calc_proc.mainfile)

                    if calc_proc.tasks_status == SUCCESS:
                        calc_mainfiles.append(calc_proc.mainfile)
818
                    else:
819
                        report.failed_calcs += 1
820
                        calc_logger.info(
821
822
                            'could not process a calc', process_errors=calc_proc.errors)
                        continue
823

824
825
        # verify upload against source
        calcs_in_search = 0
826
        with utils.timer(logger, 'verified upload against source calcs'):
827
828
829
830
831
832
            scroll_id = 'first'
            while scroll_id is not None:
                scroll_args: Dict[str, Any] = dict(scroll=True)
                if scroll_id != 'first':
                    scroll_args['scroll_id'] = scroll_id

833
                search = self.call_api('repo.search', upload_id=upload.upload_id, **scroll_args)
834
835

                scroll_id = search.scroll_id
836

837
838
839
840
841
842
843
844
845
                for calc in search.results:
                    calcs_in_search += 1
                    source_calc, source_calc_with_metadata = source_calcs.get(
                        calc['mainfile'], (None, None))

                    if source_calc is not None:
                        report.migrated_calcs += 1

                        calc_logger = logger.bind(calc_id=calc['calc_id'], mainfile=calc['mainfile'])
846
                        try:
847
                            if not self.validate(calc, source_calc_with_metadata, calc_logger):
848
849
                                report.calcs_with_diffs += 1
                        except Exception as e:
Markus Scheidgen's avatar
Markus Scheidgen committed
850
                            calc_logger.warning('unexpected exception during validation', exc_info=e)
851
852
853
854
                            report.calcs_with_diffs += 1
                    else:
                        calc_logger.info('processed a calc that has no source')
                        report.new_calcs += 1
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
                        # guessing the metadata from other calcs in upload/package
                        if surrogate_source_calc_with_metadata is not None:
                            new_calc_with_metadata = CalcWithMetadata(**surrogate_source_calc_with_metadata.to_dict())
                            new_calc_with_metadata.mainfile = calc['mainfile']
                        else:
                            calc_logger.warning('could not determine any metadata for new calc')
                            create_time_epoch = os.path.getctime(package.upload_path)
                            new_calc_with_metadata = CalcWithMetadata(
                                upload_time=datetime.datetime.fromtimestamp(create_time_epoch),
                                with_embargo=package.restricted > 0,
                                comment=default_comment,
                                uploader=default_uploader,
                                mainfile=calc['mainfile'])
                            surrogate_source_calc_with_metadata = new_calc_with_metadata

                        source_calcs[calc['mainfile']] = (None, new_calc_with_metadata)
871

872
873
            if len(calc_mainfiles) != calcs_in_search:
                logger.error('missmatch between processed calcs and calcs found with search')
Markus Scheidgen's avatar
Markus Scheidgen committed
874

875
876
877
878
879
880
881
        # publish upload
        if len(calc_mainfiles) > 0:
            with utils.timer(logger, 'upload published'):
                upload_metadata = dict(with_embargo=(package.restricted > 0))
                upload_metadata['calculations'] = [
                    self._to_api_metadata(source_calc_with_metadata)
                    for _, source_calc_with_metadata in source_calcs.values()]
882

883
                upload = self.call_api(
884
885
                    'uploads.exec_upload_operation', upload_id=upload.upload_id,
                    payload=dict(operation='publish', metadata=upload_metadata))
886

887
888
889
                sleep = utils.SleepTimeBackoff()
                while upload.process_running:
                    try:
890
                        upload = self.call_api('uploads.get_upload', upload_id=upload.upload_id)
891
892
893
894
895
896
897
898
899
900
901
                        sleep()
                    except HTTPNotFound:
                        # the proc upload will be deleted by the publish operation
                        break

                if upload.tasks_status == FAILURE:
                    logger.error('could not publish upload', process_errors=upload.errors)
                    report.failed_calcs = report.total_calcs
                    report.migrated_calcs = 0
                    report.calcs_with_diffs = 0
                    report.new_calcs = 0
902
                    report.failed_packages += 1
903
904

                    delete_upload(FAILED_PUBLISH)
905
906
                    SourceCalc.objects(upload=source_upload_id, mainfile__in=calc_mainfiles) \
                        .update(migration_version=-1)
907
908
909
910
                else:
                    SourceCalc.objects(upload=source_upload_id, mainfile__in=calc_mainfiles) \
                        .update(migration_version=self.migration_version)
        else:
911
            delete_upload(NO_PROCESSED_CALCS)
912
            logger.info('no successful calcs, skip publish')
913

914
915
        logger.info('migrated package', **report)
        return report
916
917

    def _to_api_metadata(self, calc_with_metadata: CalcWithMetadata) -> dict:
Markus Scheidgen's avatar
Markus Scheidgen committed
918
        """ Transforms to a dict that fullfils the API's uploade metadata model. """
919

Markus Scheidgen's avatar
Markus Scheidgen committed
920
        return dict(
921
922
923
924
            _upload_time=calc_with_metadata.upload_time,
            _uploader=calc_with_metadata.uploader['id'],
            _pid=calc_with_metadata.pid,
            references=[ref['value'] for ref in calc_with_metadata.references],
Markus Scheidgen's avatar
Markus Scheidgen committed
925
926
927
            datasets=[dict(
                id=ds['id'],
                _doi=ds.get('doi', {'value': None})['value'],
928
929
930
931
932
933
                _name=ds.get('name', None)) for ds in calc_with_metadata.datasets],
            mainfile=calc_with_metadata.mainfile,
            with_embargo=calc_with_metadata.with_embargo,
            comment=calc_with_metadata.comment,
            coauthors=list(int(user['id']) for user in calc_with_metadata.coauthors),
            shared_with=list(int(user['id']) for user in calc_with_metadata.shared_with)
Markus Scheidgen's avatar
Markus Scheidgen committed
934
935
        )

936
    def source_calc_index(self, *args, **kwargs):
937
        """ see :func:`SourceCalc.index` """
938
        return SourceCalc.index(self.source, *args, **kwargs)
939

940
941
942
943
944
945
946
947
    def package_index(self, *upload_paths) -> None:
        """
        Creates Package objects and respective package zip files for the given uploads.
        The given uploads are supposed to be path to the extracted upload directories.
        If the upload is already in the index, it is not recreated.
        """
        logger = utils.get_logger(__name__)

948
949
950
951
        cv = threading.Condition()
        threads = []

        def package_upload(upload_path):
952
            try:
953
954
955
956
                for package_entry in Package.create_packages(
                        upload_path, self.package_directory,
                        compress=self.compress_packages):

957
958
959
960
961
962
963
964
965
                    logger.info(
                        'package in index',
                        source_upload_path=upload_path,
                        source_upload_id=package_entry.upload_id,
                        package_id=package_entry.package_id)
            except Exception as e:
                logger.error(
                    'could create package from upload',
                    upload_path=upload_path, exc_info=e)
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
            finally:
                with cv:
                    self._threads += 1
                    cv.notify()

        for upload_path in upload_paths:
            with cv:
                cv.wait_for(lambda: self._threads > 0)
            self._threads -= 1
            thread = threading.Thread(target=lambda: package_upload(upload_path))
            threads.append(thread)
            thread.start()

        for thread in threads:
            thread.join()
981
982
983


class Report(utils.POPO):
Markus Scheidgen's avatar
Markus Scheidgen committed
984
    def __init__(self, *args, **kwargs):
985
986
        self.total_packages = 0
        self.failed_packages = 0
Markus Scheidgen's avatar
Markus Scheidgen committed
987
        self.skipped_packages = 0
988
989
990
991
992
993
994
995
        self.total_calcs = 0  # the calcs that have been found by the target
        self.total_source_calcs = 0  # the calcs in the source index
        self.failed_calcs = 0  # the calcs found b the target that could not be processed/published
        self.migrated_calcs = 0   # the calcs from the source, successfully added to the target
        self.calcs_with_diffs = 0  # the calcs from the source, successfully added to the target with different metadata
        self.new_calcs = 0  # the calcs successfully added to the target that were not found in the source
        self.missing_calcs = 0  # the calcs in the source, that could not be added to the target due to failure or not founding the calc

Markus Scheidgen's avatar
Markus Scheidgen committed
996
997
        super().__init__(*args, **kwargs)

998
999
1000
    def add(self, other: 'Report') -> None:
        for key, value in other.items():
            self[key] = self.get(key, 0) + value
For faster browsing, not all history is shown. View entire blame