data.py 63.1 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
Markus Scheidgen's avatar
Markus Scheidgen committed
5
6
7
8
9
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
Markus Scheidgen's avatar
Markus Scheidgen committed
10
#     http://www.apache.org/licenses/LICENSE-2.0
Markus Scheidgen's avatar
Markus Scheidgen committed
11
12
#
# Unless required by applicable law or agreed to in writing, software
Markus Scheidgen's avatar
Markus Scheidgen committed
13
# distributed under the License is distributed on an "AS IS" BASIS,
Markus Scheidgen's avatar
Markus Scheidgen committed
14
15
16
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Markus Scheidgen's avatar
Markus Scheidgen committed
17
#
Markus Scheidgen's avatar
Markus Scheidgen committed
18

19
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
20
21
22
23
24
25
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
Markus Scheidgen's avatar
Markus Scheidgen committed
26

Markus Scheidgen's avatar
Markus Scheidgen committed
27
.. autoclass:: Upload
Markus Scheidgen's avatar
Markus Scheidgen committed
28

29
'''
30
31

from typing import cast, List, Any, Tuple, Iterator, Dict, cast, Iterable
32
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField, ListField
Markus Scheidgen's avatar
Markus Scheidgen committed
33
import logging
34
from structlog import wrap_logger
35
from contextlib import contextmanager
36
import os.path
37
38
from datetime import datetime
from pymongo import UpdateOne
39
import hashlib
40
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
41
42
import yaml
import json
43
from functools import lru_cache
44
45
import urllib.parse
import requests
Markus Scheidgen's avatar
Markus Scheidgen committed
46

47
from nomad import utils, config, infrastructure, search, datamodel
Markus Scheidgen's avatar
Markus Scheidgen committed
48
49
50
from nomad.files import (
    PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles,
    PublicUploadFiles, StagingUploadFiles)
51
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
Markus Scheidgen's avatar
Markus Scheidgen committed
52
from nomad.parsing.parsers import parser_dict, match_parser
Markus Scheidgen's avatar
Markus Scheidgen committed
53
from nomad.normalizing import normalizers
54
from nomad.datamodel import EntryArchive, EditableUserMetadata, OasisMetadata
55
from nomad.archive import query_archive, write_partial_archive_to_mongo, delete_partial_archives_from_mongo
Markus Scheidgen's avatar
Markus Scheidgen committed
56
from nomad.datamodel.encyclopedia import EncyclopediaMetadata
Markus Scheidgen's avatar
Markus Scheidgen committed
57
58


59
60
61
62
section_metadata = datamodel.EntryArchive.section_metadata.name
section_workflow = datamodel.EntryArchive.section_workflow.name


Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
63
64
_editable_metadata = {
    quantity.name: quantity for quantity in EditableUserMetadata.m_def.definitions}
65
66
_oasis_metadata = {
    quantity.name: quantity for quantity in OasisMetadata.m_def.definitions}
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
67
68


69
70
71
72
73
74
75
def _pack_log_event(logger, method_name, event_dict):
    try:
        log_data = dict(event_dict)
        log_data.update(**{
            key: value
            for key, value in getattr(logger, '_context', {}).items()
            if key not in ['service', 'release', 'upload_id', 'calc_id', 'mainfile', 'process_status']})
76
        log_data.update(logger=logger.name)
77
78
79
80
81
82
83
84
85
86
87

        return log_data
    except Exception:
        # raising an exception would cause an indefinite loop
        return event_dict


_log_processors = [
    StackInfoRenderer(),
    _pack_log_event,
    format_exc_info,
88
    TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)]
89
90


91
92
93
94
95
def _normalize_oasis_upload_metadata(upload_id, metadata):
    # This is overwritten by the tests to do necessary id manipulations
    return upload_id, metadata


Markus Scheidgen's avatar
Markus Scheidgen committed
96
class Calc(Proc):
97
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
98
99
100
101
102
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

103
104
    The attribute list, does not include the various metadata properties generated
    while parsing, including ``code_name``, ``code_version``, etc.
Markus Scheidgen's avatar
Markus Scheidgen committed
105
106

    Attributes:
107
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
108
109
110
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
111

112
113
        metadata: the metadata record wit calc and user metadata, see :class:`datamodel.EntryMetadata`
    '''
