data.py 50.9 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
16
17
18
19
20
21
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
Markus Scheidgen's avatar
Markus Scheidgen committed
22

Markus Scheidgen's avatar
Markus Scheidgen committed
23
.. autoclass:: Upload
Markus Scheidgen's avatar
Markus Scheidgen committed
24

25
'''
26
27

from typing import cast, List, Any, Tuple, Iterator, Dict, cast, Iterable
28
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField
Markus Scheidgen's avatar
Markus Scheidgen committed
29
import logging
30
from structlog import wrap_logger
31
from contextlib import contextmanager
32
import os.path
33
34
from datetime import datetime
from pymongo import UpdateOne
35
import hashlib
36
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
Markus Scheidgen's avatar
Markus Scheidgen committed
37

38
from nomad import utils, config, infrastructure, search, datamodel
39
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles, StagingUploadFiles
40
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
41
from nomad.parsing import parser_dict, match_parser, Backend
Markus Scheidgen's avatar
Markus Scheidgen committed
42
from nomad.normalizing import normalizers
43
44
from nomad.datamodel import EntryArchive
from nomad.archive import query_archive
45
46
47
from nomad.datamodel.encyclopedia import (
    EncyclopediaMetadata,
)
48
from nomad.metainfo import MSection
49
import phonopyparser
Markus Scheidgen's avatar
Markus Scheidgen committed
50
51


52
53
54
55
56
57
58
def _pack_log_event(logger, method_name, event_dict):
    try:
        log_data = dict(event_dict)
        log_data.update(**{
            key: value
            for key, value in getattr(logger, '_context', {}).items()
            if key not in ['service', 'release', 'upload_id', 'calc_id', 'mainfile', 'process_status']})
59
        log_data.update(logger=logger.name)
60
61
62
63
64
65
66
67
68
69
70

        return log_data
    except Exception:
        # raising an exception would cause an indefinite loop
        return event_dict


_log_processors = [
    StackInfoRenderer(),
    _pack_log_event,
    format_exc_info,
71
    TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)]
72
73


Markus Scheidgen's avatar
Markus Scheidgen committed
74
class Calc(Proc):
75
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
76
77
78
79
80
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

81
82
    The attribute list, does not include the various metadata properties generated
    while parsing, including ``code_name``, ``code_version``, etc.
Markus Scheidgen's avatar
Markus Scheidgen committed
83
84

    Attributes:
85
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
86
87
88
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
89

90
91
        metadata: the metadata record wit calc and user metadata, see :class:`datamodel.EntryMetadata`
    '''
92
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
93
94
95
96
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

97
98
    metadata = DictField()

