data.py 51.6 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
16
17
18
19
20
21
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
Markus Scheidgen's avatar
Markus Scheidgen committed
22

Markus Scheidgen's avatar
Markus Scheidgen committed
23
.. autoclass:: Upload
Markus Scheidgen's avatar
Markus Scheidgen committed
24

25
'''
26
27

from typing import cast, List, Any, Tuple, Iterator, Dict, cast, Iterable
28
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField
Markus Scheidgen's avatar
Markus Scheidgen committed
29
import logging
30
from structlog import wrap_logger
31
from contextlib import contextmanager
32
import os.path
33
34
from datetime import datetime
from pymongo import UpdateOne
35
import hashlib
36
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
Markus Scheidgen's avatar
Markus Scheidgen committed
37

38
from nomad import utils, config, infrastructure, search, datamodel
39
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles, StagingUploadFiles
40
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
Markus Scheidgen's avatar
Markus Scheidgen committed
41
42
from nomad.parsing import Backend
from nomad.parsing.parsers import parser_dict, match_parser
Markus Scheidgen's avatar
Markus Scheidgen committed
43
from nomad.normalizing import normalizers
44
45
from nomad.datamodel import EntryArchive
from nomad.archive import query_archive
46
47
48
from nomad.datamodel.encyclopedia import (
    EncyclopediaMetadata,
)
Markus Scheidgen's avatar
Markus Scheidgen committed
49
import phonopyparser.metainfo
Markus Scheidgen's avatar
Markus Scheidgen committed
50
51


52
53
54
55
section_metadata = datamodel.EntryArchive.section_metadata.name
section_workflow = datamodel.EntryArchive.section_workflow.name


56
57
58
59
60
61
62
def _pack_log_event(logger, method_name, event_dict):
    try:
        log_data = dict(event_dict)
        log_data.update(**{
            key: value
            for key, value in getattr(logger, '_context', {}).items()
            if key not in ['service', 'release', 'upload_id', 'calc_id', 'mainfile', 'process_status']})
63
        log_data.update(logger=logger.name)
64
65
66
67
68
69
70
71
72
73
74

        return log_data
    except Exception:
        # raising an exception would cause an indefinite loop
        return event_dict


_log_processors = [
    StackInfoRenderer(),
    _pack_log_event,
    format_exc_info,
75
    TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)]
76
77


Markus Scheidgen's avatar
Markus Scheidgen committed
78
class Calc(Proc):
79
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
80
81
82
83
84
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

85
86
    The attribute list, does not include the various metadata properties generated
    while parsing, including ``code_name``, ``code_version``, etc.
Markus Scheidgen's avatar
Markus Scheidgen committed
87
88

    Attributes:
89
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
90
91
92
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
93

94
95
        metadata: the metadata record wit calc and user metadata, see :class:`datamodel.EntryMetadata`
    '''
96
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
97
98
99
100
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

101
102
    metadata = DictField()

