data.py 53.4 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
Markus Scheidgen's avatar
Markus Scheidgen committed
5
6
7
8
9
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
Markus Scheidgen's avatar
Markus Scheidgen committed
10
#     http://www.apache.org/licenses/LICENSE-2.0
Markus Scheidgen's avatar
Markus Scheidgen committed
11
12
#
# Unless required by applicable law or agreed to in writing, software
Markus Scheidgen's avatar
Markus Scheidgen committed
13
# distributed under the License is distributed on an "AS IS" BASIS,
Markus Scheidgen's avatar
Markus Scheidgen committed
14
15
16
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Markus Scheidgen's avatar
Markus Scheidgen committed
17
#
Markus Scheidgen's avatar
Markus Scheidgen committed
18

19
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
20
21
22
23
24
25
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
Markus Scheidgen's avatar
Markus Scheidgen committed
26

Markus Scheidgen's avatar
Markus Scheidgen committed
27
.. autoclass:: Upload
Markus Scheidgen's avatar
Markus Scheidgen committed
28

29
'''
30
31

from typing import cast, List, Any, Tuple, Iterator, Dict, cast, Iterable
32
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField
Markus Scheidgen's avatar
Markus Scheidgen committed
33
import logging
34
from structlog import wrap_logger
35
from contextlib import contextmanager
36
import os.path
37
38
from datetime import datetime
from pymongo import UpdateOne
39
import hashlib
40
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
41
42
43
import yaml
import json
from cachetools import cached, LRUCache
44
from cachetools.keys import hashkey
Markus Scheidgen's avatar
Markus Scheidgen committed
45

46
from nomad import utils, config, infrastructure, search, datamodel
Markus Scheidgen's avatar
Markus Scheidgen committed
47
48
49
from nomad.files import (
    PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles,
    PublicUploadFiles, StagingUploadFiles)
50
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
Markus Scheidgen's avatar
Markus Scheidgen committed
51
from nomad.parsing.parsers import parser_dict, match_parser
Markus Scheidgen's avatar
Markus Scheidgen committed
52
from nomad.normalizing import normalizers
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
53
from nomad.datamodel import EntryArchive, EditableUserMetadata
54
from nomad.archive import query_archive, write_partial_archive_to_mongo, delete_partial_archives_from_mongo
Markus Scheidgen's avatar
Markus Scheidgen committed
55
from nomad.datamodel.encyclopedia import EncyclopediaMetadata
Markus Scheidgen's avatar
Markus Scheidgen committed
56
57


58
59
60
61
section_metadata = datamodel.EntryArchive.section_metadata.name
section_workflow = datamodel.EntryArchive.section_workflow.name


Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
62
63
64
65
_editable_metadata = {
    quantity.name: quantity for quantity in EditableUserMetadata.m_def.definitions}


66
67
68
@cached(cache=LRUCache(maxsize=100), key=lambda path, *args, **kwargs: hashkey(path))
def metadata_file_cached(path, logger):
    for ext in config.metadata_file_extensions:
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
69
70
        full_path = '%s.%s' % (path, ext)
        if os.path.isfile(full_path):
71
72
73
74
75
76
77
78
79
80
81
82
            try:
                with open(full_path) as f:
                    if full_path.endswith('.json'):
                        return json.load(f)
                    elif full_path.endswith('.yaml') or full_path.endswith('.yml'):
                        return yaml.load(f, Loader=getattr(yaml, 'FullLoader'))
                    else:
                        return {}
            except Exception as e:
                logger.warn('could not parse nomad.yaml/json', path=path, exc_info=e)
                # ignore the file contents if the file is not parsable
                pass
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
83
84
85
    return {}


86
87
88
89
90
91
92
def _pack_log_event(logger, method_name, event_dict):
    try:
        log_data = dict(event_dict)
        log_data.update(**{
            key: value
            for key, value in getattr(logger, '_context', {}).items()
            if key not in ['service', 'release', 'upload_id', 'calc_id', 'mainfile', 'process_status']})
93
        log_data.update(logger=logger.name)
94
95
96
97
98
99
100
101
102
103
104

        return log_data
    except Exception:
        # raising an exception would cause an indefinite loop
        return event_dict


_log_processors = [
    StackInfoRenderer(),
    _pack_log_event,
    format_exc_info,
105
    TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)]
106
107


Markus Scheidgen's avatar
Markus Scheidgen committed
108
class Calc(Proc):
109
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
110
111
112
113
114
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

115
116
    The attribute list, does not include the various metadata properties generated
    while parsing, including ``code_name``, ``code_version``, etc.
Markus Scheidgen's avatar
Markus Scheidgen committed
117
118

    Attributes:
119
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
120
121
122
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
123

124
125
        metadata: the metadata record wit calc and user metadata, see :class:`datamodel.EntryMetadata`
    '''
