data.py 49.6 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
16
17
18
19
20
21
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
Markus Scheidgen's avatar
Markus Scheidgen committed
22

Markus Scheidgen's avatar
Markus Scheidgen committed
23
.. autoclass:: Upload
Markus Scheidgen's avatar
Markus Scheidgen committed
24

25
'''
26
27

from typing import cast, List, Any, Tuple, Iterator, Dict, cast, Iterable
28
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField
Markus Scheidgen's avatar
Markus Scheidgen committed
29
import logging
30
from structlog import wrap_logger
31
from contextlib import contextmanager
32
import os.path
33
34
from datetime import datetime
from pymongo import UpdateOne
35
import hashlib
36
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
Markus Scheidgen's avatar
Markus Scheidgen committed
37

38
from nomad import utils, config, infrastructure, search, datamodel
Markus Scheidgen's avatar
Markus Scheidgen committed
39
40
41
from nomad.files import (
    PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles,
    PublicUploadFiles, StagingUploadFiles)
42
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
Markus Scheidgen's avatar
Markus Scheidgen committed
43
from nomad.parsing.parsers import parser_dict, match_parser
Markus Scheidgen's avatar
Markus Scheidgen committed
44
from nomad.normalizing import normalizers
45
46
from nomad.datamodel import EntryArchive
from nomad.archive import query_archive
Markus Scheidgen's avatar
Markus Scheidgen committed
47
from nomad.datamodel.encyclopedia import EncyclopediaMetadata
Markus Scheidgen's avatar
Markus Scheidgen committed
48
49


50
51
52
53
section_metadata = datamodel.EntryArchive.section_metadata.name
section_workflow = datamodel.EntryArchive.section_workflow.name


54
55
56
57
58
59
60
def _pack_log_event(logger, method_name, event_dict):
    try:
        log_data = dict(event_dict)
        log_data.update(**{
            key: value
            for key, value in getattr(logger, '_context', {}).items()
            if key not in ['service', 'release', 'upload_id', 'calc_id', 'mainfile', 'process_status']})
61
        log_data.update(logger=logger.name)
62
63
64
65
66
67
68
69
70
71
72

        return log_data
    except Exception:
        # raising an exception would cause an indefinite loop
        return event_dict


_log_processors = [
    StackInfoRenderer(),
    _pack_log_event,
    format_exc_info,
73
    TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)]
74
75


Markus Scheidgen's avatar
Markus Scheidgen committed
76
class Calc(Proc):
77
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
78
79
80
81
82
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

83
84
    The attribute list, does not include the various metadata properties generated
    while parsing, including ``code_name``, ``code_version``, etc.
Markus Scheidgen's avatar
Markus Scheidgen committed
85
86

    Attributes:
87
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
88
89
90
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
91

92
93
        metadata: the metadata record wit calc and user metadata, see :class:`datamodel.EntryMetadata`
    '''
94
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
95
96
97
98
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

99
100
    metadata = DictField()