114
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
115
116
117
118
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

119
120
    metadata = DictField()

Markus Scheidgen's avatar
Markus Scheidgen committed
121
    meta: Any = {
122
        'strict': False,
Markus Scheidgen's avatar
Markus Scheidgen committed
123
        'indexes': [
124
            'upload_id',
125
            'parser',
126
127
128
            ('upload_id', 'mainfile'),
            ('upload_id', 'parser'),
            ('upload_id', 'tasks_status'),
129
            ('upload_id', 'process_status'),
130
            ('upload_id', 'metadata.nomad_version'),
131
132
            'metadata.processed',
            'metadata.last_processing',
133
            'metadata.published',
134
            'metadata.datasets',
Markus Scheidgen's avatar
Markus Scheidgen committed
135
            'metadata.pid'
Markus Scheidgen's avatar
Markus Scheidgen committed
136
137
138
139
140
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
141
        self._parser_results: EntryArchive = None
142
143
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
144
        self._calc_proc_logs: List[Any] = None
145

146
        self._entry_metadata = None
147

Markus Scheidgen's avatar
Markus Scheidgen committed
148
149
    @classmethod
    def get(cls, id):
150
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
151

Markus Scheidgen's avatar
Markus Scheidgen committed
152
    @property
153
154
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
155

156
157
158
159
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
160
            self._upload.worker_hostname = self.worker_hostname
161
162
        return self._upload

163
164
165
166
167
168
169
170
    def apply_entry_metadata(self, entry_metadata: datamodel.EntryMetadata):
        self.metadata = entry_metadata.m_to_dict(
            include_defaults=True,
            categories=[datamodel.MongoMetadata])  # TODO use embedded doc?

    def create_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
171
        processing object, not necessarily the user metadata nor the metadata from
172
173
174
        the archive.
        '''
        entry_metadata = datamodel.EntryMetadata()
175
        if self.parser is not None:
176
177
178
            parser = parser_dict[self.parser]
            if parser.domain:
                entry_metadata.domain = parser_dict[self.parser].domain
179
180
181
        entry_metadata.upload_id = self.upload_id
        entry_metadata.calc_id = self.calc_id
        entry_metadata.mainfile = self.mainfile
182
183
        entry_metadata.nomad_version = config.meta.version
        entry_metadata.nomad_commit = config.meta.commit
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
        entry_metadata.uploader = self.upload.user_id
        entry_metadata.upload_time = self.upload.upload_time
        entry_metadata.upload_name = self.upload.name

        return entry_metadata

    def entry_metadata(self, upload_files: UploadFiles) -> datamodel.EntryMetadata:
        '''
        Returns a complete set of :class:`nomad.datamodel.EntryMetadata` including
        the user metadata and metadata from the archive.

        Arguments:
            upload_files:
                The :class:`nomad.files.UploadFiles` instance to read the archive from.
            cache:
                A boolean that indicates if the archive file should be left unclosed,
                e.g. if this method is called for many entries of the same upload.
        '''
        archive = upload_files.read_archive(self.calc_id)
203
        try:
204
205
206
207
208
209
210
211
212
            # instead of loading the whole archive, it should be enough to load the
            # parts that are referenced by section_metadata/EntryMetadata
            # TODO somehow it should determine which root setions too load from the metainfo
            # or configuration
            calc_archive = archive[self.calc_id]
            entry_archive_dict = {section_metadata: calc_archive[section_metadata].to_dict()}
            if section_workflow in calc_archive:
                entry_archive_dict[section_workflow] = calc_archive[section_workflow].to_dict()
            entry_metadata = datamodel.EntryArchive.m_from_dict(entry_archive_dict)[section_metadata]
213
214
215
216
217
218
219
220
221

        except KeyError:
            # Due hard processing failures, it might be possible that an entry might not
            # have an archive
            if self._entry_metadata is not None:
                entry_metadata = self._entry_metadata

            else:
                entry_metadata = self.create_metadata()
222
223
224
225
226
227
228
229

        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

    def user_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
230
        processing object and the user metadata, not necessarily the metadata from
231
232
233
234
235
236
237
        the archive.
        '''
        entry_metadata = self.create_metadata()
        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

238
239
240
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
241
242
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
243
244
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
245
    def get_logger(self, **kwargs):
246
        '''
247
248
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
249
        '''
250
251
252
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
253

254
255
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []
256

257
258
        def save_to_calc_log(logger, method_name, event_dict):
            try:
259
260
                # sanitize the event_dict, because all kinds of values might have been added
                dump_dict = {key: str(value) for key, value in event_dict.items()}
261
262
                dump_dict.update(level=method_name.upper())
                self._calc_proc_logs.append(dump_dict)
263

264
265
266
            except Exception:
                # Exceptions here will cause indefinite loop
                pass
267

268
            return event_dict
269

270
        return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
271

272
    @process
273
    def re_process_calc(self):
274
        '''
275
276
277
        Processes a calculation again. This means there is already metadata and
        instead of creating it initially, we are just updating the existing
        records.
278
        '''
279
        parser = match_parser(self.upload_files.raw_file_object(self.mainfile).os_path, strict=False)
280
        logger = self.get_logger()
281
282
283

        if parser is None and not config.reprocess_unmatched:
            self.errors = ['no parser matches during re-process, will not re-process this calc']
284

285
286
287
288
289
290
291
            try:
                upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
                with upload_files.read_archive(self.calc_id) as archive:
                    self.upload_files.write_archive(self.calc_id, archive[self.calc_id].to_dict())

            except Exception as e:
                logger.error('could not copy archive for non matching, non reprocessed entry', exc_info=e)
292
                raise e
293

294
295
296
297
298
            # mock the steps of actual processing
            self._continue_with('parsing')
            self._continue_with('normalizing')
            self._continue_with('archiving')
            self._complete()
299
300
            return

301
        if parser is None:
302
303
304
305
            self.get_logger().warn('no parser matches during re-process, use the old parser')
            self.warnings = ['no matching parser found during re-processing']

        elif self.parser != parser.name:
306
307
308
309
310
311
312
313
314
            if parser_dict[self.parser].name == parser.name:
                # parser was just renamed
                self.parser = parser.name

            else:
                self.parser = parser.name
                logger.info(
                    'different parser matches during re-process, use new parser',
                    parser=parser.name)
315

316
        try:
317
318
            self._entry_metadata = self.user_metadata()
            self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
319
320
            self._entry_metadata.nomad_version = config.meta.version
            self._entry_metadata.nomad_commit = config.meta.commit
321
322
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)
323
324
325
326
327
328
329

            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
            # close loghandler that was not closed due to failures
            try:
Markus Scheidgen's avatar
Markus Scheidgen committed
330
                if self._parser_results and self._parser_results.m_resource:
331
                    self._parser_results.section_metadata = None
Markus Scheidgen's avatar
Markus Scheidgen committed
332
                    self._parser_results.m_resource.unload()
333
            except Exception as e:
334
                logger.error('could not unload processing results', exc_info=e)
335

336
337
338
339
340
    def _setup_fallback_metadata(self):
        self._entry_metadata = self.create_metadata()
        self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
        self._entry_metadata.last_processing = datetime.utcnow()
        self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)