126
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
127
128
129
130
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

131
132
    metadata = DictField()

Markus Scheidgen's avatar
Markus Scheidgen committed
133
    meta: Any = {
Markus Scheidgen's avatar
Markus Scheidgen committed
134
        'indexes': [
135
            'upload_id',
136
            'parser',
137
138
139
            ('upload_id', 'mainfile'),
            ('upload_id', 'parser'),
            ('upload_id', 'tasks_status'),
140
            ('upload_id', 'process_status'),
141
            ('upload_id', 'metadata.nomad_version'),
142
143
            'metadata.processed',
            'metadata.last_processing',
144
            'metadata.published',
145
            'metadata.datasets',
Markus Scheidgen's avatar
Markus Scheidgen committed
146
            'metadata.pid'
Markus Scheidgen's avatar
Markus Scheidgen committed
147
148
149
150
151
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
152
        self._parser_results: EntryArchive = None
153
154
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
155
        self._calc_proc_logs: List[Any] = None
156

157
        self._entry_metadata = None
158

Markus Scheidgen's avatar
Markus Scheidgen committed
159
160
    @classmethod
    def get(cls, id):
161
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
162

Markus Scheidgen's avatar
Markus Scheidgen committed
163
    @property
164
165
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
166

167
168
169
170
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
171
            self._upload.worker_hostname = self.worker_hostname
172
173
        return self._upload

174
175
176
177
178
179
180
181
    def apply_entry_metadata(self, entry_metadata: datamodel.EntryMetadata):
        self.metadata = entry_metadata.m_to_dict(
            include_defaults=True,
            categories=[datamodel.MongoMetadata])  # TODO use embedded doc?

    def create_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
182
        processing object, not necessarily the user metadata nor the metadata from
183
184
185
        the archive.
        '''
        entry_metadata = datamodel.EntryMetadata()
186
        if self.parser is not None:
187
188
189
            parser = parser_dict[self.parser]
            if parser.domain:
                entry_metadata.domain = parser_dict[self.parser].domain
190
191
192
        entry_metadata.upload_id = self.upload_id
        entry_metadata.calc_id = self.calc_id
        entry_metadata.mainfile = self.mainfile
193
194
        entry_metadata.nomad_version = config.meta.version
        entry_metadata.nomad_commit = config.meta.commit
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
        entry_metadata.uploader = self.upload.user_id
        entry_metadata.upload_time = self.upload.upload_time
        entry_metadata.upload_name = self.upload.name

        return entry_metadata

    def entry_metadata(self, upload_files: UploadFiles) -> datamodel.EntryMetadata:
        '''
        Returns a complete set of :class:`nomad.datamodel.EntryMetadata` including
        the user metadata and metadata from the archive.

        Arguments:
            upload_files:
                The :class:`nomad.files.UploadFiles` instance to read the archive from.
            cache:
                A boolean that indicates if the archive file should be left unclosed,
                e.g. if this method is called for many entries of the same upload.
        '''
        archive = upload_files.read_archive(self.calc_id)
214
        try:
215
216
217
218
219
220
221
222
223
            # instead of loading the whole archive, it should be enough to load the
            # parts that are referenced by section_metadata/EntryMetadata
            # TODO somehow it should determine which root setions too load from the metainfo
            # or configuration
            calc_archive = archive[self.calc_id]
            entry_archive_dict = {section_metadata: calc_archive[section_metadata].to_dict()}
            if section_workflow in calc_archive:
                entry_archive_dict[section_workflow] = calc_archive[section_workflow].to_dict()
            entry_metadata = datamodel.EntryArchive.m_from_dict(entry_archive_dict)[section_metadata]
224
225
226
227
228
229
230
231
232

        except KeyError:
            # Due hard processing failures, it might be possible that an entry might not
            # have an archive
            if self._entry_metadata is not None:
                entry_metadata = self._entry_metadata

            else:
                entry_metadata = self.create_metadata()
233
234
235
236
237
238
239
240

        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

    def user_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
241
        processing object and the user metadata, not necessarily the metadata from
242
243
244
245
246
247
248
        the archive.
        '''
        entry_metadata = self.create_metadata()
        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

249
250
251
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
252
253
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
254
255
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
256
    def get_logger(self, **kwargs):
257
        '''
258
259
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
260
        '''
261
262
263
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
264

265
266
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []
267

268
269
        def save_to_calc_log(logger, method_name, event_dict):
            try:
270
271
                # sanitize the event_dict, because all kinds of values might have been added
                dump_dict = {key: str(value) for key, value in event_dict.items()}
272
273
                dump_dict.update(level=method_name.upper())
                self._calc_proc_logs.append(dump_dict)
274

275
276
277
            except Exception:
                # Exceptions here will cause indefinite loop
                pass
278

279
            return event_dict
280

281
        return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
282

283
    @process
284
    def re_process_calc(self):
285
        '''
286
287
288
        Processes a calculation again. This means there is already metadata and
        instead of creating it initially, we are just updating the existing
        records.
289
        '''
290
        parser = match_parser(self.upload_files.raw_file_object(self.mainfile).os_path, strict=False)
291
        logger = self.get_logger()
292
293
294

        if parser is None and not config.reprocess_unmatched:
            self.errors = ['no parser matches during re-process, will not re-process this calc']
295

296
297
298
299
300
301
302
            try:
                upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
                with upload_files.read_archive(self.calc_id) as archive:
                    self.upload_files.write_archive(self.calc_id, archive[self.calc_id].to_dict())

            except Exception as e:
                logger.error('could not copy archive for non matching, non reprocessed entry', exc_info=e)
303
                raise e
304

305
306
307
308
309
            # mock the steps of actual processing
            self._continue_with('parsing')
            self._continue_with('normalizing')
            self._continue_with('archiving')
            self._complete()
310
311
            return

312
        if parser is None:
313
314
315
316
            self.get_logger().warn('no parser matches during re-process, use the old parser')
            self.warnings = ['no matching parser found during re-processing']

        elif self.parser != parser.name:
317
318
319
320
321
322
323
324
325
            if parser_dict[self.parser].name == parser.name:
                # parser was just renamed
                self.parser = parser.name

            else:
                self.parser = parser.name
                logger.info(
                    'different parser matches during re-process, use new parser',
                    parser=parser.name)
326

327
        try:
328
329
            self._entry_metadata = self.user_metadata()
            self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
330
331
            self._entry_metadata.nomad_version = config.meta.version
            self._entry_metadata.nomad_commit = config.meta.commit
332
333
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)
334
335
336
337
338
339
340

            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
            # close loghandler that was not closed due to failures
            try:
Markus Scheidgen's avatar
Markus Scheidgen committed
341
                if self._parser_results and self._parser_results.m_resource:
342
                    self._parser_results.section_metadata = None
Markus Scheidgen's avatar
Markus Scheidgen committed
343
                    self._parser_results.m_resource.unload()
344
            except Exception as e:
345
                logger.error('could not unload processing results', exc_info=e)
346

347
348
349
350
351
    def _setup_fallback_metadata(self):
        self._entry_metadata = self.create_metadata()
        self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
        self._entry_metadata.last_processing = datetime.utcnow()
        self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)
