data.py 56.6 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
Markus Scheidgen's avatar
Markus Scheidgen committed
5
6
7
8
9
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
Markus Scheidgen's avatar
Markus Scheidgen committed
10
#     http://www.apache.org/licenses/LICENSE-2.0
Markus Scheidgen's avatar
Markus Scheidgen committed
11
12
#
# Unless required by applicable law or agreed to in writing, software
Markus Scheidgen's avatar
Markus Scheidgen committed
13
# distributed under the License is distributed on an "AS IS" BASIS,
Markus Scheidgen's avatar
Markus Scheidgen committed
14
15
16
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Markus Scheidgen's avatar
Markus Scheidgen committed
17
#
Markus Scheidgen's avatar
Markus Scheidgen committed
18

19
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
20
21
22
23
24
25
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
Markus Scheidgen's avatar
Markus Scheidgen committed
26

Markus Scheidgen's avatar
Markus Scheidgen committed
27
.. autoclass:: Upload
Markus Scheidgen's avatar
Markus Scheidgen committed
28

29
'''
30
31

from typing import cast, List, Any, Tuple, Iterator, Dict, cast, Iterable
32
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField
Markus Scheidgen's avatar
Markus Scheidgen committed
33
import logging
34
from structlog import wrap_logger
35
from contextlib import contextmanager
36
import os.path
37
38
from datetime import datetime
from pymongo import UpdateOne
39
import hashlib
40
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
41
42
import yaml
import json
43
from functools import lru_cache
Markus Scheidgen's avatar
Markus Scheidgen committed
44

45
from nomad import utils, config, infrastructure, search, datamodel
Markus Scheidgen's avatar
Markus Scheidgen committed
46
47
48
from nomad.files import (
    PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles,
    PublicUploadFiles, StagingUploadFiles)
49
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
Markus Scheidgen's avatar
Markus Scheidgen committed
50
from nomad.parsing.parsers import parser_dict, match_parser
Markus Scheidgen's avatar
Markus Scheidgen committed
51
from nomad.normalizing import normalizers
52
from nomad.datamodel import EntryArchive, EditableUserMetadata, OasisMetadata
53
from nomad.archive import query_archive, write_partial_archive_to_mongo, delete_partial_archives_from_mongo
Markus Scheidgen's avatar
Markus Scheidgen committed
54
from nomad.datamodel.encyclopedia import EncyclopediaMetadata
Markus Scheidgen's avatar
Markus Scheidgen committed
55
56


57
58
59
60
section_metadata = datamodel.EntryArchive.section_metadata.name
section_workflow = datamodel.EntryArchive.section_workflow.name


Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
61
62
_editable_metadata = {
    quantity.name: quantity for quantity in EditableUserMetadata.m_def.definitions}
63
64
_oasis_metadata = {
    quantity.name: quantity for quantity in OasisMetadata.m_def.definitions}
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
65
66


67
68
69
70
71
72
73
def _pack_log_event(logger, method_name, event_dict):
    try:
        log_data = dict(event_dict)
        log_data.update(**{
            key: value
            for key, value in getattr(logger, '_context', {}).items()
            if key not in ['service', 'release', 'upload_id', 'calc_id', 'mainfile', 'process_status']})
74
        log_data.update(logger=logger.name)
75
76
77
78
79
80
81
82
83
84
85

        return log_data
    except Exception:
        # raising an exception would cause an indefinite loop
        return event_dict


_log_processors = [
    StackInfoRenderer(),
    _pack_log_event,
    format_exc_info,
86
    TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)]
87
88


Markus Scheidgen's avatar
Markus Scheidgen committed
89
class Calc(Proc):
90
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
91
92
93
94
95
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

96
97
    The attribute list, does not include the various metadata properties generated
    while parsing, including ``code_name``, ``code_version``, etc.
Markus Scheidgen's avatar
Markus Scheidgen committed
98
99

    Attributes:
100
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
101
102
103
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
104

105
106
        metadata: the metadata record wit calc and user metadata, see :class:`datamodel.EntryMetadata`
    '''
107
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
108
109
110
111
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

112
113
    metadata = DictField()

