data.py 53.1 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
16
17
18
19
20
21
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
Markus Scheidgen's avatar
Markus Scheidgen committed
22

Markus Scheidgen's avatar
Markus Scheidgen committed
23
.. autoclass:: Upload
Markus Scheidgen's avatar
Markus Scheidgen committed
24

25
'''
26
27

from typing import cast, List, Any, Tuple, Iterator, Dict, cast, Iterable
28
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField
Markus Scheidgen's avatar
Markus Scheidgen committed
29
import logging
30
from structlog import wrap_logger
31
from contextlib import contextmanager
32
import os.path
33
34
from datetime import datetime
from pymongo import UpdateOne
35
import hashlib
36
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
37
38
39
import yaml
import json
from cachetools import cached, LRUCache
40
from cachetools.keys import hashkey
Markus Scheidgen's avatar
Markus Scheidgen committed
41

42
from nomad import utils, config, infrastructure, search, datamodel
Markus Scheidgen's avatar
Markus Scheidgen committed
43
44
45
from nomad.files import (
    PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles,
    PublicUploadFiles, StagingUploadFiles)
46
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
Markus Scheidgen's avatar
Markus Scheidgen committed
47
from nomad.parsing.parsers import parser_dict, match_parser
Markus Scheidgen's avatar
Markus Scheidgen committed
48
from nomad.normalizing import normalizers
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
49
from nomad.datamodel import EntryArchive, EditableUserMetadata
50
from nomad.archive import query_archive, write_partial_archive_to_mongo
Markus Scheidgen's avatar
Markus Scheidgen committed
51
from nomad.datamodel.encyclopedia import EncyclopediaMetadata
Markus Scheidgen's avatar
Markus Scheidgen committed
52
53


54
55
56
57
section_metadata = datamodel.EntryArchive.section_metadata.name
section_workflow = datamodel.EntryArchive.section_workflow.name


Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
58
59
60
61
_editable_metadata = {
    quantity.name: quantity for quantity in EditableUserMetadata.m_def.definitions}


62
63
64
@cached(cache=LRUCache(maxsize=100), key=lambda path, *args, **kwargs: hashkey(path))
def metadata_file_cached(path, logger):
    for ext in config.metadata_file_extensions:
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
65
66
        full_path = '%s.%s' % (path, ext)
        if os.path.isfile(full_path):
67
68
69
70
71
72
73
74
75
76
77
78
            try:
                with open(full_path) as f:
                    if full_path.endswith('.json'):
                        return json.load(f)
                    elif full_path.endswith('.yaml') or full_path.endswith('.yml'):
                        return yaml.load(f, Loader=getattr(yaml, 'FullLoader'))
                    else:
                        return {}
            except Exception as e:
                logger.warn('could not parse nomad.yaml/json', path=path, exc_info=e)
                # ignore the file contents if the file is not parsable
                pass
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
79
80
81
    return {}


82
83
84
85
86
87
88
def _pack_log_event(logger, method_name, event_dict):
    try:
        log_data = dict(event_dict)
        log_data.update(**{
            key: value
            for key, value in getattr(logger, '_context', {}).items()
            if key not in ['service', 'release', 'upload_id', 'calc_id', 'mainfile', 'process_status']})
89
        log_data.update(logger=logger.name)
90
91
92
93
94
95
96
97
98
99
100

        return log_data
    except Exception:
        # raising an exception would cause an indefinite loop
        return event_dict


_log_processors = [
    StackInfoRenderer(),
    _pack_log_event,
    format_exc_info,
101
    TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)]
102
103


Markus Scheidgen's avatar
Markus Scheidgen committed
104
class Calc(Proc):
105
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
106
107
108
109
110
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

111
112
    The attribute list, does not include the various metadata properties generated
    while parsing, including ``code_name``, ``code_version``, etc.
Markus Scheidgen's avatar
Markus Scheidgen committed
113
114

    Attributes:
115
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
116
117
118
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
119

120
121
        metadata: the metadata record wit calc and user metadata, see :class:`datamodel.EntryMetadata`
    '''
122
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
123
124
125
126
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

127
128
    metadata = DictField()

