data.py 59.4 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
Markus Scheidgen's avatar
Markus Scheidgen committed
5
6
7
8
9
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
Markus Scheidgen's avatar
Markus Scheidgen committed
10
#     http://www.apache.org/licenses/LICENSE-2.0
Markus Scheidgen's avatar
Markus Scheidgen committed
11
12
#
# Unless required by applicable law or agreed to in writing, software
Markus Scheidgen's avatar
Markus Scheidgen committed
13
# distributed under the License is distributed on an "AS IS" BASIS,
Markus Scheidgen's avatar
Markus Scheidgen committed
14
15
16
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Markus Scheidgen's avatar
Markus Scheidgen committed
17
#
Markus Scheidgen's avatar
Markus Scheidgen committed
18

19
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
20
21
22
23
24
25
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
Markus Scheidgen's avatar
Markus Scheidgen committed
26

Markus Scheidgen's avatar
Markus Scheidgen committed
27
.. autoclass:: Upload
Markus Scheidgen's avatar
Markus Scheidgen committed
28

29
'''
30
31

from typing import cast, List, Any, Tuple, Iterator, Dict, cast, Iterable
32
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField
Markus Scheidgen's avatar
Markus Scheidgen committed
33
import logging
34
from structlog import wrap_logger
35
from contextlib import contextmanager
36
import os.path
37
38
from datetime import datetime
from pymongo import UpdateOne
39
import hashlib
40
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
41
42
import yaml
import json
43
from functools import lru_cache
44
45
import urllib.parse
import requests
Markus Scheidgen's avatar
Markus Scheidgen committed
46

47
from nomad import utils, config, infrastructure, search, datamodel
Markus Scheidgen's avatar
Markus Scheidgen committed
48
49
50
from nomad.files import (
    PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles,
    PublicUploadFiles, StagingUploadFiles)
51
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
Markus Scheidgen's avatar
Markus Scheidgen committed
52
from nomad.parsing.parsers import parser_dict, match_parser
Markus Scheidgen's avatar
Markus Scheidgen committed
53
from nomad.normalizing import normalizers
54
from nomad.datamodel import EntryArchive, EditableUserMetadata, OasisMetadata
55
from nomad.archive import query_archive, write_partial_archive_to_mongo, delete_partial_archives_from_mongo
Markus Scheidgen's avatar
Markus Scheidgen committed
56
from nomad.datamodel.encyclopedia import EncyclopediaMetadata
Markus Scheidgen's avatar
Markus Scheidgen committed
57
58


59
60
61
62
section_metadata = datamodel.EntryArchive.section_metadata.name
section_workflow = datamodel.EntryArchive.section_workflow.name


Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
63
64
_editable_metadata = {
    quantity.name: quantity for quantity in EditableUserMetadata.m_def.definitions}
65
66
_oasis_metadata = {
    quantity.name: quantity for quantity in OasisMetadata.m_def.definitions}
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
67
68


69
70
71
72
73
74
75
def _pack_log_event(logger, method_name, event_dict):
    try:
        log_data = dict(event_dict)
        log_data.update(**{
            key: value
            for key, value in getattr(logger, '_context', {}).items()
            if key not in ['service', 'release', 'upload_id', 'calc_id', 'mainfile', 'process_status']})
76
        log_data.update(logger=logger.name)
77
78
79
80
81
82
83
84
85
86
87

        return log_data
    except Exception:
        # raising an exception would cause an indefinite loop
        return event_dict


_log_processors = [
    StackInfoRenderer(),
    _pack_log_event,
    format_exc_info,
88
    TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)]
89
90


91
92
93
94
95
def _normalize_oasis_upload_metadata(upload_id, metadata):
    # This is overwritten by the tests to do necessary id manipulations
    return upload_id, metadata


Markus Scheidgen's avatar
Markus Scheidgen committed
96
class Calc(Proc):
97
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
98
99
100
101
102
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

103
104
    The attribute list, does not include the various metadata properties generated
    while parsing, including ``code_name``, ``code_version``, etc.
Markus Scheidgen's avatar
Markus Scheidgen committed
105
106

    Attributes:
107
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
108
109
110
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
111

112
113
        metadata: the metadata record wit calc and user metadata, see :class:`datamodel.EntryMetadata`
    '''
114
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
115
116
117
118
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

119
120
    metadata = DictField()

