data.py 63.5 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
Markus Scheidgen's avatar
Markus Scheidgen committed
5
6
7
8
9
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
Markus Scheidgen's avatar
Markus Scheidgen committed
10
#     http://www.apache.org/licenses/LICENSE-2.0
Markus Scheidgen's avatar
Markus Scheidgen committed
11
12
#
# Unless required by applicable law or agreed to in writing, software
Markus Scheidgen's avatar
Markus Scheidgen committed
13
# distributed under the License is distributed on an "AS IS" BASIS,
Markus Scheidgen's avatar
Markus Scheidgen committed
14
15
16
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Markus Scheidgen's avatar
Markus Scheidgen committed
17
#
Markus Scheidgen's avatar
Markus Scheidgen committed
18

19
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
20
21
22
23
24
25
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
Markus Scheidgen's avatar
Markus Scheidgen committed
26

Markus Scheidgen's avatar
Markus Scheidgen committed
27
.. autoclass:: Upload
Markus Scheidgen's avatar
Markus Scheidgen committed
28

29
'''
30
31

from typing import cast, List, Any, Tuple, Iterator, Dict, cast, Iterable
32
33
from mongoengine import (
    StringField, DateTimeField, DictField, BooleanField, IntField, ListField)
Markus Scheidgen's avatar
Markus Scheidgen committed
34
import logging
35
from structlog import wrap_logger
36
from contextlib import contextmanager
37
import os.path
38
39
from datetime import datetime
from pymongo import UpdateOne
40
import hashlib
41
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
42
43
import yaml
import json
44
from functools import lru_cache
45
46
import urllib.parse
import requests
Markus Scheidgen's avatar
Markus Scheidgen committed
47

48
from nomad import utils, config, infrastructure, search, datamodel, metainfo
Markus Scheidgen's avatar
Markus Scheidgen committed
49
50
51
from nomad.files import (
    PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles,
    PublicUploadFiles, StagingUploadFiles)
52
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
Markus Scheidgen's avatar
Markus Scheidgen committed
53
from nomad.parsing.parsers import parser_dict, match_parser
Markus Scheidgen's avatar
Markus Scheidgen committed
54
from nomad.normalizing import normalizers
55
56
57
58
from nomad.datamodel import (
    EntryArchive, EditableUserMetadata, OasisMetadata, UserProvidableMetadata)
from nomad.archive import (
    query_archive, write_partial_archive_to_mongo, delete_partial_archives_from_mongo)
Markus Scheidgen's avatar
Markus Scheidgen committed
59
from nomad.datamodel.encyclopedia import EncyclopediaMetadata
Markus Scheidgen's avatar
Markus Scheidgen committed
60
61


62
63
64
65
section_metadata = datamodel.EntryArchive.section_metadata.name
section_workflow = datamodel.EntryArchive.section_workflow.name


66
67
68
69
70
71
_editable_metadata: Dict[str, metainfo.Definition] = {}
_editable_metadata.update(**{
    quantity.name: quantity for quantity in UserProvidableMetadata.m_def.definitions})
_editable_metadata.update(**{
    quantity.name: quantity for quantity in EditableUserMetadata.m_def.definitions})

72
73
_oasis_metadata = {
    quantity.name: quantity for quantity in OasisMetadata.m_def.definitions}
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
74
75


76
77
78
79
80
81
82
def _pack_log_event(logger, method_name, event_dict):
    try:
        log_data = dict(event_dict)
        log_data.update(**{
            key: value
            for key, value in getattr(logger, '_context', {}).items()
            if key not in ['service', 'release', 'upload_id', 'calc_id', 'mainfile', 'process_status']})
83
        log_data.update(logger=logger.name)
84
85
86
87
88
89
90
91
92
93
94

        return log_data
    except Exception:
        # raising an exception would cause an indefinite loop
        return event_dict


_log_processors = [
    StackInfoRenderer(),
    _pack_log_event,
    format_exc_info,
95
    TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)]
96
97


98
99
100
101
102
def _normalize_oasis_upload_metadata(upload_id, metadata):
    # This is overwritten by the tests to do necessary id manipulations
    return upload_id, metadata


Markus Scheidgen's avatar
Markus Scheidgen committed
103
class Calc(Proc):
104
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
105
106
107
108
109
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

110
111
    The attribute list, does not include the various metadata properties generated
    while parsing, including ``code_name``, ``code_version``, etc.
Markus Scheidgen's avatar
Markus Scheidgen committed
112
113

    Attributes:
114
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
115
116
117
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
118

119
120
        metadata: the metadata record wit calc and user metadata, see :class:`datamodel.EntryMetadata`
    '''