352
        self._entry_metadata.parser_name = self.parser
353

354
    @process
355
    def process_calc(self):
356
        '''
357
358
        Processes a new calculation that has no prior records in the mongo, elastic,
        or filesystem storage. It will create an initial set of (user) metadata.
359
        '''
360
        logger = self.get_logger()
361
        if self.upload is None:
362
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
363
364

        try:
365
366
            # save preliminary minimum calc metadata in case processing fails
            # successful processing will replace it with the actual metadata
367
            self._setup_fallback_metadata()
368
369

            if len(self._entry_metadata.files) >= config.auxfile_cutoff:
370
371
372
373
                self.warning(
                    'This calc has many aux files in its directory. '
                    'Have you placed many calculations in the same directory?')

Markus Scheidgen's avatar
Markus Scheidgen committed
374
375
376
377
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
378
            # close loghandler that was not closed due to failures
379
            try:
Markus Scheidgen's avatar
Markus Scheidgen committed
380
                if self._parser_results and self._parser_results.m_resource:
381
                    self._parser_results.section_metadata = None
Markus Scheidgen's avatar
Markus Scheidgen committed
382
                    self._parser_results.m_resource.unload()
383
            except Exception as e:
384
                logger.error('could unload processing results', exc_info=e)
385

386
    def on_fail(self):