Markus Scheidgen's avatar
Markus Scheidgen committed
103
    meta: Any = {
Markus Scheidgen's avatar
Markus Scheidgen committed
104
        'indexes': [
105
            'upload_id',
106
            'parser',
107
108
109
            ('upload_id', 'mainfile'),
            ('upload_id', 'parser'),
            ('upload_id', 'tasks_status'),
110
            ('upload_id', 'process_status'),
111
            ('upload_id', 'metadata.nomad_version'),
112
113
            'metadata.processed',
            'metadata.last_processing',
114
            'metadata.published',
115
            'metadata.datasets',
Markus Scheidgen's avatar
Markus Scheidgen committed
116
            'metadata.pid'
Markus Scheidgen's avatar
Markus Scheidgen committed
117
118
119
120
121
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
122
        self._parser_backend: Backend = None
123
124
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
125
        self._calc_proc_logs: List[Any] = None
126

127
        self._entry_metadata = None
128

Markus Scheidgen's avatar
Markus Scheidgen committed
129
130
    @classmethod
    def get(cls, id):
131
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
132

Markus Scheidgen's avatar
Markus Scheidgen committed
133
    @property
134
135
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
136

137
138
139
140
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
141
            self._upload.worker_hostname = self.worker_hostname
142
143
        return self._upload

144
145
146
147
148
149
150
151
152
153
154
155
    def apply_entry_metadata(self, entry_metadata: datamodel.EntryMetadata):
        self.metadata = entry_metadata.m_to_dict(
            include_defaults=True,
            categories=[datamodel.MongoMetadata])  # TODO use embedded doc?

    def create_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
        processing object, not necessarely the user metadata nor the metadata from
        the archive.
        '''
        entry_metadata = datamodel.EntryMetadata()
156
157
        if self.parser is not None:
            entry_metadata.domain = parser_dict[self.parser].domain
158
159
160
        entry_metadata.upload_id = self.upload_id
        entry_metadata.calc_id = self.calc_id
        entry_metadata.mainfile = self.mainfile
161
162
        entry_metadata.nomad_version = config.meta.version
        entry_metadata.nomad_commit = config.meta.commit
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
        entry_metadata.uploader = self.upload.user_id
        entry_metadata.upload_time = self.upload.upload_time
        entry_metadata.upload_name = self.upload.name

        return entry_metadata

    def entry_metadata(self, upload_files: UploadFiles) -> datamodel.EntryMetadata:
        '''
        Returns a complete set of :class:`nomad.datamodel.EntryMetadata` including
        the user metadata and metadata from the archive.

        Arguments:
            upload_files:
                The :class:`nomad.files.UploadFiles` instance to read the archive from.
            cache:
                A boolean that indicates if the archive file should be left unclosed,
                e.g. if this method is called for many entries of the same upload.
        '''
        archive = upload_files.read_archive(self.calc_id)
182
        try:
183
184
185
186
187
188
189
190
191
            # instead of loading the whole archive, it should be enough to load the
            # parts that are referenced by section_metadata/EntryMetadata
            # TODO somehow it should determine which root setions too load from the metainfo
            # or configuration
            calc_archive = archive[self.calc_id]
            entry_archive_dict = {section_metadata: calc_archive[section_metadata].to_dict()}
            if section_workflow in calc_archive:
                entry_archive_dict[section_workflow] = calc_archive[section_workflow].to_dict()
            entry_metadata = datamodel.EntryArchive.m_from_dict(entry_archive_dict)[section_metadata]
192
193
194
195
196
197
198
199
200

        except KeyError:
            # Due hard processing failures, it might be possible that an entry might not
            # have an archive
            if self._entry_metadata is not None:
                entry_metadata = self._entry_metadata

            else:
                entry_metadata = self.create_metadata()
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216

        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

    def user_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
        processing object and the user metadata, not necessarely the metadata from
        the archive.
        '''
        entry_metadata = self.create_metadata()
        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

217
218
219
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
220
221
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
222
223
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
224
    def get_logger(self, **kwargs):
225
        '''
226
227
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
228
        '''
229
230
231
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
232

233
234
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []
235

236
237
        def save_to_calc_log(logger, method_name, event_dict):
            try:
238
239
                # sanitize the event_dict, because all kinds of values might have been added
                dump_dict = {key: str(value) for key, value in event_dict.items()}
240
241
                dump_dict.update(level=method_name.upper())
                self._calc_proc_logs.append(dump_dict)
242

243
244
245
            except Exception:
                # Exceptions here will cause indefinite loop
                pass
246

247
            return event_dict
248

249
        return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
250

251
    @process
252
    def re_process_calc(self):
253
        '''
254
255
256
        Processes a calculation again. This means there is already metadata and
        instead of creating it initially, we are just updating the existing
        records.
257
        '''
258
        parser = match_parser(self.upload_files.raw_file_object(self.mainfile).os_path, strict=False)
259
        logger = self.get_logger()
260
261
262

        if parser is None and not config.reprocess_unmatched:
            self.errors = ['no parser matches during re-process, will not re-process this calc']
263

264
265
266
267
268
269
270
            try:
                upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
                with upload_files.read_archive(self.calc_id) as archive:
                    self.upload_files.write_archive(self.calc_id, archive[self.calc_id].to_dict())

            except Exception as e:
                logger.error('could not copy archive for non matching, non reprocessed entry', exc_info=e)
271
                raise e
272

273
274
275
276
277
            # mock the steps of actual processing
            self._continue_with('parsing')
            self._continue_with('normalizing')
            self._continue_with('archiving')
            self._complete()
278
279
            return

280
        if parser is None:
281
282
283
284
            self.get_logger().warn('no parser matches during re-process, use the old parser')
            self.warnings = ['no matching parser found during re-processing']

        elif self.parser != parser.name:
285
286
287
288
289
290
291
292
293
            if parser_dict[self.parser].name == parser.name:
                # parser was just renamed
                self.parser = parser.name

            else:
                self.parser = parser.name
                logger.info(
                    'different parser matches during re-process, use new parser',
                    parser=parser.name)
294

295
        try:
296
297
            self._entry_metadata = self.user_metadata()
            self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
298
299
            self._entry_metadata.nomad_version = config.meta.version
            self._entry_metadata.nomad_commit = config.meta.commit
300
301
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)
302
303
304
305
306
307
308

            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
            # close loghandler that was not closed due to failures
            try:
309
310
                if self._parser_backend and self._parser_backend.resource:
                    self._parser_backend.resource.unload()
311
            except Exception as e:
312
                logger.error('could unload processing results', exc_info=e)
313

314
315
316
317
318
319
    def _setup_fallback_metadata(self):
        self._entry_metadata = self.create_metadata()
        self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
        self._entry_metadata.last_processing = datetime.utcnow()
        self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)