Markus Scheidgen's avatar
Markus Scheidgen committed
101
    meta: Any = {
Markus Scheidgen's avatar
Markus Scheidgen committed
102
        'indexes': [
103
            'upload_id',
104
            'parser',
105
106
107
            ('upload_id', 'mainfile'),
            ('upload_id', 'parser'),
            ('upload_id', 'tasks_status'),
108
            ('upload_id', 'process_status'),
109
            ('upload_id', 'metadata.nomad_version'),
110
111
            'metadata.processed',
            'metadata.last_processing',
112
            'metadata.published',
113
            'metadata.datasets',
Markus Scheidgen's avatar
Markus Scheidgen committed
114
            'metadata.pid'
Markus Scheidgen's avatar
Markus Scheidgen committed
115
116
117
118
119
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
120
        self._parser_results: EntryArchive = None
121
122
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
123
        self._calc_proc_logs: List[Any] = None
124

125
        self._entry_metadata = None
126

Markus Scheidgen's avatar
Markus Scheidgen committed
127
128
    @classmethod
    def get(cls, id):
129
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
130

Markus Scheidgen's avatar
Markus Scheidgen committed
131
    @property
132
133
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
134

135
136
137
138
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
139
            self._upload.worker_hostname = self.worker_hostname
140
141
        return self._upload

142
143
144
145
146
147
148
149
150
151
152
153
    def apply_entry_metadata(self, entry_metadata: datamodel.EntryMetadata):
        self.metadata = entry_metadata.m_to_dict(
            include_defaults=True,
            categories=[datamodel.MongoMetadata])  # TODO use embedded doc?

    def create_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
        processing object, not necessarely the user metadata nor the metadata from
        the archive.
        '''
        entry_metadata = datamodel.EntryMetadata()
154
155
        if self.parser is not None:
            entry_metadata.domain = parser_dict[self.parser].domain
156
157
158
        entry_metadata.upload_id = self.upload_id
        entry_metadata.calc_id = self.calc_id
        entry_metadata.mainfile = self.mainfile
159
160
        entry_metadata.nomad_version = config.meta.version
        entry_metadata.nomad_commit = config.meta.commit
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
        entry_metadata.uploader = self.upload.user_id
        entry_metadata.upload_time = self.upload.upload_time
        entry_metadata.upload_name = self.upload.name

        return entry_metadata

    def entry_metadata(self, upload_files: UploadFiles) -> datamodel.EntryMetadata:
        '''
        Returns a complete set of :class:`nomad.datamodel.EntryMetadata` including
        the user metadata and metadata from the archive.

        Arguments:
            upload_files:
                The :class:`nomad.files.UploadFiles` instance to read the archive from.
            cache:
                A boolean that indicates if the archive file should be left unclosed,
                e.g. if this method is called for many entries of the same upload.
        '''
        archive = upload_files.read_archive(self.calc_id)
180
        try:
181
182
183
184
185
186
187
188
189
            # instead of loading the whole archive, it should be enough to load the
            # parts that are referenced by section_metadata/EntryMetadata
            # TODO somehow it should determine which root setions too load from the metainfo
            # or configuration
            calc_archive = archive[self.calc_id]
            entry_archive_dict = {section_metadata: calc_archive[section_metadata].to_dict()}
            if section_workflow in calc_archive:
                entry_archive_dict[section_workflow] = calc_archive[section_workflow].to_dict()
            entry_metadata = datamodel.EntryArchive.m_from_dict(entry_archive_dict)[section_metadata]
190
191
192
193
194
195
196
197
198

        except KeyError:
            # Due hard processing failures, it might be possible that an entry might not
            # have an archive
            if self._entry_metadata is not None:
                entry_metadata = self._entry_metadata

            else:
                entry_metadata = self.create_metadata()
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214

        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

    def user_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
        processing object and the user metadata, not necessarely the metadata from
        the archive.
        '''
        entry_metadata = self.create_metadata()
        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

215
216
217
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
218
219
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
220
221
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
222
    def get_logger(self, **kwargs):
223
        '''
224
225
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
226
        '''
227
228
229
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
230

231
232
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []
233

234
235
        def save_to_calc_log(logger, method_name, event_dict):
            try:
236
237
                # sanitize the event_dict, because all kinds of values might have been added
                dump_dict = {key: str(value) for key, value in event_dict.items()}
238
239
                dump_dict.update(level=method_name.upper())
                self._calc_proc_logs.append(dump_dict)
240

241
242
243
            except Exception:
                # Exceptions here will cause indefinite loop
                pass
244

245
            return event_dict
246

247
        return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
248

249
    @process
250
    def re_process_calc(self):
251
        '''
252
253
254
        Processes a calculation again. This means there is already metadata and
        instead of creating it initially, we are just updating the existing
        records.
255
        '''
256
        parser = match_parser(self.upload_files.raw_file_object(self.mainfile).os_path, strict=False)
257
        logger = self.get_logger()
258
259
260

        if parser is None and not config.reprocess_unmatched:
            self.errors = ['no parser matches during re-process, will not re-process this calc']
261

262
263
264
265
266
267
268
            try:
                upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
                with upload_files.read_archive(self.calc_id) as archive:
                    self.upload_files.write_archive(self.calc_id, archive[self.calc_id].to_dict())

            except Exception as e:
                logger.error('could not copy archive for non matching, non reprocessed entry', exc_info=e)
269
                raise e
270

271
272
273
274
275
            # mock the steps of actual processing
            self._continue_with('parsing')
            self._continue_with('normalizing')
            self._continue_with('archiving')
            self._complete()
276
277
            return

278
        if parser is None:
279
280
281
282
            self.get_logger().warn('no parser matches during re-process, use the old parser')
            self.warnings = ['no matching parser found during re-processing']

        elif self.parser != parser.name:
283
284
285
286
287
288
289
290
291
            if parser_dict[self.parser].name == parser.name:
                # parser was just renamed
                self.parser = parser.name

            else:
                self.parser = parser.name
                logger.info(
                    'different parser matches during re-process, use new parser',
                    parser=parser.name)
292

293
        try:
294
295
            self._entry_metadata = self.user_metadata()
            self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
296
297
            self._entry_metadata.nomad_version = config.meta.version
            self._entry_metadata.nomad_commit = config.meta.commit
298
299
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)
300
301
302
303
304
305
306

            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
            # close loghandler that was not closed due to failures
            try:
Markus Scheidgen's avatar
Markus Scheidgen committed
307
308
                if self._parser_results and self._parser_results.m_resource:
                    self._parser_results.m_resource.unload()
309
            except Exception as e:
310
                logger.error('could unload processing results', exc_info=e)
311

312
313
314
315
316
317
    def _setup_fallback_metadata(self):
        self._entry_metadata = self.create_metadata()
        self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
        self._entry_metadata.last_processing = datetime.utcnow()
        self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)

