data.py 51.4 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
16
17
18
19
20
21
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
Markus Scheidgen's avatar
Markus Scheidgen committed
22

Markus Scheidgen's avatar
Markus Scheidgen committed
23
.. autoclass:: Upload
Markus Scheidgen's avatar
Markus Scheidgen committed
24

25
'''
26
27

from typing import cast, List, Any, Tuple, Iterator, Dict, cast, Iterable
28
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField
Markus Scheidgen's avatar
Markus Scheidgen committed
29
import logging
30
from structlog import wrap_logger
31
from contextlib import contextmanager
32
import os.path
33
34
from datetime import datetime
from pymongo import UpdateOne
35
import hashlib
36
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
37
38
39
import yaml
import json
from cachetools import cached, LRUCache
Markus Scheidgen's avatar
Markus Scheidgen committed
40

41
from nomad import utils, config, infrastructure, search, datamodel
Markus Scheidgen's avatar
Markus Scheidgen committed
42
43
44
from nomad.files import (
    PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles,
    PublicUploadFiles, StagingUploadFiles)
45
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
Markus Scheidgen's avatar
Markus Scheidgen committed
46
from nomad.parsing.parsers import parser_dict, match_parser
Markus Scheidgen's avatar
Markus Scheidgen committed
47
from nomad.normalizing import normalizers
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
48
from nomad.datamodel import EntryArchive, EditableUserMetadata
49
from nomad.archive import query_archive
Markus Scheidgen's avatar
Markus Scheidgen committed
50
from nomad.datamodel.encyclopedia import EncyclopediaMetadata
Markus Scheidgen's avatar
Markus Scheidgen committed
51
52


53
54
55
56
section_metadata = datamodel.EntryArchive.section_metadata.name
section_workflow = datamodel.EntryArchive.section_workflow.name


Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
_editable_metadata = {
    quantity.name: quantity for quantity in EditableUserMetadata.m_def.definitions}


@cached(cache=LRUCache(maxsize=100))
def metadata_cached(path):
    for ext in config.aux_metadata_exts:
        full_path = '%s.%s' % (path, ext)
        if os.path.isfile(full_path):
            with open(full_path) as f:
                if full_path.endswith('json'):
                    return json.load(f)
                elif full_path.endswith('yaml'):
                    return yaml.load(f, Loader=getattr(yaml, 'FullLoader'))
                else:
                    return {}
    return {}


76
77
78
79
80
81
82
def _pack_log_event(logger, method_name, event_dict):
    try:
        log_data = dict(event_dict)
        log_data.update(**{
            key: value
            for key, value in getattr(logger, '_context', {}).items()
            if key not in ['service', 'release', 'upload_id', 'calc_id', 'mainfile', 'process_status']})
83
        log_data.update(logger=logger.name)
84
85
86
87
88
89
90
91
92
93
94

        return log_data
    except Exception:
        # raising an exception would cause an indefinite loop
        return event_dict


_log_processors = [
    StackInfoRenderer(),
    _pack_log_event,
    format_exc_info,
95
    TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)]
96
97


Markus Scheidgen's avatar
Markus Scheidgen committed
98
class Calc(Proc):
99
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
100
101
102
103
104
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

105
106
    The attribute list, does not include the various metadata properties generated
    while parsing, including ``code_name``, ``code_version``, etc.
Markus Scheidgen's avatar
Markus Scheidgen committed
107
108

    Attributes:
109
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
110
111
112
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
113

114
115
        metadata: the metadata record wit calc and user metadata, see :class:`datamodel.EntryMetadata`
    '''
116
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
117
118
119
120
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

121
122
    metadata = DictField()