341
        self._entry_metadata.parser_name = self.parser
342

343
    @process
344
    def process_calc(self):
345
        '''
346
347
        Processes a new calculation that has no prior records in the mongo, elastic,
        or filesystem storage. It will create an initial set of (user) metadata.
348
        '''
349
        logger = self.get_logger()
350
        if self.upload is None:
351
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
352
353

        try:
354
355
            # save preliminary minimum calc metadata in case processing fails
            # successful processing will replace it with the actual metadata
356
            self._setup_fallback_metadata()
357
358

            if len(self._entry_metadata.files) >= config.auxfile_cutoff:
359
360
361
362
                self.warning(
                    'This calc has many aux files in its directory. '
                    'Have you placed many calculations in the same directory?')

Markus Scheidgen's avatar
Markus Scheidgen committed
363
364
365
366
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
367
            # close loghandler that was not closed due to failures
368
            try:
Markus Scheidgen's avatar
Markus Scheidgen committed
369
                if self._parser_results and self._parser_results.m_resource:
370
                    self._parser_results.section_metadata = None
Markus Scheidgen's avatar
Markus Scheidgen committed
371
                    self._parser_results.m_resource.unload()
372
            except Exception as e:
373
                logger.error('could unload processing results', exc_info=e)
374

375
    def on_fail(self):