Markus Scheidgen's avatar
Markus Scheidgen committed
121
    meta: Any = {
Markus Scheidgen's avatar
Markus Scheidgen committed
122
        'indexes': [
123
            'upload_id',
124
            'parser',
125
126
127
            ('upload_id', 'mainfile'),
            ('upload_id', 'parser'),
            ('upload_id', 'tasks_status'),
128
            ('upload_id', 'process_status'),
129
            ('upload_id', 'metadata.nomad_version'),
130
131
            'metadata.processed',
            'metadata.last_processing',
132
            'metadata.published',
133
            'metadata.datasets',
Markus Scheidgen's avatar
Markus Scheidgen committed
134
            'metadata.pid'
Markus Scheidgen's avatar
Markus Scheidgen committed
135
136
137
138
139
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
140
        self._parser_results: EntryArchive = None
141
142
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
143
        self._calc_proc_logs: List[Any] = None
144

145
        self._entry_metadata = None
146

Markus Scheidgen's avatar
Markus Scheidgen committed
147
148
    @classmethod
    def get(cls, id):
149
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
150

Markus Scheidgen's avatar
Markus Scheidgen committed
151
    @property
152
153
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
154

155
156
157
158
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
159
            self._upload.worker_hostname = self.worker_hostname
160
161
        return self._upload

162
163
164
165
166
167
168
169
    def apply_entry_metadata(self, entry_metadata: datamodel.EntryMetadata):
        self.metadata = entry_metadata.m_to_dict(
            include_defaults=True,
            categories=[datamodel.MongoMetadata])  # TODO use embedded doc?

    def create_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
170
        processing object, not necessarily the user metadata nor the metadata from
171
172
173
        the archive.
        '''
        entry_metadata = datamodel.EntryMetadata()
174
        if self.parser is not None:
175
176
177
            parser = parser_dict[self.parser]
            if parser.domain:
                entry_metadata.domain = parser_dict[self.parser].domain
178
179
180
        entry_metadata.upload_id = self.upload_id
        entry_metadata.calc_id = self.calc_id
        entry_metadata.mainfile = self.mainfile
181
182
        entry_metadata.nomad_version = config.meta.version
        entry_metadata.nomad_commit = config.meta.commit
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
        entry_metadata.uploader = self.upload.user_id
        entry_metadata.upload_time = self.upload.upload_time
        entry_metadata.upload_name = self.upload.name

        return entry_metadata

    def entry_metadata(self, upload_files: UploadFiles) -> datamodel.EntryMetadata:
        '''
        Returns a complete set of :class:`nomad.datamodel.EntryMetadata` including
        the user metadata and metadata from the archive.

        Arguments:
            upload_files:
                The :class:`nomad.files.UploadFiles` instance to read the archive from.
            cache:
                A boolean that indicates if the archive file should be left unclosed,
                e.g. if this method is called for many entries of the same upload.
        '''
        archive = upload_files.read_archive(self.calc_id)
202
        try:
203
204
205
206
207
208
209
210
211
            # instead of loading the whole archive, it should be enough to load the
            # parts that are referenced by section_metadata/EntryMetadata
            # TODO somehow it should determine which root setions too load from the metainfo
            # or configuration
            calc_archive = archive[self.calc_id]
            entry_archive_dict = {section_metadata: calc_archive[section_metadata].to_dict()}
            if section_workflow in calc_archive:
                entry_archive_dict[section_workflow] = calc_archive[section_workflow].to_dict()
            entry_metadata = datamodel.EntryArchive.m_from_dict(entry_archive_dict)[section_metadata]
212
213
214
215
216
217
218
219
220

        except KeyError:
            # Due hard processing failures, it might be possible that an entry might not
            # have an archive
            if self._entry_metadata is not None:
                entry_metadata = self._entry_metadata

            else:
                entry_metadata = self.create_metadata()
221
222
223
224
225
226
227
228

        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

    def user_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
229
        processing object and the user metadata, not necessarily the metadata from
230
231
232
233
234
235
236
        the archive.
        '''
        entry_metadata = self.create_metadata()
        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

237
238
239
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
240
241
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
242
243
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
244
    def get_logger(self, **kwargs):
245
        '''
246
247
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
248
        '''
249
250
251
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
252

253
254
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []
255

256
257
        def save_to_calc_log(logger, method_name, event_dict):
            try:
258
259
                # sanitize the event_dict, because all kinds of values might have been added
                dump_dict = {key: str(value) for key, value in event_dict.items()}
260
261
                dump_dict.update(level=method_name.upper())
                self._calc_proc_logs.append(dump_dict)
262

263
264
265
            except Exception:
                # Exceptions here will cause indefinite loop
                pass
266

267
            return event_dict
268

269
        return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
270

271
    @process
272
    def re_process_calc(self):
273
        '''
274
275
276
        Processes a calculation again. This means there is already metadata and
        instead of creating it initially, we are just updating the existing
        records.
277
        '''
278
        parser = match_parser(self.upload_files.raw_file_object(self.mainfile).os_path, strict=False)
279
        logger = self.get_logger()
280
281
282

        if parser is None and not config.reprocess_unmatched:
            self.errors = ['no parser matches during re-process, will not re-process this calc']
283

284
285
286
287
288
289
290
            try:
                upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
                with upload_files.read_archive(self.calc_id) as archive:
                    self.upload_files.write_archive(self.calc_id, archive[self.calc_id].to_dict())

            except Exception as e:
                logger.error('could not copy archive for non matching, non reprocessed entry', exc_info=e)
291
                raise e
292

293
294
295
296
297
            # mock the steps of actual processing
            self._continue_with('parsing')
            self._continue_with('normalizing')
            self._continue_with('archiving')
            self._complete()
298
299
            return

300
        if parser is None:
301
302
303
304
            self.get_logger().warn('no parser matches during re-process, use the old parser')
            self.warnings = ['no matching parser found during re-processing']

        elif self.parser != parser.name:
305
306
307
308
309
310
311
312
313
            if parser_dict[self.parser].name == parser.name:
                # parser was just renamed
                self.parser = parser.name

            else:
                self.parser = parser.name
                logger.info(
                    'different parser matches during re-process, use new parser',
                    parser=parser.name)
314

315
        try:
316
317
            self._entry_metadata = self.user_metadata()
            self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
318
319
            self._entry_metadata.nomad_version = config.meta.version
            self._entry_metadata.nomad_commit = config.meta.commit
320
321
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)
322
323
324
325
326
327
328

            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
            # close loghandler that was not closed due to failures
            try:
Markus Scheidgen's avatar
Markus Scheidgen committed
329
                if self._parser_results and self._parser_results.m_resource:
330
                    self._parser_results.section_metadata = None
Markus Scheidgen's avatar
Markus Scheidgen committed
331
                    self._parser_results.m_resource.unload()
332
            except Exception as e:
333
                logger.error('could not unload processing results', exc_info=e)
334

335
336
337
338
339
    def _setup_fallback_metadata(self):
        self._entry_metadata = self.create_metadata()
        self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
        self._entry_metadata.last_processing = datetime.utcnow()
        self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)