Markus Scheidgen's avatar
Markus Scheidgen committed
99
    meta: Any = {
Markus Scheidgen's avatar
Markus Scheidgen committed
100
        'indexes': [
101
            'upload_id',
102
            'parser',
103
104
105
            ('upload_id', 'mainfile'),
            ('upload_id', 'parser'),
            ('upload_id', 'tasks_status'),
106
            ('upload_id', 'process_status'),
107
            ('upload_id', 'metadata.nomad_version'),
108
109
            'metadata.processed',
            'metadata.last_processing',
110
            'metadata.published',
111
            'metadata.datasets',
Markus Scheidgen's avatar
Markus Scheidgen committed
112
            'metadata.pid'
Markus Scheidgen's avatar
Markus Scheidgen committed
113
114
115
116
117
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
118
        self._parser_backend: Backend = None
119
120
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
121
        self._calc_proc_logs: List[Any] = None
122

123
        self._entry_metadata = None
124

Markus Scheidgen's avatar
Markus Scheidgen committed
125
126
    @classmethod
    def get(cls, id):
127
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
128

Markus Scheidgen's avatar
Markus Scheidgen committed
129
    @property
130
131
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
132

133
134
135
136
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
137
            self._upload.worker_hostname = self.worker_hostname
138
139
        return self._upload

140
141
142
143
144
145
146
147
148
149
150
151
    def apply_entry_metadata(self, entry_metadata: datamodel.EntryMetadata):
        self.metadata = entry_metadata.m_to_dict(
            include_defaults=True,
            categories=[datamodel.MongoMetadata])  # TODO use embedded doc?

    def create_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
        processing object, not necessarely the user metadata nor the metadata from
        the archive.
        '''
        entry_metadata = datamodel.EntryMetadata()
152
153
        if self.parser is not None:
            entry_metadata.domain = parser_dict[self.parser].domain
154
155
156
        entry_metadata.upload_id = self.upload_id
        entry_metadata.calc_id = self.calc_id
        entry_metadata.mainfile = self.mainfile
157
158
        entry_metadata.nomad_version = config.meta.version
        entry_metadata.nomad_commit = config.meta.commit
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
        entry_metadata.uploader = self.upload.user_id
        entry_metadata.upload_time = self.upload.upload_time
        entry_metadata.upload_name = self.upload.name

        return entry_metadata

    def entry_metadata(self, upload_files: UploadFiles) -> datamodel.EntryMetadata:
        '''
        Returns a complete set of :class:`nomad.datamodel.EntryMetadata` including
        the user metadata and metadata from the archive.

        Arguments:
            upload_files:
                The :class:`nomad.files.UploadFiles` instance to read the archive from.
            cache:
                A boolean that indicates if the archive file should be left unclosed,
                e.g. if this method is called for many entries of the same upload.
        '''
        archive = upload_files.read_archive(self.calc_id)
178
179
180
181
182
183
184
185
186
187
188
189
        try:
            entry_metadata = datamodel.EntryMetadata.m_from_dict(
                archive[self.calc_id][datamodel.EntryArchive.section_metadata.name].to_dict())

        except KeyError:
            # Due hard processing failures, it might be possible that an entry might not
            # have an archive
            if self._entry_metadata is not None:
                entry_metadata = self._entry_metadata

            else:
                entry_metadata = self.create_metadata()
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205

        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

    def user_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
        processing object and the user metadata, not necessarely the metadata from
        the archive.
        '''
        entry_metadata = self.create_metadata()
        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

206
207
208
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
209
210
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
211
212
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
213
    def get_logger(self, **kwargs):
214
        '''
215
216
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
217
        '''
218
219
220
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
221

222
223
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []
224

225
226
        def save_to_calc_log(logger, method_name, event_dict):
            try:
227
228
                # sanitize the event_dict, because all kinds of values might have been added
                dump_dict = {key: str(value) for key, value in event_dict.items()}
229
230
                dump_dict.update(level=method_name.upper())
                self._calc_proc_logs.append(dump_dict)
231

232
233
234
            except Exception:
                # Exceptions here will cause indefinite loop
                pass
235

236
            return event_dict
237

238
        return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
239

240
    @process
241
    def re_process_calc(self):
242
        '''
243
244
245
        Processes a calculation again. This means there is already metadata and
        instead of creating it initially, we are just updating the existing
        records.
246
        '''
247
        parser = match_parser(self.upload_files.raw_file_object(self.mainfile).os_path, strict=False)
248
        logger = self.get_logger()
249
250
251

        if parser is None and not config.reprocess_unmatched:
            self.errors = ['no parser matches during re-process, will not re-process this calc']
252

253
254
255
256
257
258
259
            try:
                upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
                with upload_files.read_archive(self.calc_id) as archive:
                    self.upload_files.write_archive(self.calc_id, archive[self.calc_id].to_dict())

            except Exception as e:
                logger.error('could not copy archive for non matching, non reprocessed entry', exc_info=e)
260
                raise e
261

262
263
264
265
266
            # mock the steps of actual processing
            self._continue_with('parsing')
            self._continue_with('normalizing')
            self._continue_with('archiving')
            self._complete()
267
268
            return

269
        if parser is None:
270
271
272
273
            self.get_logger().warn('no parser matches during re-process, use the old parser')
            self.warnings = ['no matching parser found during re-processing']

        elif self.parser != parser.name:
274
275
276
277
278
279
280
281
282
            if parser_dict[self.parser].name == parser.name:
                # parser was just renamed
                self.parser = parser.name

            else:
                self.parser = parser.name
                logger.info(
                    'different parser matches during re-process, use new parser',
                    parser=parser.name)
283

284
        try:
285
286
            self._entry_metadata = self.user_metadata()
            self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
287
288
            self._entry_metadata.nomad_version = config.meta.version
            self._entry_metadata.nomad_commit = config.meta.commit
289
290
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)
291
292
293
294
295
296
297

            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
            # close loghandler that was not closed due to failures
            try:
298
299
                if self._parser_backend and self._parser_backend.resource:
                    self._parser_backend.resource.unload()
300
            except Exception as e:
301
                logger.error('could unload processing results', exc_info=e)
302

303
304
305
306
307
308
    def _setup_fallback_metadata(self):
        self._entry_metadata = self.create_metadata()
        self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
        self._entry_metadata.last_processing = datetime.utcnow()
        self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)

