data.py 51.5 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
16
17
18
19
20
21
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
Markus Scheidgen's avatar
Markus Scheidgen committed
22

Markus Scheidgen's avatar
Markus Scheidgen committed
23
.. autoclass:: Upload
Markus Scheidgen's avatar
Markus Scheidgen committed
24

25
'''
26
27

from typing import cast, List, Any, Tuple, Iterator, Dict, cast, Iterable
28
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField
Markus Scheidgen's avatar
Markus Scheidgen committed
29
import logging
30
from structlog import wrap_logger
31
from contextlib import contextmanager
32
import os.path
33
34
from datetime import datetime
from pymongo import UpdateOne
35
import hashlib
36
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
Markus Scheidgen's avatar
Markus Scheidgen committed
37

38
from nomad import utils, config, infrastructure, search, datamodel
39
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles, StagingUploadFiles
40
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
Markus Scheidgen's avatar
Markus Scheidgen committed
41
42
from nomad.parsing import Backend
from nomad.parsing.parsers import parser_dict, match_parser
Markus Scheidgen's avatar
Markus Scheidgen committed
43
from nomad.normalizing import normalizers
44
45
from nomad.datamodel import EntryArchive
from nomad.archive import query_archive
46
47
48
from nomad.datamodel.encyclopedia import (
    EncyclopediaMetadata,
)
49
from nomad.metainfo import MSection
Markus Scheidgen's avatar
Markus Scheidgen committed
50
import phonopyparser.metainfo
Markus Scheidgen's avatar
Markus Scheidgen committed
51
52


53
54
55
56
57
58
59
def _pack_log_event(logger, method_name, event_dict):
    try:
        log_data = dict(event_dict)
        log_data.update(**{
            key: value
            for key, value in getattr(logger, '_context', {}).items()
            if key not in ['service', 'release', 'upload_id', 'calc_id', 'mainfile', 'process_status']})
60
        log_data.update(logger=logger.name)
61
62
63
64
65
66
67
68
69
70
71

        return log_data
    except Exception:
        # raising an exception would cause an indefinite loop
        return event_dict


_log_processors = [
    StackInfoRenderer(),
    _pack_log_event,
    format_exc_info,
72
    TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)]
73
74


Markus Scheidgen's avatar
Markus Scheidgen committed
75
class Calc(Proc):
76
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
77
78
79
80
81
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

82
83
    The attribute list, does not include the various metadata properties generated
    while parsing, including ``code_name``, ``code_version``, etc.
Markus Scheidgen's avatar
Markus Scheidgen committed
84
85

    Attributes:
86
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
87
88
89
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
90

91
92
        metadata: the metadata record wit calc and user metadata, see :class:`datamodel.EntryMetadata`
    '''
93
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
94
95
96
97
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

98
99
    metadata = DictField()