340
        self._entry_metadata.parser_name = self.parser
341

342
    @process
343
    def process_calc(self):
344
        '''
345
346
        Processes a new calculation that has no prior records in the mongo, elastic,
        or filesystem storage. It will create an initial set of (user) metadata.
347
        '''
348
        logger = self.get_logger()
349
        if self.upload is None:
350
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
351
352

        try:
353
354
            # save preliminary minimum calc metadata in case processing fails
            # successful processing will replace it with the actual metadata
355
            self._setup_fallback_metadata()
356
357

            if len(self._entry_metadata.files) >= config.auxfile_cutoff:
358
359
360
361
                self.warning(
                    'This calc has many aux files in its directory. '
                    'Have you placed many calculations in the same directory?')

Markus Scheidgen's avatar
Markus Scheidgen committed
362
363
364
365
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
366
            # close loghandler that was not closed due to failures
367
            try:
Markus Scheidgen's avatar
Markus Scheidgen committed
368
                if self._parser_results and self._parser_results.m_resource:
369
                    self._parser_results.section_metadata = None
Markus Scheidgen's avatar
Markus Scheidgen committed
370
                    self._parser_results.m_resource.unload()
371
            except Exception as e:
372
                logger.error('could unload processing results', exc_info=e)
373

374
    def on_fail(self):
375
376
377
        # in case of failure, index a minimum set of metadata and mark
        # processing failure
        try:
378
379
380
            if self._entry_metadata is None:
                self._setup_fallback_metadata()

381
            self._entry_metadata.processed = False
382