Markus Scheidgen's avatar
Markus Scheidgen committed
129
    meta: Any = {
Markus Scheidgen's avatar
Markus Scheidgen committed
130
        'indexes': [
131
            'upload_id',
132
            'parser',
133
134
135
            ('upload_id', 'mainfile'),
            ('upload_id', 'parser'),
            ('upload_id', 'tasks_status'),
136
            ('upload_id', 'process_status'),
137
            ('upload_id', 'metadata.nomad_version'),
138
139
            'metadata.processed',
            'metadata.last_processing',
140
            'metadata.published',
141
            'metadata.datasets',
Markus Scheidgen's avatar
Markus Scheidgen committed
142
            'metadata.pid'
Markus Scheidgen's avatar
Markus Scheidgen committed
143
144
145
146
147
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
148
        self._parser_results: EntryArchive = None
149
150
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
151
        self._calc_proc_logs: List[Any] = None
152

153
        self._entry_metadata = None
154

Markus Scheidgen's avatar
Markus Scheidgen committed
155
156
    @classmethod
    def get(cls, id):
157
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
158

Markus Scheidgen's avatar
Markus Scheidgen committed
159
    @property
160
161
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
162

163
164
165
166
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
167
            self._upload.worker_hostname = self.worker_hostname
168
169
        return self._upload

170
171
172
173
174
175
176
177
    def apply_entry_metadata(self, entry_metadata: datamodel.EntryMetadata):
        self.metadata = entry_metadata.m_to_dict(
            include_defaults=True,
            categories=[datamodel.MongoMetadata])  # TODO use embedded doc?

    def create_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
178
        processing object, not necessarily the user metadata nor the metadata from
179
180
181
        the archive.
        '''
        entry_metadata = datamodel.EntryMetadata()
182
        if self.parser is not None:
183
184
185
            parser = parser_dict[self.parser]
            if parser.domain:
                entry_metadata.domain = parser_dict[self.parser].domain
186
187
188
        entry_metadata.upload_id = self.upload_id
        entry_metadata.calc_id = self.calc_id
        entry_metadata.mainfile = self.mainfile
189
190
        entry_metadata.nomad_version = config.meta.version
        entry_metadata.nomad_commit = config.meta.commit
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
        entry_metadata.uploader = self.upload.user_id
        entry_metadata.upload_time = self.upload.upload_time
        entry_metadata.upload_name = self.upload.name

        return entry_metadata

    def entry_metadata(self, upload_files: UploadFiles) -> datamodel.EntryMetadata:
        '''
        Returns a complete set of :class:`nomad.datamodel.EntryMetadata` including
        the user metadata and metadata from the archive.

        Arguments:
            upload_files:
                The :class:`nomad.files.UploadFiles` instance to read the archive from.
            cache:
                A boolean that indicates if the archive file should be left unclosed,
                e.g. if this method is called for many entries of the same upload.
        '''
        archive = upload_files.read_archive(self.calc_id)
210
        try:
211
212
213
214
215
216
217
218
219
            # instead of loading the whole archive, it should be enough to load the
            # parts that are referenced by section_metadata/EntryMetadata
            # TODO somehow it should determine which root setions too load from the metainfo
            # or configuration
            calc_archive = archive[self.calc_id]
            entry_archive_dict = {section_metadata: calc_archive[section_metadata].to_dict()}
            if section_workflow in calc_archive:
                entry_archive_dict[section_workflow] = calc_archive[section_workflow].to_dict()
            entry_metadata = datamodel.EntryArchive.m_from_dict(entry_archive_dict)[section_metadata]
220
221
222
223
224
225
226
227
228

        except KeyError:
            # Due hard processing failures, it might be possible that an entry might not
            # have an archive
            if self._entry_metadata is not None:
                entry_metadata = self._entry_metadata

            else:
                entry_metadata = self.create_metadata()
229
230
231
232
233
234
235
236

        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

    def user_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
237
        processing object and the user metadata, not necessarily the metadata from
238
239
240
241
242
243
244
        the archive.
        '''
        entry_metadata = self.create_metadata()
        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

245
246
247
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
248
249
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
250
251
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
252
    def get_logger(self, **kwargs):
253
        '''
254
255
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
256
        '''
257
258
259
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
260

261
262
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []
263

264
265
        def save_to_calc_log(logger, method_name, event_dict):
            try:
266
267
                # sanitize the event_dict, because all kinds of values might have been added
                dump_dict = {key: str(value) for key, value in event_dict.items()}
268
269
                dump_dict.update(level=method_name.upper())
                self._calc_proc_logs.append(dump_dict)
270

271
272
273
            except Exception:
                # Exceptions here will cause indefinite loop
                pass
274

275
            return event_dict
276

277
        return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
278

279
    @process
280
    def re_process_calc(self):
281
        '''
282
283
284
        Processes a calculation again. This means there is already metadata and
        instead of creating it initially, we are just updating the existing
        records.
285
        '''
286
        parser = match_parser(self.upload_files.raw_file_object(self.mainfile).os_path, strict=False)
287
        logger = self.get_logger()
288
289
290

        if parser is None and not config.reprocess_unmatched:
            self.errors = ['no parser matches during re-process, will not re-process this calc']
291

292
293
294
295
296
297
298
            try:
                upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
                with upload_files.read_archive(self.calc_id) as archive:
                    self.upload_files.write_archive(self.calc_id, archive[self.calc_id].to_dict())

            except Exception as e:
                logger.error('could not copy archive for non matching, non reprocessed entry', exc_info=e)
299
                raise e
300

301
302
303
304
305
            # mock the steps of actual processing
            self._continue_with('parsing')
            self._continue_with('normalizing')
            self._continue_with('archiving')
            self._complete()
306
307
            return

308
        if parser is None:
309
310
311
312
            self.get_logger().warn('no parser matches during re-process, use the old parser')
            self.warnings = ['no matching parser found during re-processing']

        elif self.parser != parser.name:
313
314
315
316
317
318
319
320
321
            if parser_dict[self.parser].name == parser.name:
                # parser was just renamed
                self.parser = parser.name

            else:
                self.parser = parser.name
                logger.info(
                    'different parser matches during re-process, use new parser',
                    parser=parser.name)
322

323
        try:
324
325
            self._entry_metadata = self.user_metadata()
            self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
326
327
            self._entry_metadata.nomad_version = config.meta.version
            self._entry_metadata.nomad_commit = config.meta.commit
328
329
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)
330
331
332
333
334
335
336

            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
            # close loghandler that was not closed due to failures
            try:
Markus Scheidgen's avatar
Markus Scheidgen committed
337
                if self._parser_results and self._parser_results.m_resource:
338
                    self._parser_results.section_metadata = None
Markus Scheidgen's avatar
Markus Scheidgen committed
339
                    self._parser_results.m_resource.unload()
340
            except Exception as e:
341
                logger.error('could not unload processing results', exc_info=e)
342

343
344
345
346
347
348
    def _setup_fallback_metadata(self):
        self._entry_metadata = self.create_metadata()
        self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
        self._entry_metadata.last_processing = datetime.utcnow()
        self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)