Markus Scheidgen's avatar
Markus Scheidgen committed
100
    meta: Any = {
Markus Scheidgen's avatar
Markus Scheidgen committed
101
        'indexes': [
102
            'upload_id',
103
            'parser',
104
105
106
            ('upload_id', 'mainfile'),
            ('upload_id', 'parser'),
            ('upload_id', 'tasks_status'),
107
            ('upload_id', 'process_status'),
108
            ('upload_id', 'metadata.nomad_version'),
109
110
            'metadata.processed',
            'metadata.last_processing',
111
            'metadata.published',
112
            'metadata.datasets',
Markus Scheidgen's avatar
Markus Scheidgen committed
113
            'metadata.pid'
Markus Scheidgen's avatar
Markus Scheidgen committed
114
115
116
117
118
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
119
        self._parser_backend: Backend = None
120
121
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
122
        self._calc_proc_logs: List[Any] = None
123

124
        self._entry_metadata = None
125

Markus Scheidgen's avatar
Markus Scheidgen committed
126
127
    @classmethod
    def get(cls, id):
128
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
129

Markus Scheidgen's avatar
Markus Scheidgen committed
130
    @property
131
132
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
133

134
135
136
137
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
138
            self._upload.worker_hostname = self.worker_hostname
139
140
        return self._upload

141
142
143
144
145
146
147
148
149
150
151
152
    def apply_entry_metadata(self, entry_metadata: datamodel.EntryMetadata):
        self.metadata = entry_metadata.m_to_dict(
            include_defaults=True,
            categories=[datamodel.MongoMetadata])  # TODO use embedded doc?

    def create_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
        processing object, not necessarely the user metadata nor the metadata from
        the archive.
        '''
        entry_metadata = datamodel.EntryMetadata()
153
154
        if self.parser is not None:
            entry_metadata.domain = parser_dict[self.parser].domain
155
156
157
        entry_metadata.upload_id = self.upload_id
        entry_metadata.calc_id = self.calc_id
        entry_metadata.mainfile = self.mainfile
158
159
        entry_metadata.nomad_version = config.meta.version
        entry_metadata.nomad_commit = config.meta.commit
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
        entry_metadata.uploader = self.upload.user_id
        entry_metadata.upload_time = self.upload.upload_time
        entry_metadata.upload_name = self.upload.name

        return entry_metadata

    def entry_metadata(self, upload_files: UploadFiles) -> datamodel.EntryMetadata:
        '''
        Returns a complete set of :class:`nomad.datamodel.EntryMetadata` including
        the user metadata and metadata from the archive.

        Arguments:
            upload_files:
                The :class:`nomad.files.UploadFiles` instance to read the archive from.
            cache:
                A boolean that indicates if the archive file should be left unclosed,
                e.g. if this method is called for many entries of the same upload.
        '''
        archive = upload_files.read_archive(self.calc_id)
179
180
181
182
183
184
185
186
187
188
189
190
        try:
            entry_metadata = datamodel.EntryMetadata.m_from_dict(
                archive[self.calc_id][datamodel.EntryArchive.section_metadata.name].to_dict())

        except KeyError:
            # Due hard processing failures, it might be possible that an entry might not
            # have an archive
            if self._entry_metadata is not None:
                entry_metadata = self._entry_metadata

            else:
                entry_metadata = self.create_metadata()
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206

        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

    def user_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
        processing object and the user metadata, not necessarely the metadata from
        the archive.
        '''
        entry_metadata = self.create_metadata()
        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

207
208
209
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
210
211
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
212
213
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
214
    def get_logger(self, **kwargs):
215
        '''
216
217
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
218
        '''
219
220
221
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
222

223
224
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []
225

226
227
        def save_to_calc_log(logger, method_name, event_dict):
            try:
228
229
                # sanitize the event_dict, because all kinds of values might have been added
                dump_dict = {key: str(value) for key, value in event_dict.items()}
230
231
                dump_dict.update(level=method_name.upper())
                self._calc_proc_logs.append(dump_dict)
232

233
234
235
            except Exception:
                # Exceptions here will cause indefinite loop
                pass
236

237
            return event_dict
238

239
        return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
240

241
    @process
242
    def re_process_calc(self):
243
        '''
244
245
246
        Processes a calculation again. This means there is already metadata and
        instead of creating it initially, we are just updating the existing
        records.
247
        '''
248
        parser = match_parser(self.upload_files.raw_file_object(self.mainfile).os_path, strict=False)
249
        logger = self.get_logger()
250
251
252

        if parser is None and not config.reprocess_unmatched:
            self.errors = ['no parser matches during re-process, will not re-process this calc']
253

254
255
256
257
258
259
260
            try:
                upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
                with upload_files.read_archive(self.calc_id) as archive:
                    self.upload_files.write_archive(self.calc_id, archive[self.calc_id].to_dict())

            except Exception as e:
                logger.error('could not copy archive for non matching, non reprocessed entry', exc_info=e)
261
                raise e
262

263
264
265
266
267
            # mock the steps of actual processing
            self._continue_with('parsing')
            self._continue_with('normalizing')
            self._continue_with('archiving')
            self._complete()
268
269
            return

270
        if parser is None:
271
272
273
274
            self.get_logger().warn('no parser matches during re-process, use the old parser')
            self.warnings = ['no matching parser found during re-processing']

        elif self.parser != parser.name:
275
276
277
278
279
280
281
282
283
            if parser_dict[self.parser].name == parser.name:
                # parser was just renamed
                self.parser = parser.name

            else:
                self.parser = parser.name
                logger.info(
                    'different parser matches during re-process, use new parser',
                    parser=parser.name)
284

285
        try:
286
287
            self._entry_metadata = self.user_metadata()
            self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
288
289
            self._entry_metadata.nomad_version = config.meta.version
            self._entry_metadata.nomad_commit = config.meta.commit
290
291
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)
292
293
294
295
296
297
298

            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
            # close loghandler that was not closed due to failures
            try:
299
300
                if self._parser_backend and self._parser_backend.resource:
                    self._parser_backend.resource.unload()
301
            except Exception as e:
302
                logger.error('could unload processing results', exc_info=e)
303

304
305
306
307
308
309
    def _setup_fallback_metadata(self):
        self._entry_metadata = self.create_metadata()
        self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
        self._entry_metadata.last_processing = datetime.utcnow()
        self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)