320
    @process
321
    def process_calc(self):
322
        '''
323
324
        Processes a new calculation that has no prior records in the mongo, elastic,
        or filesystem storage. It will create an initial set of (user) metadata.
325
        '''
326
        logger = self.get_logger()
327
        if self.upload is None:
328
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
329
330

        try:
331
332
            # save preliminary minimum calc metadata in case processing fails
            # successful processing will replace it with the actual metadata
333
            self._setup_fallback_metadata()
334
335

            if len(self._entry_metadata.files) >= config.auxfile_cutoff:
336
337
338
339
                self.warning(
                    'This calc has many aux files in its directory. '
                    'Have you placed many calculations in the same directory?')

Markus Scheidgen's avatar
Markus Scheidgen committed
340
341
342
343
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
344
            # close loghandler that was not closed due to failures
345
            try:
346
347
                if self._parser_backend and self._parser_backend.resource:
                    self._parser_backend.resource.unload()
348
            except Exception as e:
349
                logger.error('could unload processing results', exc_info=e)
350

351
    def on_fail(self):
352
353
354
        # in case of failure, index a minimum set of metadata and mark
        # processing failure
        try:
355
356
357
            if self._entry_metadata is None:
                self._setup_fallback_metadata()

358
            self._entry_metadata.processed = False
359

360
            self.apply_entry_metadata(self._entry_metadata)
361
362
363
364
365
            if self._parser_backend and self._parser_backend.resource:
                backend = self._parser_backend
            else:
                backend = None
            self._entry_metadata.apply_domain_metadata(backend)
366

367
            self._entry_metadata.a_elastic.index()
368
        except Exception as e:
369
370
            self.get_logger().error(
                'could not index after processing failure', exc_info=e)
371

372
        try:
373
            self.write_archive(None)
374
        except Exception as e:
375
376
            self.get_logger().error(
                'could not write archive after processing failure', exc_info=e)
377

378
379
380
381
382
383
384
385
    def on_process_complete(self, process_name):
        # the save might be necessary to correctly read the join condition from the db
        self.save()
        # in case of error, the process_name might be unknown
        if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None:
            self.upload.reload()
            self.upload.check_join()

Markus Scheidgen's avatar
Markus Scheidgen committed
386
387
    @task
    def parsing(self):
388
        ''' The *task* that encapsulates all parsing related actions. '''