Markus Scheidgen's avatar
Markus Scheidgen committed
123
    meta: Any = {
Markus Scheidgen's avatar
Markus Scheidgen committed
124
        'indexes': [
125
            'upload_id',
126
            'parser',
127
128
129
            ('upload_id', 'mainfile'),
            ('upload_id', 'parser'),
            ('upload_id', 'tasks_status'),
130
            ('upload_id', 'process_status'),
131
            ('upload_id', 'metadata.nomad_version'),
132
133
            'metadata.processed',
            'metadata.last_processing',
134
            'metadata.published',
135
            'metadata.datasets',
Markus Scheidgen's avatar
Markus Scheidgen committed
136
            'metadata.pid'
Markus Scheidgen's avatar
Markus Scheidgen committed
137
138
139
140
141
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
142
        self._parser_results: EntryArchive = None
143
144
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
145
        self._calc_proc_logs: List[Any] = None
146

147
        self._entry_metadata = None
148

Markus Scheidgen's avatar
Markus Scheidgen committed
149
150
    @classmethod
    def get(cls, id):
151
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
152

Markus Scheidgen's avatar
Markus Scheidgen committed
153
    @property
154
155
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
156

157
158
159
160
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
161
            self._upload.worker_hostname = self.worker_hostname
162
163
        return self._upload

164
165
166
167
168
169
170
171
172
173
174
175
    def apply_entry_metadata(self, entry_metadata: datamodel.EntryMetadata):
        self.metadata = entry_metadata.m_to_dict(
            include_defaults=True,
            categories=[datamodel.MongoMetadata])  # TODO use embedded doc?

    def create_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
        processing object, not necessarely the user metadata nor the metadata from
        the archive.
        '''
        entry_metadata = datamodel.EntryMetadata()
176
177
        if self.parser is not None:
            entry_metadata.domain = parser_dict[self.parser].domain
178
179
180
        entry_metadata.upload_id = self.upload_id
        entry_metadata.calc_id = self.calc_id
        entry_metadata.mainfile = self.mainfile
181
182
        entry_metadata.nomad_version = config.meta.version
        entry_metadata.nomad_commit = config.meta.commit
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
        entry_metadata.uploader = self.upload.user_id
        entry_metadata.upload_time = self.upload.upload_time
        entry_metadata.upload_name = self.upload.name

        return entry_metadata

    def entry_metadata(self, upload_files: UploadFiles) -> datamodel.EntryMetadata:
        '''
        Returns a complete set of :class:`nomad.datamodel.EntryMetadata` including
        the user metadata and metadata from the archive.

        Arguments:
            upload_files:
                The :class:`nomad.files.UploadFiles` instance to read the archive from.
            cache:
                A boolean that indicates if the archive file should be left unclosed,
                e.g. if this method is called for many entries of the same upload.
        '''
        archive = upload_files.read_archive(self.calc_id)
202
        try:
203
204
205
206
207
208
209
210
211
            # instead of loading the whole archive, it should be enough to load the
            # parts that are referenced by section_metadata/EntryMetadata
            # TODO somehow it should determine which root setions too load from the metainfo
            # or configuration
            calc_archive = archive[self.calc_id]
            entry_archive_dict = {section_metadata: calc_archive[section_metadata].to_dict()}
            if section_workflow in calc_archive:
                entry_archive_dict[section_workflow] = calc_archive[section_workflow].to_dict()
            entry_metadata = datamodel.EntryArchive.m_from_dict(entry_archive_dict)[section_metadata]
212
213
214
215
216
217
218
219
220

        except KeyError:
            # Due hard processing failures, it might be possible that an entry might not
            # have an archive
            if self._entry_metadata is not None:
                entry_metadata = self._entry_metadata

            else:
                entry_metadata = self.create_metadata()
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236

        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

    def user_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
        processing object and the user metadata, not necessarely the metadata from
        the archive.
        '''
        entry_metadata = self.create_metadata()
        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

237
238
239
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
240
241
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
242
243
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
244
    def get_logger(self, **kwargs):
245
        '''
246
247
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
248
        '''
249
250
251
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
252

253
254
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []
255

256
257
        def save_to_calc_log(logger, method_name, event_dict):
            try:
258
259
                # sanitize the event_dict, because all kinds of values might have been added
                dump_dict = {key: str(value) for key, value in event_dict.items()}
260
261
                dump_dict.update(level=method_name.upper())
                self._calc_proc_logs.append(dump_dict)
262

263
264
265
            except Exception:
                # Exceptions here will cause indefinite loop
                pass
266

267
            return event_dict
268

269
        return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
270

271
    @process
272
    def re_process_calc(self):
273
        '''
274
275
276
        Processes a calculation again. This means there is already metadata and
        instead of creating it initially, we are just updating the existing
        records.
277
        '''
278
        parser = match_parser(self.upload_files.raw_file_object(self.mainfile).os_path, strict=False)
279
        logger = self.get_logger()
280
281
282

        if parser is None and not config.reprocess_unmatched:
            self.errors = ['no parser matches during re-process, will not re-process this calc']
283

284
285
286
287
288
289
290
            try:
                upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
                with upload_files.read_archive(self.calc_id) as archive:
                    self.upload_files.write_archive(self.calc_id, archive[self.calc_id].to_dict())

            except Exception as e:
                logger.error('could not copy archive for non matching, non reprocessed entry', exc_info=e)
291
                raise e
292

293
294
295
296
297
            # mock the steps of actual processing
            self._continue_with('parsing')
            self._continue_with('normalizing')
            self._continue_with('archiving')
            self._complete()
298
299
            return

300
        if parser is None:
301
302
303
304
            self.get_logger().warn('no parser matches during re-process, use the old parser')
            self.warnings = ['no matching parser found during re-processing']

        elif self.parser != parser.name:
305
306
307
308
309
310
311
312
313
            if parser_dict[self.parser].name == parser.name:
                # parser was just renamed
                self.parser = parser.name

            else:
                self.parser = parser.name
                logger.info(
                    'different parser matches during re-process, use new parser',
                    parser=parser.name)
314

315
        try:
316
317
            self._entry_metadata = self.user_metadata()
            self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
318
319
            self._entry_metadata.nomad_version = config.meta.version
            self._entry_metadata.nomad_commit = config.meta.commit
320
321
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)
322
323
324
325
326
327
328

            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
            # close loghandler that was not closed due to failures
            try:
Markus Scheidgen's avatar
Markus Scheidgen committed
329
330
                if self._parser_results and self._parser_results.m_resource:
                    self._parser_results.m_resource.unload()
331
            except Exception as e:
332
                logger.error('could unload processing results', exc_info=e)
333

334
335
336
337
338
339
    def _setup_fallback_metadata(self):
        self._entry_metadata = self.create_metadata()
        self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
        self._entry_metadata.last_processing = datetime.utcnow()
        self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)