376
377
378
        # in case of failure, index a minimum set of metadata and mark
        # processing failure
        try:
379
380
381
            if self._entry_metadata is None:
                self._setup_fallback_metadata()

382
            self._entry_metadata.processed = False
383

384
385
386
387
388
389
390
391
392
393
394
395
            try:
                self.apply_entry_metadata(self._entry_metadata)
            except Exception as e:
                self.get_logger().error(
                    'could not apply entry metadata to entry', exc_info=e)

            try:
                self._entry_metadata.apply_domain_metadata(self._parser_results)
            except Exception as e:
                self.get_logger().error(
                    'could not apply domain metadata to entry', exc_info=e)

396
            self._entry_metadata.a_elastic.index()
397
        except Exception as e:
398
399
            self.get_logger().error(
                'could not index after processing failure', exc_info=e)
400

401
        try:
402
            self.write_archive(self._parser_results)
403
        except Exception as e:
404
405
            self.get_logger().error(
                'could not write archive after processing failure', exc_info=e)
406

407
408
409
410
411
412
413
414
    def on_process_complete(self, process_name):
        # the save might be necessary to correctly read the join condition from the db
        self.save()
        # in case of error, the process_name might be unknown
        if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None:
            self.upload.reload()
            self.upload.check_join()

Markus Scheidgen's avatar
Markus Scheidgen committed
415
416
    @task
    def parsing(self):
417
        ''' The *task* that encapsulates all parsing related actions. '''
418
        context = dict(parser=self.parser, step=self.parser)
419
        logger = self.get_logger(**context)
420
        parser = parser_dict[self.parser]
421
        self._entry_metadata.parser_name = self.parser
Markus Scheidgen's avatar
Markus Scheidgen committed
422

423
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
424
            try:
425
426
427
                self._parser_results = EntryArchive()
                # allow parsers to read/write metadata
                self._parser_results.m_add_sub_section(EntryArchive.section_metadata, self._entry_metadata)
Markus Scheidgen's avatar
Markus Scheidgen committed
428
429
430
                parser.parse(
                    self.upload_files.raw_file_object(self.mainfile).os_path,
                    self._parser_results, logger=logger)
431

432
            except Exception as e:
433
                self.fail('parser failed with exception', exc_info=e, error=str(e), **context)
434
                return
435
            except SystemExit:
436
                self.fail('parser raised system exit', error='system exit', **context)
437
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
438

439
440
441
442
443
444
445
446
447
448
    def process_phonon(self):
        """Function that is run for phonon calculation before cleanup.
        This task is run by the celery process that is calling the join for the
        upload.

        This function re-opens the Archive for this calculation to add method
        information from another referenced archive. Updates the method
        information in section_encyclopedia as well as the DFT domain metadata.
        """
        try:
449
            logger = self.get_logger(parser=self.parser, step=self.parser)
450
451
452
453
454
455

            # Open the archive of the phonon calculation.
            upload_files = StagingUploadFiles(self.upload_id, is_authorized=lambda: True)
            with upload_files.read_archive(self.calc_id) as archive:
                arch = query_archive(archive, {self.calc_id: self.calc_id})[self.calc_id]
                phonon_archive = EntryArchive.m_from_dict(arch)
Markus Scheidgen's avatar
Markus Scheidgen committed
456
            self._entry_metadata = phonon_archive.section_metadata
457
            self._calc_proc_logs = phonon_archive.processing_logs
458

Markus Scheidgen's avatar
Markus Scheidgen committed
459
460
            # Re-create the parse results
            self._parser_results = phonon_archive
Markus Scheidgen's avatar
Markus Scheidgen committed
461

462
463
464
            # Read in the first referenced calculation. The reference is given as
            # an absolute path which needs to be converted into a path that is
            # relative to upload root.
Markus Scheidgen's avatar
Markus Scheidgen committed
465
            scc = self._parser_results.section_run[0].section_single_configuration_calculation[0]
466
            relative_ref = scc.section_calculation_to_calculation_refs[0].calculation_to_calculation_external_url
467
468
469
470
471
472
            ref_id = upload_files.calc_id(relative_ref)
            with upload_files.read_archive(ref_id) as archive:
                arch = query_archive(archive, {ref_id: ref_id})[ref_id]
                ref_archive = EntryArchive.m_from_dict(arch)

            # Get encyclopedia method information directly from the referenced calculation.