387
388
389
        # in case of failure, index a minimum set of metadata and mark
        # processing failure
        try:
390
391
392
            if self._entry_metadata is None:
                self._setup_fallback_metadata()

393
            self._entry_metadata.processed = False
394

395
396
397
398
399
400
401
402
403
404
405
406
            try:
                self.apply_entry_metadata(self._entry_metadata)
            except Exception as e:
                self.get_logger().error(
                    'could not apply entry metadata to entry', exc_info=e)

            try:
                self._entry_metadata.apply_domain_metadata(self._parser_results)
            except Exception as e:
                self.get_logger().error(
                    'could not apply domain metadata to entry', exc_info=e)

407
            self._entry_metadata.a_elastic.index()
408
        except Exception as e:
409
410
            self.get_logger().error(
                'could not index after processing failure', exc_info=e)
411

412
        try:
413
            self.write_archive(self._parser_results)
414
        except Exception as e:
415
416
            self.get_logger().error(
                'could not write archive after processing failure', exc_info=e)
417

418
419
420
421
422
423
424
425
    def on_process_complete(self, process_name):
        # the save might be necessary to correctly read the join condition from the db
        self.save()
        # in case of error, the process_name might be unknown
        if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None:
            self.upload.reload()
            self.upload.check_join()

Markus Scheidgen's avatar
Markus Scheidgen committed
426
427
    @task
    def parsing(self):
428
        ''' The *task* that encapsulates all parsing related actions. '''
429
        context = dict(parser=self.parser, step=self.parser)
430
        logger = self.get_logger(**context)
431
        parser = parser_dict[self.parser]
432
        self._entry_metadata.parser_name = self.parser
Markus Scheidgen's avatar
Markus Scheidgen committed
433

434
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
435
            try:
436
437
438
                self._parser_results = EntryArchive()
                # allow parsers to read/write metadata
                self._parser_results.m_add_sub_section(EntryArchive.section_metadata, self._entry_metadata)
Markus Scheidgen's avatar
Markus Scheidgen committed
439
440
441
                parser.parse(
                    self.upload_files.raw_file_object(self.mainfile).os_path,
                    self._parser_results, logger=logger)
442

443
            except Exception as e:
444
                self.fail('parser failed with exception', exc_info=e, error=str(e), **context)
445
                return
446
            except SystemExit:
447
                self.fail('parser raised system exit', error='system exit', **context)
448
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
449

450
451
452
453
454
455
456
457
458
459
    def process_phonon(self):
        """Function that is run for phonon calculation before cleanup.
        This task is run by the celery process that is calling the join for the
        upload.

        This function re-opens the Archive for this calculation to add method
        information from another referenced archive. Updates the method
        information in section_encyclopedia as well as the DFT domain metadata.
        """
        try:
460
            logger = self.get_logger(parser=self.parser, step=self.parser)
461
462
463
464
465
466

            # Open the archive of the phonon calculation.
            upload_files = StagingUploadFiles(self.upload_id, is_authorized=lambda: True)
            with upload_files.read_archive(self.calc_id) as archive:
                arch = query_archive(archive, {self.calc_id: self.calc_id})[self.calc_id]
                phonon_archive = EntryArchive.m_from_dict(arch)
Markus Scheidgen's avatar
Markus Scheidgen committed
467
            self._entry_metadata = phonon_archive.section_metadata
468
            self._calc_proc_logs = phonon_archive.processing_logs