318
    @process
319
    def process_calc(self):
320
        '''
321
322
        Processes a new calculation that has no prior records in the mongo, elastic,
        or filesystem storage. It will create an initial set of (user) metadata.
323
        '''
324
        logger = self.get_logger()
325
        if self.upload is None:
326
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
327
328

        try:
329
330
            # save preliminary minimum calc metadata in case processing fails
            # successful processing will replace it with the actual metadata
331
            self._setup_fallback_metadata()
332
333

            if len(self._entry_metadata.files) >= config.auxfile_cutoff:
334
335
336
337
                self.warning(
                    'This calc has many aux files in its directory. '
                    'Have you placed many calculations in the same directory?')

Markus Scheidgen's avatar
Markus Scheidgen committed
338
339
340
341
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
342
            # close loghandler that was not closed due to failures
343
            try:
Markus Scheidgen's avatar
Markus Scheidgen committed
344
345
                if self._parser_results and self._parser_results.m_resource:
                    self._parser_results.m_resource.unload()
346
            except Exception as e:
347
                logger.error('could unload processing results', exc_info=e)
348

349
    def on_fail(self):
350
351
352
        # in case of failure, index a minimum set of metadata and mark
        # processing failure
        try:
353
354
355
            if self._entry_metadata is None:
                self._setup_fallback_metadata()

356
            self._entry_metadata.processed = False
357

358
            self.apply_entry_metadata(self._entry_metadata)
Markus Scheidgen's avatar
Markus Scheidgen committed
359
360
361
            self._entry_metadata.apply_domain_metadata(self._parser_results)
            if self._parser_results and self._parser_results.m_resource:
                self._parser_results.m_resource.unload()
362

363
            self._entry_metadata.a_elastic.index()
364
        except Exception as e:
365
366
            self.get_logger().error(
                'could not index after processing failure', exc_info=e)
367

368
        try:
369
            self.write_archive(None)
370
        except Exception as e:
371
372
            self.get_logger().error(
                'could not write archive after processing failure', exc_info=e)
373

374
375
376
377
378
379
380
381
    def on_process_complete(self, process_name):
        # the save might be necessary to correctly read the join condition from the db
        self.save()
        # in case of error, the process_name might be unknown
        if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None:
            self.upload.reload()
            self.upload.check_join()