310
    @process
311
    def process_calc(self):
312
        '''
313
314
        Processes a new calculation that has no prior records in the mongo, elastic,
        or filesystem storage. It will create an initial set of (user) metadata.
315
        '''
316
        logger = self.get_logger()
317
        if self.upload is None:
318
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
319
320

        try:
321
322
            # save preliminary minimum calc metadata in case processing fails
            # successful processing will replace it with the actual metadata
323
            self._setup_fallback_metadata()
324
325

            if len(self._entry_metadata.files) >= config.auxfile_cutoff:
326
327
328
329
                self.warning(
                    'This calc has many aux files in its directory. '
                    'Have you placed many calculations in the same directory?')

Markus Scheidgen's avatar
Markus Scheidgen committed
330
331
332
333
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
334
            # close loghandler that was not closed due to failures
335
            try:
336
337
                if self._parser_backend and self._parser_backend.resource:
                    self._parser_backend.resource.unload()
338
            except Exception as e:
339
                logger.error('could unload processing results', exc_info=e)
340

341
    def on_fail(self):
342
343
344
        # in case of failure, index a minimum set of metadata and mark
        # processing failure
        try:
345
346
347
            if self._entry_metadata is None:
                self._setup_fallback_metadata()

348
            self._entry_metadata.processed = False
349

350
            self.apply_entry_metadata(self._entry_metadata)
351
352
353
354
355
            if self._parser_backend and self._parser_backend.resource:
                backend = self._parser_backend
            else:
                backend = None
            self._entry_metadata.apply_domain_metadata(backend)
356

357
            self._entry_metadata.a_elastic.index()
358
        except Exception as e:
359
360
            self.get_logger().error(
                'could not index after processing failure', exc_info=e)
361

362
        try:
363
            self.write_archive(None)
364
        except Exception as e:
365
366
            self.get_logger().error(
                'could not write archive after processing failure', exc_info=e)
367

368
369
370
371
372
373
374
375
    def on_process_complete(self, process_name):
        # the save might be necessary to correctly read the join condition from the db
        self.save()
        # in case of error, the process_name might be unknown
        if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None:
            self.upload.reload()
            self.upload.check_join()

Markus Scheidgen's avatar
Markus Scheidgen committed
376
377
    @task
    def parsing(self):
378
        ''' The *task* that encapsulates all parsing related actions. '''
379
        context = dict(parser=self.parser, step=self.parser)
380
        logger = self.get_logger(**context)
381
        parser = parser_dict[self.parser]
382
        self._entry_metadata.parser_name = self.parser
Markus Scheidgen's avatar
Markus Scheidgen committed
383

384
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
385
386
387
            try:
                self._parser_backend = parser.run(
                    self.upload_files.raw_file_object(self.mainfile).os_path, logger=logger)
388
389
390

                if isinstance(self._parser_backend, MSection):
                    backend = Backend(parser._metainfo_env, parser.domain)