469

Markus Scheidgen's avatar
Markus Scheidgen committed
470
471
            # Re-create the parse results
            self._parser_results = phonon_archive
Markus Scheidgen's avatar
Markus Scheidgen committed
472

473
474
475
            # Read in the first referenced calculation. The reference is given as
            # an absolute path which needs to be converted into a path that is
            # relative to upload root.
Markus Scheidgen's avatar
Markus Scheidgen committed
476
            scc = self._parser_results.section_run[0].section_single_configuration_calculation[0]
477
            relative_ref = scc.section_calculation_to_calculation_refs[0].calculation_to_calculation_external_url
478
479
480
481
482
483
            ref_id = upload_files.calc_id(relative_ref)
            with upload_files.read_archive(ref_id) as archive:
                arch = query_archive(archive, {ref_id: ref_id})[ref_id]
                ref_archive = EntryArchive.m_from_dict(arch)

            # Get encyclopedia method information directly from the referenced calculation.
484
            ref_enc_method = ref_archive.section_metadata.encyclopedia.method
485
            if ref_enc_method is None or len(ref_enc_method) == 0 or ref_enc_method.functional_type is None:
486
                raise ValueError("No method information available in referenced calculation.")
Markus Scheidgen's avatar
Markus Scheidgen committed
487
            self._parser_results.section_metadata.encyclopedia.method = ref_enc_method
488
489
490
491
492
493
494
495

            # Overwrite old entry with new data. The metadata is updated with
            # new timestamp and method details taken from the referenced
            # archive.
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.dft.xc_functional = ref_archive.section_metadata.dft.xc_functional
            self._entry_metadata.dft.basis_set = ref_archive.section_metadata.dft.basis_set
            self._entry_metadata.dft.update_group_hash()
496
            self._entry_metadata.encyclopedia.status = EncyclopediaMetadata.status.type.success
497
        except Exception as e:
498
            logger.error("Could not retrieve method information for phonon calculation.", exc_info=e)
Markus Scheidgen's avatar
Markus Scheidgen committed
499
500
501
502
            if self._entry_metadata is None:
                self._setup_fallback_metadata()
                self._entry_metadata.processed = False

503
504
505
506
507
508
            try:
                if self._entry_metadata.encyclopedia is None:
                    self._entry_metadata.encyclopedia = EncyclopediaMetadata()
                self._entry_metadata.encyclopedia.status = EncyclopediaMetadata.status.type.failure
            except Exception as e:
                logger.error("Could set encyclopedia status.", exc_info=e)
509

510
        finally:
511
512
513
514
515
516
517
518
519
520
521
522
523
            # persist the calc metadata
            with utils.timer(logger, 'saved calc metadata', step='metadata'):
                self.apply_entry_metadata(self._entry_metadata)

            # index in search
            with utils.timer(logger, 'indexed', step='index'):
                self._entry_metadata.a_elastic.index()

            # persist the archive
            with utils.timer(
                    logger, 'archived', step='archive',
                    input_size=self.mainfile_file.size) as log_data:

Markus Scheidgen's avatar
Markus Scheidgen committed
524
                archive_size = self.write_archive(self._parser_results)
525
526
                log_data.update(archive_size=archive_size)

Markus Scheidgen's avatar
Markus Scheidgen committed
527
528
    @task
    def normalizing(self):
529
        ''' The *task* that encapsulates all normalizing related actions. '''
530
531

        # allow normalizer to access and add data to the entry metadata
532
533
534
        if self._parser_results.section_metadata is None:
            self._parser_results.m_add_sub_section(
                datamodel.EntryArchive.section_metadata, self._entry_metadata)
535

Markus Scheidgen's avatar
Markus Scheidgen committed
536
        for normalizer in normalizers:
537
            if normalizer.domain != parser_dict[self.parser].domain:
538
539
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
540
            normalizer_name = normalizer.__name__
541
            context = dict(normalizer=normalizer_name, step=normalizer_name)
542
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
543

Markus Scheidgen's avatar
Markus Scheidgen committed
544
545
546
547
548
549
            with utils.timer(logger, 'normalizer executed', input_size=self.mainfile_file.size):
                try:
                    normalizer(self._parser_results).normalize(logger=logger)
                    logger.info('processor completed successfull', **context)
                except Exception as e:
                    self.fail('normalizer failed with exception', exc_info=e, error=str(e), **context)