473
            ref_enc_method = ref_archive.section_metadata.encyclopedia.method
474
            if ref_enc_method is None or len(ref_enc_method) == 0 or ref_enc_method.functional_type is None:
475
                raise ValueError("No method information available in referenced calculation.")
Markus Scheidgen's avatar
Markus Scheidgen committed
476
            self._parser_results.section_metadata.encyclopedia.method = ref_enc_method
477
478
479
480
481
482
483
484

            # Overwrite old entry with new data. The metadata is updated with
            # new timestamp and method details taken from the referenced
            # archive.
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.dft.xc_functional = ref_archive.section_metadata.dft.xc_functional
            self._entry_metadata.dft.basis_set = ref_archive.section_metadata.dft.basis_set
            self._entry_metadata.dft.update_group_hash()
485
            self._entry_metadata.encyclopedia.status = EncyclopediaMetadata.status.type.success
486
        except Exception as e:
487
            logger.error("Could not retrieve method information for phonon calculation.", exc_info=e)
Markus Scheidgen's avatar
Markus Scheidgen committed
488
489
490
491
            if self._entry_metadata is None:
                self._setup_fallback_metadata()
                self._entry_metadata.processed = False

492
493
494
495
496
497
            try:
                if self._entry_metadata.encyclopedia is None:
                    self._entry_metadata.encyclopedia = EncyclopediaMetadata()
                self._entry_metadata.encyclopedia.status = EncyclopediaMetadata.status.type.failure
            except Exception as e:
                logger.error("Could set encyclopedia status.", exc_info=e)
498

499
        finally:
500
501
502
503
504
505
506
507
508
509
510
511
512
            # persist the calc metadata
            with utils.timer(logger, 'saved calc metadata', step='metadata'):
                self.apply_entry_metadata(self._entry_metadata)

            # index in search
            with utils.timer(logger, 'indexed', step='index'):
                self._entry_metadata.a_elastic.index()

            # persist the archive
            with utils.timer(
                    logger, 'archived', step='archive',
                    input_size=self.mainfile_file.size) as log_data:

Markus Scheidgen's avatar
Markus Scheidgen committed
513
                archive_size = self.write_archive(self._parser_results)
514
515
                log_data.update(archive_size=archive_size)

Markus Scheidgen's avatar
Markus Scheidgen committed
516
517
    @task
    def normalizing(self):
518
        ''' The *task* that encapsulates all normalizing related actions. '''
519
520

        # allow normalizer to access and add data to the entry metadata
521
522
523
        if self._parser_results.section_metadata is None:
            self._parser_results.m_add_sub_section(
                datamodel.EntryArchive.section_metadata, self._entry_metadata)
524

Markus Scheidgen's avatar
Markus Scheidgen committed
525
        for normalizer in normalizers:
526
            if normalizer.domain != parser_dict[self.parser].domain:
527
528
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
529
            normalizer_name = normalizer.__name__
530
            context = dict(normalizer=normalizer_name, step=normalizer_name)
531
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
532

Markus Scheidgen's avatar
Markus Scheidgen committed
533
534
535
536
537
538
            with utils.timer(logger, 'normalizer executed', input_size=self.mainfile_file.size):
                try:
                    normalizer(self._parser_results).normalize(logger=logger)
                    logger.info('processor completed successfull', **context)
                except Exception as e:
                    self.fail('normalizer failed with exception', exc_info=e, error=str(e), **context)
Markus Scheidgen's avatar
Markus Scheidgen committed
539

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
540
541
542
543
544
    def _read_metadata_from_file(self, logger):
        # metadata file name defined in nomad.config nomad_metadata.yaml/json
        # which can be placed in the directory containing the mainfile or somewhere up
        # highest priority is directory with mainfile

545
        metadata_file = config.metadata_file_name
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
546
547
548
        metadata_dir = os.path.dirname(self.mainfile_file.os_path)

        metadata = {}
549
550
551
552
553
554
555
556
557
558
559
560
        while True:
            # top-level nomad file can also contain an entries dict with entry
            # metadata per mainfile as key
            if metadata_dir == self.upload_files.os_path:
                entries = metadata_part.get('entries', {})
                metadata_part = entries.get(self.mainfile, {})
                for key, val in metadata_part.items():
                    metadata.setdefault(key, val)

            # consider the nomad file of the current directory
            metadata_part = self.upload.metadata_file_cached(
                os.path.join(metadata_dir, metadata_file))
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
561
            for key, val in metadata_part.items():