Markus Scheidgen's avatar
Markus Scheidgen committed
114
    meta: Any = {
Markus Scheidgen's avatar
Markus Scheidgen committed
115
        'indexes': [
116
            'upload_id',
117
            'parser',
118
119
120
            ('upload_id', 'mainfile'),
            ('upload_id', 'parser'),
            ('upload_id', 'tasks_status'),
121
            ('upload_id', 'process_status'),
122
            ('upload_id', 'metadata.nomad_version'),
123
124
            'metadata.processed',
            'metadata.last_processing',
125
            'metadata.published',
126
            'metadata.datasets',
Markus Scheidgen's avatar
Markus Scheidgen committed
127
            'metadata.pid'
Markus Scheidgen's avatar
Markus Scheidgen committed
128
129
130
131
132
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
133
        self._parser_results: EntryArchive = None
134
135
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
136
        self._calc_proc_logs: List[Any] = None
137

138
        self._entry_metadata = None
139

Markus Scheidgen's avatar
Markus Scheidgen committed
140
141
    @classmethod
    def get(cls, id):
142
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
143

Markus Scheidgen's avatar
Markus Scheidgen committed
144
    @property
145
146
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
147

148
149
150
151
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
152
            self._upload.worker_hostname = self.worker_hostname
153
154
        return self._upload

155
156
157
158
159
160
161
162
    def apply_entry_metadata(self, entry_metadata: datamodel.EntryMetadata):
        self.metadata = entry_metadata.m_to_dict(
            include_defaults=True,
            categories=[datamodel.MongoMetadata])  # TODO use embedded doc?

    def create_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
163
        processing object, not necessarily the user metadata nor the metadata from
164
165
166
        the archive.
        '''
        entry_metadata = datamodel.EntryMetadata()
167
        if self.parser is not None:
168
169
170
            parser = parser_dict[self.parser]
            if parser.domain:
                entry_metadata.domain = parser_dict[self.parser].domain
171
172
173
        entry_metadata.upload_id = self.upload_id
        entry_metadata.calc_id = self.calc_id
        entry_metadata.mainfile = self.mainfile
174
175
        entry_metadata.nomad_version = config.meta.version
        entry_metadata.nomad_commit = config.meta.commit
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
        entry_metadata.uploader = self.upload.user_id
        entry_metadata.upload_time = self.upload.upload_time
        entry_metadata.upload_name = self.upload.name

        return entry_metadata

    def entry_metadata(self, upload_files: UploadFiles) -> datamodel.EntryMetadata:
        '''
        Returns a complete set of :class:`nomad.datamodel.EntryMetadata` including
        the user metadata and metadata from the archive.

        Arguments:
            upload_files:
                The :class:`nomad.files.UploadFiles` instance to read the archive from.
            cache:
                A boolean that indicates if the archive file should be left unclosed,
                e.g. if this method is called for many entries of the same upload.
        '''
        archive = upload_files.read_archive(self.calc_id)
195
        try:
196
197
198
199
200
201
202
203
204
            # instead of loading the whole archive, it should be enough to load the
            # parts that are referenced by section_metadata/EntryMetadata
            # TODO somehow it should determine which root setions too load from the metainfo
            # or configuration
            calc_archive = archive[self.calc_id]
            entry_archive_dict = {section_metadata: calc_archive[section_metadata].to_dict()}
            if section_workflow in calc_archive:
                entry_archive_dict[section_workflow] = calc_archive[section_workflow].to_dict()
            entry_metadata = datamodel.EntryArchive.m_from_dict(entry_archive_dict)[section_metadata]
205
206
207
208
209
210
211
212
213

        except KeyError:
            # Due hard processing failures, it might be possible that an entry might not
            # have an archive
            if self._entry_metadata is not None:
                entry_metadata = self._entry_metadata

            else:
                entry_metadata = self.create_metadata()
214
215
216
217
218
219
220
221

        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

    def user_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
222
        processing object and the user metadata, not necessarily the metadata from
223
224
225
226
227
228
229
        the archive.
        '''
        entry_metadata = self.create_metadata()
        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

230
231
232
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
233
234
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
235
236
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
237
    def get_logger(self, **kwargs):
238
        '''
239
240
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
241
        '''
242
243
244
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
245

246
247
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []
248

249
250
        def save_to_calc_log(logger, method_name, event_dict):
            try:
251
252
                # sanitize the event_dict, because all kinds of values might have been added
                dump_dict = {key: str(value) for key, value in event_dict.items()}
253
254
                dump_dict.update(level=method_name.upper())
                self._calc_proc_logs.append(dump_dict)
255

256
257
258
            except Exception:
                # Exceptions here will cause indefinite loop
                pass
259

260
            return event_dict
261

262
        return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
263

264
    @process
265
    def re_process_calc(self):
266
        '''
267
268
269
        Processes a calculation again. This means there is already metadata and
        instead of creating it initially, we are just updating the existing
        records.
270
        '''
271
        parser = match_parser(self.upload_files.raw_file_object(self.mainfile).os_path, strict=False)
272
        logger = self.get_logger()
273
274
275

        if parser is None and not config.reprocess_unmatched:
            self.errors = ['no parser matches during re-process, will not re-process this calc']
276

277
278
279
280
281
282
283
            try:
                upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
                with upload_files.read_archive(self.calc_id) as archive:
                    self.upload_files.write_archive(self.calc_id, archive[self.calc_id].to_dict())

            except Exception as e:
                logger.error('could not copy archive for non matching, non reprocessed entry', exc_info=e)
284
                raise e
285

286
287
288
289
290
            # mock the steps of actual processing
            self._continue_with('parsing')
            self._continue_with('normalizing')
            self._continue_with('archiving')
            self._complete()
291
292
            return

293
        if parser is None:
294
295
296
297
            self.get_logger().warn('no parser matches during re-process, use the old parser')
            self.warnings = ['no matching parser found during re-processing']

        elif self.parser != parser.name:
298
299
300
301
302
303
304
305
306
            if parser_dict[self.parser].name == parser.name:
                # parser was just renamed
                self.parser = parser.name

            else:
                self.parser = parser.name
                logger.info(
                    'different parser matches during re-process, use new parser',
                    parser=parser.name)
307

308
        try:
309
310
            self._entry_metadata = self.user_metadata()
            self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
311
312
            self._entry_metadata.nomad_version = config.meta.version
            self._entry_metadata.nomad_commit = config.meta.commit
313
314
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)
315
316
317
318
319
320
321

            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
            # close loghandler that was not closed due to failures
            try:
Markus Scheidgen's avatar
Markus Scheidgen committed
322
                if self._parser_results and self._parser_results.m_resource:
323
                    self._parser_results.section_metadata = None
Markus Scheidgen's avatar
Markus Scheidgen committed
324
                    self._parser_results.m_resource.unload()
325
            except Exception as e:
326
                logger.error('could not unload processing results', exc_info=e)
327

328
329
330
331
332
    def _setup_fallback_metadata(self):
        self._entry_metadata = self.create_metadata()
        self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
        self._entry_metadata.last_processing = datetime.utcnow()
        self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)