Markus Scheidgen's avatar
Markus Scheidgen committed
382
383
    @task
    def parsing(self):
384
        ''' The *task* that encapsulates all parsing related actions. '''
385
        context = dict(parser=self.parser, step=self.parser)
386
        logger = self.get_logger(**context)
387
        parser = parser_dict[self.parser]
388
        self._entry_metadata.parser_name = self.parser
Markus Scheidgen's avatar
Markus Scheidgen committed
389

390
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
391
            try:
Markus Scheidgen's avatar
Markus Scheidgen committed
392
393
394
395
                self._parser_results = datamodel.EntryArchive()
                parser.parse(
                    self.upload_files.raw_file_object(self.mainfile).os_path,
                    self._parser_results, logger=logger)
396

397
            except Exception as e:
398
                self.fail('parser failed with exception', exc_info=e, error=str(e), **context)
399
                return
400
            except SystemExit:
401
                self.fail('parser raised system exit', error='system exit', **context)
402
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
403

404
405
406
407
408
409
410
411
412
413
    def process_phonon(self):
        """Function that is run for phonon calculation before cleanup.
        This task is run by the celery process that is calling the join for the
        upload.

        This function re-opens the Archive for this calculation to add method
        information from another referenced archive. Updates the method
        information in section_encyclopedia as well as the DFT domain metadata.
        """
        try:
414
            logger = self.get_logger(parser=self.parser, step=self.parser)
415
416
417
418
419
420

            # Open the archive of the phonon calculation.
            upload_files = StagingUploadFiles(self.upload_id, is_authorized=lambda: True)
            with upload_files.read_archive(self.calc_id) as archive:
                arch = query_archive(archive, {self.calc_id: self.calc_id})[self.calc_id]
                phonon_archive = EntryArchive.m_from_dict(arch)
Markus Scheidgen's avatar
Markus Scheidgen committed
421
            self._entry_metadata = phonon_archive.section_metadata
422
            self._calc_proc_logs = phonon_archive.processing_logs
423

Markus Scheidgen's avatar
Markus Scheidgen committed
424
425
            # Re-create the parse results
            self._parser_results = phonon_archive
Markus Scheidgen's avatar
Markus Scheidgen committed
426

427
428
429
            # Read in the first referenced calculation. The reference is given as
            # an absolute path which needs to be converted into a path that is
            # relative to upload root.
Markus Scheidgen's avatar
Markus Scheidgen committed
430
            scc = self._parser_results.section_run[0].section_single_configuration_calculation[0]
431
            relative_ref = scc.section_calculation_to_calculation_refs[0].calculation_to_calculation_external_url
432
433
434
435
436
437
            ref_id = upload_files.calc_id(relative_ref)
            with upload_files.read_archive(ref_id) as archive:
                arch = query_archive(archive, {ref_id: ref_id})[ref_id]
                ref_archive = EntryArchive.m_from_dict(arch)

            # Get encyclopedia method information directly from the referenced calculation.
438
            ref_enc_method = ref_archive.section_metadata.encyclopedia.method
439
            if ref_enc_method is None or len(ref_enc_method) == 0 or ref_enc_method.functional_type is None:
440
                raise ValueError("No method information available in referenced calculation.")
Markus Scheidgen's avatar
Markus Scheidgen committed
441
            self._parser_results.section_metadata.encyclopedia.method = ref_enc_method
442
443
444
445
446
447
448
449

            # Overwrite old entry with new data. The metadata is updated with
            # new timestamp and method details taken from the referenced
            # archive.
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.dft.xc_functional = ref_archive.section_metadata.dft.xc_functional
            self._entry_metadata.dft.basis_set = ref_archive.section_metadata.dft.basis_set
            self._entry_metadata.dft.update_group_hash()
450
            self._entry_metadata.encyclopedia.status = EncyclopediaMetadata.status.type.success
451
        except Exception as e:
452
            logger.error("Could not retrieve method information for phonon calculation.", exc_info=e)
Markus Scheidgen's avatar
Markus Scheidgen committed
453
454
455
456
            if self._entry_metadata is None:
                self._setup_fallback_metadata()
                self._entry_metadata.processed = False