562
563
                if key in ['entries', 'oasis_datasets']:
                    continue
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
564
565
                metadata.setdefault(key, val)

566
            if metadata_dir == self.upload_files.os_path:
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
567
568
                break

569
570
571
            metadata_dir = os.path.dirname(metadata_dir)

        if len(metadata) > 0:
572
573
            logger.info('Apply user metadata from nomad.yaml/json file')

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
574
        for key, val in metadata.items():
575
576
577
            if key == 'entries':
                continue

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
578
            definition = _editable_metadata.get(key, None)
579
580
581
582
583
            if definition is None and self.upload.from_oasis:
                definition = _oasis_metadata.get(key, None)

            if definition is None:
                logger.warn('Users cannot set metadata', quantity=key)
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
584
                continue
585
586
587
588
589
590
591
592
593

            try:
                self._entry_metadata.m_set(definition, val)
                if definition == datamodel.EntryMetadata.calc_id:
                    self.calc_id = val
            except Exception as e:
                logger.error(
                    'Could not apply user metadata from nomad.yaml/json file',
                    quantitiy=definition.name, exc_info=e)
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
594

Markus Scheidgen's avatar
Markus Scheidgen committed
595
596
    @task
    def archiving(self):
597
        ''' The *task* that encapsulates all archival related actions. '''
598
599
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
600
        self._entry_metadata.apply_domain_metadata(self._parser_results)
601
        self._entry_metadata.processed = True
602

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
603
604
        self._read_metadata_from_file(logger)

605
606
        # persist the calc metadata
        with utils.timer(logger, 'saved calc metadata', step='metadata'):
607
            self.apply_entry_metadata(self._entry_metadata)
608
609

        # index in search
610
        with utils.timer(logger, 'indexed', step='index'):
611
            self._entry_metadata.a_elastic.index()
612

613
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
614
        with utils.timer(
615
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
616
                input_size=self.mainfile_file.size) as log_data:
617

Markus Scheidgen's avatar
Markus Scheidgen committed
618
            archive_size = self.write_archive(self._parser_results)
619
            log_data.update(archive_size=archive_size)
Markus Scheidgen's avatar
Markus Scheidgen committed
620

Markus Scheidgen's avatar
Markus Scheidgen committed
621
    def write_archive(self, archive: EntryArchive):
622
623
        # save the archive mongo entry
        try:
624
625
            if self._entry_metadata.processed:
                write_partial_archive_to_mongo(archive)
626
627
628
629
        except Exception as e:
            self.get_logger().error('could not write mongodb archive entry', exc_info=e)

        # add the processing logs to the archive
630
631
632
633
634
635
        def filter_processing_logs(logs):
            if len(logs) > 100:
                return [
                    log for log in logs
                    if log.get('level') != 'DEBUG']
            return logs
Markus Scheidgen's avatar
Markus Scheidgen committed
636

637
638
639
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []

Markus Scheidgen's avatar
Markus Scheidgen committed
640
641
        if archive is not None:
            archive = archive.m_copy()
642
        else:
Markus Scheidgen's avatar
Markus Scheidgen committed
643
            archive = datamodel.EntryArchive()
644

Markus Scheidgen's avatar
Markus Scheidgen committed
645
646
        if archive.section_metadata is None:
            archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
647

Markus Scheidgen's avatar
Markus Scheidgen committed
648
        archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
649

650
        # save the archive msg-pack
651
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
652
            return self.upload_files.write_archive(self.calc_id, archive.m_to_dict())
653
654
        except Exception as e:
            # most likely failed due to domain data, try to write metadata and processing logs
Markus Scheidgen's avatar
Markus Scheidgen committed
655
656
657
658
            archive = datamodel.EntryArchive()
            archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
            archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
            self.upload_files.write_archive(self.calc_id, archive.m_to_dict())
659
            raise e
Markus Scheidgen's avatar
Markus Scheidgen committed
660

661
    def __str__(self):
662
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
663

664