333
        self._entry_metadata.parser_name = self.parser
334

335
    @process
336
    def process_calc(self):
337
        '''
338
339
        Processes a new calculation that has no prior records in the mongo, elastic,
        or filesystem storage. It will create an initial set of (user) metadata.
340
        '''
341
        logger = self.get_logger()
342
        if self.upload is None:
343
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
344
345

        try:
346
347
            # save preliminary minimum calc metadata in case processing fails
            # successful processing will replace it with the actual metadata
348
            self._setup_fallback_metadata()
349
350

            if len(self._entry_metadata.files) >= config.auxfile_cutoff:
351
352
353
354
                self.warning(
                    'This calc has many aux files in its directory. '
                    'Have you placed many calculations in the same directory?')

Markus Scheidgen's avatar
Markus Scheidgen committed
355
356
357
358
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
359
            # close loghandler that was not closed due to failures
360
            try:
Markus Scheidgen's avatar
Markus Scheidgen committed
361
                if self._parser_results and self._parser_results.m_resource:
362
                    self._parser_results.section_metadata = None
Markus Scheidgen's avatar
Markus Scheidgen committed
363
                    self._parser_results.m_resource.unload()
364
            except Exception as e:
365
                logger.error('could unload processing results', exc_info=e)
366

367
    def on_fail(self):