340
    @process
341
    def process_calc(self):
342
        '''
343
344
        Processes a new calculation that has no prior records in the mongo, elastic,
        or filesystem storage. It will create an initial set of (user) metadata.
345
        '''
346
        logger = self.get_logger()
347
        if self.upload is None:
348
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
349
350

        try:
351
352
            # save preliminary minimum calc metadata in case processing fails
            # successful processing will replace it with the actual metadata
353
            self._setup_fallback_metadata()
354
355

            if len(self._entry_metadata.files) >= config.auxfile_cutoff:
356
357
358
359
                self.warning(
                    'This calc has many aux files in its directory. '
                    'Have you placed many calculations in the same directory?')

Markus Scheidgen's avatar
Markus Scheidgen committed
360
361
362
363
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
364
            # close loghandler that was not closed due to failures
365
            try:
Markus Scheidgen's avatar
Markus Scheidgen committed
366
367
                if self._parser_results and self._parser_results.m_resource:
                    self._parser_results.m_resource.unload()
368
            except Exception as e:
369
                logger.error('could unload processing results', exc_info=e)
370

371
    def on_fail(self):
372
373
374
        # in case of failure, index a minimum set of metadata and mark
        # processing failure
        try:
375
376
377
            if self._entry_metadata is None:
                self._setup_fallback_metadata()

378
            self._entry_metadata.processed = False
379

380
            self.apply_entry_metadata(self._entry_metadata)
Markus Scheidgen's avatar
Markus Scheidgen committed
381
382
383
            self._entry_metadata.apply_domain_metadata(self._parser_results)
            if self._parser_results and self._parser_results.m_resource:
                self._parser_results.m_resource.unload()
384

385
            self._entry_metadata.a_elastic.index()
386
        except Exception as e:
387
388
            self.get_logger().error(
                'could not index after processing failure', exc_info=e)
389

390
        try:
391
            self.write_archive(None)
392
        except Exception as e:
393
394
            self.get_logger().error(
                'could not write archive after processing failure', exc_info=e)
395

396
397
398
399
400
401
402
403
    def on_process_complete(self, process_name):
        # the save might be necessary to correctly read the join condition from the db
        self.save()
        # in case of error, the process_name might be unknown
        if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None:
            self.upload.reload()
            self.upload.check_join()

Markus Scheidgen's avatar
Markus Scheidgen committed
404
405
    @task
    def parsing(self):
406
        ''' The *task* that encapsulates all parsing related actions. '''
407
        context = dict(parser=self.parser, step=self.parser)
408
        logger = self.get_logger(**context)
409
        parser = parser_dict[self.parser]
410
        self._entry_metadata.parser_name = self.parser
Markus Scheidgen's avatar
Markus Scheidgen committed
411

412
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
413
            try:
Markus Scheidgen's avatar
Markus Scheidgen committed
414
415
416
417
                self._parser_results = datamodel.EntryArchive()
                parser.parse(
                    self.upload_files.raw_file_object(self.mainfile).os_path,
                    self._parser_results, logger=logger)
418

419
            except Exception as e:
420
                self.fail('parser failed with exception', exc_info=e, error=str(e), **context)
421
                return
422
            except SystemExit:
423
                self.fail('parser raised system exit', error='system exit', **context)
424
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
425

426
427
428
429
430
431
432
433
434
435
    def process_phonon(self):
        """Function that is run for phonon calculation before cleanup.
        This task is run by the celery process that is calling the join for the
        upload.

        This function re-opens the Archive for this calculation to add method
        information from another referenced archive. Updates the method
        information in section_encyclopedia as well as the DFT domain metadata.
        """
        try:
436
            logger = self.get_logger(parser=self.parser, step=self.parser)
437
438
439
440
441
442

            # Open the archive of the phonon calculation.
            upload_files = StagingUploadFiles(self.upload_id, is_authorized=lambda: True)
            with upload_files.read_archive(self.calc_id) as archive:
                arch = query_archive(archive, {self.calc_id: self.calc_id})[self.calc_id]
                phonon_archive = EntryArchive.m_from_dict(arch)
Markus Scheidgen's avatar
Markus Scheidgen committed
443
            self._entry_metadata = phonon_archive.section_metadata
444
            self._calc_proc_logs = phonon_archive.processing_logs
445

Markus Scheidgen's avatar
Markus Scheidgen committed
446
447
            # Re-create the parse results
            self._parser_results = phonon_archive
Markus Scheidgen's avatar
Markus Scheidgen committed
448

449
450
451
            # Read in the first referenced calculation. The reference is given as
            # an absolute path which needs to be converted into a path that is
            # relative to upload root.
Markus Scheidgen's avatar
Markus Scheidgen committed
452
            scc = self._parser_results.section_run[0].section_single_configuration_calculation[0]
453
            relative_ref = scc.section_calculation_to_calculation_refs[0].calculation_to_calculation_external_url
454
455
456
457
458
459
            ref_id = upload_files.calc_id(relative_ref)
            with upload_files.read_archive(ref_id) as archive:
                arch = query_archive(archive, {ref_id: ref_id})[ref_id]
                ref_archive = EntryArchive.m_from_dict(arch)

            # Get encyclopedia method information directly from the referenced calculation.
460
            ref_enc_method = ref_archive.section_metadata.encyclopedia.method
461
            if ref_enc_method is None or len(ref_enc_method) == 0 or ref_enc_method.functional_type is None:
462
                raise ValueError("No method information available in referenced calculation.")
Markus Scheidgen's avatar
Markus Scheidgen committed
463
            self._parser_results.section_metadata.encyclopedia.method = ref_enc_method
464
465
466
467
468
469
470
471

            # Overwrite old entry with new data. The metadata is updated with
            # new timestamp and method details taken from the referenced
            # archive.
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.dft.xc_functional = ref_archive.section_metadata.dft.xc_functional
            self._entry_metadata.dft.basis_set = ref_archive.section_metadata.dft.basis_set
            self._entry_metadata.dft.update_group_hash()
472
            self._entry_metadata.encyclopedia.status = EncyclopediaMetadata.status.type.success
