migration.py 18.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module contains functions to read data from NOMAD coe, external sources,
other/older nomad@FAIRDI instances to mass upload it to a new nomad@FAIRDI instance.
18
19
20

.. autoclass:: NomadCOEMigration
.. autoclass:: SourceCalc
21
22
"""

Markus Scheidgen's avatar
Markus Scheidgen committed
23
from typing import Generator, Tuple, List, Iterable
24
import os.path
25
import zipstream
26
import zipfile
27
import math
28
from mongoengine import Document, IntField, StringField, DictField
29
30
from werkzeug.contrib.iterio import IterIO
import time
31
from bravado.exception import HTTPNotFound, HTTPBadRequest
32

33
34
from nomad import utils, infrastructure
from nomad.coe_repo import User, Calc, LoginException
35
36
from nomad.datamodel import CalcWithMetadata
from nomad.processing import FAILURE, SUCCESS
37
38


39
40
41
42
default_pid_prefix = 7000000
""" The default pid prefix for new non migrated calcualtions """


43
class SourceCalc(Document):
44
    """
45
    Mongo document used as a calculation, upload, and metadata db and index
46
47
48
49
50
    build from a given source db. Each :class:`SourceCacl` entry relates
    a pid, mainfile, upload "id" with each other for a corressponding calculation.
    It might alos contain the user metadata. The uploads are "id"ed via the
    specific path segment that identifies an upload on the CoE repo FS(s) without
    any prefixes (e.g. $EXTRACTED, /data/upload, etc.)
51
52
53
54
55
56
57
58
    """
    pid = IntField(primary_key=True)
    mainfile = StringField()
    upload = StringField()
    metadata = DictField()

    extracted_prefix = '$EXTRACTED/'
    sites = ['/data/nomad/extracted/', '/nomad/repository/extracted/']
59
60
    prefixes = [extracted_prefix] + sites

61
    meta = dict(indexes=['upload'])
62

63
    _dataset_cache: dict = {}
64
65

    @staticmethod
66
67
68
69
    def index(source, drop: bool = False, with_metadata: bool = True, per_query: int = 100) \
            -> Generator[Tuple['SourceCalc', int], None, None]:
        """
        Creates a collection of :class:`SourceCalc` documents that represent source repo
70
        db entries.
71
72
73

        Arguments:
            source: The source db sql alchemy session
74
75
76
77
78
79
80
            drop: True to drop and create a new collection, update the existing otherwise,
                default is False.
            with_metadata: True to also grab all metadata and store it, default is True.
            per_query: The implementation tries to grab almost all data with a heavely joined
                query on the CoE snoflake/star shaped schema.
                The query cannot ask for the whole db at once: choose how many calculations
                should be read at a time to optimize for your application.
81
82

        Returns:
83
            yields tuples (:class:`SourceCalc`, #calcs_total[incl. datasets])
84
        """
85
        logger = utils.get_logger(__name__)
86
87
88
        if drop:
            SourceCalc.drop_collection()

89
        last_source_calc = SourceCalc.objects().order_by('-pid').first()
90
91
        start_pid = last_source_calc.pid if last_source_calc is not None else 0
        source_query = source.query(Calc)
92
        total = source_query.count() - SourceCalc.objects.count()
93
94

        while True:
95
96
            query_timer = utils.timer(logger, 'query source db')
            query_timer.__enter__()  # pylint: disable=E1101
Markus Scheidgen's avatar
Markus Scheidgen committed
97
            calcs: Iterable[Calc] = source_query \
98
99
100
                .filter(Calc.coe_calc_id > start_pid) \
                .order_by(Calc.coe_calc_id) \
                .limit(per_query)
Markus Scheidgen's avatar
Markus Scheidgen committed
101

102
103
            source_calcs = []
            for calc in calcs:
104
                query_timer.__exit__(None, None, None)  # pylint: disable=E1101
105
                try:
Markus Scheidgen's avatar
Markus Scheidgen committed
106
107
                    filenames = calc.files
                    if filenames is None or len(filenames) == 0:
108
109
110
                        continue  # dataset case

                    filename = filenames[0]
111
112
113
                    if len(filenames) == 1 and (filename.endswith('.tgz') or filename.endswith('.zip')):
                        continue  # also a dataset, some datasets have a downloadable archive

114
115
116
117
118
119
120
121
                    for prefix in SourceCalc.prefixes:
                        filename = filename.replace(prefix, '')
                    segments = [file.strip('\\') for file in filename.split('/')]

                    source_calc = SourceCalc(pid=calc.pid)
                    source_calc.upload = segments[0]
                    source_calc.mainfile = os.path.join(*segments[1:])
                    if with_metadata:
Markus Scheidgen's avatar
Markus Scheidgen committed
122
                        source_calc.metadata = calc.to_calc_with_metadata().__dict__
123
124
125
126
127
128
                    source_calcs.append(source_calc)
                    start_pid = source_calc.pid

                    yield source_calc, total
                except Exception as e:
                    logger.error('could not index', pid=calc.pid, exc_info=e)
129

130
131
132
            if len(source_calcs) == 0:
                break
            else:
133
134
                with utils.timer(logger, 'write index'):
                    SourceCalc.objects.insert(source_calcs)
135
136
137
138
139
140
141


class NomadCOEMigration:
    """
    Drives a migration from the NOMAD coe repository db to nomad@FAIRDI. It is assumed
    that this class is never used on the worker or api service. It assumes the
    default coe repo connection as a connection to the source repository db.
142
143

    Attributes:
144
        source: SQLAlchemy session for the source NOMAD coe repository db.
145
146
147
148
149

    Arguments:
        sites: Directories that might contain uploads to migrate. Use to override defaults.
        pid_prefix: All PIDs for previously unknown calculations will get a PID higher
            than that. Use to override default.
150
    """
151

152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
    default_sites = [
        '/nomad/repository/data/uploads',
        '/nomad/repository/data/extracted',
        '/data/nomad/uploaded/',
        '/data/nomad/extracted/']

    default_pid_prefix = int(1e7)

    archive_filename = 'archive.tar.gz'
    """ The standard name for tarred uploads in the CoE repository. """

    def __init__(
            self,
            sites: List[str] = default_sites,
            pid_prefix: int = default_pid_prefix) -> None:

        self.sites, self.pid_prefix = sites, pid_prefix
169
        self.logger = utils.get_logger(__name__)
170
171
172
173
174
175
176
177
178
179
        self._client = None
        self.source = infrastructure.repository_db

    @property
    def client(self):
        if self._client is None:
            from nomad.client import create_client
            self._client = create_client()

        return self._client
180

181
182
    def copy_users(self):
        """ Copy all users. """
183
        for source_user in self.source.query(User).all():
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
            if source_user.user_id <= 2:
                # skip first two users to keep example users
                # they probably are either already the example users, or [root, Evgeny]
                continue

            create_user_payload = dict(
                user_id=source_user.user_id,
                email=source_user.email,
                first_name=source_user.first_name,
                last_name=source_user.last_name,
                password=source_user.password
            )

            try:
                create_user_payload.update(token=source_user.token)
            except LoginException:
                pass

            if source_user.affiliation is not None:
                create_user_payload.update(affiliation=dict(
                    name=source_user.affiliation.name,
                    address=source_user.affiliation.address))

            try:
                self.client.auth.create_user(payload=create_user_payload).response()
            except HTTPBadRequest as e:
                self.logger.error('could not create user due to bad data', exc_info=e, user_id=source_user.user_id)
211

Markus Scheidgen's avatar
Markus Scheidgen committed
212
213
214
215
216
217
218
219
220
221
    def _to_comparable_list(self, list):
        for item in list:
            if isinstance(item, dict):
                for key in item.keys():
                    if key.endswith('id'):
                        yield item.get(key)
            else:
                yield item

    def _validate(self, upload_id: str, calc_id: str, source_calc: CalcWithMetadata, logger) -> bool:
222
223
224
225
226
227
228
229
230
231
232
        """
        Validates the given processed calculation, assuming that the data in the given
        source_calc is correct.

        Returns:
            False, if the calculation differs from the source calc.
        """
        repo_calc = self.client.repo.get_repo_calc(
            upload_id=upload_id, calc_id=calc_id).response().result

        is_valid = True
Markus Scheidgen's avatar
Markus Scheidgen committed
233
        for key, target_value in repo_calc.items():
234
            if key in ['calc_id', 'upload_id', 'files', 'calc_hash']:
235
236
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
237
            source_value = getattr(source_calc, key, None)
238
239
240
241
242
243
244
245
246
247
248
249

            def report_mismatch():
                logger.info(
                    'source target missmatch', quantity=key,
                    source_value=source_value, target_value=target_value)

            if (source_value is None or target_value is None) and source_value != target_value:
                report_mismatch()
                is_valid = False
                continue

            if isinstance(target_value, list):
Markus Scheidgen's avatar
Markus Scheidgen committed
250
251
252
                source_list = list(self._to_comparable_list(source_value))
                target_list = list(self._to_comparable_list(target_value))
                if len(set(source_list).intersection(target_list)) != len(target_list):
253
254
255
256
257
258
259
260
261
262
263
264
265
266
                    report_mismatch()
                    is_valid = False
                continue

            if isinstance(source_value, str):
                source_value = source_value.lower()
                target_value = str(target_value).lower()

            if source_value != target_value:
                report_mismatch()
                is_valid = False

        return is_valid

267
    def migrate(self, *args, prefix: int = default_pid_prefix):
268
269
270
271
272
273
274
275
276
277
278
279
280
        """
        Migrate the given uploads.

        It takes upload 'id's as args. Alternatively takes absolute paths to uploads.
        It tries to be as flexible as possible with those 'id's: looking at all
        configured sites, dealing with extracted and tarred/zipped uploads, dealing
        with paths to files and directories.

        Requires a build :func:`index` to look for existing data in the source db. This
        will be used to add user (and other, PID, ...) metadata and validate calculations.

        Uses PIDs of identified old calculations. Will create new PIDs for previously
        unknown uploads. New PIDs will be choosed from a `prefix++` range of ints.
281

282
283
284
        Arguments:
            prefix: The PID prefix that should be used for new non migrated calcualtions.

285
        Returns: Yields a dictionary with status and statistics for each given upload.
286
        """
287
288
289
        if prefix is not None:
            self.logger.info('set pid prefix', pid_prefix=prefix)
            self.client.admin.exec_pidprefix_command(payload=dict(prefix=prefix)).response()
290
291
292

        upload_specs = args
        for upload_spec in upload_specs:
293
294
295
296
297
            # identify upload
            upload_path = None
            abs_upload_path = os.path.abspath(upload_spec)
            if os.path.exists(abs_upload_path):
                upload_path = upload_spec
298
299
300
301
302
303
304
305
            else:
                for site in self.sites:
                    potential_upload_path = os.path.join(site, upload_spec)
                    if os.path.exists(potential_upload_path):
                        upload_path = potential_upload_path
                        break

            if upload_path is None:
306
307
308
                error = 'upload does not exist'
                self.logger.error(error, upload_spec=upload_spec)
                yield dict(status=FAILURE, error=error)
309
310
                continue

311
            # prepare the upload by determining/creating an upload file, name, source upload id
312
313
            if os.path.isfile(upload_path):
                upload_archive_f = open(upload_path, 'rb')
314
                source_upload_id = os.path.split(os.path.split(upload_path)[0])[1]
315
316
                upload_name = os.path.basename(upload_path)
            else:
Markus Scheidgen's avatar
Markus Scheidgen committed
317
318
                potential_upload_archive = os.path.join(
                    upload_path, NomadCOEMigration.archive_filename)
319
320
                if os.path.isfile(potential_upload_archive):
                    upload_archive_f = open(potential_upload_archive, 'rb')
321
                    source_upload_id = os.path.split(os.path.split(potential_upload_archive)[0])[1]
Markus Scheidgen's avatar
Markus Scheidgen committed
322
                    upload_name = os.path.basename(potential_upload_archive)
323
                else:
324
325
326
327
328
329
330
331
332
333
334
335
336
337
                    source_upload_id = os.path.split(upload_path)[1]
                    zip_file = zipstream.ZipFile()
                    path_prefix = len(upload_path) + 1
                    for root, _, files in os.walk(upload_path):
                        for file in files:
                            zip_file.write(
                                os.path.join(root, file),
                                os.path.join(root[path_prefix:], file),
                                zipfile.ZIP_DEFLATED)
                    zip_file.write(upload_path)
                    upload_archive_f = IterIO(zip_file)
                    upload_name = '%s.zip' % source_upload_id

            # upload and process the upload file
Markus Scheidgen's avatar
Markus Scheidgen committed
338
339
            upload = self.client.uploads.upload(
                file=upload_archive_f, name=upload_name).response().result
340
            upload_archive_f.close()
341
342

            upload_logger = self.logger.bind(
343
                source_upload_id=source_upload_id, upload_id=upload.upload_id)
344

345
            # grab source metadata
346
347
348
            upload_metadata_calcs = list()
            metadata_dict = dict()
            upload_metadata = dict(calculations=upload_metadata_calcs)
349
            for source_calc in SourceCalc.objects(upload=source_upload_id):
350
                source_metadata = CalcWithMetadata(**source_calc.metadata)
Markus Scheidgen's avatar
Markus Scheidgen committed
351
                source_metadata.upload_id = upload.upload_id
352
                source_metadata.mainfile = source_calc.mainfile
353
                source_metadata.pid = source_calc.pid
Markus Scheidgen's avatar
Markus Scheidgen committed
354
                source_metadata.__migrated = False
355
                upload_metadata_calcs.append(source_metadata)
356
357
                metadata_dict[source_calc.mainfile] = source_metadata

358
359
            report = utils.POPO()
            report.total_source_calcs = len(metadata_dict)
Markus Scheidgen's avatar
Markus Scheidgen committed
360
            report.failed_calcs = 0
361
362
363
364
365
            report.migrated_calcs = 0
            report.calcs_with_diffs = 0
            report.new_calcs = 0
            report.missing_calcs = 0

Markus Scheidgen's avatar
Markus Scheidgen committed
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
            # wait for complete upload
            while upload.tasks_running:
                upload = self.client.uploads.get_upload(upload_id=upload.upload_id).response().result
                time.sleep(0.1)

            if upload.tasks_status == FAILURE:
                error = 'failed to process upload'
                report.missing_calcs = report.total_source_calcs
                report.total_calcs = 0
                upload_logger.error(error, process_errors=upload.errors)
                yield report
                continue
            else:
                report.total_calcs = upload.calcs.pagination.total

            # verify upload
382
383
            for page in range(1, math.ceil(report.total_calcs / 100) + 1):
                upload = self.client.uploads.get_upload(
384
                    upload_id=upload.upload_id, per_page=100, page=page,
385
                    order_by='mainfile').response().result
386
387
388
389
390
391
392
393

                for calc_proc in upload.calcs.results:
                    calc_logger = upload_logger.bind(
                        calc_id=calc_proc.calc_id,
                        mainfile=calc_proc.mainfile)

                    source_calc = metadata_dict.get(calc_proc.mainfile, None)
                    if calc_proc.tasks_status == SUCCESS:
394
395
396
397
398
399
400
                        if source_calc is None:
                            calc_logger.info('processed a calc that has no source')
                            report.new_calcs += 1
                            continue
                        else:
                            source_calc.__migrated = True
                            report.migrated_calcs += 1
401

402
403
404
                        if not self._validate(
                                upload.upload_id, calc_proc.calc_id, source_calc, calc_logger):
                            report.calcs_with_diffs += 1
405
                    else:
Markus Scheidgen's avatar
Markus Scheidgen committed
406
407
408
                        report.failed_calcs += 1
                        calc_logger.error(
                            'could not process a calc', process_errors=calc_proc.errors)
409
410
411
                        continue

            for source_calc in upload_metadata_calcs:
Markus Scheidgen's avatar
Markus Scheidgen committed
412
                if source_calc.__migrated is False:
413
                    report.missing_calcs += 1
Markus Scheidgen's avatar
Markus Scheidgen committed
414
415
416
                    upload_logger.info(
                        'no match or processed calc for source calc',
                        mainfile=source_calc.mainfile)
417

418
            # publish upload
419
            upload_metadata['calculations'] = [
Markus Scheidgen's avatar
Markus Scheidgen committed
420
421
                self._to_api_metadata(calc)
                for calc in upload_metadata['calculations'] if calc.__migrated]
422

Markus Scheidgen's avatar
Markus Scheidgen committed
423
            if report.total_calcs > report.failed_calcs:
424
                upload = self.client.uploads.exec_upload_operation(
425
                    upload_id=upload.upload_id,
426
                    payload=dict(operation='publish', metadata=upload_metadata)
427
                ).response().result
428
429

                while upload.process_running:
430
                    try:
431
432
                        upload = self.client.uploads.get_upload(
                            upload_id=upload.upload_id).response().result
433
434
                        time.sleep(0.1)
                    except HTTPNotFound:
435
                        # the proc upload will be deleted by the publish operation
436
                        break
437
438

            # report
439
440
            upload_logger.info('migrated upload', **report)
            yield report
441

Markus Scheidgen's avatar
Markus Scheidgen committed
442
443
444
445
    def _to_api_metadata(self, source: CalcWithMetadata) -> dict:
        """ Transforms to a dict that fullfils the API's uploade metadata model. """
        return dict(
            _upload_time=source.upload_time,
446
            _uploader=source.uploader['id'],
Markus Scheidgen's avatar
Markus Scheidgen committed
447
448
449
450
451
452
453
454
455
            _pid=source.pid,
            references=[ref['value'] for ref in source.references],
            datasets=[dict(
                id=ds['id'],
                _doi=ds.get('doi', {'value': None})['value'],
                _name=ds.get('name', None)) for ds in source.datasets],
            mainfile=source.mainfile,
            with_embargo=source.with_embargo,
            comment=source.comment,
456
457
            coauthors=list(user['id'] for user in source.coauthors),
            shared_with=list(user['id'] for user in source.shared_with)
Markus Scheidgen's avatar
Markus Scheidgen committed
458
459
        )

460
    def index(self, *args, **kwargs):
461
        """ see :func:`SourceCalc.index` """
462
        return SourceCalc.index(self.source, *args, **kwargs)