migration.py 41.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module contains functions to read data from NOMAD coe, external sources,
other/older nomad@FAIRDI instances to mass upload it to a new nomad@FAIRDI instance.
18
19
20

.. autoclass:: NomadCOEMigration
.. autoclass:: SourceCalc
21
22
"""

Markus Scheidgen's avatar
Markus Scheidgen committed
23
from typing import Generator, Tuple, List, Iterable, IO, Any, Dict
Markus Scheidgen's avatar
Markus Scheidgen committed
24
import os
25
import os.path
26
import zipstream
27
import zipfile
28
import math
Markus Scheidgen's avatar
Markus Scheidgen committed
29
from mongoengine import Document, IntField, StringField, DictField, ListField
30
import time
31
import datetime
32
from bravado.exception import HTTPNotFound, HTTPBadRequest, HTTPGatewayTimeout
33
import glob
34
import os
35
import runstats
Markus Scheidgen's avatar
Markus Scheidgen committed
36
import io
Markus Scheidgen's avatar
Markus Scheidgen committed
37
import threading
38

39
from nomad import utils, infrastructure, config
40
from nomad.coe_repo import User, Calc, LoginException
41
42
from nomad.datamodel import CalcWithMetadata
from nomad.processing import FAILURE, SUCCESS
43
44


45
default_pid_prefix = 7000000
46
47
""" The default pid prefix for new non migrated calculations """

Markus Scheidgen's avatar
Markus Scheidgen committed
48
max_package_size = 16 * 1024 * 1024 * 1024  # 16 GB
49
""" The maximum size of a package that will be used as an upload on nomad@FAIRDI """
50
use_stats_for_filestats_threshold = 1024
51

52
53
54
default_comment = 'entry with unknown provernance'
default_uploader = dict(id=1)

55

Markus Scheidgen's avatar
Markus Scheidgen committed
56
57
58
59
60
61
62
63
64
65
66
67
def iterable_to_stream(iterable, buffer_size=io.DEFAULT_BUFFER_SIZE):
    """
    Lets you use an iterable (e.g. a generator) that yields bytestrings as a read-only
    input stream.

    The stream implements Python 3's newer I/O API (available in Python 2's io module).
    For efficiency, the stream is buffered.
    """
    class IterStream(io.RawIOBase):
        def __init__(self):
            self.leftover = None
            self.iterator = iter(iterable)
68

Markus Scheidgen's avatar
Markus Scheidgen committed
69
70
        def readable(self):
            return True
71

Markus Scheidgen's avatar
Markus Scheidgen committed
72
        def readinto(self, b):
73
            requested_len = len(b)  # We're supposed to return at most this much
Markus Scheidgen's avatar
Markus Scheidgen committed
74
75
76
            while True:
                try:
                    chunk = next(self.iterator)
77
                except StopIteration:
Markus Scheidgen's avatar
Markus Scheidgen committed
78
79
80
                    if len(self.leftover) == 0:
                        return 0  # indicate EOF
                    chunk = self.leftover
81
                output, self.leftover = chunk[:requested_len], chunk[requested_len:]
Markus Scheidgen's avatar
Markus Scheidgen committed
82
83
84
85
86
                len_output = len(output)
                if len_output == 0:
                    continue  # do not prematurely indicate EOF
                b[:len_output] = output
                return len_output
87

Markus Scheidgen's avatar
Markus Scheidgen committed
88
89
90
    return io.BufferedReader(IterStream(), buffer_size=buffer_size)


91
92
93
94
95
96
97
98
99
100
101
class Package(Document):
    """
    A Package represents split origin NOMAD CoE uploads. We use packages as uploads
    in nomad@FAIRDI. Some of the uploads in nomad are very big (alfow lib) and need
    to be split down to yield practical (i.e. for mirrors) upload sizes. Therefore,
    uploads are split over multiple packages if one upload gets to large. A package
    always contains full directories of files to preserve *mainfile* *aux* file relations.
    """

    package_id = StringField(primary_key=True)
    """ A random UUID for the package. Could serve later is target upload id."""
Markus Scheidgen's avatar
Markus Scheidgen committed
102
103
104
105
    filenames = ListField(StringField(), default=[])
    """ The files in the package relative to the upload path """
    upload_path = StringField()
    """ The absolute path of the source upload """
106
107
108
109
110
111
112
    upload_id = StringField()
    """ The source upload_id. There might be multiple packages per upload (this is the point). """
    restricted = IntField()
    """ The restricted in month, 0 for unrestricted """
    size = IntField()
    """ The sum of all file sizes """

113
    migration_version = IntField()
Markus Scheidgen's avatar
Markus Scheidgen committed
114
    report = DictField()
115
116

    meta = dict(indexes=['upload_id', 'migration_version'])
117

118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
    @classmethod
    def aggregate_reports(cls, migration_version: str = None) -> 'Report':
        """
        Returns an aggregated report over all package reports of the given migration version,
        or all packages.
        """
        if migration_version is not None:
            query = cls.objects(migration_version__gte=migration_version, report__exists=True)
        else:
            query = cls.objects(report__exists=True)

        report = Report()
        for package in query:
            report.add(Report(package.report))

        return report

135
    def open_package_upload_file(self) -> IO:
136
        """ Creates a streaming zip file from the files of this package. """
137
        zip_file = zipstream.ZipFile(compression=zipstream.ZIP_STORED, allowZip64=True)
138
139
        for filename in self.filenames:
            filepath = os.path.join(self.upload_path, filename)
140
            zip_file.write(filepath, filename)
141

Markus Scheidgen's avatar
Markus Scheidgen committed
142
        return iterable_to_stream(zip_file)  # type: ignore
143

144
145
146
147
148
    def create_package_upload_file(self) -> Tuple[str, bool]:
        """
        Creates a zip file for the package in tmp and returns its path and whether it
        was created (``True``) or existed before (``False``).
        """
149
        upload_filepath = os.path.join(config.fs.tmp, '%s.zip' % self.package_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
150
151
        if not os.path.exists(os.path.dirname(upload_filepath)):
            os.mkdir(os.path.dirname(upload_filepath))
152
153
154
155
156
157
158
        if not os.path.isfile(upload_filepath):
            with zipfile.ZipFile(
                    upload_filepath, 'w',
                    compression=zipfile.ZIP_STORED, allowZip64=True) as zip_file:
                for filename in self.filenames:
                    filepath = os.path.join(self.upload_path, filename)
                    zip_file.write(filepath, filename)
159
160
161
            created = True
        else:
            created = False
162

163
        return upload_filepath, created
164

165
166
167
168
169
170
171
172
173
174
    @classmethod
    def index(cls, *upload_paths):
        """
        Creates Package objects for the given uploads in nomad. The given uploads are
        supposed to be path to the extracted upload directories. If the upload is already
        in the db, the upload is skipped entirely.
        """
        logger = utils.get_logger(__name__)

        for upload_path in upload_paths:
Markus Scheidgen's avatar
Markus Scheidgen committed
175
176
177
178
179
180
            try:
                stats = runstats.Statistics()
                upload_path = os.path.abspath(upload_path)
                upload_id = os.path.basename(upload_path)
                if cls.objects(upload_id=upload_id).first() is not None:
                    logger.info('upload already exists, skip', upload_id=upload_id)
181
182
                    continue

Markus Scheidgen's avatar
Markus Scheidgen committed
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
                restrict_files = glob.glob(os.path.join(upload_path, 'RESTRICTED_*'))
                month = 0
                for restrict_file in restrict_files:
                    restrict_file = os.path.basename(restrict_file)
                    try:
                        new_month = int(restrict_file[len('RESTRICTED_'):])
                        if new_month > month:
                            month = new_month
                    except Exception:
                        month = 36
                if month > 36:
                    month = 36
                restricted = month

                def create_package():
                    cls.timer = time.time()
                    package = Package(
                        package_id=utils.create_uuid(),
                        upload_id=upload_id,
                        upload_path=upload_path,
                        restricted=restricted,
                        size=0)
                    return package

                def save_package(package):
                    if len(package.filenames) == 0:
                        return

                    if package.size > max_package_size:
                        # a single directory seems to big for a package
                        logger.error(
                            'directory exceeds max package size', directory=upload_path, size=package.size)

                    package.save()
                    logger.info(
                        'created package',
                        size=package.size,
                        files=len(package.filenames),
                        package_id=package.package_id,
                        exec_time=time.time() - cls.timer,
                        upload_id=package.upload_id)

                package = create_package()

                for root, _, files in os.walk(upload_path):
                    directory_filenames: List[str] = []
                    directory_size = 0

                    if len(files) == 0:
                        continue

                    for file in files:
                        filepath = os.path.join(root, file)
                        filename = filepath[len(upload_path) + 1:]
                        directory_filenames.append(filename)
                        # getting file stats is pretty expensive with gpfs
                        # if an upload has more then 1000 files, its pretty likely that
                        # size patterns repeat ... goood enough
                        if len(stats) < use_stats_for_filestats_threshold:
                            filesize = os.path.getsize(filepath)
                            stats.push(filesize)
                        else:
                            filesize = stats.mean()
                        directory_size += filesize

                    if (package.size + directory_size) > max_package_size and package.size > 0:
                        save_package(package)
                        package = create_package()

                    for filename in directory_filenames:
                        package.filenames.append(filename)
                    package.size += directory_size

                    logger.debug('packaged directory', directory=root, size=directory_size)

                save_package(package)

                logger.info('completed upload', directory=upload_path, upload_id=upload_id)
            except Exception as e:
                logger.error(
263
                    'could create package from upload',
Markus Scheidgen's avatar
Markus Scheidgen committed
264
265
                    upload_path=upload_path, upload_id=upload_id, exc_info=e)
                continue
Markus Scheidgen's avatar
Markus Scheidgen committed
266

267

268
class SourceCalc(Document):
269
    """