368
369
370
        # in case of failure, index a minimum set of metadata and mark
        # processing failure
        try:
371
372
373
            if self._entry_metadata is None:
                self._setup_fallback_metadata()

374
            self._entry_metadata.processed = False
375

376
377
378
379
380
381
382
383
384
385
386
387
            try:
                self.apply_entry_metadata(self._entry_metadata)
            except Exception as e:
                self.get_logger().error(
                    'could not apply entry metadata to entry', exc_info=e)

            try:
                self._entry_metadata.apply_domain_metadata(self._parser_results)
            except Exception as e:
                self.get_logger().error(
                    'could not apply domain metadata to entry', exc_info=e)

388
            self._entry_metadata.a_elastic.index()
389
        except Exception as e:
390
391
            self.get_logger().error(
                'could not index after processing failure', exc_info=e)
392

393
        try:
394
            self.write_archive(self._parser_results)
395
        except Exception as e:
396
397
            self.get_logger().error(
                'could not write archive after processing failure', exc_info=e)
398

399
400
401
402
403
404
405
406
    def on_process_complete(self, process_name):
        # the save might be necessary to correctly read the join condition from the db
        self.save()
        # in case of error, the process_name might be unknown
        if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None:
            self.upload.reload()
            self.upload.check_join()

Markus Scheidgen's avatar
Markus Scheidgen committed
407
408
    @task
    def parsing(self):
409
        ''' The *task* that encapsulates all parsing related actions. '''
410
        context = dict(parser=self.parser, step=self.parser)
411
        logger = self.get_logger(**context)
412
        parser = parser_dict[self.parser]
413
        self._entry_metadata.parser_name = self.parser
Markus Scheidgen's avatar
Markus Scheidgen committed
414

415
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
416
            try:
417
418
419
                self._parser_results = EntryArchive()
                # allow parsers to read/write metadata
                self._parser_results.m_add_sub_section(EntryArchive.section_metadata, self._entry_metadata)
Markus Scheidgen's avatar
Markus Scheidgen committed
420
421
422
                parser.parse(
                    self.upload_files.raw_file_object(self.mainfile).os_path,
                    self._parser_results, logger=logger)
423

424
            except Exception as e:
425
                self.fail('parser failed with exception', exc_info=e, error=str(e), **context)
426
                return
427
            except SystemExit:
428
                self.fail('parser raised system exit', error='system exit', **context)
429
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
430

431
432
433
434
435
436
437
438
439
440
    def process_phonon(self):
        """Function that is run for phonon calculation before cleanup.
        This task is run by the celery process that is calling the join for the
        upload.

        This function re-opens the Archive for this calculation to add method
        information from another referenced archive. Updates the method
        information in section_encyclopedia as well as the DFT domain metadata.
        """
        try:
441
            logger = self.get_logger(parser=self.parser, step=self.parser)
442
443
444
445
446
447

            # Open the archive of the phonon calculation.
            upload_files = StagingUploadFiles(self.upload_id, is_authorized=lambda: True)
            with upload_files.read_archive(self.calc_id) as archive:
                arch = query_archive(archive, {self.calc_id: self.calc_id})[self.calc_id]
                phonon_archive = EntryArchive.m_from_dict(arch)
Markus Scheidgen's avatar
Markus Scheidgen committed
448
            self._entry_metadata = phonon_archive.section_metadata
449
            self._calc_proc_logs = phonon_archive.processing_logs
450

Markus Scheidgen's avatar
Markus Scheidgen committed
451
452
            # Re-create the parse results
            self._parser_results = phonon_archive
Markus Scheidgen's avatar
Markus Scheidgen committed
453

454
455
456
            # Read in the first referenced calculation. The reference is given as
            # an absolute path which needs to be converted into a path that is
            # relative to upload root.
Markus Scheidgen's avatar
Markus Scheidgen committed
457
            scc = self._parser_results.section_run[0].section_single_configuration_calculation[0]
458
            relative_ref = scc.section_calculation_to_calculation_refs[0].calculation_to_calculation_external_url
459
460
461
462
463
464
            ref_id = upload_files.calc_id(relative_ref)
            with upload_files.read_archive(ref_id) as archive:
                arch = query_archive(archive, {ref_id: ref_id})[ref_id]
                ref_archive = EntryArchive.m_from_dict(arch)

            # Get encyclopedia method information directly from the referenced calculation.