309
    @process
310
    def process_calc(self):
311
        '''
312
313
        Processes a new calculation that has no prior records in the mongo, elastic,
        or filesystem storage. It will create an initial set of (user) metadata.
314
        '''
315
        logger = self.get_logger()
316
        if self.upload is None:
317
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
318
319

        try:
320
321
            # save preliminary minimum calc metadata in case processing fails
            # successful processing will replace it with the actual metadata
322
            self._setup_fallback_metadata()
323
324

            if len(self._entry_metadata.files) >= config.auxfile_cutoff:
325
326
327
328
                self.warning(
                    'This calc has many aux files in its directory. '
                    'Have you placed many calculations in the same directory?')

Markus Scheidgen's avatar
Markus Scheidgen committed
329
330
331
332
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
333
            # close loghandler that was not closed due to failures
334
            try:
335
336
                if self._parser_backend and self._parser_backend.resource:
                    self._parser_backend.resource.unload()
337
            except Exception as e:
338
                logger.error('could unload processing results', exc_info=e)
339

340
    def on_fail(self):
341
342
343
        # in case of failure, index a minimum set of metadata and mark
        # processing failure
        try:
344
345
346
            if self._entry_metadata is None:
                self._setup_fallback_metadata()

347
            self._entry_metadata.processed = False
348

349
            self.apply_entry_metadata(self._entry_metadata)
350
351
352
353
354
            if self._parser_backend and self._parser_backend.resource:
                backend = self._parser_backend
            else:
                backend = None
            self._entry_metadata.apply_domain_metadata(backend)
355

356
            self._entry_metadata.a_elastic.index()
357
        except Exception as e:
358
359
            self.get_logger().error(
                'could not index after processing failure', exc_info=e)
360

361
        try:
362
            self.write_archive(None)
363
        except Exception as e:
364
365
            self.get_logger().error(
                'could not write archive after processing failure', exc_info=e)
366

367
368
369
370
371
372
373
374
    def on_process_complete(self, process_name):
        # the save might be necessary to correctly read the join condition from the db
        self.save()
        # in case of error, the process_name might be unknown
        if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None:
            self.upload.reload()
            self.upload.check_join()

Markus Scheidgen's avatar
Markus Scheidgen committed
375
376
    @task
    def parsing(self):
377
        ''' The *task* that encapsulates all parsing related actions. '''
378
        context = dict(parser=self.parser, step=self.parser)
379
        logger = self.get_logger(**context)
380
        parser = parser_dict[self.parser]
381
        self._entry_metadata.parser_name = self.parser
Markus Scheidgen's avatar
Markus Scheidgen committed
382

383
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
384
385
386
            try:
                self._parser_backend = parser.run(
                    self.upload_files.raw_file_object(self.mainfile).os_path, logger=logger)