473
        except Exception as e:
474
            logger.error("Could not retrieve method information for phonon calculation.", exc_info=e)
Markus Scheidgen's avatar
Markus Scheidgen committed
475
476
477
478
            if self._entry_metadata is None:
                self._setup_fallback_metadata()
                self._entry_metadata.processed = False

479
480
481
482
483
484
            try:
                if self._entry_metadata.encyclopedia is None:
                    self._entry_metadata.encyclopedia = EncyclopediaMetadata()
                self._entry_metadata.encyclopedia.status = EncyclopediaMetadata.status.type.failure
            except Exception as e:
                logger.error("Could set encyclopedia status.", exc_info=e)
485

486
        finally:
487
488
489
490
491
492
493
494
495
496
497
498
499
            # persist the calc metadata
            with utils.timer(logger, 'saved calc metadata', step='metadata'):
                self.apply_entry_metadata(self._entry_metadata)

            # index in search
            with utils.timer(logger, 'indexed', step='index'):
                self._entry_metadata.a_elastic.index()

            # persist the archive
            with utils.timer(
                    logger, 'archived', step='archive',
                    input_size=self.mainfile_file.size) as log_data:

Markus Scheidgen's avatar
Markus Scheidgen committed
500
                archive_size = self.write_archive(self._parser_results)
501
502
                log_data.update(archive_size=archive_size)

Markus Scheidgen's avatar
Markus Scheidgen committed
503
504
    @task
    def normalizing(self):
505
        ''' The *task* that encapsulates all normalizing related actions. '''
506
507

        # allow normalizer to access and add data to the entry metadata
Markus Scheidgen's avatar
Markus Scheidgen committed
508
        self._parser_results.m_add_sub_section(
509
510
            datamodel.EntryArchive.section_metadata, self._entry_metadata)

Markus Scheidgen's avatar
Markus Scheidgen committed
511
        for normalizer in normalizers:
512
            if normalizer.domain != parser_dict[self.parser].domain:
513
514
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
515
            normalizer_name = normalizer.__name__
516
            context = dict(normalizer=normalizer_name, step=normalizer_name)
517
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
518

Markus Scheidgen's avatar
Markus Scheidgen committed
519
520
521
522
523
524
            with utils.timer(logger, 'normalizer executed', input_size=self.mainfile_file.size):
                try:
                    normalizer(self._parser_results).normalize(logger=logger)
                    logger.info('processor completed successfull', **context)
                except Exception as e:
                    self.fail('normalizer failed with exception', exc_info=e, error=str(e), **context)
Markus Scheidgen's avatar
Markus Scheidgen committed
525

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
    def _read_metadata_from_file(self, logger):
        # metadata file name defined in nomad.config nomad_metadata.yaml/json
        # which can be placed in the directory containing the mainfile or somewhere up
        # highest priority is directory with mainfile

        metadata_file = config.aux_metadata_file
        metadata_dir = os.path.dirname(self.mainfile_file.os_path)

        metadata = {}
        metadata_path = None
        while metadata_dir:
            metadata_part = metadata_cached(os.path.join(metadata_dir, metadata_file))

            for key, val in metadata_part.items():
                metadata.setdefault(key, val)

            metadata_dir = os.path.dirname(metadata_dir)
            if metadata_path is not None:
                break

        for key, val in metadata.items():
            definition = _editable_metadata.get(key, None)
            if not definition:
                logger.warn('Cannot set metadata %s' % key)
                continue
            self._entry_metadata.m_set(definition, val)

Markus Scheidgen's avatar
Markus Scheidgen committed
553
554
    @task
    def archiving(self):
555
        ''' The *task* that encapsulates all archival related actions. '''
556
557
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
558
        self._entry_metadata.apply_domain_metadata(self._parser_results)
559
        self._entry_metadata.processed = True
560

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
561
562
563
        # read metadata from file
        self._read_metadata_from_file(logger)

564
565
        # persist the calc metadata
        with utils.timer(logger, 'saved calc metadata', step='metadata'):
566
            self.apply_entry_metadata(self._entry_metadata)
567
568

        # index in search
569
        with utils.timer(logger, 'indexed', step='index'):