389
        context = dict(parser=self.parser, step=self.parser)
390
        logger = self.get_logger(**context)
391
        parser = parser_dict[self.parser]
392
        self._entry_metadata.parser_name = self.parser
Markus Scheidgen's avatar
Markus Scheidgen committed
393

394
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
395
396
397
            try:
                self._parser_backend = parser.run(
                    self.upload_files.raw_file_object(self.mainfile).os_path, logger=logger)
398

399
            except Exception as e:
400
                self.fail('parser failed with exception', exc_info=e, error=str(e), **context)
401
                return
402
            except SystemExit:
403
                self.fail('parser raised system exit', error='system exit', **context)
404
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
405

Markus Scheidgen's avatar
Markus Scheidgen committed
406
407
        if self._parser_backend.status[0] != 'ParseSuccess':
            error = self._parser_backend.status[1]
408
            self.fail('parser failed', error=error, **context)
409

410
411
412
413
414
415
416
417
418
419
    def process_phonon(self):
        """Function that is run for phonon calculation before cleanup.
        This task is run by the celery process that is calling the join for the
        upload.

        This function re-opens the Archive for this calculation to add method
        information from another referenced archive. Updates the method
        information in section_encyclopedia as well as the DFT domain metadata.
        """
        try:
420
            logger = self.get_logger(parser=self.parser, step=self.parser)
421
422
423
424
425
426

            # Open the archive of the phonon calculation.
            upload_files = StagingUploadFiles(self.upload_id, is_authorized=lambda: True)
            with upload_files.read_archive(self.calc_id) as archive:
                arch = query_archive(archive, {self.calc_id: self.calc_id})[self.calc_id]
                phonon_archive = EntryArchive.m_from_dict(arch)
Markus Scheidgen's avatar
Markus Scheidgen committed
427
            self._entry_metadata = phonon_archive.section_metadata
428
            self._calc_proc_logs = phonon_archive.processing_logs
429

Markus Scheidgen's avatar
Markus Scheidgen committed
430
431
432
433
434
            # Re-create a backend
            metainfo = phonopyparser.metainfo.m_env
            self._parser_backend = Backend(metainfo, logger=logger, domain="dft")
            self._parser_backend.entry_archive = phonon_archive

435
436
437
            # Read in the first referenced calculation. The reference is given as
            # an absolute path which needs to be converted into a path that is
            # relative to upload root.
Markus Scheidgen's avatar
Markus Scheidgen committed
438
            scc = self._parser_backend.entry_archive.section_run[0].section_single_configuration_calculation[0]
439
            relative_ref = scc.section_calculation_to_calculation_refs[0].calculation_to_calculation_external_url
440
441
442
443
444
445
            ref_id = upload_files.calc_id(relative_ref)
            with upload_files.read_archive(ref_id) as archive:
                arch = query_archive(archive, {ref_id: ref_id})[ref_id]
                ref_archive = EntryArchive.m_from_dict(arch)

            # Get encyclopedia method information directly from the referenced calculation.
446
            ref_enc_method = ref_archive.section_metadata.encyclopedia.method
447
            if ref_enc_method is None or len(ref_enc_method) == 0 or ref_enc_method.functional_type is None:
448
                raise ValueError("No method information available in referenced calculation.")
Markus Scheidgen's avatar
Markus Scheidgen committed
449
            self._parser_backend.entry_archive.section_metadata.encyclopedia.method = ref_enc_method
450
451
452
453
454
455
456
457

            # Overwrite old entry with new data. The metadata is updated with
            # new timestamp and method details taken from the referenced
            # archive.
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.dft.xc_functional = ref_archive.section_metadata.dft.xc_functional
            self._entry_metadata.dft.basis_set = ref_archive.section_metadata.dft.basis_set
            self._entry_metadata.dft.update_group_hash()
458
            self._entry_metadata.encyclopedia.status = EncyclopediaMetadata.status.type.success
459
        except Exception as e:
460
            logger.error("Could not retrieve method information for phonon calculation.", exc_info=e)