270
    Mongo document used as a calculation, upload, and metadata db and index
271
272
273
274
275
    build from a given source db. Each :class:`SourceCacl` entry relates
    a pid, mainfile, upload "id" with each other for a corressponding calculation.
    It might alos contain the user metadata. The uploads are "id"ed via the
    specific path segment that identifies an upload on the CoE repo FS(s) without
    any prefixes (e.g. $EXTRACTED, /data/upload, etc.)
276
277
278
279
280
281
    """
    pid = IntField(primary_key=True)
    mainfile = StringField()
    upload = StringField()
    metadata = DictField()

282
    migration_version = IntField()
283

284
285
    extracted_prefix = '$EXTRACTED/'
    sites = ['/data/nomad/extracted/', '/nomad/repository/extracted/']
286
287
    prefixes = [extracted_prefix] + sites

288
    meta = dict(indexes=['upload', 'mainfile', 'migration_version'])
289

290
    _dataset_cache: dict = {}
291
292

    @staticmethod
293
294
295
296
    def index(source, drop: bool = False, with_metadata: bool = True, per_query: int = 100) \
            -> Generator[Tuple['SourceCalc', int], None, None]:
        """
        Creates a collection of :class:`SourceCalc` documents that represent source repo
297
        db entries.
298
299
300

        Arguments:
            source: The source db sql alchemy session