121
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
122
123
124
125
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

126
127
    metadata = DictField()

Markus Scheidgen's avatar
Markus Scheidgen committed
128
    meta: Any = {
129
        'strict': False,
Markus Scheidgen's avatar
Markus Scheidgen committed
130
        'indexes': [
131
            'upload_id',
132
            'parser',
133
134
135
            ('upload_id', 'mainfile'),
            ('upload_id', 'parser'),
            ('upload_id', 'tasks_status'),
136
            ('upload_id', 'process_status'),
137
            ('upload_id', 'metadata.nomad_version'),
138
139
            'metadata.processed',
            'metadata.last_processing',
140
            'metadata.published',
141
            'metadata.datasets',
Markus Scheidgen's avatar
Markus Scheidgen committed
142
            'metadata.pid'
Markus Scheidgen's avatar
Markus Scheidgen committed
143
144
145
146
147
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
148
        self._parser_results: EntryArchive = None
149
150
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
151
        self._calc_proc_logs: List[Any] = None
152

153
        self._entry_metadata = None
154

Markus Scheidgen's avatar
Markus Scheidgen committed
155
156
    @classmethod
    def get(cls, id):
157
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
158

Markus Scheidgen's avatar
Markus Scheidgen committed
159
    @property
160
161
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
162

163
164
165
166
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
167
            self._upload.worker_hostname = self.worker_hostname
168
169
        return self._upload

170
171
172
173
174
175
176
177
    def apply_entry_metadata(self, entry_metadata: datamodel.EntryMetadata):
        self.metadata = entry_metadata.m_to_dict(
            include_defaults=True,
            categories=[datamodel.MongoMetadata])  # TODO use embedded doc?

    def create_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
178
        processing object, not necessarily the user metadata nor the metadata from
179
180
181
        the archive.
        '''
        entry_metadata = datamodel.EntryMetadata()
182
        if self.parser is not None:
183
184
185
            parser = parser_dict[self.parser]
            if parser.domain:
                entry_metadata.domain = parser_dict[self.parser].domain
186
187
188
        entry_metadata.upload_id = self.upload_id
        entry_metadata.calc_id = self.calc_id
        entry_metadata.mainfile = self.mainfile
189
190
        entry_metadata.nomad_version = config.meta.version
        entry_metadata.nomad_commit = config.meta.commit
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
        entry_metadata.uploader = self.upload.user_id
        entry_metadata.upload_time = self.upload.upload_time
        entry_metadata.upload_name = self.upload.name

        return entry_metadata

    def entry_metadata(self, upload_files: UploadFiles) -> datamodel.EntryMetadata:
        '''
        Returns a complete set of :class:`nomad.datamodel.EntryMetadata` including
        the user metadata and metadata from the archive.

        Arguments:
            upload_files:
                The :class:`nomad.files.UploadFiles` instance to read the archive from.
            cache:
                A boolean that indicates if the archive file should be left unclosed,
                e.g. if this method is called for many entries of the same upload.
        '''
        archive = upload_files.read_archive(self.calc_id)
210
        try:
211
212
213
214
215
216
217
218
219
            # instead of loading the whole archive, it should be enough to load the
            # parts that are referenced by section_metadata/EntryMetadata
            # TODO somehow it should determine which root setions too load from the metainfo
            # or configuration
            calc_archive = archive[self.calc_id]
            entry_archive_dict = {section_metadata: calc_archive[section_metadata].to_dict()}
            if section_workflow in calc_archive:
                entry_archive_dict[section_workflow] = calc_archive[section_workflow].to_dict()
            entry_metadata = datamodel.EntryArchive.m_from_dict(entry_archive_dict)[section_metadata]
220
221
222
223
224
225
226
227
228

        except KeyError:
            # Due hard processing failures, it might be possible that an entry might not
            # have an archive
            if self._entry_metadata is not None:
                entry_metadata = self._entry_metadata

            else:
                entry_metadata = self.create_metadata()
229
230
231
232
233
234
235
236

        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

    def user_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
237
        processing object and the user metadata, not necessarily the metadata from
238
239
240
241
242
243
244
        the archive.
        '''
        entry_metadata = self.create_metadata()
        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

245
246
247
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
248
249
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
250
251
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
252
    def get_logger(self, **kwargs):
253
        '''
254
255
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
256
        '''
257
258
259
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
260

261
262
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []
263

264
265
        def save_to_calc_log(logger, method_name, event_dict):
            try:
266
267
                # sanitize the event_dict, because all kinds of values might have been added
                dump_dict = {key: str(value) for key, value in event_dict.items()}
268
269
                dump_dict.update(level=method_name.upper())
                self._calc_proc_logs.append(dump_dict)
270

271
272
273
            except Exception:
                # Exceptions here will cause indefinite loop
                pass
274

275
            return event_dict
276

277
        return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
278

279
    @process
280
    def re_process_calc(self):
281
        '''
282
283
284
        Processes a calculation again. This means there is already metadata and
        instead of creating it initially, we are just updating the existing
        records.
285
        '''
286
        parser = match_parser(self.upload_files.raw_file_object(self.mainfile).os_path, strict=False)
287
        logger = self.get_logger()
288
289
290

        if parser is None and not config.reprocess_unmatched:
            self.errors = ['no parser matches during re-process, will not re-process this calc']
291

292
293
294
295
296
297
298
            try:
                upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
                with upload_files.read_archive(self.calc_id) as archive:
                    self.upload_files.write_archive(self.calc_id, archive[self.calc_id].to_dict())

            except Exception as e:
                logger.error('could not copy archive for non matching, non reprocessed entry', exc_info=e)
299
                raise e
300

301
302
303
304
305
            # mock the steps of actual processing
            self._continue_with('parsing')
            self._continue_with('normalizing')
            self._continue_with('archiving')
            self._complete()
306
307
            return

308
        if parser is None:
309
310
311
312
            self.get_logger().warn('no parser matches during re-process, use the old parser')
            self.warnings = ['no matching parser found during re-processing']

        elif self.parser != parser.name:
313
314
315
316
317
318
319
320
321
            if parser_dict[self.parser].name == parser.name:
                # parser was just renamed
                self.parser = parser.name

            else:
                self.parser = parser.name
                logger.info(
                    'different parser matches during re-process, use new parser',
                    parser=parser.name)
322

323
        try:
324
325
            self._entry_metadata = self.user_metadata()
            self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
326
327
            self._entry_metadata.nomad_version = config.meta.version
            self._entry_metadata.nomad_commit = config.meta.commit
328
329
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)
330
331
332
333
334
335
336

            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
            # close loghandler that was not closed due to failures
            try:
Markus Scheidgen's avatar
Markus Scheidgen committed
337
                if self._parser_results and self._parser_results.m_resource:
338
                    self._parser_results.section_metadata = None
Markus Scheidgen's avatar
Markus Scheidgen committed
339
                    self._parser_results.m_resource.unload()
340
            except Exception as e:
341
                logger.error('could not unload processing results', exc_info=e)
342

343
344
345
346
347
    def _setup_fallback_metadata(self):
        self._entry_metadata = self.create_metadata()
        self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
        self._entry_metadata.last_processing = datetime.utcnow()
        self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)