383
384
385
386
387
388
389
390
391
392
393
394
            try:
                self.apply_entry_metadata(self._entry_metadata)
            except Exception as e:
                self.get_logger().error(
                    'could not apply entry metadata to entry', exc_info=e)

            try:
                self._entry_metadata.apply_domain_metadata(self._parser_results)
            except Exception as e:
                self.get_logger().error(
                    'could not apply domain metadata to entry', exc_info=e)

395
            self._entry_metadata.a_elastic.index()
396
        except Exception as e:
397
398
            self.get_logger().error(
                'could not index after processing failure', exc_info=e)
399

400
        try:
401
            self.write_archive(self._parser_results)
402
        except Exception as e:
403
404
            self.get_logger().error(
                'could not write archive after processing failure', exc_info=e)
405

406
407
408
409
410
411
412
413
    def on_process_complete(self, process_name):
        # the save might be necessary to correctly read the join condition from the db
        self.save()
        # in case of error, the process_name might be unknown
        if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None:
            self.upload.reload()
            self.upload.check_join()

Markus Scheidgen's avatar
Markus Scheidgen committed
414
415
    @task
    def parsing(self):
416
        ''' The *task* that encapsulates all parsing related actions. '''
417
        context = dict(parser=self.parser, step=self.parser)
418
        logger = self.get_logger(**context)
419
        parser = parser_dict[self.parser]
420
        self._entry_metadata.parser_name = self.parser
Markus Scheidgen's avatar
Markus Scheidgen committed
421

422
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
423
            try:
424
425
426
                self._parser_results = EntryArchive()
                # allow parsers to read/write metadata
                self._parser_results.m_add_sub_section(EntryArchive.section_metadata, self._entry_metadata)
Markus Scheidgen's avatar
Markus Scheidgen committed
427
428
429
                parser.parse(
                    self.upload_files.raw_file_object(self.mainfile).os_path,
                    self._parser_results, logger=logger)
430

431
            except Exception as e:
432
                self.fail('parser failed with exception', exc_info=e, error=str(e), **context)
433
                return
434
            except SystemExit:
435
                self.fail('parser raised system exit', error='system exit', **context)
436
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
437

438
439
440
441
442
443
444
445
446
447
    def process_phonon(self):
        """Function that is run for phonon calculation before cleanup.
        This task is run by the celery process that is calling the join for the
        upload.

        This function re-opens the Archive for this calculation to add method
        information from another referenced archive. Updates the method
        information in section_encyclopedia as well as the DFT domain metadata.
        """
        try:
448
            logger = self.get_logger(parser=self.parser, step=self.parser)
449
450
451
452
453
454

            # Open the archive of the phonon calculation.
            upload_files = StagingUploadFiles(self.upload_id, is_authorized=lambda: True)
            with upload_files.read_archive(self.calc_id) as archive:
                arch = query_archive(archive, {self.calc_id: self.calc_id})[self.calc_id]
                phonon_archive = EntryArchive.m_from_dict(arch)
Markus Scheidgen's avatar
Markus Scheidgen committed
455
            self._entry_metadata = phonon_archive.section_metadata
456
            self._calc_proc_logs = phonon_archive.processing_logs
457

Markus Scheidgen's avatar
Markus Scheidgen committed
458
459
            # Re-create the parse results
            self._parser_results = phonon_archive
Markus Scheidgen's avatar
Markus Scheidgen committed
460

461
462
463
            # Read in the first referenced calculation. The reference is given as
            # an absolute path which needs to be converted into a path that is
            # relative to upload root.
Markus Scheidgen's avatar
Markus Scheidgen committed
464
            scc = self._parser_results.section_run[0].section_single_configuration_calculation[0]
465
            relative_ref = scc.section_calculation_to_calculation_refs[0].calculation_to_calculation_external_url
466
467
468
469
470
471
            ref_id = upload_files.calc_id(relative_ref)
            with upload_files.read_archive(ref_id) as archive:
                arch = query_archive(archive, {ref_id: ref_id})[ref_id]
                ref_archive = EntryArchive.m_from_dict(arch)

            # Get encyclopedia method information directly from the referenced calculation.
472
            ref_enc_method = ref_archive.section_metadata.encyclopedia.method
473
            if ref_enc_method is None or len(ref_enc_method) == 0 or ref_enc_method.functional_type is None:
474
                raise ValueError("No method information available in referenced calculation.")