Markus Scheidgen's avatar
Markus Scheidgen committed
461
462
463
464
            if self._entry_metadata is None:
                self._setup_fallback_metadata()
                self._entry_metadata.processed = False

465
466
467
468
469
470
            try:
                if self._entry_metadata.encyclopedia is None:
                    self._entry_metadata.encyclopedia = EncyclopediaMetadata()
                self._entry_metadata.encyclopedia.status = EncyclopediaMetadata.status.type.failure
            except Exception as e:
                logger.error("Could set encyclopedia status.", exc_info=e)
471

472
        finally:
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
            # persist the calc metadata
            with utils.timer(logger, 'saved calc metadata', step='metadata'):
                self.apply_entry_metadata(self._entry_metadata)

            # index in search
            with utils.timer(logger, 'indexed', step='index'):
                self._entry_metadata.a_elastic.index()

            # persist the archive
            with utils.timer(
                    logger, 'archived', step='archive',
                    input_size=self.mainfile_file.size) as log_data:

                archive_size = self.write_archive(self._parser_backend)
                log_data.update(archive_size=archive_size)

489
490
491
492
493
494
495
    @contextmanager
    def use_parser_backend(self, processor_name):
        self._parser_backend.reset_status()
        yield self._parser_backend

        if self._parser_backend.status[0] == 'ParseSuccess':
            warnings = getattr(self._parser_backend, '_warnings', [])
496

497
            if len(warnings) > 0:
498
499
500
501
                self.get_logger().warn(
                    'processor completed successful with warnings',
                    processor=processor_name, warnings=[str(warning) for warning in warnings])

502
            else:
503
504
505
506
                self.get_logger().info(
                    'processor completed successful',
                    processor=processor_name)

507
        else:
508
            errors = self._parser_backend.status[1]
509
510
511
            self.get_logger().error(
                'processor completed with failure',
                processor=processor_name, errors=str(errors))
Markus Scheidgen's avatar
Markus Scheidgen committed
512
513
514

    @task
    def normalizing(self):
515
        ''' The *task* that encapsulates all normalizing related actions. '''
516
517
518
519
520

        # allow normalizer to access and add data to the entry metadata
        self._parser_backend.entry_archive.m_add_sub_section(
            datamodel.EntryArchive.section_metadata, self._entry_metadata)

Markus Scheidgen's avatar
Markus Scheidgen committed
521
        for normalizer in normalizers:
522
            if normalizer.domain != parser_dict[self.parser].domain:
523
524
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
525
            normalizer_name = normalizer.__name__
526
            context = dict(normalizer=normalizer_name, step=normalizer_name)
527
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
528
529

            with utils.timer(
530
                    logger, 'normalizer executed', input_size=self.mainfile_file.size):
531
                with self.use_parser_backend(normalizer_name) as backend:
532
                    try:
533
                        normalizer(backend.entry_archive).normalize(logger=logger)
534
                    except Exception as e:
535
                        self._parser_backend.finishedParsingSession('ParseFailure', [str(e)])
536
537
538
                        logger.error(
                            'normalizer failed with exception', exc_info=e,
                            error=str(e), **context)
539
540
541
542
543
544
545
546
547
                        break
                    else:
                        if self._parser_backend.status[0] != 'ParseSuccess':
                            error = self._parser_backend.status[1]
                            self.fail('normalizer failed', error=error, **context)
                            break
                        else:
                            logger.debug(
                                'completed normalizer successfully', normalizer=normalizer_name)
Markus Scheidgen's avatar
Markus Scheidgen committed
548
549
550

    @task
    def archiving(self):
551
        ''' The *task* that encapsulates all archival related actions. '''
552
553
        logger = self.get_logger()

554
555
        self._entry_metadata.apply_domain_metadata(self._parser_backend)
        self._entry_metadata.processed = True
556

557
558
        # persist the calc metadata
        with utils.timer(logger, 'saved calc metadata', step='metadata'):
559
            self.apply_entry_metadata(self._entry_metadata)
560
561

        # index in search
562
        with utils.timer(logger, 'indexed', step='index'):