457
458
459
460
461
462
            try:
                if self._entry_metadata.encyclopedia is None:
                    self._entry_metadata.encyclopedia = EncyclopediaMetadata()
                self._entry_metadata.encyclopedia.status = EncyclopediaMetadata.status.type.failure
            except Exception as e:
                logger.error("Could set encyclopedia status.", exc_info=e)
463

464
        finally:
465
466
467
468
469
470
471
472
473
474
475
476
477
            # persist the calc metadata
            with utils.timer(logger, 'saved calc metadata', step='metadata'):
                self.apply_entry_metadata(self._entry_metadata)

            # index in search
            with utils.timer(logger, 'indexed', step='index'):
                self._entry_metadata.a_elastic.index()

            # persist the archive
            with utils.timer(
                    logger, 'archived', step='archive',
                    input_size=self.mainfile_file.size) as log_data:

Markus Scheidgen's avatar
Markus Scheidgen committed
478
                archive_size = self.write_archive(self._parser_results)
479
480
                log_data.update(archive_size=archive_size)

Markus Scheidgen's avatar
Markus Scheidgen committed
481
482
    @task
    def normalizing(self):
483
        ''' The *task* that encapsulates all normalizing related actions. '''
484
485

        # allow normalizer to access and add data to the entry metadata
Markus Scheidgen's avatar
Markus Scheidgen committed
486
        self._parser_results.m_add_sub_section(
487
488
            datamodel.EntryArchive.section_metadata, self._entry_metadata)

Markus Scheidgen's avatar
Markus Scheidgen committed
489
        for normalizer in normalizers:
490
            if normalizer.domain != parser_dict[self.parser].domain:
491
492
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
493
            normalizer_name = normalizer.__name__
494
            context = dict(normalizer=normalizer_name, step=normalizer_name)
495
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
496

Markus Scheidgen's avatar
Markus Scheidgen committed
497
498
499
500
501
502
            with utils.timer(logger, 'normalizer executed', input_size=self.mainfile_file.size):
                try:
                    normalizer(self._parser_results).normalize(logger=logger)
                    logger.info('processor completed successfull', **context)
                except Exception as e:
                    self.fail('normalizer failed with exception', exc_info=e, error=str(e), **context)
Markus Scheidgen's avatar
Markus Scheidgen committed
503
504
505

    @task
    def archiving(self):
506
        ''' The *task* that encapsulates all archival related actions. '''
507
508
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
509
        self._entry_metadata.apply_domain_metadata(self._parser_results)
510
        self._entry_metadata.processed = True
511

512
513
        # persist the calc metadata
        with utils.timer(logger, 'saved calc metadata', step='metadata'):
514
            self.apply_entry_metadata(self._entry_metadata)
515
516

        # index in search
517
        with utils.timer(logger, 'indexed', step='index'):
518
            self._entry_metadata.a_elastic.index()
519

520
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
521
        with utils.timer(
522
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
523
                input_size=self.mainfile_file.size) as log_data:
524

Markus Scheidgen's avatar
Markus Scheidgen committed
525
            archive_size = self.write_archive(self._parser_results)
526
            log_data.update(archive_size=archive_size)
Markus Scheidgen's avatar
Markus Scheidgen committed
527

Markus Scheidgen's avatar
Markus Scheidgen committed
528
    def write_archive(self, archive: EntryArchive):
529
530
531
532
533
534
        def filter_processing_logs(logs):
            if len(logs) > 100:
                return [
                    log for log in logs
                    if log.get('level') != 'DEBUG']
            return logs
Markus Scheidgen's avatar
Markus Scheidgen committed
535

536
537
538
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []

Markus Scheidgen's avatar
Markus Scheidgen committed
539
540
        if archive is not None:
            archive = archive.m_copy()
541
        else:
Markus Scheidgen's avatar
Markus Scheidgen committed
542
            archive = datamodel.EntryArchive()
543

Markus Scheidgen's avatar
Markus Scheidgen committed
544
545
        if archive.section_metadata is None:
            archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