665
class Upload(Proc):
666
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
667
668
669
670
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
        name: Optional user provided upload name.
        upload_path: The fs path were the uploaded files was stored during upload.
        temporary: True if the uploaded file should be removed after extraction.

        upload_id: The upload id generated by the database or the uploaded NOMAD deployment.
        upload_time: Datetime of the original upload independent of the NOMAD deployment
            it was first uploaded to.
        user_id: The id of the user that created this upload.
        published: Boolean that indicates that the upload is published on this NOMAD deployment.
        publish_time: Datetime when the upload was initially published on this NOMAD deployment.
        last_update: Datetime of the last modifying process run (publish, re-processing, upload).

        from_oasis: Boolean indicating that this upload is coming from another NOMAD deployment.
        oasis_id: The deployment id of the NOMAD that uploaded the upload.
        published_to: A list of deployment ids where this upload has been successfully uploaded to.

        joined: Boolean indicates if the running processing has joined (:func:`check_join`).
688
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
689
690
691
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
692
693
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
694
    embargo_length = IntField(default=36)
Markus Scheidgen's avatar
Markus Scheidgen committed
695
696
697

    name = StringField(default=None)
    upload_time = DateTimeField()
698
    user_id = StringField(required=True)
699
700
    published = BooleanField(default=False)
    publish_time = DateTimeField()
701
    last_update = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
702

703
    from_oasis = BooleanField(default=False)
704
705
706
    oasis_deployment_id = StringField(default=None)
    published_to = ListField(StringField())

707
708
    joined = BooleanField(default=False)