563
            self._entry_metadata.a_elastic.index()
564

565
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
566
        with utils.timer(
567
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
568
                input_size=self.mainfile_file.size) as log_data:
569

570
            archive_size = self.write_archive(self._parser_backend)
571
            log_data.update(archive_size=archive_size)
Markus Scheidgen's avatar
Markus Scheidgen committed
572

573
    def write_archive(self, backend: Backend):
574
575
576
577
578
579
        def filter_processing_logs(logs):
            if len(logs) > 100:
                return [
                    log for log in logs
                    if log.get('level') != 'DEBUG']
            return logs
Markus Scheidgen's avatar
Markus Scheidgen committed
580

581
582
583
584
585
586
587
588
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []

        if backend is not None:
            entry_archive = backend.entry_archive.m_copy()
        else:
            entry_archive = datamodel.EntryArchive()

589
590
591
        if entry_archive.section_metadata is None:
            entry_archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)

592
        entry_archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
593
594
595
596
597
598
599
600
601
602

        try:
            return self.upload_files.write_archive(self.calc_id, entry_archive.m_to_dict())
        except Exception as e:
            if backend is None:
                raise e

            # most likely failed due to domain data, try to write metadata and processing logs
            entry_archive = datamodel.EntryArchive()
            entry_archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
603
            entry_archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
604
605
            self.upload_files.write_archive(self.calc_id, entry_archive.m_to_dict())
            raise e
Markus Scheidgen's avatar
Markus Scheidgen committed
606

607
    def __str__(self):
608
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
609

610

611
class Upload(Proc):
612
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
613
614
615
616
617
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
618
619
        upload_path: the path were the uploaded files was stored
        temporary: True if the uploaded file should be removed after extraction
Markus Scheidgen's avatar
Markus Scheidgen committed
620
621
        upload_id: the upload id generated by the database
        upload_time: the timestamp when the system realised the upload
622
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
623
624
        published: Boolean that indicates the publish status
        publish_time: Date when the upload was initially published
625
        last_update: Date of the last publishing/re-processing
626
        joined: Boolean indicates if the running processing has joined (:func:`check_join`)
627
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
628
629
630
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
631
632
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
633
    embargo_length = IntField(default=36)
Markus Scheidgen's avatar
Markus Scheidgen committed
634
635
636

    name = StringField(default=None)
    upload_time = DateTimeField()
637
    user_id = StringField(required=True)
638
639
    published = BooleanField(default=False)
    publish_time = DateTimeField()
640
    last_update = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
641

642
643
    joined = BooleanField(default=False)