349
    @process
350
    def process_calc(self):
351
        '''
352
353
        Processes a new calculation that has no prior records in the mongo, elastic,
        or filesystem storage. It will create an initial set of (user) metadata.
354
        '''
355
        logger = self.get_logger()
356
        if self.upload is None:
357
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
358
359

        try:
360
361
            # save preliminary minimum calc metadata in case processing fails
            # successful processing will replace it with the actual metadata
362
            self._setup_fallback_metadata()
363
364

            if len(self._entry_metadata.files) >= config.auxfile_cutoff:
365
366
367
368
                self.warning(
                    'This calc has many aux files in its directory. '
                    'Have you placed many calculations in the same directory?')

Markus Scheidgen's avatar
Markus Scheidgen committed
369
370
371
372
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
373
            # close loghandler that was not closed due to failures
374
            try:
Markus Scheidgen's avatar
Markus Scheidgen committed
375
                if self._parser_results and self._parser_results.m_resource:
376
                    self._parser_results.section_metadata = None
Markus Scheidgen's avatar
Markus Scheidgen committed
377
                    self._parser_results.m_resource.unload()
378
            except Exception as e:
379
                logger.error('could unload processing results', exc_info=e)
380

381
    def on_fail(self):
382
383
384
        # in case of failure, index a minimum set of metadata and mark
        # processing failure
        try:
385
386
387
            if self._entry_metadata is None:
                self._setup_fallback_metadata()

388
            self._entry_metadata.processed = False
389

390
391
392
393
394
395
396
397
398
399
400
401
            try:
                self.apply_entry_metadata(self._entry_metadata)
            except Exception as e:
                self.get_logger().error(
                    'could not apply entry metadata to entry', exc_info=e)

            try:
                self._entry_metadata.apply_domain_metadata(self._parser_results)
            except Exception as e:
                self.get_logger().error(
                    'could not apply domain metadata to entry', exc_info=e)

Markus Scheidgen's avatar
Markus Scheidgen committed
402
            if self._parser_results and self._parser_results.m_resource:
403
                self._parser_results.section_metadata = None
Markus Scheidgen's avatar
Markus Scheidgen committed
404
                self._parser_results.m_resource.unload()
405

406
            self._entry_metadata.a_elastic.index()
407
        except Exception as e:
408
409
            self.get_logger().error(
                'could not index after processing failure', exc_info=e)
410

411
        try:
412
            self.write_archive(None)
413
        except Exception as e:
414
415
            self.get_logger().error(
                'could not write archive after processing failure', exc_info=e)
416

417
418
419
420
421
422
423
424
    def on_process_complete(self, process_name):
        # the save might be necessary to correctly read the join condition from the db
        self.save()
        # in case of error, the process_name might be unknown
        if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None:
            self.upload.reload()
            self.upload.check_join()

Markus Scheidgen's avatar
Markus Scheidgen committed
425
426
    @task
    def parsing(self):
427
        ''' The *task* that encapsulates all parsing related actions. '''
428
        context = dict(parser=self.parser, step=self.parser)
429
        logger = self.get_logger(**context)
430
        parser = parser_dict[self.parser]
431
        self._entry_metadata.parser_name = self.parser
Markus Scheidgen's avatar
Markus Scheidgen committed
432

433
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
434
            try:
435
436
437
                self._parser_results = EntryArchive()
                # allow parsers to read/write metadata
                self._parser_results.m_add_sub_section(EntryArchive.section_metadata, self._entry_metadata)
Markus Scheidgen's avatar
Markus Scheidgen committed
438
439
440
                parser.parse(
                    self.upload_files.raw_file_object(self.mainfile).os_path,
                    self._parser_results, logger=logger)
441

442
            except Exception as e:
443
                self.fail('parser failed with exception', exc_info=e, error=str(e), **context)
444
                return
445
            except SystemExit:
446
                self.fail('parser raised system exit', error='system exit', **context)
447
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
448

449
450
451
452
453
454
455
456
457
458
    def process_phonon(self):
        """Function that is run for phonon calculation before cleanup.
        This task is run by the celery process that is calling the join for the
        upload.

        This function re-opens the Archive for this calculation to add method
        information from another referenced archive. Updates the method
        information in section_encyclopedia as well as the DFT domain metadata.
        """
        try:
459
            logger = self.get_logger(parser=self.parser, step=self.parser)
460
461
462
463
464
465

            # Open the archive of the phonon calculation.
            upload_files = StagingUploadFiles(self.upload_id, is_authorized=lambda: True)
            with upload_files.read_archive(self.calc_id) as archive:
                arch = query_archive(archive, {self.calc_id: self.calc_id})[self.calc_id]
                phonon_archive = EntryArchive.m_from_dict(arch)
Markus Scheidgen's avatar
Markus Scheidgen committed
466
            self._entry_metadata = phonon_archive.section_metadata
467
            self._calc_proc_logs = phonon_archive.processing_logs
468

Markus Scheidgen's avatar
Markus Scheidgen committed
469
470
            # Re-create the parse results
            self._parser_results = phonon_archive
Markus Scheidgen's avatar
Markus Scheidgen committed
471

472
473
474
            # Read in the first referenced calculation. The reference is given as
            # an absolute path which needs to be converted into a path that is
            # relative to upload root.