465
            ref_enc_method = ref_archive.section_metadata.encyclopedia.method
466
            if ref_enc_method is None or len(ref_enc_method) == 0 or ref_enc_method.functional_type is None:
467
                raise ValueError("No method information available in referenced calculation.")
Markus Scheidgen's avatar
Markus Scheidgen committed
468
            self._parser_results.section_metadata.encyclopedia.method = ref_enc_method
469
470
471
472
473
474
475
476

            # Overwrite old entry with new data. The metadata is updated with
            # new timestamp and method details taken from the referenced
            # archive.
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.dft.xc_functional = ref_archive.section_metadata.dft.xc_functional
            self._entry_metadata.dft.basis_set = ref_archive.section_metadata.dft.basis_set
            self._entry_metadata.dft.update_group_hash()
477
            self._entry_metadata.encyclopedia.status = EncyclopediaMetadata.status.type.success
478
        except Exception as e:
479
            logger.error("Could not retrieve method information for phonon calculation.", exc_info=e)
Markus Scheidgen's avatar
Markus Scheidgen committed
480
481
482
483
            if self._entry_metadata is None:
                self._setup_fallback_metadata()
                self._entry_metadata.processed = False

484
485
486
487
488
489
            try:
                if self._entry_metadata.encyclopedia is None:
                    self._entry_metadata.encyclopedia = EncyclopediaMetadata()
                self._entry_metadata.encyclopedia.status = EncyclopediaMetadata.status.type.failure
            except Exception as e:
                logger.error("Could set encyclopedia status.", exc_info=e)
490

491
        finally:
492
493
494
495
496
497
498
499
500
501
502
503
504
            # persist the calc metadata
            with utils.timer(logger, 'saved calc metadata', step='metadata'):
                self.apply_entry_metadata(self._entry_metadata)

            # index in search
            with utils.timer(logger, 'indexed', step='index'):
                self._entry_metadata.a_elastic.index()

            # persist the archive
            with utils.timer(
                    logger, 'archived', step='archive',
                    input_size=self.mainfile_file.size) as log_data:

Markus Scheidgen's avatar
Markus Scheidgen committed
505
                archive_size = self.write_archive(self._parser_results)
506
507
                log_data.update(archive_size=archive_size)

Markus Scheidgen's avatar
Markus Scheidgen committed
508
509
    @task
    def normalizing(self):
510
        ''' The *task* that encapsulates all normalizing related actions. '''
511
512

        # allow normalizer to access and add data to the entry metadata
513
514
515
        if self._parser_results.section_metadata is None:
            self._parser_results.m_add_sub_section(
                datamodel.EntryArchive.section_metadata, self._entry_metadata)
516

Markus Scheidgen's avatar
Markus Scheidgen committed
517
        for normalizer in normalizers:
518
            if normalizer.domain != parser_dict[self.parser].domain:
519
520
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
521
            normalizer_name = normalizer.__name__
522
            context = dict(normalizer=normalizer_name, step=normalizer_name)
523
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
524

Markus Scheidgen's avatar
Markus Scheidgen committed
525
526
527
528
529
530
            with utils.timer(logger, 'normalizer executed', input_size=self.mainfile_file.size):
                try:
                    normalizer(self._parser_results).normalize(logger=logger)
                    logger.info('processor completed successfull', **context)
                except Exception as e:
                    self.fail('normalizer failed with exception', exc_info=e, error=str(e), **context)
Markus Scheidgen's avatar
Markus Scheidgen committed
531

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
532
533
534
535
536
    def _read_metadata_from_file(self, logger):
        # metadata file name defined in nomad.config nomad_metadata.yaml/json
        # which can be placed in the directory containing the mainfile or somewhere up
        # highest priority is directory with mainfile

537
        metadata_file = config.metadata_file_name
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
538
539
540
        metadata_dir = os.path.dirname(self.mainfile_file.os_path)

        metadata = {}