391
392
393
                    root_section = self._parser_backend.m_def.name
                    section_def = getattr(datamodel.EntryArchive, root_section)
                    backend.entry_archive.m_add_sub_section(section_def, self._parser_backend)
394
395
396
                    backend.resource.add(self._parser_backend)
                    self._parser_backend = backend

397
            except Exception as e:
398
                self.fail('parser failed with exception', exc_info=e, error=str(e), **context)
399
                return
400
            except SystemExit:
401
                self.fail('parser raised system exit', error='system exit', **context)
402
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
403

Markus Scheidgen's avatar
Markus Scheidgen committed
404
405
        if self._parser_backend.status[0] != 'ParseSuccess':
            error = self._parser_backend.status[1]
406
            self.fail('parser failed', error=error, **context)
407

408
409
410
411
412
413
414
415
416
417
    def process_phonon(self):
        """Function that is run for phonon calculation before cleanup.
        This task is run by the celery process that is calling the join for the
        upload.

        This function re-opens the Archive for this calculation to add method
        information from another referenced archive. Updates the method
        information in section_encyclopedia as well as the DFT domain metadata.
        """
        try:
418
            logger = self.get_logger(parser=self.parser, step=self.parser)
419
420
421
422
423
424

            # Open the archive of the phonon calculation.
            upload_files = StagingUploadFiles(self.upload_id, is_authorized=lambda: True)
            with upload_files.read_archive(self.calc_id) as archive:
                arch = query_archive(archive, {self.calc_id: self.calc_id})[self.calc_id]
                phonon_archive = EntryArchive.m_from_dict(arch)
Markus Scheidgen's avatar
Markus Scheidgen committed
425
            self._entry_metadata = phonon_archive.section_metadata
426
            self._calc_proc_logs = phonon_archive.processing_logs
427

Markus Scheidgen's avatar
Markus Scheidgen committed
428
429
430
431
432
            # Re-create a backend
            metainfo = phonopyparser.metainfo.m_env
            self._parser_backend = Backend(metainfo, logger=logger, domain="dft")
            self._parser_backend.entry_archive = phonon_archive

433
434
435
            # Read in the first referenced calculation. The reference is given as
            # an absolute path which needs to be converted into a path that is
            # relative to upload root.
Markus Scheidgen's avatar
Markus Scheidgen committed
436
            scc = self._parser_backend.entry_archive.section_run[0].section_single_configuration_calculation[0]
437
            relative_ref = scc.section_calculation_to_calculation_refs[0].calculation_to_calculation_external_url
438
439
440
441
442
443
            ref_id = upload_files.calc_id(relative_ref)
            with upload_files.read_archive(ref_id) as archive:
                arch = query_archive(archive, {ref_id: ref_id})[ref_id]
                ref_archive = EntryArchive.m_from_dict(arch)

            # Get encyclopedia method information directly from the referenced calculation.
444
            ref_enc_method = ref_archive.section_metadata.encyclopedia.method
445
            if ref_enc_method is None or len(ref_enc_method) == 0 or ref_enc_method.functional_type is None:
446
                raise ValueError("No method information available in referenced calculation.")
Markus Scheidgen's avatar
Markus Scheidgen committed
447
            self._parser_backend.entry_archive.section_metadata.encyclopedia.method = ref_enc_method
448
449
450
451
452
453
454
455

            # Overwrite old entry with new data. The metadata is updated with
            # new timestamp and method details taken from the referenced
            # archive.
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.dft.xc_functional = ref_archive.section_metadata.dft.xc_functional
            self._entry_metadata.dft.basis_set = ref_archive.section_metadata.dft.basis_set
            self._entry_metadata.dft.update_group_hash()
456
            self._entry_metadata.encyclopedia.status = EncyclopediaMetadata.status.type.success
457
        except Exception as e:
458
            logger.error("Could not retrieve method information for phonon calculation.", exc_info=e)
Markus Scheidgen's avatar
Markus Scheidgen committed
459
460
461
462
            if self._entry_metadata is None:
                self._setup_fallback_metadata()
                self._entry_metadata.processed = False