348
        self._entry_metadata.parser_name = self.parser
349

350
    @process
351
    def process_calc(self):
352
        '''
353
354
        Processes a new calculation that has no prior records in the mongo, elastic,
        or filesystem storage. It will create an initial set of (user) metadata.
355
        '''
356
        logger = self.get_logger()
357
        if self.upload is None:
358
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
359
360

        try:
361
362
            # save preliminary minimum calc metadata in case processing fails
            # successful processing will replace it with the actual metadata
363
            self._setup_fallback_metadata()
364
365

            if len(self._entry_metadata.files) >= config.auxfile_cutoff:
366
367
368
369
                self.warning(
                    'This calc has many aux files in its directory. '
                    'Have you placed many calculations in the same directory?')

Markus Scheidgen's avatar
Markus Scheidgen committed
370
371
372
373
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
374
            # close loghandler that was not closed due to failures
375
            try:
Markus Scheidgen's avatar
Markus Scheidgen committed
376
                if self._parser_results and self._parser_results.m_resource:
377
                    self._parser_results.section_metadata = None
Markus Scheidgen's avatar
Markus Scheidgen committed
378
                    self._parser_results.m_resource.unload()
379
            except Exception as e:
380
                logger.error('could unload processing results', exc_info=e)
381

382
    def on_fail(self):