541
542
543
544
545
546
547
548
549
550
551
552
        while True:
            # top-level nomad file can also contain an entries dict with entry
            # metadata per mainfile as key
            if metadata_dir == self.upload_files.os_path:
                entries = metadata_part.get('entries', {})
                metadata_part = entries.get(self.mainfile, {})
                for key, val in metadata_part.items():
                    metadata.setdefault(key, val)

            # consider the nomad file of the current directory
            metadata_part = self.upload.metadata_file_cached(
                os.path.join(metadata_dir, metadata_file))
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
553
554
555
            for key, val in metadata_part.items():
                metadata.setdefault(key, val)

556
            if metadata_dir == self.upload_files.os_path:
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
557
558
                break

559
560
561
            metadata_dir = os.path.dirname(metadata_dir)

        if len(metadata) > 0:
562
563
            logger.info('Apply user metadata from nomad.yaml/json file')

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
564
        for key, val in metadata.items():
565
566
567
            if key == 'entries':
                continue

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
568
            definition = _editable_metadata.get(key, None)
569
570
571
572
573
            if definition is None and self.upload.from_oasis:
                definition = _oasis_metadata.get(key, None)

            if definition is None:
                logger.warn('Users cannot set metadata', quantity=key)
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
574
                continue
575
576
577
578
579
580
581
582
583

            try:
                self._entry_metadata.m_set(definition, val)
                if definition == datamodel.EntryMetadata.calc_id:
                    self.calc_id = val
            except Exception as e:
                logger.error(
                    'Could not apply user metadata from nomad.yaml/json file',
                    quantitiy=definition.name, exc_info=e)
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
584

Markus Scheidgen's avatar
Markus Scheidgen committed
585
586
    @task
    def archiving(self):
587
        ''' The *task* that encapsulates all archival related actions. '''
588
589
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
590
        self._entry_metadata.apply_domain_metadata(self._parser_results)
591
        self._entry_metadata.processed = True
592

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
593
594
        self._read_metadata_from_file(logger)

595
596
        # persist the calc metadata
        with utils.timer(logger, 'saved calc metadata', step='metadata'):
597
            self.apply_entry_metadata(self._entry_metadata)
598
599

        # index in search
600
        with utils.timer(logger, 'indexed', step='index'):
601
            self._entry_metadata.a_elastic.index()
602

603
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
604
        with utils.timer(
605
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
606
                input_size=self.mainfile_file.size) as log_data:
607

Markus Scheidgen's avatar
Markus Scheidgen committed
608
            archive_size = self.write_archive(self._parser_results)
609
            log_data.update(archive_size=archive_size)
Markus Scheidgen's avatar
Markus Scheidgen committed
610

Markus Scheidgen's avatar
Markus Scheidgen committed
611
    def write_archive(self, archive: EntryArchive):
612
613
        # save the archive mongo entry
        try:
614
615
            if self._entry_metadata.processed:
                write_partial_archive_to_mongo(archive)
616
617
618
619
        except Exception as e:
            self.get_logger().error('could not write mongodb archive entry', exc_info=e)

        # add the processing logs to the archive
620
621
622
623
624
625
        def filter_processing_logs(logs):
            if len(logs) > 100:
                return [
                    log for log in logs
                    if log.get('level') != 'DEBUG']
            return logs
Markus Scheidgen's avatar
Markus Scheidgen committed
626

627
628
629
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []

Markus Scheidgen's avatar
Markus Scheidgen committed
630
631
        if archive is not None:
            archive = archive.m_copy()
632
        else:
Markus Scheidgen's avatar
Markus Scheidgen committed
633
            archive = datamodel.EntryArchive()
634

Markus Scheidgen's avatar
Markus Scheidgen committed
635
636
        if archive.section_metadata is None:
            archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
637

Markus Scheidgen's avatar
Markus Scheidgen committed
638
        archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
639

640
        # save the archive msg-pack
641
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
642
            return self.upload_files.write_archive(self.calc_id, archive.m_to_dict())
643
644
        except Exception as e:
            # most likely failed due to domain data, try to write metadata and processing logs
Markus Scheidgen's avatar
Markus Scheidgen committed
645
646
647
648
            archive = datamodel.EntryArchive()
            archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
            archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
            self.upload_files.write_archive(self.calc_id, archive.m_to_dict())
649
            raise e
Markus Scheidgen's avatar
Markus Scheidgen committed
650