463
464
465
466
467
468
            try:
                if self._entry_metadata.encyclopedia is None:
                    self._entry_metadata.encyclopedia = EncyclopediaMetadata()
                self._entry_metadata.encyclopedia.status = EncyclopediaMetadata.status.type.failure
            except Exception as e:
                logger.error("Could set encyclopedia status.", exc_info=e)
469

470
        finally:
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
            # persist the calc metadata
            with utils.timer(logger, 'saved calc metadata', step='metadata'):
                self.apply_entry_metadata(self._entry_metadata)

            # index in search
            with utils.timer(logger, 'indexed', step='index'):
                self._entry_metadata.a_elastic.index()

            # persist the archive
            with utils.timer(
                    logger, 'archived', step='archive',
                    input_size=self.mainfile_file.size) as log_data:

                archive_size = self.write_archive(self._parser_backend)
                log_data.update(archive_size=archive_size)

487
488
489
490
491
492
493
    @contextmanager
    def use_parser_backend(self, processor_name):
        self._parser_backend.reset_status()
        yield self._parser_backend

        if self._parser_backend.status[0] == 'ParseSuccess':
            warnings = getattr(self._parser_backend, '_warnings', [])
494

495
            if len(warnings) > 0:
496
497
498
499
                self.get_logger().warn(
                    'processor completed successful with warnings',
                    processor=processor_name, warnings=[str(warning) for warning in warnings])

500
            else:
501
502
503
504
                self.get_logger().info(
                    'processor completed successful',
                    processor=processor_name)

505
        else:
506
            errors = self._parser_backend.status[1]
507
508
509
            self.get_logger().error(
                'processor completed with failure',
                processor=processor_name, errors=str(errors))
Markus Scheidgen's avatar
Markus Scheidgen committed
510
511
512

    @task
    def normalizing(self):
513
        ''' The *task* that encapsulates all normalizing related actions. '''
514
515
516
517
518

        # allow normalizer to access and add data to the entry metadata
        self._parser_backend.entry_archive.m_add_sub_section(
            datamodel.EntryArchive.section_metadata, self._entry_metadata)

Markus Scheidgen's avatar
Markus Scheidgen committed
519
        for normalizer in normalizers:
520
            if normalizer.domain != parser_dict[self.parser].domain:
521
522
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
523
            normalizer_name = normalizer.__name__
524
            context = dict(normalizer=normalizer_name, step=normalizer_name)
525
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
526
527

            with utils.timer(
528
                    logger, 'normalizer executed', input_size=self.mainfile_file.size):
529
                with self.use_parser_backend(normalizer_name) as backend:
530
531
532
                    try:
                        normalizer(backend).normalize(logger=logger)
                    except Exception as e:
533
                        self._parser_backend.finishedParsingSession('ParseFailure', [str(e)])
534
535
536
                        logger.error(
                            'normalizer failed with exception', exc_info=e,
                            error=str(e), **context)
537
538
539
540
541
542
543
544
545
                        break
                    else:
                        if self._parser_backend.status[0] != 'ParseSuccess':
                            error = self._parser_backend.status[1]
                            self.fail('normalizer failed', error=error, **context)
                            break
                        else:
                            logger.debug(
                                'completed normalizer successfully', normalizer=normalizer_name)
Markus Scheidgen's avatar
Markus Scheidgen committed
546
547
548

    @task
    def archiving(self):
549
        ''' The *task* that encapsulates all archival related actions. '''
550
551
        logger = self.get_logger()

552
553
        self._entry_metadata.apply_domain_metadata(self._parser_backend)
        self._entry_metadata.processed = True
554

555
556
        # persist the calc metadata
        with utils.timer(logger, 'saved calc metadata', step='metadata'):
557
            self.apply_entry_metadata(self._entry_metadata)
558
559

        # index in search
560
        with utils.timer(logger, 'indexed', step='index'):
561
            self._entry_metadata.a_elastic.index()
562