383
384
385
        # in case of failure, index a minimum set of metadata and mark
        # processing failure
        try:
386
387
388
            if self._entry_metadata is None:
                self._setup_fallback_metadata()

389
            self._entry_metadata.processed = False
390

391
392
393
394
395
396
397
398
399
400
401
402
            try:
                self.apply_entry_metadata(self._entry_metadata)
            except Exception as e:
                self.get_logger().error(
                    'could not apply entry metadata to entry', exc_info=e)

            try:
                self._entry_metadata.apply_domain_metadata(self._parser_results)
            except Exception as e:
                self.get_logger().error(
                    'could not apply domain metadata to entry', exc_info=e)

403
            self._entry_metadata.a_elastic.index()
404
        except Exception as e:
405
406
            self.get_logger().error(
                'could not index after processing failure', exc_info=e)
407

408
        try:
409
            self.write_archive(self._parser_results)
410
        except Exception as e:
411
412
            self.get_logger().error(
                'could not write archive after processing failure', exc_info=e)
413

414
415
416
417
418
419
420
421
    def on_process_complete(self, process_name):
        # the save might be necessary to correctly read the join condition from the db
        self.save()
        # in case of error, the process_name might be unknown
        if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None:
            self.upload.reload()
            self.upload.check_join()

Markus Scheidgen's avatar
Markus Scheidgen committed
422
423
    @task
    def parsing(self):
424
        ''' The *task* that encapsulates all parsing related actions. '''
425
        context = dict(parser=self.parser, step=self.parser)
426
        logger = self.get_logger(**context)
427
        parser = parser_dict[self.parser]
428
        self._entry_metadata.parser_name = self.parser
Markus Scheidgen's avatar
Markus Scheidgen committed
429

430
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
431
            try:
432
433
434
                self._parser_results = EntryArchive()
                # allow parsers to read/write metadata
                self._parser_results.m_add_sub_section(EntryArchive.section_metadata, self._entry_metadata)
Markus Scheidgen's avatar
Markus Scheidgen committed
435
436
437
                parser.parse(
                    self.upload_files.raw_file_object(self.mainfile).os_path,
                    self._parser_results, logger=logger)
438

439
            except Exception as e:
440
                self.fail('parser failed with exception', exc_info=e, error=str(e), **context)
441
                return
442
            except SystemExit:
443
                self.fail('parser raised system exit', error='system exit', **context)
444
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
445

446
447
448
449
450
451
452
453
454
455
    def process_phonon(self):
        """Function that is run for phonon calculation before cleanup.
        This task is run by the celery process that is calling the join for the
        upload.

        This function re-opens the Archive for this calculation to add method
        information from another referenced archive. Updates the method
        information in section_encyclopedia as well as the DFT domain metadata.
        """
        try:
456
            logger = self.get_logger(parser=self.parser, step=self.parser)
457
458
459
460
461
462

            # Open the archive of the phonon calculation.
            upload_files = StagingUploadFiles(self.upload_id, is_authorized=lambda: True)
            with upload_files.read_archive(self.calc_id) as archive:
                arch = query_archive(archive, {self.calc_id: self.calc_id})[self.calc_id]
                phonon_archive = EntryArchive.m_from_dict(arch)
Markus Scheidgen's avatar
Markus Scheidgen committed
463
            self._entry_metadata = phonon_archive.section_metadata
464
            self._calc_proc_logs = phonon_archive.processing_logs
465

Markus Scheidgen's avatar
Markus Scheidgen committed
466
467
            # Re-create the parse results
            self._parser_results = phonon_archive
Markus Scheidgen's avatar
Markus Scheidgen committed
468

469
470
471
            # Read in the first referenced calculation. The reference is given as
            # an absolute path which needs to be converted into a path that is
            # relative to upload root.
Markus Scheidgen's avatar
Markus Scheidgen committed
472
            scc = self._parser_results.section_run[0].section_single_configuration_calculation[0]
473
            relative_ref = scc.section_calculation_to_calculation_refs[0].calculation_to_calculation_external_url