Markus Scheidgen's avatar
Markus Scheidgen committed
550

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
551
552
553
554
555
    def _read_metadata_from_file(self, logger):
        # metadata file name defined in nomad.config nomad_metadata.yaml/json
        # which can be placed in the directory containing the mainfile or somewhere up
        # highest priority is directory with mainfile

556
        metadata_file = config.metadata_file_name
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
557
558
559
560
561
        metadata_dir = os.path.dirname(self.mainfile_file.os_path)

        metadata = {}
        metadata_path = None
        while metadata_dir:
562
563
            metadata_part = metadata_file_cached(
                os.path.join(metadata_dir, metadata_file), logger)
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
564
565
566
567
568
569
570
571

            for key, val in metadata_part.items():
                metadata.setdefault(key, val)

            metadata_dir = os.path.dirname(metadata_dir)
            if metadata_path is not None:
                break

572
573
574
        if len(metadata_dir) > 0:
            logger.info('Apply user metadata from nomad.yaml/json file')

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
575
576
577
578
579
580
581
        for key, val in metadata.items():
            definition = _editable_metadata.get(key, None)
            if not definition:
                logger.warn('Cannot set metadata %s' % key)
                continue
            self._entry_metadata.m_set(definition, val)

Markus Scheidgen's avatar
Markus Scheidgen committed
582
583
    @task
    def archiving(self):
584
        ''' The *task* that encapsulates all archival related actions. '''
585
586
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
587
        self._entry_metadata.apply_domain_metadata(self._parser_results)
588
        self._entry_metadata.processed = True
589

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
590
591
        self._read_metadata_from_file(logger)

592
593
        # persist the calc metadata
        with utils.timer(logger, 'saved calc metadata', step='metadata'):
594
            self.apply_entry_metadata(self._entry_metadata)
595
596

        # index in search
597
        with utils.timer(logger, 'indexed', step='index'):
598
            self._entry_metadata.a_elastic.index()
599

600
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
601
        with utils.timer(
602
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
603
                input_size=self.mainfile_file.size) as log_data:
604

Markus Scheidgen's avatar
Markus Scheidgen committed
605
            archive_size = self.write_archive(self._parser_results)
606
            log_data.update(archive_size=archive_size)
Markus Scheidgen's avatar
Markus Scheidgen committed
607

Markus Scheidgen's avatar
Markus Scheidgen committed
608
    def write_archive(self, archive: EntryArchive):
609
610
        # save the archive mongo entry
        try:
611
612
            if self._entry_metadata.processed:
                write_partial_archive_to_mongo(archive)
613
614
615
616
        except Exception as e:
            self.get_logger().error('could not write mongodb archive entry', exc_info=e)

        # add the processing logs to the archive
617
618
619
620
621
622
        def filter_processing_logs(logs):
            if len(logs) > 100:
                return [
                    log for log in logs
                    if log.get('level') != 'DEBUG']
            return logs
Markus Scheidgen's avatar
Markus Scheidgen committed
623

624
625
626
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []

Markus Scheidgen's avatar
Markus Scheidgen committed
627
628
        if archive is not None:
            archive = archive.m_copy()
629
        else:
Markus Scheidgen's avatar
Markus Scheidgen committed
630
            archive = datamodel.EntryArchive()
631

Markus Scheidgen's avatar
Markus Scheidgen committed
632
633
        if archive.section_metadata is None:
            archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
634

Markus Scheidgen's avatar
Markus Scheidgen committed
635
        archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
636

637
        # save the archive msg-pack
638
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
639
            return self.upload_files.write_archive(self.calc_id, archive.m_to_dict())
640
641
        except Exception as e:
            # most likely failed due to domain data, try to write metadata and processing logs
Markus Scheidgen's avatar
Markus Scheidgen committed
642
643
644
645
            archive = datamodel.EntryArchive()
            archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
            archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
            self.upload_files.write_archive(self.calc_id, archive.m_to_dict())
646
            raise e
Markus Scheidgen's avatar
Markus Scheidgen committed
647