387
388
389

                if isinstance(self._parser_backend, MSection):
                    backend = Backend(parser._metainfo_env, parser.domain)
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
390
391
                    root_section = datamodel.domains[parser.domain]['root_section']
                    setattr(backend.entry_archive, root_section, self._parser_backend)
392
393
394
                    backend.resource.add(self._parser_backend)
                    self._parser_backend = backend

395
            except Exception as e:
396
                self.fail('parser failed with exception', exc_info=e, error=str(e), **context)
397
                return
398
            except SystemExit:
399
                self.fail('parser raised system exit', error='system exit', **context)
400
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
401

Markus Scheidgen's avatar
Markus Scheidgen committed
402
403
        if self._parser_backend.status[0] != 'ParseSuccess':
            error = self._parser_backend.status[1]
404
            self.fail('parser failed', error=error, **context)
405

406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
    def process_phonon(self):
        """Function that is run for phonon calculation before cleanup.
        This task is run by the celery process that is calling the join for the
        upload.

        This function re-opens the Archive for this calculation to add method
        information from another referenced archive. Updates the method
        information in section_encyclopedia as well as the DFT domain metadata.
        """
        try:
            # Re-create a backend
            context = dict(parser=self.parser, step=self.parser)
            logger = self.get_logger(**context)
            metainfo = phonopyparser.metainfo.m_env
            backend = Backend(metainfo, logger=logger, domain="dft")

            # Open the archive of the phonon calculation.
            upload_files = StagingUploadFiles(self.upload_id, is_authorized=lambda: True)
            with upload_files.read_archive(self.calc_id) as archive:
                arch = query_archive(archive, {self.calc_id: self.calc_id})[self.calc_id]
                phonon_archive = EntryArchive.m_from_dict(arch)
427
428
429
430
431
432

            # Save Archive contents, metadata and logs from the old entry
            backend.entry_archive = phonon_archive
            self._parser_backend = backend
            self._entry_metadata = backend.entry_archive.section_metadata
            self._calc_proc_logs = phonon_archive.processing_logs
433
434
435
436
437

            # Read in the first referenced calculation. The reference is given as
            # an absolute path which needs to be converted into a path that is
            # relative to upload root.
            scc = backend.entry_archive.section_run[0].section_single_configuration_calculation[0]
438
            relative_ref = scc.section_calculation_to_calculation_refs[0].calculation_to_calculation_external_url
439
440
441
442
443
444
            ref_id = upload_files.calc_id(relative_ref)
            with upload_files.read_archive(ref_id) as archive:
                arch = query_archive(archive, {ref_id: ref_id})[ref_id]
                ref_archive = EntryArchive.m_from_dict(arch)

            # Get encyclopedia method information directly from the referenced calculation.
445
            ref_enc_method = ref_archive.section_metadata.encyclopedia.method
446
447
            if ref_enc_method is None or len(ref_enc_method) == 0:
                raise ValueError("No method information available in referenced calculation.")
448
            backend.entry_archive.section_metadata.encyclopedia.method = ref_enc_method
449
450
451
452
453
454
455
456

            # Overwrite old entry with new data. The metadata is updated with
            # new timestamp and method details taken from the referenced
            # archive.
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.dft.xc_functional = ref_archive.section_metadata.dft.xc_functional
            self._entry_metadata.dft.basis_set = ref_archive.section_metadata.dft.basis_set
            self._entry_metadata.dft.update_group_hash()
457
458
459
460
        except Exception as e:
            logger.error("Could not retrieve method information for phonon calculation.", exception=e)
            self._entry_metadata.encyclopedia.status = EncyclopediaMetadata.status.type.failure
        finally:
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
            # persist the calc metadata
            with utils.timer(logger, 'saved calc metadata', step='metadata'):
                self.apply_entry_metadata(self._entry_metadata)

            # index in search
            with utils.timer(logger, 'indexed', step='index'):
                self._entry_metadata.a_elastic.index()

            # persist the archive
            with utils.timer(
                    logger, 'archived', step='archive',
                    input_size=self.mainfile_file.size) as log_data:

                archive_size = self.write_archive(self._parser_backend)
                log_data.update(archive_size=archive_size)