Markus Scheidgen's avatar
Markus Scheidgen committed
709
    meta: Any = {
710
        'strict': False,
Markus Scheidgen's avatar
Markus Scheidgen committed
711
        'indexes': [
712
            'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
713
        ]
Markus Scheidgen's avatar
Markus Scheidgen committed
714
715
716
717
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
718
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
719

720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
    @lru_cache()
    def metadata_file_cached(self, path):
        for ext in config.metadata_file_extensions:
            full_path = '%s.%s' % (path, ext)
            if os.path.isfile(full_path):
                try:
                    with open(full_path) as f:
                        if full_path.endswith('.json'):
                            return json.load(f)
                        elif full_path.endswith('.yaml') or full_path.endswith('.yml'):
                            return yaml.load(f, Loader=getattr(yaml, 'FullLoader'))
                        else:
                            return {}
                except Exception as e:
                    self.get_logger().warn('could not parse nomad.yaml/json', path=path, exc_info=e)
                    # ignore the file contents if the file is not parsable
                    pass
        return {}

739
740
    @property
    def metadata(self) -> dict:
741
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
742
743
744
745
746
        Getter, setter for user metadata. Metadata is pickled to and from the public
        bucket to allow sharing among all processes. Usually uploads do not have (much)
        user defined metadata, but users provide all metadata per upload as part of
        the publish process. This will change, when we introduce editing functionality
        and metadata will be provided through different means.
747
        '''
748
749
750
751
752
        try:
            upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
        except KeyError:
            return None
        return upload_files.user_metadata
753
754
755

    @metadata.setter
    def metadata(self, data: dict) -> None:
756
757
        upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True, create=True)
        upload_files.user_metadata = data
758

Markus Scheidgen's avatar
Markus Scheidgen committed
759
    @classmethod
760
    def get(cls, id: str, include_published: bool = True) -> 'Upload':
761
        return cls.get_by_id(id, 'upload_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
762
763

    @classmethod
764
    def user_uploads(cls, user: datamodel.User, **kwargs) -> List['Upload']:
765
        ''' Returns all uploads for the given user. Kwargs are passed to mongo query. '''
766
        return cls.objects(user_id=str(user.user_id), **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
767

768
769
    @property
    def uploader(self):
770
        return datamodel.User.get(self.user_id)
771

Markus Scheidgen's avatar
Markus Scheidgen committed
772
773
    def get_logger(self, **kwargs):
        logger = super().get_logger()
774
775
        user = self.uploader
        user_name = '%s %s' % (user.first_name, user.last_name)
776
        # We are not using 'user_id' because logstash (?) will filter these entries ?!
777
778
        logger = logger.bind(
            upload_id=self.upload_id, upload_name=self.name, user_name=user_name,
779
            user=user.user_id, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
780
781
782
783
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
784
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
785
786
787
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
788
789

        Arguments:
790
            user: The user that created the upload.
791
        '''
792
793
        # use kwargs to keep compatibility with super method
        user: datamodel.User = kwargs['user']
794
        del(kwargs['user'])
795

796
797
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
798
        kwargs.update(user_id=user.user_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
799
        self = super().create(**kwargs)
800

Markus Scheidgen's avatar
Markus Scheidgen committed
801
        self._continue_with('uploading')
802

Markus Scheidgen's avatar
Markus Scheidgen committed
803
804
        return self

805
    def delete(self):
806
        ''' Deletes this upload process state entry and its calcs. '''
807
808
809
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

810
    def delete_upload_local(self):
811
        '''
812
        Deletes the upload, including its processing state and
813
        staging files. Local version without celery processing.
814
        '''
815
816
        logger = self.get_logger()

817
        with utils.lnr(logger, 'upload delete failed'):
818
            with utils.timer(
819
                    logger, 'upload deleted from index', step='index',
820
                    upload_size=self.upload_files.size):
821
                search.delete_upload(self.upload_id)
822

823
824
825
826
827
828
829
            with utils.timer(
                    logger, 'upload partial archives', step='files',
                    upload_size=self.upload_files.size):

                calc_ids = [calc.calc_id for calc in Calc.objects(upload_id=self.upload_id)]
                delete_partial_archives_from_mongo(calc_ids)

830
            with utils.timer(
831
                    logger, 'upload deleted', step='files',
832
833
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
834
835

            self.delete()
836

837
    @process
838
    def delete_upload(self):
839
        '''
840
        Deletes the upload, including its processing state and
841
        staging files. This starts the celery process of deleting the upload.
842
        '''
843
        self.delete_upload_local()
844

845
        return True  # do not save the process status on the delete upload
846

847
    @process
848
    def publish_upload(self):
849
        '''
850
851
        Moves the upload out of staging to the public area. It will
        pack the staging upload files in to public upload files.
852
        '''
853
854
        assert self.processed_calcs > 0

855
        logger = self.get_logger()
856
        logger.info('started to publish')
857

858
        with utils.lnr(logger, 'publish failed'):
859
            with self.entries_metadata(self.metadata) as calcs:
860

861
                with utils.timer(
862
                        logger, 'upload metadata updated', step='metadata',
863
                        upload_size=self.upload_files.size):
864

865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
                    def create_update(calc):
                        calc.published = True
                        calc.with_embargo = calc.with_embargo if calc.with_embargo is not None else False
                        return UpdateOne(
                            {'_id': calc.calc_id},
                            {'$set': {'metadata': calc.m_to_dict(
                                include_defaults=True, categories=[datamodel.MongoMetadata])}})

                    Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload files packed', step='pack',
                            upload_size=self.upload_files.size):
                        self.upload_files.pack(calcs)
880

881
                with utils.timer(
882
                        logger, 'index updated', step='index',
883
                        upload_size=self.upload_files.size):
884
885
886
887
888
889
890
891
892
893
894
895
                    search.publish(calcs)

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload deleted', step='delete staged',
                            upload_size=self.upload_files.size):
                        self.upload_files.delete()
                        self.published = True
                        self.publish_time = datetime.utcnow()
                        self.last_update = datetime.utcnow()
                        self.save()
                else:
896
                    self.last_update = datetime.utcnow()
897
                    self.save()
898

899
900
901
902
903
904
    @process
    def publish_from_oasis(self):
        '''
        Uploads the already published upload to a different NOMAD deployment. This allows
        to push uploads from an OASIS to the central NOMAD.
        '''
905
906
907
908
        assert self.published, \
            'Only published uploads can be published to the central NOMAD.'
        assert config.oasis.central_nomad_deployment_id not in self.published_to, \
            'Upload is already published to the central NOMAD.'
909

910
911
912
913
        from nomad.cli.client.client import _create_client as create_client
        central_nomad_client = create_client(
            user=config.keycloak.username,
            password=config.keycloak.password,
914
915
            api_base_url=config.oasis.central_nomad_api_url,
            use_token=False)
916
917
918
919

        # compile oasis metadata for the upload
        upload_metadata = dict(upload_time=str(self.upload_time))
        upload_metadata_entries = {}
920
        upload_metadata_datasets = {}