301
302
303
304
305
306
307
            drop: True to drop and create a new collection, update the existing otherwise,
                default is False.
            with_metadata: True to also grab all metadata and store it, default is True.
            per_query: The implementation tries to grab almost all data with a heavely joined
                query on the CoE snoflake/star shaped schema.
                The query cannot ask for the whole db at once: choose how many calculations
                should be read at a time to optimize for your application.
308
309

        Returns:
310
            yields tuples (:class:`SourceCalc`, #calcs_total[incl. datasets])
311
        """
312
        logger = utils.get_logger(__name__)
313
314
315
        if drop:
            SourceCalc.drop_collection()

316
        last_source_calc = SourceCalc.objects().order_by('-pid').first()
317
318
        start_pid = last_source_calc.pid if last_source_calc is not None else 0
        source_query = source.query(Calc)
319
        total = source_query.count() - SourceCalc.objects.count()
320
321

        while True:
322
323
            query_timer = utils.timer(logger, 'query source db')
            query_timer.__enter__()  # pylint: disable=E1101
Markus Scheidgen's avatar
Markus Scheidgen committed
324
            calcs: Iterable[Calc] = source_query \
325
326
327
                .filter(Calc.coe_calc_id > start_pid) \
                .order_by(Calc.coe_calc_id) \
                .limit(per_query)
Markus Scheidgen's avatar
Markus Scheidgen committed
328

329
330
            source_calcs = []
            for calc in calcs:
331
                query_timer.__exit__(None, None, None)  # pylint: disable=E1101
332
                try:
Markus Scheidgen's avatar
Markus Scheidgen committed
333
334
                    filenames = calc.files
                    if filenames is None or len(filenames) == 0:
335
336
337
                        continue  # dataset case

                    filename = filenames[0]
338
339
340
                    if len(filenames) == 1 and (filename.endswith('.tgz') or filename.endswith('.zip')):
                        continue  # also a dataset, some datasets have a downloadable archive

341
342
343
344
345
346
347
348
                    for prefix in SourceCalc.prefixes:
                        filename = filename.replace(prefix, '')
                    segments = [file.strip('\\') for file in filename.split('/')]

                    source_calc = SourceCalc(pid=calc.pid)
                    source_calc.upload = segments[0]
                    source_calc.mainfile = os.path.join(*segments[1:])
                    if with_metadata:
Markus Scheidgen's avatar
Markus Scheidgen committed
349
                        source_calc.metadata = calc.to_calc_with_metadata().__dict__
350
351
352
353
354
355
                    source_calcs.append(source_calc)
                    start_pid = source_calc.pid

                    yield source_calc, total
                except Exception as e:
                    logger.error('could not index', pid=calc.pid, exc_info=e)
356

357
358
359
            if len(source_calcs) == 0:
                break
            else:
360
361
                with utils.timer(logger, 'write index'):
                    SourceCalc.objects.insert(source_calcs)
362
363


364
365
366
367
368
NO_PROCESSED_CALCS = 0
FAILED_PROCESSING = 1
FAILED_PUBLISH = 2


369
370
class NomadCOEMigration:
    """
371
    Drives a migration from the NOMAD coe repository db to nomad@FAIRDI.
372
373

    Arguments:
374
375
        migration_version: The migration version. Only packages/calculations with
            no migration version or a lower migration version are migrated.
Markus Scheidgen's avatar
Markus Scheidgen committed
376
377
        threads: Number of threads to run migration in parallel.
        quiet: Prints stats if not quiet
378
    """
379

380
381
382
383
384
385
386
387
388
389
390
    default_sites = [
        '/nomad/repository/data/uploads',
        '/nomad/repository/data/extracted',
        '/data/nomad/uploaded/',
        '/data/nomad/extracted/']

    default_pid_prefix = int(1e7)

    archive_filename = 'archive.tar.gz'
    """ The standard name for tarred uploads in the CoE repository. """

Markus Scheidgen's avatar
Markus Scheidgen committed
391
    def __init__(self, migration_version: int = 0, threads: int = 1, quiet: bool = False) -> None:
392
393
        self.logger = utils.get_logger(__name__, migration_version=migration_version)
        self.migration_version = migration_version
394
        self._client = None
Markus Scheidgen's avatar
Markus Scheidgen committed
395
396
        self._threads = threads
        self._quiet = quiet
397

398
399
400
401
402
403
404
405
406
        self.source = infrastructure.repository_db

    @property
    def client(self):
        if self._client is None:
            from nomad.client import create_client
            self._client = create_client()

        return self._client
407

408
409
410
411
    def report(self):
        """ returns an aggregated report over all prior migrated packages """
        return Package.aggregate_reports(migration_version=self.migration_version)

412
413
    def copy_users(self):
        """ Copy all users. """
414
        for source_user in self.source.query(User).all():
415
416
417
418
419
420
421
422
423
424
            if source_user.user_id <= 2:
                # skip first two users to keep example users
                # they probably are either already the example users, or [root, Evgeny]
                continue

            create_user_payload = dict(
                user_id=source_user.user_id,
                email=source_user.email,
                first_name=source_user.first_name,
                last_name=source_user.last_name,
425
426
                password=source_user.password,
                created=source_user.created
427
428
429
430
431
432
433
434
435
436
437
438
439
440
            )

            try:
                create_user_payload.update(token=source_user.token)
            except LoginException:
                pass

            if source_user.affiliation is not None:
                create_user_payload.update(affiliation=dict(
                    name=source_user.affiliation.name,
                    address=source_user.affiliation.address))

            try:
                self.client.auth.create_user(payload=create_user_payload).response()
441
                self.logger.info('copied user', user_id=source_user.user_id)
442
443
            except HTTPBadRequest as e:
                self.logger.error('could not create user due to bad data', exc_info=e, user_id=source_user.user_id)
444

Markus Scheidgen's avatar
Markus Scheidgen committed
445
446
447
448
449
450
451
452
453
    def _to_comparable_list(self, list):
        for item in list:
            if isinstance(item, dict):
                for key in item.keys():
                    if key.endswith('id'):
                        yield item.get(key)
            else:
                yield item

Markus Scheidgen's avatar
Markus Scheidgen committed
454
455
456
457
458
459
460
    expected_differences = {
        '0d': 'molecule / cluster',
        '3d': 'bulk',
        '2d': '2d / surface',
        '+u': 'gga'
    }

461
    def _validate(self, repo_calc: dict, source_calc: CalcWithMetadata, logger) -> bool:
462
463
464
465
466
467
468
        """
        Validates the given processed calculation, assuming that the data in the given
        source_calc is correct.

        Returns:
            False, if the calculation differs from the source calc.
        """
469
470
471
        keys_to_validate = [
            'atoms', 'basis_set', 'xc_functional', 'system', 'crystal_system',
            'spacegroup', 'code_name', 'code_version']
472
473

        is_valid = True
Markus Scheidgen's avatar
Markus Scheidgen committed
474
        for key, target_value in repo_calc.items():
475
            if key not in keys_to_validate:
476
477
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
478
            source_value = getattr(source_calc, key, None)
479

480
481
            def check_mismatch() -> bool:
                # some exceptions
Markus Scheidgen's avatar
Markus Scheidgen committed
482
483
                if source_value in NomadCOEMigration.expected_differences and \
                        target_value == NomadCOEMigration.expected_differences.get(source_value):
484
485
                    return True

486
487
                logger.info(
                    'source target missmatch', quantity=key,
Markus Scheidgen's avatar
Markus Scheidgen committed
488
489
                    source_value=source_value, target_value=target_value,
                    value_diff='%s->%s' % (str(source_value), str(target_value)))
490
                return False
491

492
            if source_value is None and target_value is not None:
493
494
                continue

495
496
497
            if target_value is None and source_value is not None:
                is_valid &= check_mismatch()

498
            if isinstance(target_value, list):
Markus Scheidgen's avatar
Markus Scheidgen committed
499
500
501
                source_list = list(self._to_comparable_list(source_value))
                target_list = list(self._to_comparable_list(target_value))
                if len(set(source_list).intersection(target_list)) != len(target_list):
502
                    is_valid &= check_mismatch()
503
504
505
506
507
508
509
                continue

            if isinstance(source_value, str):
                source_value = source_value.lower()
                target_value = str(target_value).lower()

            if source_value != target_value:
510
                is_valid &= check_mismatch()
511
512
513

        return is_valid

514
    def _packages(
515
            self, source_upload_path: str,
Markus Scheidgen's avatar
Markus Scheidgen committed
516
            create: bool = False) -> Tuple[Any, str]:
517
518
519
520
521
522
523
524
525
        """
        Creates a iterator over packages for the given upload path. Packages are
        taken from the :class:`Package` index.

        Arguments:
            source_upload_path: The path to the extracted upload.
            create: If True, will index packages if they not exist.

        Returns:
Markus Scheidgen's avatar
Markus Scheidgen committed
526
            A tuple with the package query and the source_upload_id (last path segment)
527
        """
528

529
530
531
        source_upload_id = os.path.basename(source_upload_path)
        logger = self.logger.bind(
            source_upload_path=source_upload_path, source_upload_id=source_upload_id)
532
533
534

        if os.path.isfile(source_upload_path):
            # assume its a path to an archive files
Markus Scheidgen's avatar
Markus Scheidgen committed
535
            raise ValueError('currently no support for migrating archive files')
Markus Scheidgen's avatar
Markus Scheidgen committed
536
        if not os.path.exists(source_upload_path):
Markus Scheidgen's avatar
Markus Scheidgen committed
537
            raise ValueError('directory %s does not exist' % source_upload_path)
538
539
540
541
542
543
544
545
546

        package_query = Package.objects(upload_id=source_upload_id)

        if package_query.count() == 0:
            if create:
                Package.index(source_upload_path)
                package_query = Package.objects(upload_id=source_upload_id)
                if package_query.count() == 0:
                    logger.error('no package exists, even after indexing')
Markus Scheidgen's avatar
Markus Scheidgen committed
547
                    return package_query, source_upload_id
548
549
            else:
                logger.error('no package exists for upload')
Markus Scheidgen's avatar
Markus Scheidgen committed
550
                return package_query, source_upload_id
551
552

        logger.debug('identified packages for source upload', n_packages=package_query.count())
553
        return package_query, source_upload_id
554

555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
    def _surrogate_metadata(self, source: CalcWithMetadata):
        """
        Compute metadata from the given metadata that can be used for new calcs of the
        same upload.
        """
        return CalcWithMetadata(
            uploader=source.uploader,
            with_embargo=source.with_embargo,
            upload_time=source.upload_time,
            coauthors=source.coauthors,
            shared_with=source.shared_with,
            comment=source.comment,
            references=source.references,
            datasets=source.datasets)

570
571
572
573
574
575
576
577
    def set_pid_prefix(self, prefix: int = default_pid_prefix):
        """
        Sets the repo db pid counter to the given values. Allows to create new calcs
        without interfering with migration calcs with already existing PIDs.
        """
        self.logger.info('set pid prefix', pid_prefix=prefix)
        self.client.admin.exec_pidprefix_command(payload=dict(prefix=prefix)).response()

Markus Scheidgen's avatar
Markus Scheidgen committed
578
    def migrate(
579
            self, *args, create_packages: bool = True, local: bool = False,
580
            delete_local: bool = False, delete_failed: str = '') -> utils.POPO:
581
582
583
        """
        Migrate the given uploads.

584
585
586
587
588
        It takes paths to extracted uploads as arguments.

        Requires :class:`Package` instances for the given upload paths. Those will
        be created, if they do not already exists. The packages determine the uploads
        for the target infrastructure.
589
590
591
592
593

        Requires a build :func:`index` to look for existing data in the source db. This
        will be used to add user (and other, PID, ...) metadata and validate calculations.

        Uses PIDs of identified old calculations. Will create new PIDs for previously
594
        unknown uploads. See :func:`set_pid_prefix` on how to avoid conflicts.
595

596
        Arguments:
597
598
599
            upload_path: A filepath to the upload directory.
            create_packages: If True, will create non existing packages.
                Will skip with errors otherwise.
600
601
602
            local: Instead of streaming an upload, create a local file and use
                local_path on the upload.
            delete_local: Delete created local file upload files
603
604
605
            delete_failed: String from ``N``, ``U``, ``P`` to determine that uploads with
                no processed calcs (N), failed upload processing (U), or failed publish
                operation (P) should be deleted after the migration attempt.
606

Markus Scheidgen's avatar
Markus Scheidgen committed
607
        Returns: Dictionary with statistics on the migration.
608
        """
609

Markus Scheidgen's avatar
Markus Scheidgen committed
610
611
612
613
614
        cv = threading.Condition()
        overall_report = Report()
        upload_reports: Dict[str, Report] = {}
        threads = []

615
        def print_report():
Markus Scheidgen's avatar
Markus Scheidgen committed
616
            if not self._quiet:
617
                print(overall_report)
618

Markus Scheidgen's avatar
Markus Scheidgen committed
619
        def migrate_package(package: Package, of_packages: int):
620
621
622
            logger = self.logger.bind(
                package_id=package.package_id, source_upload_id=package.upload_id)

Markus Scheidgen's avatar
Markus Scheidgen committed
623
            try:
624
625
626
627
                package_report = self.migrate_package(
                    package, local=local, delete_local=delete_local,
                    delete_failed=delete_failed)

Markus Scheidgen's avatar
Markus Scheidgen committed
628
            except Exception as e:
629
                package_report = Report()
Markus Scheidgen's avatar
Markus Scheidgen committed
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
                logger.error(
                    'unexpected exception while migrating packages', exc_info=e)

            with cv:
                try:
                    overall_report.add(package_report)

                    upload_report = upload_reports[package.upload_id]
                    upload_report.add(package_report)

                    if upload_report.total_packages == of_packages:
                        missing_calcs = SourceCalc.objects(
                            upload=package.upload_id, migration_version__ne=self.migration_version).count()
                        total_source_calcs = SourceCalc.objects(upload=package.upload_id).count()

                        overall_report.missing_calcs -= upload_report.missing_calcs
                        overall_report.missing_calcs += missing_calcs
                        overall_report.total_source_calcs -= upload_report.total_source_calcs
                        overall_report.total_source_calcs += total_source_calcs

                        upload_report.missing_calcs = missing_calcs
                        upload_report.total_source_calcs = total_source_calcs

                        logger.info('migrated upload', **upload_report)

655
                    print_report()
Markus Scheidgen's avatar
Markus Scheidgen committed
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
                except Exception as e:
                    logger.error('unexpected exception while migrating packages', exc_info=e)

                self._threads += 1
                cv.notify()

        for arg in args:
            package_query, source_upload_id = self._packages(arg, create=create_packages)
            number_of_packages = package_query.count()
            for package in package_query:
                with cv:
                    if source_upload_id not in upload_reports:
                        upload_reports[source_upload_id] = Report()

                    if package.migration_version is not None and package.migration_version >= self.migration_version:
                        self.logger.info(
                            'package already migrated, skip it',
                            package_id=package.package_id, source_upload_id=package.upload_id)
                        upload_report = upload_reports[package.upload_id]
                        upload_report.add(package.report)
                        upload_report.skipped_packages += 1
                        overall_report.add(package.report)
                        overall_report.skipped_packages += 1

680
                        print_report()
Markus Scheidgen's avatar
Markus Scheidgen committed
681
                        continue
682

Markus Scheidgen's avatar
Markus Scheidgen committed
683
684
685
686
687
                    cv.wait_for(lambda: self._threads > 0)
                    self._threads -= 1
                    thread = threading.Thread(target=lambda: migrate_package(package, number_of_packages))
                    threads.append(thread)
                    thread.start()
688

Markus Scheidgen's avatar
Markus Scheidgen committed
689
690
        for thread in threads:
            thread.join()
691

Markus Scheidgen's avatar
Markus Scheidgen committed
692
        return overall_report
693

Markus Scheidgen's avatar
Markus Scheidgen committed
694
695
    _client_lock = threading.Lock()

696
697
698
    def nomad(self, operation: str, *args, **kwargs) -> Any:
        """
        Calls nomad via the bravado client. It deals with a very busy nomad and catches,
Markus Scheidgen's avatar
Markus Scheidgen committed
699
700
        backsoff, and retries on gateway timouts. It also circumvents bravados/jsonschemas
        thread safety issues using a global lock on client usage.
701
702
703
704
705
706
707
708
709
710
711
712
713

        Arguments:
            operation: Comma separated string of api, endpoint, operation,
                e.g. 'uploads.get_upload'.
        """
        op_path = operation.split('.')
        op = self.client
        for op_path_segment in op_path:
            op = getattr(op, op_path_segment)

        sleep = utils.SleepTimeBackoff()
        while True:
            try:
Markus Scheidgen's avatar
Markus Scheidgen committed
714
                NomadCOEMigration._client_lock.acquire(blocking=True)
715
716
717
718
719
                return op(*args, **kwargs).response().result
            except HTTPGatewayTimeout:
                sleep()
            except Exception as e:
                raise e
Markus Scheidgen's avatar
Markus Scheidgen committed
720
721
            finally:
                NomadCOEMigration._client_lock.release()
722

723
724
    def migrate_package(
            self, package: Package, local: bool = False,
725
            delete_local: bool = False, delete_failed: str = '') -> 'Report':
726
727
        """ Migrates the given package. For other params see :func:`migrate`. """

728
729
730
731
732
733
        source_upload_id = package.upload_id
        package_id = package.package_id

        logger = self.logger.bind(package_id=package_id, source_upload_id=source_upload_id)
        logger.debug('start to process package')

734
735
        report = Report()
        report.total_packages += 1
736
737
738

        # upload and process the upload file
        from nomad.client import stream_upload_with_client
739
        created_tmp_package_upload_file = None
740
741
742
        with utils.timer(logger, 'upload completed'):
            try:
                if local:
743
744
745
                    upload_filepath, created = package.create_package_upload_file()
                    if created:
                        created_tmp_package_upload_file = upload_filepath
746
                    self.logger.debug('created package upload file')
747
748
                    upload = self.nomad(
                        'uploads.upload', name=package_id, local_path=upload_filepath)
749
750
751
752
753
754
                else:
                    upload_f = package.open_package_upload_file()
                    self.logger.debug('opened package upload file')
                    upload = stream_upload_with_client(self.client, upload_f, name=package_id)
            except Exception as e:
                self.logger.error('could not upload package', exc_info=e)
755
756
                report.failed_packages += 1
                return report
757
758
759
760

        logger = logger.bind(
            source_upload_id=source_upload_id, upload_id=upload.upload_id)

761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
        def delete_upload(reason: int):
            delete = \
                (reason == NO_PROCESSED_CALCS and 'N' in delete_failed) or \
                (reason == FAILED_PROCESSING and 'U' in delete_failed) or \
                (reason == FAILED_PUBLISH and 'P' in delete_failed)

            upload_to_delete = upload

            if delete:
                sleep = utils.SleepTimeBackoff()
                while upload.process_running:
                    try:
                        upload_to_delete = self.nomad(
                            'uploads.delete_upload', upload_id=upload_to_delete.upload_id)
                        sleep()
                    except HTTPNotFound:
                        # the proc upload will be deleted by the delete operation
                        break

                logger.info('deleted upload after migration failure')
781
782
783
784
            else:
                logger.warning(
                    'will keep upload after migration failure for debugging',
                    reason=reason, delete_failed=delete_failed)
785

786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
        # grab source calcs, while waiting for upload
        source_calcs = dict()
        surrogate_source_calc_with_metadata = None
        with utils.timer(logger, 'loaded source metadata'):
            for filenames_chunk in utils.chunks(package.filenames, 1000):
                for source_calc in SourceCalc.objects(
                        upload=source_upload_id, mainfile__in=filenames_chunk):

                    source_calc_with_metadata = CalcWithMetadata(**source_calc.metadata)
                    source_calc_with_metadata.pid = source_calc.pid
                    source_calc_with_metadata.mainfile = source_calc.mainfile
                    source_calcs[source_calc.mainfile] = (source_calc, source_calc_with_metadata)

                    # establish a surrogate for new calcs
                    if surrogate_source_calc_with_metadata is None:
                        surrogate_source_calc_with_metadata = \
                            self._surrogate_metadata(source_calc_with_metadata)
            report.total_source_calcs = len(source_calcs)
        # try to find a surrogate outside the package, if necessary
        if surrogate_source_calc_with_metadata is None:
            source_calc = SourceCalc.objects(upload=source_upload_id).first()
            if source_calc is not None:
                source_calc_with_metadata = CalcWithMetadata(**source_calc.metadata)
                surrogate_source_calc_with_metadata = \
                    self._surrogate_metadata(source_calc_with_metadata)

812
813
814
815
        # wait for complete upload
        with utils.timer(logger, 'upload processing completed'):
            sleep = utils.SleepTimeBackoff()
            while upload.tasks_running:
816
                upload = self.nomad('uploads.get_upload', upload_id=upload.upload_id)
817
818
819
820
                sleep()

        if upload.tasks_status == FAILURE:
            logger.error('failed to process upload', process_errors=upload.errors)
821
822
            report.failed_packages += 1
            report.missing_calcs += report.total_source_calcs
823
            delete_upload(FAILED_PROCESSING)
824
            return report
825
826
827
828
829
830
831
832
        else:
            report.total_calcs += upload.calcs.pagination.total

        calc_mainfiles = []
        upload_total_calcs = upload.calcs.pagination.total

        # check for processing errors
        with utils.timer(logger, 'checked upload processing'):
833
            per_page = 500
834
            for page in range(1, math.ceil(upload_total_calcs / per_page) + 1):
835
836
837
                upload = self.nomad(
                    'uploads.get_upload', upload_id=upload.upload_id, per_page=per_page,
                    page=page, order_by='mainfile')
838
839
840
841
842
843
844
845

                for calc_proc in upload.calcs.results:
                    calc_logger = logger.bind(
                        calc_id=calc_proc.calc_id,
                        mainfile=calc_proc.mainfile)

                    if calc_proc.tasks_status == SUCCESS:
                        calc_mainfiles.append(calc_proc.mainfile)
846
                    else:
847
                        report.failed_calcs += 1
848
                        calc_logger.info(
849
850
                            'could not process a calc', process_errors=calc_proc.errors)
                        continue
851

852
853
854
        # verify upload against source
        calcs_in_search = 0
        with utils.timer(logger, 'varyfied upload against source calcs'):
855
856
857
858
859
860
861
862
863
            scroll_id = 'first'
            while scroll_id is not None:
                scroll_args: Dict[str, Any] = dict(scroll=True)
                if scroll_id != 'first':
                    scroll_args['scroll_id'] = scroll_id

                search = self.nomad('repo.search', upload_id=upload.upload_id, **scroll_args)

                scroll_id = search.scroll_id
864

865
866
867
868
869
870
871
872
873
                for calc in search.results:
                    calcs_in_search += 1
                    source_calc, source_calc_with_metadata = source_calcs.get(
                        calc['mainfile'], (None, None))

                    if source_calc is not None:
                        report.migrated_calcs += 1

                        calc_logger = logger.bind(calc_id=calc['calc_id'], mainfile=calc['mainfile'])
874
875
876
877
                        try:
                            if not self._validate(calc, source_calc_with_metadata, calc_logger):
                                report.calcs_with_diffs += 1
                        except Exception as e:
Markus Scheidgen's avatar
Markus Scheidgen committed
878
                            calc_logger.warning('unexpected exception during validation', exc_info=e)
879
880
881
882
                            report.calcs_with_diffs += 1
                    else:
                        calc_logger.info('processed a calc that has no source')
                        report.new_calcs += 1
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
                        # guessing the metadata from other calcs in upload/package
                        if surrogate_source_calc_with_metadata is not None:
                            new_calc_with_metadata = CalcWithMetadata(**surrogate_source_calc_with_metadata.to_dict())
                            new_calc_with_metadata.mainfile = calc['mainfile']
                        else:
                            calc_logger.warning('could not determine any metadata for new calc')
                            create_time_epoch = os.path.getctime(package.upload_path)
                            new_calc_with_metadata = CalcWithMetadata(
                                upload_time=datetime.datetime.fromtimestamp(create_time_epoch),
                                with_embargo=package.restricted > 0,
                                comment=default_comment,
                                uploader=default_uploader,
                                mainfile=calc['mainfile'])
                            surrogate_source_calc_with_metadata = new_calc_with_metadata

                        source_calcs[calc['mainfile']] = (None, new_calc_with_metadata)
899

900
901
            if len(calc_mainfiles) != calcs_in_search:
                logger.error('missmatch between processed calcs and calcs found with search')
Markus Scheidgen's avatar
Markus Scheidgen committed
902

903
904
905
906
907
908
909
        # publish upload
        if len(calc_mainfiles) > 0:
            with utils.timer(logger, 'upload published'):
                upload_metadata = dict(with_embargo=(package.restricted > 0))
                upload_metadata['calculations'] = [
                    self._to_api_metadata(source_calc_with_metadata)
                    for _, source_calc_with_metadata in source_calcs.values()]
910

911
912
913
                upload = self.nomad(
                    'uploads.exec_upload_operation', upload_id=upload.upload_id,
                    payload=dict(operation='publish', metadata=upload_metadata))
914

915
916
917
                sleep = utils.SleepTimeBackoff()
                while upload.process_running:
                    try:
918
                        upload = self.nomad('uploads.get_upload', upload_id=upload.upload_id)
919
920
921
922
923
924
925
926
927
928
929
                        sleep()
                    except HTTPNotFound:
                        # the proc upload will be deleted by the publish operation
                        break

                if upload.tasks_status == FAILURE:
                    logger.error('could not publish upload', process_errors=upload.errors)
                    report.failed_calcs = report.total_calcs
                    report.migrated_calcs = 0
                    report.calcs_with_diffs = 0
                    report.new_calcs = 0
930
                    report.failed_packages += 1
931
932

                    delete_upload(FAILED_PUBLISH)
933
934
935
936
937
                else:
                    SourceCalc.objects(upload=source_upload_id, mainfile__in=calc_mainfiles) \
                        .update(migration_version=self.migration_version)
                    package.migration_version = self.migration_version
        else:
938
            delete_upload(NO_PROCESSED_CALCS)
939
            logger.info('no successful calcs, skip publish')
940

941
        report.missing_calcs = report.total_source_calcs - report.migrated_calcs
Markus Scheidgen's avatar
Markus Scheidgen committed
942
943
944
        package.report = report
        package.save()

945
        logger.info('migrated package', **report)
946

947
948
949
950
951
952
        if created_tmp_package_upload_file is not None and delete_local:
            try:
                os.remove(created_tmp_package_upload_file)
            except Exception as e:
                logger.error('could not remove tmp package upload file', exc_info=e)

953
        return report
954
955

    def _to_api_metadata(self, calc_with_metadata: CalcWithMetadata) -> dict:
Markus Scheidgen's avatar
Markus Scheidgen committed
956
        """ Transforms to a dict that fullfils the API's uploade metadata model. """
957

Markus Scheidgen's avatar
Markus Scheidgen committed
958
        return dict(
959
960
961
962
            _upload_time=calc_with_metadata.upload_time,
            _uploader=calc_with_metadata.uploader['id'],
            _pid=calc_with_metadata.pid,
            references=[ref['value'] for ref in calc_with_metadata.references],
Markus Scheidgen's avatar
Markus Scheidgen committed
963
964
965
            datasets=[dict(
                id=ds['id'],
                _doi=ds.get('doi', {'value': None})['value'],
966
967
968
969
970
971
                _name=ds.get('name', None)) for ds in calc_with_metadata.datasets],
            mainfile=calc_with_metadata.mainfile,
            with_embargo=calc_with_metadata.with_embargo,
            comment=calc_with_metadata.comment,
            coauthors=list(int(user['id']) for user in calc_with_metadata.coauthors),
            shared_with=list(int(user['id']) for user in calc_with_metadata.shared_with)
Markus Scheidgen's avatar
Markus Scheidgen committed
972
973
        )

974
    def index(self, *args, **kwargs):
975
        """ see :func:`SourceCalc.index` """
976
        return SourceCalc.index(self.source, *args, **kwargs)
977
978
979
980

    def package(self, *args, **kwargs):
        """ see :func:`Package.add` """
        return Package.index(*args, **kwargs)
981
982
983


class Report(utils.POPO):
Markus Scheidgen's avatar
Markus Scheidgen committed
984
    def __init__(self, *args, **kwargs):
985
986
        self.total_packages = 0
        self.failed_packages = 0
Markus Scheidgen's avatar
Markus Scheidgen committed
987
        self.skipped_packages = 0
988
989
990
991
992
993
994
995
        self.total_calcs = 0  # the calcs that have been found by the target
        self.total_source_calcs = 0  # the calcs in the source index
        self.failed_calcs = 0  # the calcs found b the target that could not be processed/published
        self.migrated_calcs = 0   # the calcs from the source, successfully added to the target
        self.calcs_with_diffs = 0  # the calcs from the source, successfully added to the target with different metadata
        self.new_calcs = 0  # the calcs successfully added to the target that were not found in the source
        self.missing_calcs = 0  # the calcs in the source, that could not be added to the target due to failure or not founding the calc

Markus Scheidgen's avatar
Markus Scheidgen committed
996
997
        super().__init__(*args, **kwargs)

998
999
1000
    def add(self, other: 'Report') -> None:
        for key, value in other.items():
            self[key] = self.get(key, 0) + value
For faster browsing, not all history is shown. View entire blame