477
478
479
480
481
482
483
    @contextmanager
    def use_parser_backend(self, processor_name):
        self._parser_backend.reset_status()
        yield self._parser_backend

        if self._parser_backend.status[0] == 'ParseSuccess':
            warnings = getattr(self._parser_backend, '_warnings', [])
484

485
            if len(warnings) > 0:
486
487
488
489
                self.get_logger().warn(
                    'processor completed successful with warnings',
                    processor=processor_name, warnings=[str(warning) for warning in warnings])

490
            else:
491
492
493
494
                self.get_logger().info(
                    'processor completed successful',
                    processor=processor_name)

495
        else:
496
            errors = self._parser_backend.status[1]
497
498
499
            self.get_logger().error(
                'processor completed with failure',
                processor=processor_name, errors=str(errors))
Markus Scheidgen's avatar
Markus Scheidgen committed
500
501
502

    @task
    def normalizing(self):
503
        ''' The *task* that encapsulates all normalizing related actions. '''
504
505
506
507
508

        # allow normalizer to access and add data to the entry metadata
        self._parser_backend.entry_archive.m_add_sub_section(
            datamodel.EntryArchive.section_metadata, self._entry_metadata)

Markus Scheidgen's avatar
Markus Scheidgen committed
509
        for normalizer in normalizers:
510
            if normalizer.domain != parser_dict[self.parser].domain:
511
512
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
513
            normalizer_name = normalizer.__name__
514
            context = dict(normalizer=normalizer_name, step=normalizer_name)
515
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
516
517

            with utils.timer(
518
                    logger, 'normalizer executed', input_size=self.mainfile_file.size):
519
                with self.use_parser_backend(normalizer_name) as backend:
520
521
522
                    try:
                        normalizer(backend).normalize(logger=logger)
                    except Exception as e:
523
                        self._parser_backend.finishedParsingSession('ParseFailure', [str(e)])
524
525
526
                        logger.error(
                            'normalizer failed with exception', exc_info=e,
                            error=str(e), **context)
527
528
529
530
531
532
533
534
535
                        break
                    else:
                        if self._parser_backend.status[0] != 'ParseSuccess':
                            error = self._parser_backend.status[1]
                            self.fail('normalizer failed', error=error, **context)
                            break
                        else:
                            logger.debug(
                                'completed normalizer successfully', normalizer=normalizer_name)
Markus Scheidgen's avatar
Markus Scheidgen committed
536
537
538

    @task
    def archiving(self):
539
        ''' The *task* that encapsulates all archival related actions. '''
540
541
        logger = self.get_logger()

542
543
        self._entry_metadata.apply_domain_metadata(self._parser_backend)
        self._entry_metadata.processed = True
544

545
546
        # persist the calc metadata
        with utils.timer(logger, 'saved calc metadata', step='metadata'):
547
            self.apply_entry_metadata(self._entry_metadata)
548
549

        # index in search
550
        with utils.timer(logger, 'indexed', step='index'):
551
            self._entry_metadata.a_elastic.index()
552

553
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
554
        with utils.timer(
555
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
556
                input_size=self.mainfile_file.size) as log_data:
557

558
            archive_size = self.write_archive(self._parser_backend)
559
            log_data.update(archive_size=archive_size)
Markus Scheidgen's avatar
Markus Scheidgen committed
560

561
    def write_archive(self, backend: Backend):
562
563
564
565
566
567
        def filter_processing_logs(logs):
            if len(logs) > 100:
                return [
                    log for log in logs
                    if log.get('level') != 'DEBUG']
            return logs
Markus Scheidgen's avatar
Markus Scheidgen committed
568

569
570
571
572
573
574
575
576
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []

        if backend is not None:
            entry_archive = backend.entry_archive.m_copy()
        else:
            entry_archive = datamodel.EntryArchive()

577
578
579
        if entry_archive.section_metadata is None:
            entry_archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)