Markus Scheidgen's avatar
Markus Scheidgen committed
475
            self._parser_results.section_metadata.encyclopedia.method = ref_enc_method
476
477
478
479
480
481
482
483

            # Overwrite old entry with new data. The metadata is updated with
            # new timestamp and method details taken from the referenced
            # archive.
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.dft.xc_functional = ref_archive.section_metadata.dft.xc_functional
            self._entry_metadata.dft.basis_set = ref_archive.section_metadata.dft.basis_set
            self._entry_metadata.dft.update_group_hash()
484
            self._entry_metadata.encyclopedia.status = EncyclopediaMetadata.status.type.success
485
        except Exception as e:
486
            logger.error("Could not retrieve method information for phonon calculation.", exc_info=e)
Markus Scheidgen's avatar
Markus Scheidgen committed
487
488
489
490
            if self._entry_metadata is None:
                self._setup_fallback_metadata()
                self._entry_metadata.processed = False

491
492
493
494
495
496
            try:
                if self._entry_metadata.encyclopedia is None:
                    self._entry_metadata.encyclopedia = EncyclopediaMetadata()
                self._entry_metadata.encyclopedia.status = EncyclopediaMetadata.status.type.failure
            except Exception as e:
                logger.error("Could set encyclopedia status.", exc_info=e)
497

498
        finally:
499
500
501
502
503
504
505
506
507
508
509
510
511
            # persist the calc metadata
            with utils.timer(logger, 'saved calc metadata', step='metadata'):
                self.apply_entry_metadata(self._entry_metadata)

            # index in search
            with utils.timer(logger, 'indexed', step='index'):
                self._entry_metadata.a_elastic.index()

            # persist the archive
            with utils.timer(
                    logger, 'archived', step='archive',
                    input_size=self.mainfile_file.size) as log_data:

Markus Scheidgen's avatar
Markus Scheidgen committed
512
                archive_size = self.write_archive(self._parser_results)
513
514
                log_data.update(archive_size=archive_size)

Markus Scheidgen's avatar
Markus Scheidgen committed
515
516
    @task
    def normalizing(self):
517
        ''' The *task* that encapsulates all normalizing related actions. '''
518
519

        # allow normalizer to access and add data to the entry metadata
520
521
522
        if self._parser_results.section_metadata is None:
            self._parser_results.m_add_sub_section(
                datamodel.EntryArchive.section_metadata, self._entry_metadata)
523

Markus Scheidgen's avatar
Markus Scheidgen committed
524
        for normalizer in normalizers:
525
            if normalizer.domain != parser_dict[self.parser].domain:
526
527
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
528
            normalizer_name = normalizer.__name__
529
            context = dict(normalizer=normalizer_name, step=normalizer_name)
530
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
531

Markus Scheidgen's avatar
Markus Scheidgen committed
532
533
534
535
536
537
            with utils.timer(logger, 'normalizer executed', input_size=self.mainfile_file.size):
                try:
                    normalizer(self._parser_results).normalize(logger=logger)
                    logger.info('processor completed successfull', **context)
                except Exception as e:
                    self.fail('normalizer failed with exception', exc_info=e, error=str(e), **context)
Markus Scheidgen's avatar
Markus Scheidgen committed
538

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
539
540
541
542
543
    def _read_metadata_from_file(self, logger):
        # metadata file name defined in nomad.config nomad_metadata.yaml/json
        # which can be placed in the directory containing the mainfile or somewhere up
        # highest priority is directory with mainfile

544
        metadata_file = config.metadata_file_name
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
545
546
547
        metadata_dir = os.path.dirname(self.mainfile_file.os_path)

        metadata = {}
548
549
550
551
552
553
554
555
556
557
558
559
        while True:
            # top-level nomad file can also contain an entries dict with entry
            # metadata per mainfile as key
            if metadata_dir == self.upload_files.os_path:
                entries = metadata_part.get('entries', {})
                metadata_part = entries.get(self.mainfile, {})
                for key, val in metadata_part.items():
                    metadata.setdefault(key, val)

            # consider the nomad file of the current directory
            metadata_part = self.upload.metadata_file_cached(
                os.path.join(metadata_dir, metadata_file))
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
560
561
562
            for key, val in metadata_part.items():
                metadata.setdefault(key, val)

563
            if metadata_dir == self.upload_files.os_path:
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
564
565
                break

566
567
568
            metadata_dir = os.path.dirname(metadata_dir)

        if len(metadata) > 0:
569
570
            logger.info('Apply user metadata from nomad.yaml/json file')

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
571
        for key, val in metadata.items():
572
573
574
            if key == 'entries':
                continue

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
575
            definition = _editable_metadata.get(key, None)
576
577
578
579
580
            if definition is None and self.upload.from_oasis:
                definition = _oasis_metadata.get(key, None)

            if definition is None:
                logger.warn('Users cannot set metadata', quantity=key)
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
581
                continue
582
583
584
585
586
587
588
589
590

            try:
                self._entry_metadata.m_set(definition, val)
                if definition == datamodel.EntryMetadata.calc_id:
                    self.calc_id = val
            except Exception as e:
                logger.error(
                    'Could not apply user metadata from nomad.yaml/json file',
                    quantitiy=definition.name, exc_info=e)
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
591

Markus Scheidgen's avatar
Markus Scheidgen committed
592
593
    @task
    def archiving(self):
594
        ''' The *task* that encapsulates all archival related actions. '''
595
596
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
597
        self._entry_metadata.apply_domain_metadata(self._parser_results)
598
        self._entry_metadata.processed = True
599

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
600
601
        self._read_metadata_from_file(logger)

602
603
        # persist the calc metadata
        with utils.timer(logger, 'saved calc metadata', step='metadata'):
604
            self.apply_entry_metadata(self._entry_metadata)
605
606

        # index in search
607
        with utils.timer(logger, 'indexed', step='index'):
608
            self._entry_metadata.a_elastic.index()
609

610
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
611
        with utils.timer(
612
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
613
                input_size=self.mainfile_file.size) as log_data:
614

Markus Scheidgen's avatar
Markus Scheidgen committed
615
            archive_size = self.write_archive(self._parser_results)
616
            log_data.update(archive_size=archive_size)
Markus Scheidgen's avatar
Markus Scheidgen committed
617

Markus Scheidgen's avatar
Markus Scheidgen committed
618
    def write_archive(self, archive: EntryArchive):
619
620
        # save the archive mongo entry
        try:
621
622
            if self._entry_metadata.processed:
                write_partial_archive_to_mongo(archive)
623
624
625
626
        except Exception as e:
            self.get_logger().error('could not write mongodb archive entry', exc_info=e)

        # add the processing logs to the archive
627
628
629
630
631
632
        def filter_processing_logs(logs):
            if len(logs) > 100:
                return [
                    log for log in logs
                    if log.get('level') != 'DEBUG']
            return logs
Markus Scheidgen's avatar
Markus Scheidgen committed
633

634
635
636
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []

Markus Scheidgen's avatar
Markus Scheidgen committed
637
638
        if archive is not None:
            archive = archive.m_copy()
639
        else:
Markus Scheidgen's avatar
Markus Scheidgen committed
640
            archive = datamodel.EntryArchive()
641

Markus Scheidgen's avatar
Markus Scheidgen committed
642
643
        if archive.section_metadata is None:
            archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
644

Markus Scheidgen's avatar
Markus Scheidgen committed
645
        archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
646

647
        # save the archive msg-pack
648
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
649
            return self.upload_files.write_archive(self.calc_id, archive.m_to_dict())
650
651
        except Exception as e:
            # most likely failed due to domain data, try to write metadata and processing logs
Markus Scheidgen's avatar
Markus Scheidgen committed
652
653
654
655
            archive = datamodel.EntryArchive()
            archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
            archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
            self.upload_files.write_archive(self.calc_id, archive.m_to_dict())
656
            raise e
Markus Scheidgen's avatar
Markus Scheidgen committed
657

658
    def __str__(self):
659
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
660

661

662
class Upload(Proc):
663
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
664
665
666
667
668
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
669
670
        upload_path: the path were the uploaded files was stored
        temporary: True if the uploaded file should be removed after extraction
Markus Scheidgen's avatar
Markus Scheidgen committed
671
        upload_id: the upload id generated by the database
672
        upload_time: the timestamp when the system realized the upload
673
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
674
675
        published: Boolean that indicates the publish status
        publish_time: Date when the upload was initially published
676
        last_update: Date of the last publishing/re-processing
677
        joined: Boolean indicates if the running processing has joined (:func:`check_join`)
678
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
679
680
681
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
682
683
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
684
    embargo_length = IntField(default=36)