563
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
564
        with utils.timer(
565
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
566
                input_size=self.mainfile_file.size) as log_data:
567

568
            archive_size = self.write_archive(self._parser_backend)
569
            log_data.update(archive_size=archive_size)
Markus Scheidgen's avatar
Markus Scheidgen committed
570

571
    def write_archive(self, backend: Backend):
572
573
574
575
576
577
        def filter_processing_logs(logs):
            if len(logs) > 100:
                return [
                    log for log in logs
                    if log.get('level') != 'DEBUG']
            return logs
Markus Scheidgen's avatar
Markus Scheidgen committed
578

579
580
581
582
583
584
585
586
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []

        if backend is not None:
            entry_archive = backend.entry_archive.m_copy()
        else:
            entry_archive = datamodel.EntryArchive()

587
588
589
        if entry_archive.section_metadata is None:
            entry_archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)

590
        entry_archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
591
592
593
594
595
596
597
598
599
600

        try:
            return self.upload_files.write_archive(self.calc_id, entry_archive.m_to_dict())
        except Exception as e:
            if backend is None:
                raise e

            # most likely failed due to domain data, try to write metadata and processing logs
            entry_archive = datamodel.EntryArchive()
            entry_archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
601
            entry_archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
602
603
            self.upload_files.write_archive(self.calc_id, entry_archive.m_to_dict())
            raise e
Markus Scheidgen's avatar
Markus Scheidgen committed
604

605
    def __str__(self):
606
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
607

608

609
class Upload(Proc):
610
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
611
612
613
614
615
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
616
617
        upload_path: the path were the uploaded files was stored
        temporary: True if the uploaded file should be removed after extraction
Markus Scheidgen's avatar
Markus Scheidgen committed
618
619
        upload_id: the upload id generated by the database
        upload_time: the timestamp when the system realised the upload
620
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
621
622
        published: Boolean that indicates the publish status
        publish_time: Date when the upload was initially published
623
        last_update: Date of the last publishing/re-processing
624
        joined: Boolean indicates if the running processing has joined (:func:`check_join`)
625
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
626
627
628
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
629
630
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
631
    embargo_length = IntField(default=36)
Markus Scheidgen's avatar
Markus Scheidgen committed
632
633
634

    name = StringField(default=None)
    upload_time = DateTimeField()
635
    user_id = StringField(required=True)
636
637
    published = BooleanField(default=False)
    publish_time = DateTimeField()
638
    last_update = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
639

640
641
    joined = BooleanField(default=False)