474
475
476
477
478
479
            ref_id = upload_files.calc_id(relative_ref)
            with upload_files.read_archive(ref_id) as archive:
                arch = query_archive(archive, {ref_id: ref_id})[ref_id]
                ref_archive = EntryArchive.m_from_dict(arch)

            # Get encyclopedia method information directly from the referenced calculation.
480
            ref_enc_method = ref_archive.section_metadata.encyclopedia.method
481
            if ref_enc_method is None or len(ref_enc_method) == 0 or ref_enc_method.functional_type is None:
482
                raise ValueError("No method information available in referenced calculation.")
Markus Scheidgen's avatar
Markus Scheidgen committed
483
            self._parser_results.section_metadata.encyclopedia.method = ref_enc_method
484
485
486
487
488
489
490
491

            # Overwrite old entry with new data. The metadata is updated with
            # new timestamp and method details taken from the referenced
            # archive.
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.dft.xc_functional = ref_archive.section_metadata.dft.xc_functional
            self._entry_metadata.dft.basis_set = ref_archive.section_metadata.dft.basis_set
            self._entry_metadata.dft.update_group_hash()
492
            self._entry_metadata.encyclopedia.status = EncyclopediaMetadata.status.type.success
493
        except Exception as e:
494
            logger.error("Could not retrieve method information for phonon calculation.", exc_info=e)
Markus Scheidgen's avatar
Markus Scheidgen committed
495
496
497
498
            if self._entry_metadata is None:
                self._setup_fallback_metadata()
                self._entry_metadata.processed = False

499
500
501
502
503
504
            try:
                if self._entry_metadata.encyclopedia is None:
                    self._entry_metadata.encyclopedia = EncyclopediaMetadata()
                self._entry_metadata.encyclopedia.status = EncyclopediaMetadata.status.type.failure
            except Exception as e:
                logger.error("Could set encyclopedia status.", exc_info=e)
505

506
        finally:
507
508
509
510
511
512
513
514
515
516
517
518
519
            # persist the calc metadata
            with utils.timer(logger, 'saved calc metadata', step='metadata'):
                self.apply_entry_metadata(self._entry_metadata)

            # index in search
            with utils.timer(logger, 'indexed', step='index'):
                self._entry_metadata.a_elastic.index()

            # persist the archive
            with utils.timer(
                    logger, 'archived', step='archive',
                    input_size=self.mainfile_file.size) as log_data:

Markus Scheidgen's avatar
Markus Scheidgen committed
520
                archive_size = self.write_archive(self._parser_results)
521
522
                log_data.update(archive_size=archive_size)

Markus Scheidgen's avatar
Markus Scheidgen committed
523
524
    @task
    def normalizing(self):
525
        ''' The *task* that encapsulates all normalizing related actions. '''
526
527

        # allow normalizer to access and add data to the entry metadata
528
529
530
        if self._parser_results.section_metadata is None:
            self._parser_results.m_add_sub_section(
                datamodel.EntryArchive.section_metadata, self._entry_metadata)
531

Markus Scheidgen's avatar
Markus Scheidgen committed
532
        for normalizer in normalizers:
533
            if normalizer.domain != parser_dict[self.parser].domain:
534
535
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
536
            normalizer_name = normalizer.__name__
537
            context = dict(normalizer=normalizer_name, step=normalizer_name)
538
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
539

Markus Scheidgen's avatar
Markus Scheidgen committed
540
541
542
543
544
545
            with utils.timer(logger, 'normalizer executed', input_size=self.mainfile_file.size):
                try:
                    normalizer(self._parser_results).normalize(logger=logger)
                    logger.info('processor completed successfull', **context)
                except Exception as e:
                    self.fail('normalizer failed with exception', exc_info=e, error=str(e), **context)
Markus Scheidgen's avatar
Markus Scheidgen committed
546

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
547
548
549
550
551
    def _read_metadata_from_file(self, logger):
        # metadata file name defined in nomad.config nomad_metadata.yaml/json
        # which can be placed in the directory containing the mainfile or somewhere up
        # highest priority is directory with mainfile

552
        metadata_file = config.metadata_file_name
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
553
554
555
        metadata_dir = os.path.dirname(self.mainfile_file.os_path)

        metadata = {}