580
        entry_archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
581
582
583
584
585
586
587
588
589
590

        try:
            return self.upload_files.write_archive(self.calc_id, entry_archive.m_to_dict())
        except Exception as e:
            if backend is None:
                raise e

            # most likely failed due to domain data, try to write metadata and processing logs
            entry_archive = datamodel.EntryArchive()
            entry_archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
591
            entry_archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
592
593
            self.upload_files.write_archive(self.calc_id, entry_archive.m_to_dict())
            raise e
Markus Scheidgen's avatar
Markus Scheidgen committed
594

595
    def __str__(self):
596
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
597

598

599
class Upload(Proc):
600
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
601
602
603
604
605
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
606
607
        upload_path: the path were the uploaded files was stored
        temporary: True if the uploaded file should be removed after extraction
Markus Scheidgen's avatar
Markus Scheidgen committed
608
609
        upload_id: the upload id generated by the database
        upload_time: the timestamp when the system realised the upload
610
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
611
612
        published: Boolean that indicates the publish status
        publish_time: Date when the upload was initially published
613
        last_update: Date of the last publishing/re-processing
614
        joined: Boolean indicates if the running processing has joined (:func:`check_join`)
615
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
616
617
618
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
619
620
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
621
    embargo_length = IntField(default=36)
Markus Scheidgen's avatar
Markus Scheidgen committed
622
623
624

    name = StringField(default=None)
    upload_time = DateTimeField()
625
    user_id = StringField(required=True)
626
627
    published = BooleanField(default=False)
    publish_time = DateTimeField()
628
    last_update = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
629

630
631
    joined = BooleanField(default=False)