648
    def __str__(self):
649
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
650

651

652
class Upload(Proc):
653
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
654
655
656
657
658
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
659
660
        upload_path: the path were the uploaded files was stored
        temporary: True if the uploaded file should be removed after extraction
Markus Scheidgen's avatar
Markus Scheidgen committed
661
        upload_id: the upload id generated by the database
662
        upload_time: the timestamp when the system realized the upload
663
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
664
665
        published: Boolean that indicates the publish status
        publish_time: Date when the upload was initially published
666
        last_update: Date of the last publishing/re-processing
667
        joined: Boolean indicates if the running processing has joined (:func:`check_join`)
668
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
669
670
671
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
672
673
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
674
    embargo_length = IntField(default=36)
Markus Scheidgen's avatar
Markus Scheidgen committed
675
676
677

    name = StringField(default=None)
    upload_time = DateTimeField()
678
    user_id = StringField(required=True)
679
680
    published = BooleanField(default=False)
    publish_time = DateTimeField()
681
    last_update = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
682

683
684
    joined = BooleanField(default=False)

Markus Scheidgen's avatar
Markus Scheidgen committed
685
686
    meta: Any = {
        'indexes': [
687
            'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
688
        ]
Markus Scheidgen's avatar
Markus Scheidgen committed
689
690
691
692
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
693
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
694

695
696
    @property
    def metadata(self) -> dict:
697
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
698
699
700
701
702
        Getter, setter for user metadata. Metadata is pickled to and from the public
        bucket to allow sharing among all processes. Usually uploads do not have (much)
        user defined metadata, but users provide all metadata per upload as part of
        the publish process. This will change, when we introduce editing functionality
        and metadata will be provided through different means.
703
        '''
704
705
706
707
708
        try:
            upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
        except KeyError:
            return None
        return upload_files.user_metadata
709
710
711

    @metadata.setter
    def metadata(self, data: dict) -> None:
712
713
        upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True, create=True)
        upload_files.user_metadata = data
714

Markus Scheidgen's avatar
Markus Scheidgen committed
715
    @classmethod
716
    def get(cls, id: str, include_published: bool = True) -> 'Upload':
717
        return cls.get_by_id(id, 'upload_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
718
719

    @classmethod
720
    def user_uploads(cls, user: datamodel.User, **kwargs) -> List['Upload']:
721
        ''' Returns all uploads for the given user. Kwargs are passed to mongo query. '''
722
        return cls.objects(user_id=str(user.user_id), **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
723

724
725
    @property
    def uploader(self):
726
        return datamodel.User.get(self.user_id)
727

Markus Scheidgen's avatar
Markus Scheidgen committed
728
729
    def get_logger(self, **kwargs):
        logger = super().get_logger()
730
731
        user = self.uploader
        user_name = '%s %s' % (user.first_name, user.last_name)
732
        # We are not using 'user_id' because logstash (?) will filter these entries ?!
733
734
        logger = logger.bind(
            upload_id=self.upload_id, upload_name=self.name, user_name=user_name,
735
            user=user.user_id, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
736
737
738
739
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
740
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
741
742
743
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
744
745

        Arguments:
746
            user: The user that created the upload.
747
        '''
748
749
        # use kwargs to keep compatibility with super method
        user: datamodel.User = kwargs['user']
750
        del(kwargs['user'])
751

752
753
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
754
        kwargs.update(user_id=user.user_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
755
        self = super().create(**kwargs)
756

Markus Scheidgen's avatar
Markus Scheidgen committed
757
        self._continue_with('uploading')
758

Markus Scheidgen's avatar
Markus Scheidgen committed
759
760
        return self

761
    def delete(self):
762
        ''' Deletes this upload process state entry and its calcs. '''
763
764
765
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

766
    def delete_upload_local(self):
767
        '''
768
        Deletes the upload, including its processing state and
769
        staging files. Local version without celery processing.
770
        '''
771
772
        logger = self.get_logger()

773
        with utils.lnr(logger, 'upload delete failed'):
774
            with utils.timer(
775
                    logger, 'upload deleted from index', step='index',
776
                    upload_size=self.upload_files.size):
777
                search.delete_upload(self.upload_id)
778

779
780
781
782
783
784
785
            with utils.timer(
                    logger, 'upload partial archives', step='files',
                    upload_size=self.upload_files.size):

                calc_ids = [calc.calc_id for calc in Calc.objects(upload_id=self.upload_id)]
                delete_partial_archives_from_mongo(calc_ids)

786
            with utils.timer(
787
                    logger, 'upload deleted', step='files',
788
789
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
790
791

            self.delete()
792

793
    @process
794
    def delete_upload(self):
795
        '''
796
        Deletes the upload, including its processing state and
797
        staging files. This starts the celery process of deleting the upload.
798
        '''
799
        self.delete_upload_local()
800

801
        return True  # do not save the process status on the delete upload
802

803
    @process
804
    def publish_upload(self):
805
        '''
806
807
        Moves the upload out of staging to the public area. It will
        pack the staging upload files in to public upload files.
808
        '''
809
810
        assert self.processed_calcs > 0

811
        logger = self.get_logger()
812
        logger.info('started to publish')
813

814
        with utils.lnr(logger, 'publish failed'):
815
            with self.entries_metadata(self.metadata) as calcs:
816

817
                with utils.timer(
818
                        logger, 'upload metadata updated', step='metadata',
819
                        upload_size=self.upload_files.size):
820

821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
                    def create_update(calc):
                        calc.published = True
                        calc.with_embargo = calc.with_embargo if calc.with_embargo is not None else False
                        return UpdateOne(
                            {'_id': calc.calc_id},
                            {'$set': {'metadata': calc.m_to_dict(
                                include_defaults=True, categories=[datamodel.MongoMetadata])}})

                    Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload files packed', step='pack',
                            upload_size=self.upload_files.size):
                        self.upload_files.pack(calcs)
836

837
                with utils.timer(
838
                        logger, 'index updated', step='index',
839
                        upload_size=self.upload_files.size):
840
841
842
843
844
845
846
847
848
849
850
851
                    search.publish(calcs)

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload deleted', step='delete staged',
                            upload_size=self.upload_files.size):
                        self.upload_files.delete()
                        self.published = True
                        self.publish_time = datetime.utcnow()
                        self.last_update = datetime.utcnow()
                        self.save()
                else:
852
                    self.last_update = datetime.utcnow()
853
                    self.save()
854

855
856
    @process
    def re_process_upload(self):
857
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
858
859
860
        A *process* that performs the re-processing of a earlier processed
        upload.

861
        Runs the distributed process of fully reparsing/re-normalizing an existing and
862
863
864
865
866
        already published upload. Will renew the archive part of the upload and update
        mongo and elastic search entries.

        TODO this implementation does not do any re-matching. This will be more complex
        due to handling of new or missing matches.
867
        '''
868
869
870
        logger = self.get_logger()
        logger.info('started to re-process')

871
872
873
874
875
        # mock the steps of actual processing
        self._continue_with('uploading')

        # extract the published raw files into a staging upload files instance
        self._continue_with('extracting')
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890

        if self.published:
            try:
                staging_upload_files = StagingUploadFiles(self.upload_id)
                # public files exist and there is a staging directory, it is probably old
                # and we delete it first
                staging_upload_files.delete()
                logger.warn('deleted old staging files')

            except KeyError as e:
                logger.info('reprocessing published files')
        else:
            logger.info('reprocessing staging files')

        staging_upload_files = self.upload_files.to_staging_upload_files(create=True)
891
892

        self._continue_with('parse_all')
893
        try:
894
            # check if a calc is already/still processing
895
896
897
898
899
900
901
902
            processing = Calc.objects(
                upload_id=self.upload_id,
                **Calc.process_running_mongoengine_query()).count()

            if processing > 0:
                logger.warn(
                    'processes are still/already running on calc, they will be resetted',
                    count=processing)
903

904
            # reset all calcs
905
            Calc._get_collection().update_many(
906
907
908
                dict(upload_id=self.upload_id),
                {'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)})

909
910
            # process call calcs
            Calc.process_all(Calc.re_process_calc, dict(upload_id=self.upload_id), exclude=['metadata'])
911
912

            logger.info('completed to trigger re-process of all calcs')
913
914
        except Exception as e