556
557
558
559
560
561
562
563
564
565
566
567
        while True:
            # top-level nomad file can also contain an entries dict with entry
            # metadata per mainfile as key
            if metadata_dir == self.upload_files.os_path:
                entries = metadata_part.get('entries', {})
                metadata_part = entries.get(self.mainfile, {})
                for key, val in metadata_part.items():
                    metadata.setdefault(key, val)

            # consider the nomad file of the current directory
            metadata_part = self.upload.metadata_file_cached(
                os.path.join(metadata_dir, metadata_file))
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
568
            for key, val in metadata_part.items():
569
570
                if key in ['entries', 'oasis_datasets']:
                    continue
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
571
572
                metadata.setdefault(key, val)

573
            if metadata_dir == self.upload_files.os_path:
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
574
575
                break

576
577
578
            metadata_dir = os.path.dirname(metadata_dir)

        if len(metadata) > 0:
579
580
            logger.info('Apply user metadata from nomad.yaml/json file')

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
581
        for key, val in metadata.items():
582
583
584
            if key == 'entries':
                continue

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
585
            definition = _editable_metadata.get(key, None)
586
587
588
589
590
            if definition is None and self.upload.from_oasis:
                definition = _oasis_metadata.get(key, None)

            if definition is None:
                logger.warn('Users cannot set metadata', quantity=key)
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
591
                continue
592
593
594
595
596
597
598
599
600

            try:
                self._entry_metadata.m_set(definition, val)
                if definition == datamodel.EntryMetadata.calc_id:
                    self.calc_id = val
            except Exception as e:
                logger.error(
                    'Could not apply user metadata from nomad.yaml/json file',
                    quantitiy=definition.name, exc_info=e)
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
601

Markus Scheidgen's avatar
Markus Scheidgen committed
602
603
    @task
    def archiving(self):
604
        ''' The *task* that encapsulates all archival related actions. '''
605
606
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
607
        self._entry_metadata.apply_domain_metadata(self._parser_results)
608
        self._entry_metadata.processed = True
609

610
611
612
        if self.upload.publish_directly:
            self._entry_metadata.published |= True

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
613
614
        self._read_metadata_from_file(logger)

615
616
        # persist the calc metadata
        with utils.timer(logger, 'saved calc metadata', step='metadata'):
617
            self.apply_entry_metadata(self._entry_metadata)
618
619

        # index in search
620
        with utils.timer(logger, 'indexed', step='index'):
621
            self._entry_metadata.a_elastic.index()
622

623
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
624
        with utils.timer(
625
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
626
                input_size=self.mainfile_file.size) as log_data:
627

Markus Scheidgen's avatar
Markus Scheidgen committed
628
            archive_size = self.write_archive(self._parser_results)
629
            log_data.update(archive_size=archive_size)
Markus Scheidgen's avatar
Markus Scheidgen committed
630

Markus Scheidgen's avatar
Markus Scheidgen committed
631
    def write_archive(self, archive: EntryArchive):
632
633
        # save the archive mongo entry
        try:
634
635
            if self._entry_metadata.processed:
                write_partial_archive_to_mongo(archive)
636
637
638
639
        except Exception as e:
            self.get_logger().error('could not write mongodb archive entry', exc_info=e)

        # add the processing logs to the archive
640
641
642
643
644
645
        def filter_processing_logs(logs):
            if len(logs) > 100:
                return [
                    log for log in logs
                    if log.get('level') != 'DEBUG']
            return logs
Markus Scheidgen's avatar
Markus Scheidgen committed
646

647
648
649
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []

Markus Scheidgen's avatar
Markus Scheidgen committed
650
651
        if archive is not None:
            archive = archive.m_copy()
652
        else:
Markus Scheidgen's avatar
Markus Scheidgen committed
653
            archive = datamodel.EntryArchive()
654

Markus Scheidgen's avatar
Markus Scheidgen committed
655
656
        if archive.section_metadata is None:
            archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
657

Markus Scheidgen's avatar
Markus Scheidgen committed
658
        archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
659

660
        # save the archive msg-pack
661
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
662
            return self.upload_files.write_archive(self.calc_id, archive.m_to_dict())
663
664
        except Exception as e:
            # most likely failed due to domain data, try to write metadata and processing logs
Markus Scheidgen's avatar
Markus Scheidgen committed
665
666
667
668
            archive = datamodel.EntryArchive()
            archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
            archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
            self.upload_files.write_archive(self.calc_id, archive.m_to_dict())