Markus Scheidgen's avatar
Markus Scheidgen committed
475
            scc = self._parser_results.section_run[0].section_single_configuration_calculation[0]
476
            relative_ref = scc.section_calculation_to_calculation_refs[0].calculation_to_calculation_external_url
477
478
479
480
481
482
            ref_id = upload_files.calc_id(relative_ref)
            with upload_files.read_archive(ref_id) as archive:
                arch = query_archive(archive, {ref_id: ref_id})[ref_id]
                ref_archive = EntryArchive.m_from_dict(arch)

            # Get encyclopedia method information directly from the referenced calculation.
483
            ref_enc_method = ref_archive.section_metadata.encyclopedia.method
484
            if ref_enc_method is None or len(ref_enc_method) == 0 or ref_enc_method.functional_type is None:
485
                raise ValueError("No method information available in referenced calculation.")
Markus Scheidgen's avatar
Markus Scheidgen committed
486
            self._parser_results.section_metadata.encyclopedia.method = ref_enc_method
487
488
489
490
491
492
493
494

            # Overwrite old entry with new data. The metadata is updated with
            # new timestamp and method details taken from the referenced
            # archive.
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.dft.xc_functional = ref_archive.section_metadata.dft.xc_functional
            self._entry_metadata.dft.basis_set = ref_archive.section_metadata.dft.basis_set
            self._entry_metadata.dft.update_group_hash()
495
            self._entry_metadata.encyclopedia.status = EncyclopediaMetadata.status.type.success
496
        except Exception as e:
497
            logger.error("Could not retrieve method information for phonon calculation.", exc_info=e)
Markus Scheidgen's avatar
Markus Scheidgen committed
498
499
500
501
            if self._entry_metadata is None:
                self._setup_fallback_metadata()
                self._entry_metadata.processed = False

502
503
504
505
506
507
            try:
                if self._entry_metadata.encyclopedia is None:
                    self._entry_metadata.encyclopedia = EncyclopediaMetadata()
                self._entry_metadata.encyclopedia.status = EncyclopediaMetadata.status.type.failure
            except Exception as e:
                logger.error("Could set encyclopedia status.", exc_info=e)
508

509
        finally:
510
511
512
513
514
515
516
517
518
519
520
521
522
            # persist the calc metadata
            with utils.timer(logger, 'saved calc metadata', step='metadata'):
                self.apply_entry_metadata(self._entry_metadata)

            # index in search
            with utils.timer(logger, 'indexed', step='index'):
                self._entry_metadata.a_elastic.index()

            # persist the archive
            with utils.timer(
                    logger, 'archived', step='archive',
                    input_size=self.mainfile_file.size) as log_data:

Markus Scheidgen's avatar
Markus Scheidgen committed
523
                archive_size = self.write_archive(self._parser_results)
524
525
                log_data.update(archive_size=archive_size)

Markus Scheidgen's avatar
Markus Scheidgen committed
526
527
    @task
    def normalizing(self):
528
        ''' The *task* that encapsulates all normalizing related actions. '''
529
530

        # allow normalizer to access and add data to the entry metadata
531
532
533
        if self._parser_results.section_metadata is None:
            self._parser_results.m_add_sub_section(
                datamodel.EntryArchive.section_metadata, self._entry_metadata)
534

Markus Scheidgen's avatar
Markus Scheidgen committed
535
        for normalizer in normalizers:
536
            if normalizer.domain != parser_dict[self.parser].domain:
537
538
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
539
            normalizer_name = normalizer.__name__
540
            context = dict(normalizer=normalizer_name, step=normalizer_name)
541
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
542

Markus Scheidgen's avatar
Markus Scheidgen committed
543
544
545
546
547
548
            with utils.timer(logger, 'normalizer executed', input_size=self.mainfile_file.size):
                try:
                    normalizer(self._parser_results).normalize(logger=logger)
                    logger.info('processor completed successfull', **context)
                except Exception as e:
                    self.fail('normalizer failed with exception', exc_info=e, error=str(e), **context)
Markus Scheidgen's avatar
Markus Scheidgen committed
549

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
550
551
552
553
554
    def _read_metadata_from_file(self, logger):
        # metadata file name defined in nomad.config nomad_metadata.yaml/json
        # which can be placed in the directory containing the mainfile or somewhere up
        # highest priority is directory with mainfile