546

Markus Scheidgen's avatar
Markus Scheidgen committed
547
        archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
548
549

        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
550
            return self.upload_files.write_archive(self.calc_id, archive.m_to_dict())
551
552
        except Exception as e:
            # most likely failed due to domain data, try to write metadata and processing logs
Markus Scheidgen's avatar
Markus Scheidgen committed
553
554
555
556
            archive = datamodel.EntryArchive()
            archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
            archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
            self.upload_files.write_archive(self.calc_id, archive.m_to_dict())
557
            raise e
Markus Scheidgen's avatar
Markus Scheidgen committed
558

559
    def __str__(self):
560
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
561

562

563
class Upload(Proc):
564
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
565
566
567
568
569
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
570
571
        upload_path: the path were the uploaded files was stored
        temporary: True if the uploaded file should be removed after extraction
Markus Scheidgen's avatar
Markus Scheidgen committed
572
573
        upload_id: the upload id generated by the database
        upload_time: the timestamp when the system realised the upload
574
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
575
576
        published: Boolean that indicates the publish status
        publish_time: Date when the upload was initially published
577
        last_update: Date of the last publishing/re-processing
578
        joined: Boolean indicates if the running processing has joined (:func:`check_join`)
579
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
580
581
582
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
583
584
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
585
    embargo_length = IntField(default=36)
Markus Scheidgen's avatar
Markus Scheidgen committed
586
587
588

    name = StringField(default=None)
    upload_time = DateTimeField()
589
    user_id = StringField(required=True)
590
591
    published = BooleanField(default=False)
    publish_time = DateTimeField()
592
    last_update = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
593

594
595
    joined = BooleanField(default=False)