Markus Scheidgen's avatar
Markus Scheidgen committed
644
645
    meta: Any = {
        'indexes': [
646
            'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
647
        ]
Markus Scheidgen's avatar
Markus Scheidgen committed
648
649
650
651
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
652
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
653

654
655
    @property
    def metadata(self) -> dict:
656
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
657
658
659
660
661
        Getter, setter for user metadata. Metadata is pickled to and from the public
        bucket to allow sharing among all processes. Usually uploads do not have (much)
        user defined metadata, but users provide all metadata per upload as part of
        the publish process. This will change, when we introduce editing functionality
        and metadata will be provided through different means.
662
        '''
663
664
665
666
667
        try:
            upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
        except KeyError:
            return None
        return upload_files.user_metadata
668
669
670

    @metadata.setter
    def metadata(self, data: dict) -> None:
671
672
        upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True, create=True)
        upload_files.user_metadata = data
673

Markus Scheidgen's avatar
Markus Scheidgen committed
674
    @classmethod
675
    def get(cls, id: str, include_published: bool = True) -> 'Upload':
676
        return cls.get_by_id(id, 'upload_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
677
678

    @classmethod
679
    def user_uploads(cls, user: datamodel.User, **kwargs) -> List['Upload']:
680
        ''' Returns all uploads for the given user. Kwargs are passed to mongo query. '''
681
        return cls.objects(user_id=str(user.user_id), **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
682

683
684
    @property
    def uploader(self):
685
        return datamodel.User.get(self.user_id)
686

Markus Scheidgen's avatar
Markus Scheidgen committed
687
688
    def get_logger(self, **kwargs):
        logger = super().get_logger()
689
690
        user = self.uploader
        user_name = '%s %s' % (user.first_name, user.last_name)
691
        # We are not using 'user_id' because logstash (?) will filter these entries ?!
692
693
        logger = logger.bind(
            upload_id=self.upload_id, upload_name=self.name, user_name=user_name,
694
            user=user.user_id, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
695
696
697
698
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
699
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
700
701
702
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
703
704

        Arguments:
705
            user: The user that created the upload.
706
        '''
707
708
        # use kwargs to keep compatibility with super method
        user: datamodel.User = kwargs['user']
709
        del(kwargs['user'])
710

711
712
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
713
        kwargs.update(user_id=user.user_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
714
        self = super().create(**kwargs)
715

Markus Scheidgen's avatar
Markus Scheidgen committed
716
        self._continue_with('uploading')
717

Markus Scheidgen's avatar
Markus Scheidgen committed
718
719
        return self

720
    def delete(self):
721
        ''' Deletes this upload process state entry and its calcs. '''
722
723
724
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

725
    def delete_upload_local(self):
726
        '''
727
        Deletes the upload, including its processing state and
728
        staging files. Local version without celery processing.
729
        '''
730
731
        logger = self.get_logger()

732
        with utils.lnr(logger, 'upload delete failed'):
733
            with utils.timer(
734
                    logger, 'upload deleted from index', step='index',
735
                    upload_size=self.upload_files.size):
736
                search.delete_upload(self.upload_id)
737

738
            with utils.timer(
739
                    logger, 'upload deleted', step='files',
740
741
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
742
743

            self.delete()
744

745
    @process
746
    def delete_upload(self):
747
        '''
748
749
        Deletes of the upload, including its processing state and
        staging files. This starts the celery process of deleting the upload.
750
        '''
751
        self.delete_upload_local()
752

753
        return True  # do not save the process status on the delete upload
754

755
    @process
756
    def publish_upload(self):
757
        '''
758
759
        Moves the upload out of staging to the public area. It will
        pack the staging upload files in to public upload files.
760
        '''
761
762
        assert self.processed_calcs > 0

763
        logger = self.get_logger()
764
        logger.info('started to publish')
765

766
        with utils.lnr(logger, 'publish failed'):
767
            with self.entries_metadata(self.metadata) as calcs:
768

769
                with utils.timer(
770
                        logger, 'upload metadata updated', step='metadata',
771
                        upload_size=self.upload_files.size):
772

773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
                    def create_update(calc):
                        calc.published = True
                        calc.with_embargo = calc.with_embargo if calc.with_embargo is not None else False
                        return UpdateOne(
                            {'_id': calc.calc_id},
                            {'$set': {'metadata': calc.m_to_dict(
                                include_defaults=True, categories=[datamodel.MongoMetadata])}})

                    Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload files packed', step='pack',
                            upload_size=self.upload_files.size):
                        self.upload_files.pack(calcs)
788

789
                with utils.timer(
790
                        logger, 'index updated', step='index',
791
                        upload_size=self.upload_files.size):
792
793
794
795
796
797
798
799
800
801
802
803
                    search.publish(calcs)

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload deleted', step='delete staged',
                            upload_size=self.upload_files.size):
                        self.upload_files.delete()
                        self.published = True
                        self.publish_time = datetime.utcnow()
                        self.last_update = datetime.utcnow()
                        self.save()
                else:
804
                    self.last_update = datetime.utcnow()
805
                    self.save()
806

807
808
    @process
    def re_process_upload(self):
809
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
810
811
812
        A *process* that performs the re-processing of a earlier processed
        upload.

813
814
815
816
817
818
        Runs the distributed process of fully reparsing/renormalizing an existing and
        already published upload. Will renew the archive part of the upload and update
        mongo and elastic search entries.

        TODO this implementation does not do any re-matching. This will be more complex
        due to handling of new or missing matches.
819
        '''
820
821
822
        logger = self.get_logger()
        logger.info('started to re-process')

823
824
825
826
827
        # mock the steps of actual processing
        self._continue_with('uploading')

        # extract the published raw files into a staging upload files instance
        self._continue_with('extracting')
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842

        if self.published:
            try:
                staging_upload_files = StagingUploadFiles(self.upload_id)
                # public files exist and there is a staging directory, it is probably old
                # and we delete it first
                staging_upload_files.delete()
                logger.warn('deleted old staging files')

            except KeyError as e:
                logger.info('reprocessing published files')
        else:
            logger.info('reprocessing staging files')

        staging_upload_files = self.upload_files.to_staging_upload_files(create=True)
843
844

        self._continue_with('parse_all')
845
        try:
846
            # check if a calc is already/still processing
847
848
849
850
851
852
853
854
            processing = Calc.objects(
                upload_id=self.upload_id,
                **Calc.process_running_mongoengine_query()).count()

            if processing > 0:
                logger.warn(
                    'processes are still/already running on calc, they will be resetted',
                    count=processing)
855

856
            # reset all calcs
857
            Calc._get_collection().update_many(
858
859
860
                dict(upload_id=self.upload_id),
                {'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)})

861
862
            # process call calcs
            Calc.process_all(Calc.re_process_calc, dict(upload_id=self.upload_id), exclude=['metadata'])
863
864

            logger.info('completed to trigger re-process of all calcs')
865
866
        except Exception as e:
            # try to remove the staging copy in failure case
867
868
            logger.error('failed to trigger re-process of all calcs', exc_info=e)

869
870
871
            if self.published:
                if staging_upload_files is not None and staging_upload_files.exists():
                    staging_upload_files.delete()
872
873

            raise e
874
875
876
877

        # the packing and removing of the staging upload files, will be trigged by
        # the 'cleanup' task after processing all calcs

878
879
    @process
    def re_pack(self):
880
        ''' A *process* that repacks the raw and archive data based on the current embargo data. '''
881
882
883
884
885
886
887
888
        assert self.published

        # mock the steps of actual processing
        self._continue_with('uploading')
        self._continue_with('extracting')
        self._continue_with('parse_all')
        self._continue_with('cleanup')

889
        self.upload_files.re_pack(self.user_metadata())
890
        self.joined = True
891
892
        self._complete()

Markus Scheidgen's avatar
Markus Scheidgen committed
893
    @process
894
    def process_upload(self):
895
        ''' A *process* that performs the initial upload processing. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
896
897
898
899
900
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
901
        ''' A no-op *task* as a stand-in for receiving upload data. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
902
903
        pass

904
    @property
905
906
    def upload_files(self) -> UploadFiles:
        upload_files_class = ArchiveBasedStagingUploadFiles if not self.published else PublicUploadFiles
907
        kwargs = dict(upload_path=self.upload_path) if not self.published else {}
908
909
910

        if not self._upload_files or not isinstance(self._upload_files, upload_files_class):
            self._upload_files = upload_files_class(
911
                self.upload_id, is_authorized=lambda: True, **kwargs)
912

913
        return self._upload_files
914

915
916
917
918
919
    @property
    def staging_upload_files(self) -> ArchiveBasedStagingUploadFiles:
        assert not self.published
        return cast(ArchiveBasedStagingUploadFiles, self.upload_files)

Markus Scheidgen's avatar
Markus Scheidgen committed
920
921
    @task
    def extracting(self):
922
        '''
923
924
        The *task* performed before the actual parsing/normalizing: extracting
        the uploaded files.
925
        '''
926
927
928
929
930
        # extract the uploaded file
        self._upload_files = ArchiveBasedStagingUploadFiles(
            upload_id=self.upload_id, is_authorized=lambda: True, create=True,
            upload_path=self.upload_path)

Markus Scheidgen's avatar
Markus Scheidgen committed
931
932
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
933
934
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
935
936
                    upload_size=self.upload_files.size):
                self.upload_files.extract()