555
        metadata_file = config.metadata_file_name
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
556
557
558
559
560
        metadata_dir = os.path.dirname(self.mainfile_file.os_path)

        metadata = {}
        metadata_path = None
        while metadata_dir:
561
562
            metadata_part = metadata_file_cached(
                os.path.join(metadata_dir, metadata_file), logger)
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
563
564
565
566
567
568
569
570

            for key, val in metadata_part.items():
                metadata.setdefault(key, val)

            metadata_dir = os.path.dirname(metadata_dir)
            if metadata_path is not None:
                break

571
572
573
        if len(metadata_dir) > 0:
            logger.info('Apply user metadata from nomad.yaml/json file')

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
574
575
576
577
578
579
580
        for key, val in metadata.items():
            definition = _editable_metadata.get(key, None)
            if not definition:
                logger.warn('Cannot set metadata %s' % key)
                continue
            self._entry_metadata.m_set(definition, val)

Markus Scheidgen's avatar
Markus Scheidgen committed
581
582
    @task
    def archiving(self):
583
        ''' The *task* that encapsulates all archival related actions. '''
584
585
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
586
        self._entry_metadata.apply_domain_metadata(self._parser_results)
587
        self._entry_metadata.processed = True
588

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
589
590
        self._read_metadata_from_file(logger)

591
        # persist the calc metadata
592
        # add the calc metadata
593
        with utils.timer(logger, 'saved calc metadata', step='metadata'):
594
            self.apply_entry_metadata(self._entry_metadata)
595
596

        # index in search
597
        with utils.timer(logger, 'indexed', step='index'):
598
            self._entry_metadata.a_elastic.index()
599

600
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
601
        with utils.timer(
602
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
603
                input_size=self.mainfile_file.size) as log_data:
604

Markus Scheidgen's avatar
Markus Scheidgen committed
605
            archive_size = self.write_archive(self._parser_results)
606
            log_data.update(archive_size=archive_size)
Markus Scheidgen's avatar
Markus Scheidgen committed
607

Markus Scheidgen's avatar
Markus Scheidgen committed
608
    def write_archive(self, archive: EntryArchive):
609
610
611
612
613
614
615
        # save the archive mongo entry
        try:
            write_partial_archive_to_mongo(archive)
        except Exception as e:
            self.get_logger().error('could not write mongodb archive entry', exc_info=e)

        # add the processing logs to the archive
616
617
618
619
620
621
        def filter_processing_logs(logs):
            if len(logs) > 100:
                return [
                    log for log in logs
                    if log.get('level') != 'DEBUG']
            return logs
Markus Scheidgen's avatar
Markus Scheidgen committed
622

623
624
625
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []

Markus Scheidgen's avatar
Markus Scheidgen committed
626
627
        if archive is not None:
            archive = archive.m_copy()
628
        else:
Markus Scheidgen's avatar
Markus Scheidgen committed
629
            archive = datamodel.EntryArchive()
630

Markus Scheidgen's avatar
Markus Scheidgen committed
631
632
        if archive.section_metadata is None:
            archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
633

Markus Scheidgen's avatar
Markus Scheidgen committed
634
        archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
635

636
        # save the archive msg-pack
637
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
638
            return self.upload_files.write_archive(self.calc_id, archive.m_to_dict())
639
640
        except Exception as e:
            # most likely failed due to domain data, try to write metadata and processing logs
Markus Scheidgen's avatar
Markus Scheidgen committed
641
642
643
644
            archive = datamodel.EntryArchive()
            archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
            archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
            self.upload_files.write_archive(self.calc_id, archive.m_to_dict())
645
            raise e
Markus Scheidgen's avatar
Markus Scheidgen committed
646

647
    def __str__(self):
648
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
649

650

651
class Upload(Proc):
652
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
653
654
655
656
657
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
658
659
        upload_path: the path were the uploaded files was stored
        temporary: True if the uploaded file should be removed after extraction
Markus Scheidgen's avatar
Markus Scheidgen committed
660
        upload_id: the upload id generated by the database