Markus Scheidgen's avatar
Markus Scheidgen committed
632
633
    meta: Any = {
        'indexes': [
634
            'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
635
        ]
Markus Scheidgen's avatar
Markus Scheidgen committed
636
637
638
639
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
640
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
641

642
643
    @property
    def metadata(self) -> dict:
644
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
645
646
647
648
649
        Getter, setter for user metadata. Metadata is pickled to and from the public
        bucket to allow sharing among all processes. Usually uploads do not have (much)
        user defined metadata, but users provide all metadata per upload as part of
        the publish process. This will change, when we introduce editing functionality
        and metadata will be provided through different means.
650
        '''
651
652
653
654
655
        try:
            upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
        except KeyError:
            return None
        return upload_files.user_metadata
656
657
658

    @metadata.setter
    def metadata(self, data: dict) -> None:
659
660
        upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True, create=True)
        upload_files.user_metadata = data
661

Markus Scheidgen's avatar
Markus Scheidgen committed
662
    @classmethod
663
    def get(cls, id: str, include_published: bool = True) -> 'Upload':
664
        return cls.get_by_id(id, 'upload_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
665
666

    @classmethod
667
    def user_uploads(cls, user: datamodel.User, **kwargs) -> List['Upload']:
668
        ''' Returns all uploads for the given user. Kwargs are passed to mongo query. '''
669
        return cls.objects(user_id=str(user.user_id), **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
670

671
672
    @property
    def uploader(self):
673
        return datamodel.User.get(self.user_id)
674

Markus Scheidgen's avatar
Markus Scheidgen committed
675
676
    def get_logger(self, **kwargs):
        logger = super().get_logger()
677
678
        user = self.uploader
        user_name = '%s %s' % (user.first_name, user.last_name)
679
        # We are not using 'user_id' because logstash (?) will filter these entries ?!
680
681
        logger = logger.bind(
            upload_id=self.upload_id, upload_name=self.name, user_name=user_name,
682
            user=user.user_id, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
683
684
685
686
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
687
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
688
689
690
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
691
692

        Arguments:
693
            user: The user that created the upload.
694
        '''
695
696
        # use kwargs to keep compatibility with super method
        user: datamodel.User = kwargs['user']
697
        del(kwargs['user'])
698

699
700
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
701
        kwargs.update(user_id=user.user_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
702
        self = super().create(**kwargs)
703

Markus Scheidgen's avatar
Markus Scheidgen committed
704
        self._continue_with('uploading')
705

Markus Scheidgen's avatar
Markus Scheidgen committed
706
707
        return self

708
    def delete(self):
709
        ''' Deletes this upload process state entry and its calcs. '''
710
711
712
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

713
    def delete_upload_local(self):
714
        '''
715
        Deletes the upload, including its processing state and
716
        staging files. Local version without celery processing.
717
        '''
718
719
        logger = self.get_logger()

720
        with utils.lnr(logger, 'upload delete failed'):
721
            with utils.timer(
722
                    logger, 'upload deleted from index', step='index',
723
                    upload_size=self.upload_files.size):
724
                search.delete_upload(self.upload_id)
725

726
            with utils.timer(
727
                    logger, 'upload deleted', step='files',
728
729
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
730
731

            self.delete()
732

733
    @process
734
    def delete_upload(self):
735
        '''
736
737
        Deletes of the upload, including its processing state and
        staging files. This starts the celery process of deleting the upload.
738
        '''
739
        self.delete_upload_local()
740

741
        return True  # do not save the process status on the delete upload
742

743
    @process
744
    def publish_upload(self):
745
        '''
746
747
        Moves the upload out of staging to the public area. It will
        pack the staging upload files in to public upload files.
748
        '''
749
750
        assert self.processed_calcs > 0

751
        logger = self.get_logger()
752
        logger.info('started to publish')
753

754
        with utils.lnr(logger, 'publish failed'):
755
            with self.entries_metadata(self.metadata) as calcs:
756

757
                with utils.timer(
758
                        logger, 'upload metadata updated', step='metadata',
759
                        upload_size=self.upload_files.size):
760

761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
                    def create_update(calc):
                        calc.published = True
                        calc.with_embargo = calc.with_embargo if calc.with_embargo is not None else False
                        return UpdateOne(
                            {'_id': calc.calc_id},
                            {'$set': {'metadata': calc.m_to_dict(
                                include_defaults=True, categories=[datamodel.MongoMetadata])}})

                    Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload files packed', step='pack',
                            upload_size=self.upload_files.size):
                        self.upload_files.pack(calcs)
776

777
                with utils.timer(
778
                        logger, 'index updated', step='index',
779
                        upload_size=self.upload_files.size):
780
781
782
783
784
785
786
787
788
789
790
791
                    search.publish(calcs)

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload deleted', step='delete staged',
                            upload_size=self.upload_files.size):
                        self.upload_files.delete()
                        self.published = True
                        self.publish_time = datetime.utcnow()
                        self.last_update = datetime.utcnow()
                        self.save()
                else:
792
                    self.last_update = datetime.utcnow()
793
                    self.save()
794

795
796
    @process
    def re_process_upload(self):
797
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
798
799
800
        A *process* that performs the re-processing of a earlier processed
        upload.

801
802
803
804
805
806
        Runs the distributed process of fully reparsing/renormalizing an existing and
        already published upload. Will renew the archive part of the upload and update
        mongo and elastic search entries.

        TODO this implementation does not do any re-matching. This will be more complex
        due to handling of new or missing matches.
807
        '''
808
809
810
        logger = self.get_logger()
        logger.info('started to re-process')

811
812
813
814
815
        # mock the steps of actual processing
        self._continue_with('uploading')

        # extract the published raw files into a staging upload files instance
        self._continue_with('extracting')
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830

        if self.published:
            try:
                staging_upload_files = StagingUploadFiles(self.upload_id)
                # public files exist and there is a staging directory, it is probably old
                # and we delete it first
                staging_upload_files.delete()
                logger.warn('deleted old staging files')

            except KeyError as e:
                logger.info('reprocessing published files')
        else:
            logger.info('reprocessing staging files')

        staging_upload_files = self.upload_files.to_staging_upload_files(create=True)
831
832

        self._continue_with('parse_all')
833
        try:
834
            # check if a calc is already/still processing
835
836
837
838
839
840
841
842
            processing = Calc.objects(
                upload_id=self.upload_id,
                **Calc.process_running_mongoengine_query()).count()

            if processing > 0:
                logger.warn(
                    'processes are still/already running on calc, they will be resetted',
                    count=processing)
843

844
            # reset all calcs
845
            Calc._get_collection().update_many(
846
847
848
                dict(upload_id=self.upload_id),
                {'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)})

849
850
            # process call calcs
            Calc.process_all(Calc.re_process_calc, dict(upload_id=self.upload_id), exclude=['metadata'])
851
852

            logger.info('completed to trigger re-process of all calcs')
853
854
        except Exception as e:
            # try to remove the staging copy in failure case
855
856
            logger.error('failed to trigger re-process of all calcs', exc_info=e)

857
858
859
            if self.published:
                if staging_upload_files is not None and staging_upload_files.exists():
                    staging_upload_files.delete()
860
861

            raise e
862
863
864
865

        # the packing and removing of the staging upload files, will be trigged by
        # the 'cleanup' task after processing all calcs

866
867
    @process
    def re_pack(self):
868
        ''' A *process* that repacks the raw and archive data based on the current embargo data. '''
869
870
871
872
873
874
875
876
        assert self.published

        # mock the steps of actual processing
        self._continue_with('uploading')
        self._continue_with('extracting')
        self._continue_with('parse_all')
        self._continue_with('cleanup')

877
        self.upload_files.re_pack(self.user_metadata())
878
        self.joined = True
879
880
        self._complete()

Markus Scheidgen's avatar
Markus Scheidgen committed
881
    @process
882
    def process_upload(self):
883
        ''' A *process* that performs the initial upload processing. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
884
885
886
887
888
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
889
        ''' A no-op *task* as a stand-in for receiving upload data. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
890
891
        pass

892
    @property
893
894
    def upload_files(self) -> UploadFiles:
        upload_files_class = ArchiveBasedStagingUploadFiles if not self.published else PublicUploadFiles
895
        kwargs = dict(upload_path=self.upload_path) if not self.published else {}
896
897
898

        if not self._upload_files or not isinstance(self._upload_files, upload_files_class):
            self._upload_files = upload_files_class(
899
                self.upload_id, is_authorized=lambda: True, **kwargs)
900

901
        return self._upload_files
902

903
904
905
906
907
    @property
    def staging_upload_files(self) -> ArchiveBasedStagingUploadFiles:
        assert not self.published
        return cast(ArchiveBasedStagingUploadFiles, self.upload_files)

Markus Scheidgen's avatar
Markus Scheidgen committed
908
909
    @task
    def extracting(self):
910
        '''
911
912
        The *task* performed before the actual parsing/normalizing: extracting
        the uploaded files.
913
        '''
914
915
916
917
918
        # extract the uploaded file
        self._upload_files = ArchiveBasedStagingUploadFiles(
            upload_id=self.upload_id, is_authorized=lambda: True, create=True,
            upload_path=self.upload_path)

Markus Scheidgen's avatar
Markus Scheidgen committed
919
920
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
921
922
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
923
924
                    upload_size=self.upload_files.size):
                self.upload_files.extract()
925
926
927
928

            if self.temporary:
                os.remove(self.upload_path)
                self.upload_path = None
929

930
        except KeyError:
931
            self.fail('processing requested for non existing upload', log_level=logging.ERROR)
932
933
934
            return
        except ExtractError:
            self.fail('bad .zip/.tar file', log_level=logging.INFO)
Markus Scheidgen's avatar
Markus Scheidgen committed
935
936
            return

937
    def _preprocess_files(self, path):
938
        '''
939
940
        Some files need preprocessing. Currently we need to add a stripped POTCAR version
        and always restrict/embargo the original.
941
        '''
942
        if os.path.basename(path).startswith('POTCAR'):
943
944
945
946
947
948
949