570
            self._entry_metadata.a_elastic.index()
571

572
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
573
        with utils.timer(
574
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
575
                input_size=self.mainfile_file.size) as log_data:
576

Markus Scheidgen's avatar
Markus Scheidgen committed
577
            archive_size = self.write_archive(self._parser_results)
578
            log_data.update(archive_size=archive_size)
Markus Scheidgen's avatar
Markus Scheidgen committed
579

Markus Scheidgen's avatar
Markus Scheidgen committed
580
    def write_archive(self, archive: EntryArchive):
581
582
583
584
585
586
        def filter_processing_logs(logs):
            if len(logs) > 100:
                return [
                    log for log in logs
                    if log.get('level') != 'DEBUG']
            return logs
Markus Scheidgen's avatar
Markus Scheidgen committed
587

588
589
590
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []

Markus Scheidgen's avatar
Markus Scheidgen committed
591
592
        if archive is not None:
            archive = archive.m_copy()
593
        else:
Markus Scheidgen's avatar
Markus Scheidgen committed
594
            archive = datamodel.EntryArchive()
595

Markus Scheidgen's avatar
Markus Scheidgen committed
596
597
        if archive.section_metadata is None:
            archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
598

Markus Scheidgen's avatar
Markus Scheidgen committed
599
        archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
600
601

        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
602
            return self.upload_files.write_archive(self.calc_id, archive.m_to_dict())
603
604
        except Exception as e:
            # most likely failed due to domain data, try to write metadata and processing logs
Markus Scheidgen's avatar
Markus Scheidgen committed
605
606
607
608
            archive = datamodel.EntryArchive()
            archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
            archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
            self.upload_files.write_archive(self.calc_id, archive.m_to_dict())
609
            raise e
Markus Scheidgen's avatar
Markus Scheidgen committed
610

611
    def __str__(self):
612
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
613

614

615
class Upload(Proc):
616
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
617
618
619
620
621
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
622
623
        upload_path: the path were the uploaded files was stored
        temporary: True if the uploaded file should be removed after extraction
Markus Scheidgen's avatar
Markus Scheidgen committed
624
625
        upload_id: the upload id generated by the database
        upload_time: the timestamp when the system realised the upload
626
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
627
628
        published: Boolean that indicates the publish status
        publish_time: Date when the upload was initially published
629
        last_update: Date of the last publishing/re-processing
630
        joined: Boolean indicates if the running processing has joined (:func:`check_join`)
631
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
632
633
634
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
635
636
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
637
    embargo_length = IntField(default=36)
Markus Scheidgen's avatar
Markus Scheidgen committed
638
639
640

    name = StringField(default=None)
    upload_time = DateTimeField()
641
    user_id = StringField(required=True)
642
643
    published = BooleanField(default=False)
    publish_time = DateTimeField()
644
    last_update = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
645

646
647
    joined = BooleanField(default=False)