651
    def __str__(self):
652
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
653

654

655
class Upload(Proc):
656
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
657
658
659
660
661
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
662
663
        upload_path: the path were the uploaded files was stored
        temporary: True if the uploaded file should be removed after extraction
Markus Scheidgen's avatar
Markus Scheidgen committed
664
        upload_id: the upload id generated by the database
665
        upload_time: the timestamp when the system realized the upload
666
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
667
668
        published: Boolean that indicates the publish status
        publish_time: Date when the upload was initially published
669
        last_update: Date of the last publishing/re-processing
670
        joined: Boolean indicates if the running processing has joined (:func:`check_join`)
671
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
672
673
674
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
675
676
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
677
    embargo_length = IntField(default=36)
Markus Scheidgen's avatar
Markus Scheidgen committed
678
679
680

    name = StringField(default=None)
    upload_time = DateTimeField()
681
    user_id = StringField(required=True)
682
683
    published = BooleanField(default=False)
    publish_time = DateTimeField()
684
    last_update = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
685

686
    from_oasis = BooleanField(default=False)
687
688
    joined = BooleanField(default=False)

Markus Scheidgen's avatar
Markus Scheidgen committed
689
690
    meta: Any = {
        'indexes': [
691
            'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
692
        ]
Markus Scheidgen's avatar
Markus Scheidgen committed
693
694
695
696
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
697
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
698

699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
    @lru_cache()
    def metadata_file_cached(self, path):
        for ext in config.metadata_file_extensions:
            full_path = '%s.%s' % (path, ext)
            if os.path.isfile(full_path):
                try:
                    with open(full_path) as f:
                        if full_path.endswith('.json'):
                            return json.load(f)
                        elif full_path.endswith('.yaml') or full_path.endswith('.yml'):
                            return yaml.load(f, Loader=getattr(yaml, 'FullLoader'))
                        else:
                            return {}
                except Exception as e:
                    self.get_logger().warn('could not parse nomad.yaml/json', path=path, exc_info=e)
                    # ignore the file contents if the file is not parsable
                    pass
        return {}

718
719
    @property
    def metadata(self) -> dict:
720
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
721
722
723
724
725
        Getter, setter for user metadata. Metadata is pickled to and from the public
        bucket to allow sharing among all processes. Usually uploads do not have (much)
        user defined metadata, but users provide all metadata per upload as part of
        the publish process. This will change, when we introduce editing functionality
        and metadata will be provided through different means.
726
        '''
727
728
729
730
731
        try:
            upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
        except KeyError:
            return None
        return upload_files.user_metadata
732
733
734

    @metadata.setter
    def metadata(self, data: dict) -> None:
735
736
        upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True, create=True)
        upload_files.user_metadata = data
737

Markus Scheidgen's avatar
Markus Scheidgen committed
738
    @classmethod
739
    def get(cls, id: str, include_published: bool = True) -> 'Upload':
740
        return cls.get_by_id(id, 'upload_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
741
742

    @classmethod
743
    def user_uploads(cls, user: datamodel.User, **kwargs) -> List['Upload']:
744
        ''' Returns all uploads for the given user. Kwargs are passed to mongo query. '''
745
        return cls.objects(user_id=str(user.user_id), **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
746

747
748
    @property
    def uploader(self):
749
        return datamodel.User.get(self.user_id)
750

Markus Scheidgen's avatar
Markus Scheidgen committed
751
752
    def get_logger(self, **kwargs):
        logger = super().get_logger()
753
754
        user = self.uploader
        user_name = '%s %s' % (user.first_name, user.last_name)
755
        # We are not using 'user_id' because logstash (?) will filter these entries ?!
756
757
        logger = logger.bind(
            upload_id=self.upload_id, upload_name=self.name, user_name=user_name,
758
            user=user.user_id, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
759
760
761
762
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
763
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
764
765
766
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
767
768

        Arguments:
769
            user: The user that created the upload.
770
        '''
771
772
        # use kwargs to keep compatibility with super method
        user: datamodel.User = kwargs['user']
773
        del(kwargs['user'])
774

775
776
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
777
        kwargs.update(user_id=user.user_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
778
        self = super().create(**kwargs)
779

Markus Scheidgen's avatar
Markus Scheidgen committed
780
        self._continue_with('uploading')
781

Markus Scheidgen's avatar
Markus Scheidgen committed
782
783
        return self

784
    def delete(self):
785
        ''' Deletes this upload process state entry and its calcs. '''
786
787
788
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

789
    def delete_upload_local(self):
790
        '''
791
        Deletes the upload, including its processing state and
792
        staging files. Local version without celery processing.
793
        '''
794
795
        logger = self.get_logger()

796
        with utils.lnr(logger, 'upload delete failed'):
797
            with utils.timer(
798
                    logger, 'upload deleted from index', step='index',
799
                    upload_size=self.upload_files.size):
800
                search.delete_upload(self.upload_id)
801

802
803
804
805
806
807
808
            with utils.timer(
                    logger, 'upload partial archives', step='files',
                    upload_size=self.upload_files.size):

                calc_ids = [calc.calc_id for calc in Calc.objects(upload_id=self.upload_id)]
                delete_partial_archives_from_mongo(calc_ids)

809
            with utils.timer(
810
                    logger, 'upload deleted', step='files',
811
812
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
813
814

            self.delete()
815

816
    @process
817
    def delete_upload(self):
818
        '''
819
        Deletes the upload, including its processing state and
820
        staging files. This starts the celery process of deleting the upload.
821
        '''
822
        self.delete_upload_local()
823

824
        return True  # do not save the process status on the delete upload
825

826
    @process
827
    def publish_upload(self):
828
        '''
829
830
        Moves the upload out of staging to the public area. It will
        pack the staging upload files in to public upload files.
831
        '''
832
833
        assert self.processed_calcs > 0

834
        logger = self.get_logger()
835
        logger.info('started to publish')
836

837
        with utils.lnr(logger, 'publish failed'):
838
            with self.entries_metadata(self.metadata) as calcs:
839

840
                with utils.timer(
841
                        logger, 'upload metadata updated', step='metadata',
842
                        upload_size=self.upload_files.size):
843

844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
                    def create_update(calc):
                        calc.published = True
                        calc.with_embargo = calc.with_embargo if calc.with_embargo is not None else False
                        return UpdateOne(
                            {'_id': calc.calc_id},
                            {'$set': {'metadata': calc.m_to_dict(
                                include_defaults=True, categories=[datamodel.MongoMetadata])}})

                    Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload files packed', step='pack',
                            upload_size=self.upload_files.size):
                        self.upload_files.pack(calcs)
859

860
                with utils.timer(
861
                        logger, 'index updated', step='index',
862
                        upload_size=self.upload_files.size):
863
864
865
866
867
868
869
870
871
872
873
874
                    search.publish(calcs)

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload deleted', step='delete staged',
                            upload_size=self.upload_files.size):
                        self.upload_files.delete()
                        self.published = True
                        self.publish_time = datetime.utcnow()
                        self.last_update = datetime.utcnow()
                        self.save()
                else:
875
                    self.last_update = datetime.utcnow()
876
                    self.save()
877

878
879
    @process
    def re_process_upload(self):
880
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
881
882
883
        A *process* that performs the re-processing of a earlier processed
        upload.

884
        Runs the distributed process of fully reparsing/re-normalizing an existing and
885
886
887
888
889
        already published upload. Will renew the archive part of the upload and update
        mongo and elastic search entries.

        TODO this implementation does not do any re-matching. This will be more complex
        due to handling of new or missing matches.
890
        '''
891
892
893
        logger = self.get_logger()
        logger.info('started to re-process')

894
895
896
897
898
        # mock the steps of actual processing
        self._continue_with('uploading')

        # extract the published raw files into a staging upload files instance
        self._continue_with('extracting')
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913

        if self.published:
            try:
                staging_upload_files = StagingUploadFiles(self.upload_id)
                # public files exist and there is a staging directory, it is probably old
                # and we delete it first
                staging_upload_files.delete()
                logger.warn('deleted old staging files')

            except KeyError as e:
                logger.info('reprocessing published files')
        else:
            logger.info('reprocessing staging files')

        staging_upload_files = self.upload_files.to_staging_upload_files(create=True)
914
915

        self._continue_with('parse_all')