Markus Scheidgen's avatar
Markus Scheidgen committed
596
597
    meta: Any = {
        'indexes': [
598
            'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
599
        ]
Markus Scheidgen's avatar
Markus Scheidgen committed
600
601
602
603
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
604
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
605

606
607
    @property
    def metadata(self) -> dict:
608
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
609
610
611
612
613
        Getter, setter for user metadata. Metadata is pickled to and from the public
        bucket to allow sharing among all processes. Usually uploads do not have (much)
        user defined metadata, but users provide all metadata per upload as part of
        the publish process. This will change, when we introduce editing functionality
        and metadata will be provided through different means.
614
        '''
615
616
617
618
619
        try:
            upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
        except KeyError:
            return None
        return upload_files.user_metadata
620
621
622

    @metadata.setter
    def metadata(self, data: dict) -> None:
623
624
        upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True, create=True)
        upload_files.user_metadata = data
625

Markus Scheidgen's avatar
Markus Scheidgen committed
626
    @classmethod
627
    def get(cls, id: str, include_published: bool = True) -> 'Upload':
628
        return cls.get_by_id(id, 'upload_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
629
630

    @classmethod
631
    def user_uploads(cls, user: datamodel.User, **kwargs) -> List['Upload']:
632
        ''' Returns all uploads for the given user. Kwargs are passed to mongo query. '''
633
        return cls.objects(user_id=str(user.user_id), **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
634

635
636
    @property
    def uploader(self):
637
        return datamodel.User.get(self.user_id)
638

Markus Scheidgen's avatar
Markus Scheidgen committed
639
640
    def get_logger(self, **kwargs):
        logger = super().get_logger()
641
642
        user = self.uploader
        user_name = '%s %s' % (user.first_name, user.last_name)
643
        # We are not using 'user_id' because logstash (?) will filter these entries ?!
644
645
        logger = logger.bind(
            upload_id=self.upload_id, upload_name=self.name, user_name=user_name,
646
            user=user.user_id, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
647
648
649
650
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
651
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
652
653
654
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
655
656

        Arguments:
657
            user: The user that created the upload.
658
        '''
659
660
        # use kwargs to keep compatibility with super method
        user: datamodel.User = kwargs['user']
661
        del(kwargs['user'])
662

663
664
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
665
        kwargs.update(user_id=user.user_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
666
        self = super().create(**kwargs)
667

Markus Scheidgen's avatar
Markus Scheidgen committed
668
        self._continue_with('uploading')
669

Markus Scheidgen's avatar
Markus Scheidgen committed
670
671
        return self

672
    def delete(self):
673
        ''' Deletes this upload process state entry and its calcs. '''
674
675
676
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

677
    def delete_upload_local(self):
678
        '''
679
        Deletes the upload, including its processing state and
680
        staging files. Local version without celery processing.
681
        '''
682
683
        logger = self.get_logger()

684
        with utils.lnr(logger, 'upload delete failed'):
685
            with utils.timer(
686
                    logger, 'upload deleted from index', step='index',
687
                    upload_size=self.upload_files.size):
688
                search.delete_upload(self.upload_id)
689

690
            with utils.timer(
691
                    logger, 'upload deleted', step='files',
692
693
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
694
695

            self.delete()
696

697
    @process
698
    def delete_upload(self):
699
        '''
700
701
        Deletes of the upload, including its processing state and
        staging files. This starts the celery process of deleting the upload.
702
        '''
703
        self.delete_upload_local()
704

705
        return True  # do not save the process status on the delete upload
706

707
    @process
708
    def publish_upload(self):
709
        '''
710
711
        Moves the upload out of staging to the public area. It will
        pack the staging upload files in to public upload files.
712
        '''
713
714
        assert self.processed_calcs > 0

715
        logger = self.get_logger()
716
        logger.info('started to publish')
717

718
        with utils.lnr(logger, 'publish failed'):
719
            with self.entries_metadata(self.metadata) as calcs:
720

721
                with utils.timer(
722
                        logger, 'upload metadata updated', step='metadata',
723
                        upload_size=self.upload_files.size):
724

725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
                    def create_update(calc):
                        calc.published = True
                        calc.with_embargo = calc.with_embargo if calc.with_embargo is not None else False
                        return UpdateOne(
                            {'_id': calc.calc_id},
                            {'$set': {'metadata': calc.m_to_dict(
                                include_defaults=True, categories=[datamodel.MongoMetadata])}})

                    Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload files packed', step='pack',
                            upload_size=self.upload_files.size):
                        self.upload_files.pack(calcs)
740

741
                with utils.timer(
742
                        logger, 'index updated', step='index',
743
                        upload_size=self.upload_files.size):
744
745
746
747
748
749
750
751
752
753
754
755
                    search.publish(calcs)

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload deleted', step='delete staged',
                            upload_size=self.upload_files.size):
                        self.upload_files.delete()
                        self.published = True
                        self.publish_time = datetime.utcnow()
                        self.last_update = datetime.utcnow()
                        self.save()
                else:
756
                    self.last_update = datetime.utcnow()
757
                    self.save()
758

759
760
    @process
    def re_process_upload(self):
761
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
762
763
764
        A *process* that performs the re-processing of a earlier processed
        upload.

765
766
767
768
769
770
        Runs the distributed process of fully reparsing/renormalizing an existing and
        already published upload. Will renew the archive part of the upload and update
        mongo and elastic search entries.

        TODO this implementation does not do any re-matching. This will be more complex
        due to handling of new or missing matches.
771
        '''
772
773
774
        logger = self.get_logger()
        logger.info('started to re-process')

775
776
777
778
779
        # mock the steps of actual processing
        self._continue_with('uploading')

        # extract the published raw files into a staging upload files instance
        self._continue_with('extracting')
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794

        if self.published:
            try:
                staging_upload_files = StagingUploadFiles(self.upload_id)
                # public files exist and there is a staging directory, it is probably old
                # and we delete it first
                staging_upload_files.delete()
                logger.warn('deleted old staging files')

            except KeyError as e:
                logger.info('reprocessing published files')
        else:
            logger.info('reprocessing staging files')

        staging_upload_files = self.upload_files.to_staging_upload_files(create=True)
795
796

        self._continue_with('parse_all')
797
        try:
798
            # check if a calc is already/still processing
799
800
801
802
803
804
805
806
            processing = Calc.objects(
                upload_id=self.upload_id,
                **Calc.process_running_mongoengine_query()).count()

            if processing > 0:
                logger.warn(
                    'processes are still/already running on calc, they will be resetted',
                    count=processing)
807

808
            # reset all calcs
809
            Calc._get_collection().update_many(
810
811
812
                dict(upload_id=self.upload_id),
                {'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)})

813
814
            # process call calcs
            Calc.process_all(Calc.re_process_calc, dict(upload_id=self.upload_id), exclude=['metadata'])
815
816

            logger.info('completed to trigger re-process of all calcs')
817
818
        except Exception as e:
            # try to remove the staging copy in failure case
819
820
            logger.error('failed to trigger re-process of all calcs', exc_info=e)

821
822
823
            if self.published:
                if staging_upload_files is not None and staging_upload_files.exists():
                    staging_upload_files.delete()
824
825

            raise e
826
827
828
829

        # the packing and removing of the staging upload files, will be trigged by
        # the 'cleanup' task after processing all calcs

830
831
    @process
    def re_pack(self):
832
        ''' A *process* that repacks the raw and archive data based on the current embargo data. '''
833
834
835
836
837
838
839
840
        assert self.published

        # mock the steps of actual processing
        self._continue_with('uploading')
        self._continue_with('extracting')
        self._continue_with('parse_all')
        self._continue_with('cleanup')

841
        self.upload_files.re_pack(self.user_metadata())
842
        self.joined = True
843
844
        self._complete()

Markus Scheidgen's avatar
Markus Scheidgen committed
845
    @process
846
    def process_upload(self):
847
        ''' A *process* that performs the initial upload processing. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
848
849
850
851
852
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
853
        ''' A no-op *task* as a stand-in for receiving upload data. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
854
855
        pass

856
    @property
857
858
    def upload_files(self) -> UploadFiles:
        upload_files_class = ArchiveBasedStagingUploadFiles if not self.published else PublicUploadFiles
859
        kwargs = dict(upload_path=self.upload_path) if not self.published else {}
860
861
862

        if not self._upload_files or not isinstance(self._upload_files, upload_files_class):
            self._upload_files = upload_files_class(
863
                self.upload_id, is_authorized=lambda: True, **kwargs)
864

865
        return self._upload_files
866

867
868
869
870
871
    @property
    def staging_upload_files(self) -> ArchiveBasedStagingUploadFiles:
        assert not self.published
        return cast(ArchiveBasedStagingUploadFiles, self.upload_files)

Markus Scheidgen's avatar
Markus Scheidgen committed
872
873
    @task
    def extracting(self):
874
        '''
875
876
        The *task* performed before the actual parsing/normalizing: extracting
        the uploaded files.
877
        '''
878
879
880
881
882
        # extract the uploaded file
        self._upload_files = ArchiveBasedStagingUploadFiles(
            upload_id=self.upload_id, is_authorized=lambda: True, create=True,
            upload_path=self.upload_path)

Markus Scheidgen's avatar
Markus Scheidgen committed
883
884
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
885
886
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
887
888
                    upload_size=self.upload_files.size):
                self.upload_files.extract()
889
890
891
892

            if self.temporary:
                os.remove(self.upload_path)
                self.upload_path = None
893

894
        except KeyError:
895
            self.fail('processing requested for non existing upload', log_level=logging.ERROR)
896
897
898
            return
        except ExtractError:
            self.fail('bad .zip/.tar file', log_level=logging.INFO)
Markus Scheidgen's avatar
Markus Scheidgen committed
899
900
            return

901
    def _preprocess_files(self, path):
902
        '''
903
904
        Some files need preprocessing. Currently we need to add a stripped POTCAR version
        and always restrict/embargo the original.
905
        '''
906
        if os.path.basename(path).startswith('POTCAR'):
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
            # create checksum
            hash = hashlib.sha224()
            with open(self.staging_upload_files.raw_file_object(path).os_path, 'rb') as orig_f:
                for line in orig_f.readlines():
                    hash.update(line)

            checksum = hash.hexdigest()

            # created stripped POTCAR