669
            raise e
Markus Scheidgen's avatar
Markus Scheidgen committed
670

671
    def __str__(self):
672
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
673

674

675
class Upload(Proc):
676
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
677
678
679
680
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
681
682
683
684
685
686
687
688
689
690
691
692
        name: Optional user provided upload name.
        upload_path: The fs path were the uploaded files was stored during upload.
        temporary: True if the uploaded file should be removed after extraction.

        upload_id: The upload id generated by the database or the uploaded NOMAD deployment.
        upload_time: Datetime of the original upload independent of the NOMAD deployment
            it was first uploaded to.
        user_id: The id of the user that created this upload.
        published: Boolean that indicates that the upload is published on this NOMAD deployment.
        publish_time: Datetime when the upload was initially published on this NOMAD deployment.
        last_update: Datetime of the last modifying process run (publish, re-processing, upload).

693
        publish_directly: Boolean indicating that this upload should be published after initial processing.
694
695
696
697
698
        from_oasis: Boolean indicating that this upload is coming from another NOMAD deployment.
        oasis_id: The deployment id of the NOMAD that uploaded the upload.
        published_to: A list of deployment ids where this upload has been successfully uploaded to.

        joined: Boolean indicates if the running processing has joined (:func:`check_join`).
699
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
700
701
702
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
703
704
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
705
    embargo_length = IntField(default=36)
Markus Scheidgen's avatar
Markus Scheidgen committed
706
707
708

    name = StringField(default=None)
    upload_time = DateTimeField()
709
    user_id = StringField(required=True)
710
711
    published = BooleanField(default=False)
    publish_time = DateTimeField()
712
    last_update = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
713

714
    publish_directly = BooleanField(default=False)
715
    from_oasis = BooleanField(default=False)
716
717
718
    oasis_deployment_id = StringField(default=None)
    published_to = ListField(StringField())

719
720
    joined = BooleanField(default=False)