Markus Scheidgen's avatar
Markus Scheidgen committed
648
649
    meta: Any = {
        'indexes': [
650
            'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
651
        ]
Markus Scheidgen's avatar
Markus Scheidgen committed
652
653
654
655
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
656
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
657

658
659
    @property
    def metadata(self) -> dict:
660
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
661
662
663
664
665
        Getter, setter for user metadata. Metadata is pickled to and from the public
        bucket to allow sharing among all processes. Usually uploads do not have (much)
        user defined metadata, but users provide all metadata per upload as part of
        the publish process. This will change, when we introduce editing functionality
        and metadata will be provided through different means.
666
        '''
667
668
669
670
671
        try:
            upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
        except KeyError:
            return None
        return upload_files.user_metadata
672
673
674

    @metadata.setter
    def metadata(self, data: dict) -> None:
675
676
        upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True, create=True)
        upload_files.user_metadata = data
677

Markus Scheidgen's avatar
Markus Scheidgen committed
678
    @classmethod
679
    def get(cls, id: str, include_published: bool = True) -> 'Upload':
680
        return cls.get_by_id(id, 'upload_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
681
682

    @classmethod
683
    def user_uploads(cls, user: datamodel.User, **kwargs) -> List['Upload']:
684
        ''' Returns all uploads for the given user. Kwargs are passed to mongo query. '''
685
        return cls.objects(user_id=str(user.user_id), **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
686

687
688
    @property
    def uploader(self):
689
        return datamodel.User.get(self.user_id)
690

Markus Scheidgen's avatar
Markus Scheidgen committed
691
692
    def get_logger(self, **kwargs):
        logger = super().get_logger()
693
694
        user = self.uploader
        user_name = '%s %s' % (user.first_name, user.last_name)
695
        # We are not using 'user_id' because logstash (?) will filter these entries ?!
696
697
        logger = logger.bind(
            upload_id=self.upload_id, upload_name=self.name, user_name=user_name,
698
            user=user.user_id, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
699
700
701
702
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
703
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
704
705
706
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
707
708

        Arguments:
709
            user: The user that created the upload.
710
        '''
711
712
        # use kwargs to keep compatibility with super method
        user: datamodel.User = kwargs['user']
713
        del(kwargs['user'])
714

715
716
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
717
        kwargs.update(user_id=user.user_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
718
        self = super().create(**kwargs)
719

Markus Scheidgen's avatar
Markus Scheidgen committed
720
        self._continue_with('uploading')
721

Markus Scheidgen's avatar
Markus Scheidgen committed
722
723
        return self

724
    def delete(self):
725
        ''' Deletes this upload process state entry and its calcs. '''
726
727
728
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

729
    def delete_upload_local(self):
730
        '''
731
        Deletes the upload, including its processing state and
732
        staging files. Local version without celery processing.
733
        '''
734
735
        logger = self.get_logger()

736
        with utils.lnr(logger, 'upload delete failed'):
737
            with utils.timer(
738
                    logger, 'upload deleted from index', step='index',
739
                    upload_size=self.upload_files.size):
740
                search.delete_upload(self.upload_id)
741

742
            with utils.timer(
743
                    logger, 'upload deleted', step='files',
744
745
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
746
747

            self.delete()
748

749
    @process
750
    def delete_upload(self):
751
        '''
752
753
        Deletes of the upload, including its processing state and
        staging files. This starts the celery process of deleting the upload.
754
        '''
755
        self.delete_upload_local()
756

757
        return True  # do not save the process status on the delete upload
758

759
    @process
760
    def publish_upload(self):
761
        '''
762
763
        Moves the upload out of staging to the public area. It will
        pack the staging upload files in to public upload files.
764
        '''
765
766
        assert self.processed_calcs > 0

767
        logger = self.get_logger()
768
        logger.info('started to publish')
769

770
        with utils.lnr(logger, 'publish failed'):
771
            with self.entries_metadata(self.metadata) as calcs:
772

773
                with utils.timer(
774
                        logger, 'upload metadata updated', step='metadata',
775
                        upload_size=self.upload_files.size):
776

777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
                    def create_update(calc):
                        calc.published = True
                        calc.with_embargo = calc.with_embargo if calc.with_embargo is not None else False
                        return UpdateOne(
                            {'_id': calc.calc_id},
                            {'$set': {'metadata': calc.m_to_dict(
                                include_defaults=True, categories=[datamodel.MongoMetadata])}})

                    Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload files packed', step='pack',
                            upload_size=self.upload_files.size):
                        self.upload_files.pack(calcs)
792

793
                with utils.timer(
794
                        logger, 'index updated', step='index',
795
                        upload_size=self.upload_files.size):
796
797
798
799
800
801
802
803
804
805
806
807
                    search.publish(calcs)

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload deleted', step='delete staged',
                            upload_size=self.upload_files.size):
                        self.upload_files.delete()
                        self.published = True
                        self.publish_time = datetime.utcnow()
                        self.last_update = datetime.utcnow()
                        self.save()
                else:
808
                    self.last_update = datetime.utcnow()
809
                    self.save()
810

811
812
    @process
    def re_process_upload(self):
813
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
814
815
816
        A *process* that performs the re-processing of a earlier processed
        upload.

817
818
819
820
821
822
        Runs the distributed process of fully reparsing/renormalizing an existing and
        already published upload. Will renew the archive part of the upload and update
        mongo and elastic search entries.

        TODO this implementation does not do any re-matching. This will be more complex
        due to handling of new or missing matches.
823
        '''
824
825
826
        logger = self.get_logger()
        logger.info('started to re-process')

827
828
829
830
831
        # mock the steps of actual processing
        self._continue_with('uploading')

        # extract the published raw files into a staging upload files instance
        self._continue_with('extracting')
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846

        if self.published:
            try:
                staging_upload_files = StagingUploadFiles(self.upload_id)
                # public files exist and there is a staging directory, it is probably old
                # and we delete it first
                staging_upload_files.delete()
                logger.warn('deleted old staging files')

            except KeyError as e:
                logger.info('reprocessing published files')
        else:
            logger.info('reprocessing staging files')

        staging_upload_files = self.upload_files.to_staging_upload_files(create=True)
847
848

        self._continue_with('parse_all')
849
        try:
850
            # check if a calc is already/still processing
851
852
853
854
855
856
857
858
            processing = Calc.objects(
                upload_id=self.upload_id,
                **Calc.process_running_mongoengine_query()).count()

            if processing > 0:
                logger.warn(
                    'processes are still/already running on calc, they will be resetted',
                    count=processing)
859

860
            # reset all calcs
861
            Calc._get_collection().update_many(
862
863
864
                dict(upload_id=self.upload_id),
                {'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)})

865
866
            # process call calcs
            Calc.process_all(Calc.re_process_calc, dict(upload_id=self.upload_id), exclude=['metadata'])
867
868

            logger.info('completed to trigger re-process of all calcs')
869
870
        except Exception as e:
            # try to remove the staging copy in failure case
871
872
            logger.error('failed to trigger re-process of all calcs', exc_info=e)

873
874
875
            if self.published:
                if staging_upload_files is not None and staging_upload_files.exists():
                    staging_upload_files.delete()
876
877

            raise e
878
879
880
881

        # the packing and removing of the staging upload files, will be trigged by
        # the 'cleanup' task after processing all calcs

882
883
    @process
    def re_pack(self):
884
        ''' A *process* that repacks the raw and archive data based on the current embargo data. '''
885
886
887
888
889
890
891
892
        assert self.published

        # mock the steps of actual processing
        self._continue_with('uploading')
        self._continue_with('extracting')
        self._continue_with('parse_all')
        self._continue_with('cleanup')

893
        self.upload_files.re_pack(self.user_metadata())
894
        self.joined = True
895
896
        self._complete()

Markus Scheidgen's avatar
Markus Scheidgen committed
897
    @process
898
    def process_upload(self):
899
        ''' A *process* that performs the initial upload processing. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
900
901
902
903
904
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
905
        ''' A no-op *task* as a stand-in for receiving upload data. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
906
907
        pass

908
    @property
909
910
    def upload_files(self) -> UploadFiles:
        upload_files_class = ArchiveBasedStagingUploadFiles if not self.published else PublicUploadFiles
911
        kwargs = dict(upload_path=self.upload_path) if not self.published else {}
912
913
914

        if not self._upload_files or not isinstance(self._upload_files, upload_files_class):
            self._upload_files = upload_files_class(
915
                self.upload_id, is_authorized=lambda: True, **kwargs)
916

917
        return self._upload_files
918

919
920
921
922
923
    @property
    def staging_upload_files(self) -> ArchiveBasedStagingUploadFiles:
        assert not self.published
        return cast(ArchiveBasedStagingUploadFiles, self.upload_files)

Markus Scheidgen's avatar
Markus Scheidgen committed
924
925
    @task
    def extracting(self):
926
        '''
927
928
        The *task* performed before the actual parsing/normalizing: extracting
        the uploaded files.
929
        '''
930
931
932
933
934
        # extract the uploaded file
        self._upload_files = ArchiveBasedStagingUploadFiles(
            upload_id=self.upload_id, is_authorized=lambda: True, create=True,
            upload_path=self.upload_path)

Markus Scheidgen's avatar
Markus Scheidgen committed
935
936
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
937
938
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
939
940
                    upload_size=self.upload_files.size):
                self.upload_files.extract()
941
942
943
944

            if self.temporary:
                os.remove