661
        upload_time: the timestamp when the system realized the upload
662
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
663
664
        published: Boolean that indicates the publish status
        publish_time: Date when the upload was initially published
665
        last_update: Date of the last publishing/re-processing
666
        joined: Boolean indicates if the running processing has joined (:func:`check_join`)
667
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
668
669
670
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
671
672
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
673
    embargo_length = IntField(default=36)
Markus Scheidgen's avatar
Markus Scheidgen committed
674
675
676

    name = StringField(default=None)
    upload_time = DateTimeField()
677
    user_id = StringField(required=True)
678
679
    published = BooleanField(default=False)
    publish_time = DateTimeField()
680
    last_update = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
681

682
683
    joined = BooleanField(default=False)

Markus Scheidgen's avatar
Markus Scheidgen committed
684
685
    meta: Any = {
        'indexes': [
686
            'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
687
        ]
Markus Scheidgen's avatar
Markus Scheidgen committed
688
689
690
691
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
692
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
693

694
695
    @property
    def metadata(self) -> dict:
696
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
697
698
699
700
701
        Getter, setter for user metadata. Metadata is pickled to and from the public
        bucket to allow sharing among all processes. Usually uploads do not have (much)
        user defined metadata, but users provide all metadata per upload as part of
        the publish process. This will change, when we introduce editing functionality
        and metadata will be provided through different means.
702
        '''
703
704
705
706
707
        try:
            upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
        except KeyError:
            return None
        return upload_files.user_metadata
708
709
710

    @metadata.setter
    def metadata(self, data: dict) -> None:
711
712
        upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True, create=True)
        upload_files.user_metadata = data
713

Markus Scheidgen's avatar
Markus Scheidgen committed
714
    @classmethod
715
    def get(cls, id: str, include_published: bool = True) -> 'Upload':
716
        return cls.get_by_id(id, 'upload_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
717
718

    @classmethod
719
    def user_uploads(cls, user: datamodel.User, **kwargs) -> List['Upload']:
720
        ''' Returns all uploads for the given user. Kwargs are passed to mongo query. '''
721
        return cls.objects(user_id=str(user.user_id), **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
722

723
724
    @property
    def uploader(self):
725
        return datamodel.User.get(self.user_id)
726

Markus Scheidgen's avatar
Markus Scheidgen committed
727
728
    def get_logger(self, **kwargs):
        logger = super().get_logger()
729
730
        user = self.uploader
        user_name = '%s %s' % (user.first_name, user.last_name)
731
        # We are not using 'user_id' because logstash (?) will filter these entries ?!
732
733
        logger = logger.bind(
            upload_id=self.upload_id, upload_name=self.name, user_name=user_name,
734
            user=user.user_id, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
735
736
737
738
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
739
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
740
741
742
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
743
744

        Arguments:
745
            user: The user that created the upload.
746
        '''
747
748
        # use kwargs to keep compatibility with super method
        user: datamodel.User = kwargs['user']
749
        del(kwargs['user'])
750

751
752
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
753
        kwargs.update(user_id=user.user_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
754
        self = super().create(**kwargs)
755

Markus Scheidgen's avatar
Markus Scheidgen committed
756
        self._continue_with('uploading')
757

Markus Scheidgen's avatar
Markus Scheidgen committed
758
759
        return self

760
    def delete(self):
761
        ''' Deletes this upload process state entry and its calcs. '''
762
763
764
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

765
    def delete_upload_local(self):
766
        '''
767
        Deletes the upload, including its processing state and
768
        staging files. Local version without celery processing.
769
        '''
770
771
        logger = self.get_logger()

772
        with utils.lnr(logger, 'upload delete failed'):
773
            with utils.timer(
774
                    logger, 'upload deleted from index', step='index',
775
                    upload_size=self.upload_files.size):
776
                search.delete_upload(self.upload_id)
777

778
            with utils.timer(
779
                    logger, 'upload deleted', step='files',
780
781
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
782
783

            self.delete()
784

785
    @process
786
    def delete_upload(self):
787
        '''
788
789
        Deletes of the upload, including its processing state and
        staging files. This starts the celery process of deleting the upload.
790
        '''
791
        self.delete_upload_local()
792

793
        return True  # do not save the process status on the delete upload
794

795
    @process
796
    def publish_upload(self):
797
        '''
798
799
        Moves the upload out of staging to the public area. It will
        pack the staging upload files in to public upload files.
800
        '''
801
802
        assert self.processed_calcs > 0

803
        logger = self.get_logger()
804
        logger.info('started to publish')
805

806
        with utils.lnr(logger, 'publish failed'):
807
            with self.entries_metadata(self.metadata) as calcs:
808

809
                with utils.timer(
810
                        logger, 'upload metadata updated', step='metadata',
811
                        upload_size=self.upload_files.size):
812

813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
                    def create_update(calc):
                        calc.published = True
                        calc.with_embargo = calc.with_embargo if calc.with_embargo is not None else False
                        return UpdateOne(
                            {'_id': calc.calc_id},
                            {'$set': {'metadata': calc.m_to_dict(
                                include_defaults=True, categories=[datamodel.MongoMetadata])}})

                    Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload files packed', step='pack',
                            upload_size=self.upload_files.size):
                        self.upload_files.pack(calcs)
828

829
                with utils.timer(
830
                        logger, 'index updated', step='index',
831
                        upload_size=self.upload_files.size):
832
833
834
835
836
837
838
839
840
841
842
843
                    search.publish(calcs)

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload deleted', step='delete staged',
                            upload_size=self.upload_files.size):
                        self.upload_files.delete()
                        self.published = True
                        self.publish_time = datetime.utcnow()
                        self.last_update = datetime.utcnow()
                        self.save()
                else:
844
                    self.last_update = datetime.utcnow()
845
                    self.save()
846

847
848
    @process
    def re_process_upload(self):
849
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
850
851
852
        A *process* that performs the re-processing of a earlier processed
        upload.

853
        Runs the distributed process of fully reparsing/re-normalizing an existing and
854
855
856
857
858
        already published upload. Will renew the archive part of the upload and update
        mongo and elastic search entries.

        TODO this implementation does not do any re-matching. This will be more complex
        due to handling of new or missing matches.
859
        '''
860
861
862
        logger = self.get_logger()
        logger.info('started to re-process')

863
864
865
866
867
        # mock the steps of actual processing
        self._continue_with('uploading')

        # extract the published raw files into a staging upload files instance
        self._continue_with('extracting')
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882

        if self.published:
            try:
                staging_upload_files = StagingUploadFiles(self.upload_id)
                # public files exist and there is a staging directory, it is probably old
                # and we delete it first
                staging_upload_files.delete()
                logger.warn('deleted old staging files')

            except KeyError as e:
                logger.info('reprocessing published files')
        else:
            logger.info('reprocessing staging files')

        staging_upload_files = self.upload_files.to_staging_upload_files(create=True)
883
884

        self._continue_with('parse_all')
885
        try:
886
            # check if a calc is already/still processing
887
888
889
890
891
892
893
894
            processing = Calc.objects(
                upload_id=self.upload_id,
                **Calc.process_running_mongoengine_query()).count()

            if processing > 0:
                logger.warn(
                    'processes are still/already running on calc, they will be resetted',
                    count=processing)
895

896
            # reset all calcs
897
            Calc._get_collection().update_many(
898
899
900
                dict(upload_id=self.upload_id),
                {'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)})

901
902
            # process call calcs
            Calc.process_all(Calc.re_process_calc, dict(upload_id=self.upload_id), exclude=['metadata'])
903
904

            logger.info('completed to trigger re-process of all calcs')
905
906
        except Exception as e:
            # try to remove the staging copy in failure case
907
908
            logger.error('failed to trigger re-process of all calcs', exc_info=e)

909
910
911
            if self.published:
                if staging_upload_files is not None and staging_upload_files.exists():
                    staging_upload_files.delete()
912
913

            raise e
914
915
916
917

        # the packing and removing of the staging upload files, will be trigged by
        # the 'cleanup' task after processing all calcs

918
919
    @process
    def re_pack(self):
920
        ''' A *process* that repacks the raw and archive data based on the current embargo data. '''
921
922
923