Markus Scheidgen's avatar
Markus Scheidgen committed
721
    meta: Any = {
722
        'strict': False,
Markus Scheidgen's avatar
Markus Scheidgen committed
723
        'indexes': [
724
            'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
725
        ]
Markus Scheidgen's avatar
Markus Scheidgen committed
726
727
728
729
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
730
        self.publish_directly = self.publish_directly or self.from_oasis
731
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
732

733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
    @lru_cache()
    def metadata_file_cached(self, path):
        for ext in config.metadata_file_extensions:
            full_path = '%s.%s' % (path, ext)
            if os.path.isfile(full_path):
                try:
                    with open(full_path) as f:
                        if full_path.endswith('.json'):
                            return json.load(f)
                        elif full_path.endswith('.yaml') or full_path.endswith('.yml'):
                            return yaml.load(f, Loader=getattr(yaml, 'FullLoader'))
                        else:
                            return {}
                except Exception as e:
                    self.get_logger().warn('could not parse nomad.yaml/json', path=path, exc_info=e)
                    # ignore the file contents if the file is not parsable
                    pass
        return {}

752
753
    @property
    def metadata(self) -> dict:
754
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
755
756
757
758
759
        Getter, setter for user metadata. Metadata is pickled to and from the public
        bucket to allow sharing among all processes. Usually uploads do not have (much)
        user defined metadata, but users provide all metadata per upload as part of
        the publish process. This will change, when we introduce editing functionality
        and metadata will be provided through different means.
760
        '''
761
762
763
764
765
        try:
            upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
        except KeyError:
            return None
        return upload_files.user_metadata
766
767
768

    @metadata.setter
    def metadata(self, data: dict) -> None:
769
770
        upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True, create=True)
        upload_files.user_metadata = data
771

Markus Scheidgen's avatar
Markus Scheidgen committed
772
    @classmethod
773
    def get(cls, id: str, include_published: bool = True) -> 'Upload':
774
        return cls.get_by_id(id, 'upload_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
775
776

    @classmethod
777
    def user_uploads(cls, user: datamodel.User, **kwargs) -> List['Upload']:
778
        ''' Returns all uploads for the given user. Kwargs are passed to mongo query. '''
779
        return cls.objects(user_id=str(user.user_id), **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
780

781
782
    @property
    def uploader(self):
783
        return datamodel.User.get(self.user_id)
784

Markus Scheidgen's avatar
Markus Scheidgen committed
785
786
    def get_logger(self, **kwargs):
        logger = super().get_logger()
787
788
        user = self.uploader
        user_name = '%s %s' % (user.first_name, user.last_name)
789
        # We are not using 'user_id' because logstash (?) will filter these entries ?!
790
791
        logger = logger.bind(
            upload_id=self.upload_id, upload_name=self.name, user_name=user_name,
792
            user=user.user_id, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
793
794
795
796
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
797
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
798
799
800
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
801
802

        Arguments:
803
            user: The user that created the upload.
804
        '''
805
806
        # use kwargs to keep compatibility with super method
        user: datamodel.User = kwargs['user']
807
        del(kwargs['user'])
808

809
810
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
811
        kwargs.update(user_id=user.user_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
812
        self = super().create(**kwargs)
813

Markus Scheidgen's avatar
Markus Scheidgen committed
814
        self._continue_with('uploading')
815

Markus Scheidgen's avatar
Markus Scheidgen committed
816
817
        return self

818
    def delete(self):
819
        ''' Deletes this upload process state entry and its calcs. '''
820
821
822
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

823
    def delete_upload_local(self):
824
        '''
825
        Deletes the upload, including its processing state and
826
        staging files. Local version without celery processing.
827
        '''
828
829
        logger = self.get_logger()

830
        with utils.lnr(logger, 'upload delete failed'):
831
            with utils.timer(
832
                    logger, 'upload deleted from index', step='index',
833
                    upload_size=self.upload_files.size):
834
                search.delete_upload(self.upload_id)
835

836
837
838
839
840
841
842
            with utils.timer(
                    logger, 'upload partial archives', step='files',
                    upload_size=self.upload_files.size):

                calc_ids = [calc.calc_id for calc in Calc.objects(upload_id=self.upload_id)]
                delete_partial_archives_from_mongo(calc_ids)

843
            with utils.timer(
844
                    logger, 'upload deleted', step='files',
845
846
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
847
848

            self.delete()
849

850
    @process
851
    def delete_upload(self):
852
        '''
853
        Deletes the upload, including its processing state and
854
        staging files. This starts the celery process of deleting the upload.
855
        '''
856
        self.delete_upload_local()
857

858
        return True  # do not save the process status on the delete upload
859

860
    @process
861
    def publish_upload(self):
862
        '''
863
864
        Moves the upload out of staging to the public area. It will
        pack the staging upload files in to public upload files.
865
        '''
866
867
        assert self.processed_calcs > 0

868
        logger = self.get_logger()
869
        logger.info('started to publish')
870

871
        with utils.lnr(logger, 'publish failed'):
872
            with self.entries_metadata(self.metadata) as calcs:
873

874
                with utils.timer(
875
                        logger, 'upload metadata updated', step='metadata',
876
                        upload_size=self.upload_files.size):
877

878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
                    def create_update(calc):
                        calc.published = True
                        calc.with_embargo = calc.with_embargo if calc.with_embargo is not None else False
                        return UpdateOne(
                            {'_id': calc.calc_id},
                            {'$set': {'metadata': calc.m_to_dict(
                                include_defaults=True, categories=[datamodel.MongoMetadata])}})

                    Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload files packed', step='pack',
                            upload_size=self.upload_files.size):
                        self.upload_files.pack(calcs)
893

894
                with utils.timer(
895
                        logger, 'index updated', step='index',
896
                        upload_size=self.upload_files.size):
897
898
899
900
901
902
903
904
905
906
907
908
                    search.publish(calcs)

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload deleted', step='delete staged',
                            upload_size=self.upload_files.size):
                        self.upload_files.delete()
                        self.published = True
                        self.publish_time = datetime.utcnow()
                        self.last_update = datetime.utcnow()
                        self.save()
                else:
909
                    self.last_update = datetime.utcnow()
910
                    self.save()
911

912
913
914
915
916
917
    @process
    def publish_from_oasis(self):
        '''
        Uploads the already published upload to a different NOMAD deployment. This allows
        to push uploads from an OASIS to the central NOMAD.
        '''
918
919
920
921
        assert self.published, \
            'Only published uploads can be published to the central NOMAD.'
        assert config.oasis.central_nomad_deployment_id not in self.published_to, \
            'Upload is already published to the central NOMAD.'
922

923
924
925