Markus Scheidgen's avatar
Markus Scheidgen committed
642
643
    meta: Any = {
        'indexes': [
644
            'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
645
        ]
Markus Scheidgen's avatar
Markus Scheidgen committed
646
647
648
649
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
650
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
651

652
653
    @property
    def metadata(self) -> dict:
654
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
655
656
657
658
659
        Getter, setter for user metadata. Metadata is pickled to and from the public
        bucket to allow sharing among all processes. Usually uploads do not have (much)
        user defined metadata, but users provide all metadata per upload as part of
        the publish process. This will change, when we introduce editing functionality
        and metadata will be provided through different means.
660
        '''
661
662
663
664
665
        try:
            upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
        except KeyError:
            return None
        return upload_files.user_metadata
666
667
668

    @metadata.setter
    def metadata(self, data: dict) -> None:
669
670
        upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True, create=True)
        upload_files.user_metadata = data
671

Markus Scheidgen's avatar
Markus Scheidgen committed
672
    @classmethod
673
    def get(cls, id: str, include_published: bool = True) -> 'Upload':
674
        return cls.get_by_id(id, 'upload_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
675
676

    @classmethod
677
    def user_uploads(cls, user: datamodel.User, **kwargs) -> List['Upload']:
678
        ''' Returns all uploads for the given user. Kwargs are passed to mongo query. '''
679
        return cls.objects(user_id=str(user.user_id), **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
680

681
682
    @property
    def uploader(self):
683
        return datamodel.User.get(self.user_id)
684

Markus Scheidgen's avatar
Markus Scheidgen committed
685
686
    def get_logger(self, **kwargs):
        logger = super().get_logger()
687
688
        user = self.uploader
        user_name = '%s %s' % (user.first_name, user.last_name)
689
        # We are not using 'user_id' because logstash (?) will filter these entries ?!
690
691
        logger = logger.bind(
            upload_id=self.upload_id, upload_name=self.name, user_name=user_name,
692
            user=user.user_id, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
693
694
695
696
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
697
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
698
699
700
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
701
702

        Arguments:
703
            user: The user that created the upload.
704
        '''
705
706
        # use kwargs to keep compatibility with super method
        user: datamodel.User = kwargs['user']
707
        del(kwargs['user'])
708

709
710
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
711
        kwargs.update(user_id=user.user_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
712
        self = super().create(**kwargs)
713

Markus Scheidgen's avatar
Markus Scheidgen committed
714
        self._continue_with('uploading')
715

Markus Scheidgen's avatar
Markus Scheidgen committed
716
717
        return self

718
    def delete(self):
719
        ''' Deletes this upload process state entry and its calcs. '''
720
721
722
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

723
    def delete_upload_local(self):
724
        '''
725
        Deletes the upload, including its processing state and
726
        staging files. Local version without celery processing.
727
        '''
728
729
        logger = self.get_logger()

730
        with utils.lnr(logger, 'upload delete failed'):
731
            with utils.timer(
732
                    logger, 'upload deleted from index', step='index',
733
                    upload_size=self.upload_files.size):
734
                search.delete_upload(self.upload_id)
735

736
            with utils.timer(
737
                    logger, 'upload deleted', step='files',
738
739
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
740
741

            self.delete()
742

743
    @process
744
    def delete_upload(self):
745
        '''
746
747
        Deletes of the upload, including its processing state and
        staging files. This starts the celery process of deleting the upload.
748
        '''
749
        self.delete_upload_local()
750

751
        return True  # do not save the process status on the delete upload
752

753
    @process
754
    def publish_upload(self):
755
        '''
756
757
        Moves the upload out of staging to the public area. It will
        pack the staging upload files in to public upload files.
758
        '''
759
760
        assert self.processed_calcs > 0

761
        logger = self.get_logger()
762
        logger.info('started to publish')
763

764
        with utils.lnr(logger, 'publish failed'):
765
            with self.entries_metadata(self.metadata) as calcs:
766

767
                with utils.timer(
768
                        logger, 'upload metadata updated', step='metadata',
769
                        upload_size=self.upload_files.size):
770

771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
                    def create_update(calc):
                        calc.published = True
                        calc.with_embargo = calc.with_embargo if calc.with_embargo is not None else False
                        return UpdateOne(
                            {'_id': calc.calc_id},
                            {'$set': {'metadata': calc.m_to_dict(
                                include_defaults=True, categories=[datamodel.MongoMetadata])}})

                    Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload files packed', step='pack',
                            upload_size=self.upload_files.size):
                        self.upload_files.pack(calcs)
786

787
                with utils.timer(
788
                        logger, 'index updated', step='index',
789
                        upload_size=self.upload_files.size):
790
791
792
793
794
795
796
797
798
799
800
801
                    search.publish(calcs)

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload deleted', step='delete staged',
                            upload_size=self.upload_files.size):
                        self.upload_files.delete()
                        self.published = True
                        self.publish_time = datetime.utcnow()
                        self.last_update = datetime.utcnow()
                        self.save()
                else:
802
                    self.last_update = datetime.utcnow()
803
                    self.save()
804

805
806
    @process
    def re_process_upload(self):
807
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
808
809
810
        A *process* that performs the re-processing of a earlier processed
        upload.

811
812
813
814
815
816
        Runs the distributed process of fully reparsing/renormalizing an existing and
        already published upload. Will renew the archive part of the upload and update
        mongo and elastic search entries.

        TODO this implementation does not do any re-matching. This will be more complex
        due to handling of new or missing matches.
817
        '''
818
819
820
        logger = self.get_logger()
        logger.info('started to re-process')

821
822
823
824
825
        # mock the steps of actual processing
        self._continue_with('uploading')

        # extract the published raw files into a staging upload files instance
        self._continue_with('extracting')
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840

        if self.published:
            try:
                staging_upload_files = StagingUploadFiles(self.upload_id)
                # public files exist and there is a staging directory, it is probably old
                # and we delete it first
                staging_upload_files.delete()
                logger.warn('deleted old staging files')

            except KeyError as e:
                logger.info('reprocessing published files')
        else:
            logger.info('reprocessing staging files')

        staging_upload_files = self.upload_files.to_staging_upload_files(create=True)
841
842

        self._continue_with('parse_all')
843
        try:
844
            # check if a calc is already/still processing
845
846
847
848
849
850
851
852
            processing = Calc.objects(
                upload_id=self.upload_id,
                **Calc.process_running_mongoengine_query()).count()

            if processing > 0:
                logger.warn(
                    'processes are still/already running on calc, they will be resetted',
                    count=processing)
853

854
            # reset all calcs
855
            Calc._get_collection().update_many(
856
857
858
                dict(upload_id=self.upload_id),
                {'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)})

859
860
            # process call calcs
            Calc.process_all(Calc.re_process_calc, dict(upload_id=self.upload_id), exclude=['metadata'])
861
862

            logger.info('completed to trigger re-process of all calcs')
863
864
        except Exception as e:
            # try to remove the staging copy in failure case
865
866
            logger.error('failed to trigger re-process of all calcs', exc_info=e)

867
868
869
            if self.published:
                if staging_upload_files is not None and staging_upload_files.exists():
                    staging_upload_files.delete()
870
871

            raise e
872
873
874
875

        # the packing and removing of the staging upload files, will be trigged by
        # the 'cleanup' task after processing all calcs

876
877
    @process
    def re_pack(self):
878
        ''' A *process* that repacks the raw and archive data based on the current embargo data. '''
879
880
881
882
883
884
885
886
        assert self.published

        # mock the steps of actual processing
        self._continue_with('uploading')
        self._continue_with('extracting')
        self._continue_with('parse_all')
        self._continue_with('cleanup')

887
        self.upload_files.re_pack(self.user_metadata())
888
        self.joined = True
889
890
        self._complete()

Markus Scheidgen's avatar
Markus Scheidgen committed
891
    @process
892
    def process_upload(self):
893
        ''' A *process* that performs the initial upload processing. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
894
895
896
897
898
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
899
        ''' A no-op *task* as a stand-in for receiving upload data. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
900
901
        pass

902
    @property
903
904
    def upload_files(self) -> UploadFiles:
        upload_files_class = ArchiveBasedStagingUploadFiles if not self.published else PublicUploadFiles
905
        kwargs = dict(upload_path=self.upload_path) if not self.published else {}
906
907
908

        if not self._upload_files or not isinstance(self._upload_files, upload_files_class):
            self._upload_files = upload_files_class(
909
                self.upload_id, is_authorized=lambda: True, **kwargs)
910

911
        return self._upload_files
912

913
914
915
916
917
    @property
    def staging_upload_files(self) -> ArchiveBasedStagingUploadFiles:
        assert not self.published
        return cast(ArchiveBasedStagingUploadFiles, self.upload_files)

Markus Scheidgen's avatar
Markus Scheidgen committed
918
919
    @task
    def extracting(self):
920
        '''
921
922
        The *task* performed before the actual parsing/normalizing: extracting
        the uploaded files.
923
        '''
924
925
926
927
928
        # extract the uploaded file
        self._upload_files = ArchiveBasedStagingUploadFiles(
            upload_id=self.upload_id, is_authorized=lambda: True, create=True,
            upload_path=self.upload_path)

Markus Scheidgen's avatar
Markus Scheidgen committed
929
930
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
931
932
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
933
934
                    upload_size=self.upload_files.size):
                self.upload_files.extract()
935
936
937
938

            if self.temporary:
                os.remove(self.upload_path)
                self.upload_path = None
939