Markus Scheidgen's avatar
Markus Scheidgen committed
685
686
687

    name = StringField(default=None)
    upload_time = DateTimeField()
688
    user_id = StringField(required=True)
689
690
    published = BooleanField(default=False)
    publish_time = DateTimeField()
691
    last_update = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
692

693
    from_oasis = BooleanField(default=False)
694
695
    joined = BooleanField(default=False)

Markus Scheidgen's avatar
Markus Scheidgen committed
696
697
    meta: Any = {
        'indexes': [
698
            'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
699
        ]
Markus Scheidgen's avatar
Markus Scheidgen committed
700
701
702
703
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
704
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
705

706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
    @lru_cache()
    def metadata_file_cached(self, path):
        for ext in config.metadata_file_extensions:
            full_path = '%s.%s' % (path, ext)
            if os.path.isfile(full_path):
                try:
                    with open(full_path) as f:
                        if full_path.endswith('.json'):
                            return json.load(f)
                        elif full_path.endswith('.yaml') or full_path.endswith('.yml'):
                            return yaml.load(f, Loader=getattr(yaml, 'FullLoader'))
                        else:
                            return {}
                except Exception as e:
                    self.get_logger().warn('could not parse nomad.yaml/json', path=path, exc_info=e)
                    # ignore the file contents if the file is not parsable
                    pass
        return {}

725
726
    @property
    def metadata(self) -> dict:
727
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
728
729
730
731
732
        Getter, setter for user metadata. Metadata is pickled to and from the public
        bucket to allow sharing among all processes. Usually uploads do not have (much)
        user defined metadata, but users provide all metadata per upload as part of
        the publish process. This will change, when we introduce editing functionality
        and metadata will be provided through different means.
733
        '''
734
735
736
737
738
        try:
            upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
        except KeyError:
            return None
        return upload_files.user_metadata
739
740
741

    @metadata.setter
    def metadata(self, data: dict) -> None:
742
743
        upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True, create=True)
        upload_files.user_metadata = data
744

Markus Scheidgen's avatar
Markus Scheidgen committed
745
    @classmethod
746
    def get(cls, id: str, include_published: bool = True) -> 'Upload':
747
        return cls.get_by_id(id, 'upload_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
748
749

    @classmethod
750
    def user_uploads(cls, user: datamodel.User, **kwargs) -> List['Upload']:
751
        ''' Returns all uploads for the given user. Kwargs are passed to mongo query. '''
752
        return cls.objects(user_id=str(user.user_id), **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
753

754
755
    @property
    def uploader(self):
756
        return datamodel.User.get(self.user_id)
757

Markus Scheidgen's avatar
Markus Scheidgen committed
758
759
    def get_logger(self, **kwargs):
        logger = super().get_logger()
760
761
        user = self.uploader
        user_name = '%s %s' % (user.first_name, user.last_name)
762
        # We are not using 'user_id' because logstash (?) will filter these entries ?!
763
764
        logger = logger.bind(
            upload_id=self.upload_id, upload_name=self.name, user_name=user_name,
765
            user=user.user_id, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
766
767
768
769
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
770
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
771
772
773
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
774
775

        Arguments:
776
            user: The user that created the upload.
777
        '''
778
779
        # use kwargs to keep compatibility with super method
        user: datamodel.User = kwargs['user']
780
        del(kwargs['user'])
781

782
783
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
784
        kwargs.update(user_id=user.user_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
785
        self = super().create(**kwargs)
786

Markus Scheidgen's avatar
Markus Scheidgen committed
787
        self._continue_with('uploading')
788

Markus Scheidgen's avatar
Markus Scheidgen committed
789
790
        return self

791
    def delete(self):
792
        ''' Deletes this upload process state entry and its calcs. '''
793
794
795
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

796
    def delete_upload_local(self):
797
        '''
798
        Deletes the upload, including its processing state and
799
        staging files. Local version without celery processing.
800
        '''
801
802
        logger = self.get_logger()

803
        with utils.lnr(logger, 'upload delete failed'):
804
            with utils.timer(
805
                    logger, 'upload deleted from index', step='index',
806
                    upload_size=self.upload_files.size):
807
                search.delete_upload(self.upload_id)
808

809
810
811
812
813
814
815
            with utils.timer(
                    logger, 'upload partial archives', step='files',
                    upload_size=self.upload_files.size):

                calc_ids = [calc.calc_id for calc in Calc.objects(upload_id=self.upload_id)]
                delete_partial_archives_from_mongo(calc_ids)

816
            with utils.timer(
817
                    logger, 'upload deleted', step='files',
818
819
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
820
821

            self.delete()
822

823
    @process
824
    def delete_upload(self):
825
        '''
826
        Deletes the upload, including its processing state and
827
        staging files. This starts the celery process of deleting the upload.
828
        '''
829
        self.delete_upload_local()
830

831
        return True  # do not save the process status on the delete upload
832

833
    @process
834
    def publish_upload(self):
835
        '''
836
837
        Moves the upload out of staging to the public area. It will
        pack the staging upload files in to public upload files.
838
        '''
839
840
        assert self.processed_calcs > 0

841
        logger = self.get_logger()
842
        logger.info('started to publish')
843

844
        with utils.lnr(logger, 'publish failed'):
845
            with self.entries_metadata(self.metadata) as calcs:
846

847
                with utils.timer(
848
                        logger, 'upload metadata updated', step='metadata',
849
                        upload_size=self.upload_files.size):
850

851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
                    def create_update(calc):
                        calc.published = True
                        calc.with_embargo = calc.with_embargo if calc.with_embargo is not None else False
                        return UpdateOne(
                            {'_id': calc.calc_id},
                            {'$set': {'metadata': calc.m_to_dict(
                                include_defaults=True, categories=[datamodel.MongoMetadata])}})

                    Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload files packed', step='pack',
                            upload_size=self.upload_files.size):
                        self.upload_files.pack(calcs)
866

867
                with utils.timer(
868
                        logger, 'index updated', step='index',
869
                        upload_size=self.upload_files.size):
870
871
872
873
874
875
876
877
878
879
880
881
                    search.publish(calcs)

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload deleted', step='delete staged',
                            upload_size=self.upload_files.size):
                        self.upload_files.delete()
                        self.published = True
                        self.publish_time = datetime.utcnow()
                        self.last_update = datetime.utcnow()
                        self.save()
                else:
882
                    self.last_update = datetime.utcnow()
883
                    self.save()
884

885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
    @process
    def publish_from_oasis(self):
        '''
        Uploads the already published upload to a different NOMAD deployment. This allows
        to push uploads from an OASIS to the central NOMAD.
        '''
        assert self.published, 'Only published uploads can be published to the central NOMAD.'

        # TODO check if it might be there

        # create a nomad.json with all necessary metadata that is not determined by
        # processing the raw data
        metadata = dict(
            upload_time=str(self.upload_time))

        entries = {}
        for calc in self.calcs:
            entry_metadata = dict(**{
                key: str(value) if isinstance(value, datetime) else value
                for key, value in calc.metadata.items()
                if key in _editable_metadata or key in _oasis_metadata})
            entry_metadata['calc_id'] = calc.calc_id
            if entry_metadata.get('with_embargo'):
                continue
            entries[calc.mainfile] = entry_metadata
        metadata['entries'] = entries

        upload_id, metadata = _normalize_oasis_upload_metadata(self.upload_id, metadata)

        assert len(entries) > 0, 'Only uploads with public contents can be published to the central NOMAD.'

        public_upload_files = cast(PublicUploadFiles, self.upload_files)
        public_upload_files.add_metadata_file(metadata)

        # upload the file
        from nomad.cli.client.client import _create_client as create_client

        try:
            client = create_client(
                user=config.keycloak.username,
                password=config.keycloak.password,
                api_base_url=config.services.central_nomad_api_url)

            oasis_admin = client.auth.get_auth().response().result
            oasis_admin_token = oasis_admin.access_token
            upload_url = '%s/uploads/?%s' % (
                config.services.central_nomad_api_url,
                urllib.parse.urlencode(dict(oasis_upload_id=upload_id, oasis_uploader=self.user_id)))
            with open(public_upload_files.public_raw_data_file, 'rb') as f:
                response = requests.put(
                    upload_url, headers={'Authorization': 'Bearer %s' % oasis_admin_token},
                    data=f)
            if response.status_code != 200:
                self.get_logger().error('Could not upload to central NOMAD', status_code=response.status_code)
        except Exception as e:
            self.get_logger().error('Could not upload to central NOMAD', exc_info=e)
            raise e

        # TODO record the publication at the other NOMAD deployment
        pass

946
947
    @process
    def