data.py 50.2 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
16
17
18
19
20
21
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
Markus Scheidgen's avatar
Markus Scheidgen committed
22

Markus Scheidgen's avatar
Markus Scheidgen committed
23
.. autoclass:: Upload
Markus Scheidgen's avatar
Markus Scheidgen committed
24

25
'''
26
27

from typing import cast, List, Any, Tuple, Iterator, Dict, cast, Iterable
28
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField
Markus Scheidgen's avatar
Markus Scheidgen committed
29
import logging
30
from structlog import wrap_logger
31
from contextlib import contextmanager
32
import os.path
33
34
from datetime import datetime
from pymongo import UpdateOne
35
import hashlib
36
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
Markus Scheidgen's avatar
Markus Scheidgen committed
37

38
from nomad import utils, config, infrastructure, search, datamodel
39
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles, StagingUploadFiles
40
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
41
from nomad.parsing import parser_dict, match_parser, Backend
Markus Scheidgen's avatar
Markus Scheidgen committed
42
from nomad.normalizing import normalizers
43
44
from nomad.datamodel import EntryArchive
from nomad.archive import query_archive
45
46
47
from nomad.datamodel.encyclopedia import (
    EncyclopediaMetadata,
)
48
import phonopyparser
Markus Scheidgen's avatar
Markus Scheidgen committed
49
50


51
52
53
54
55
56
57
def _pack_log_event(logger, method_name, event_dict):
    try:
        log_data = dict(event_dict)
        log_data.update(**{
            key: value
            for key, value in getattr(logger, '_context', {}).items()
            if key not in ['service', 'release', 'upload_id', 'calc_id', 'mainfile', 'process_status']})
58
        log_data.update(logger=logger.name)
59
60
61
62
63
64
65
66
67
68
69

        return log_data
    except Exception:
        # raising an exception would cause an indefinite loop
        return event_dict


_log_processors = [
    StackInfoRenderer(),
    _pack_log_event,
    format_exc_info,
70
    TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)]
71
72


Markus Scheidgen's avatar
Markus Scheidgen committed
73
class Calc(Proc):
74
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
75
76
77
78
79
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

80
81
    The attribute list, does not include the various metadata properties generated
    while parsing, including ``code_name``, ``code_version``, etc.
Markus Scheidgen's avatar
Markus Scheidgen committed
82
83

    Attributes:
84
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
85
86
87
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
88

89
90
        metadata: the metadata record wit calc and user metadata, see :class:`datamodel.EntryMetadata`
    '''
91
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
92
93
94
95
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

96
97
    metadata = DictField()

Markus Scheidgen's avatar
Markus Scheidgen committed
98
    meta: Any = {
Markus Scheidgen's avatar
Markus Scheidgen committed
99
        'indexes': [
100
            'upload_id',
101
            'parser',
102
103
104
            ('upload_id', 'mainfile'),
            ('upload_id', 'parser'),
            ('upload_id', 'tasks_status'),
105
            ('upload_id', 'process_status'),
106
            ('upload_id', 'metadata.nomad_version'),
107
108
            'metadata.processed',
            'metadata.last_processing',
109
            'metadata.published',
110
            'metadata.datasets',
Markus Scheidgen's avatar
Markus Scheidgen committed
111
            'metadata.pid'
Markus Scheidgen's avatar
Markus Scheidgen committed
112
113
114
115
116
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
117
        self._parser_backend: Backend = None
118
119
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
120
        self._calc_proc_logs: List[Any] = None
121

122
        self._entry_metadata = None
123

Markus Scheidgen's avatar
Markus Scheidgen committed
124
125
    @classmethod
    def get(cls, id):
126
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
127

Markus Scheidgen's avatar
Markus Scheidgen committed
128
    @property
129
130
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
131

132
133
134
135
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
136
            self._upload.worker_hostname = self.worker_hostname
137
138
        return self._upload

139
140
141
142
143
144
145
146
147
148
149
150
    def apply_entry_metadata(self, entry_metadata: datamodel.EntryMetadata):
        self.metadata = entry_metadata.m_to_dict(
            include_defaults=True,
            categories=[datamodel.MongoMetadata])  # TODO use embedded doc?

    def create_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
        processing object, not necessarely the user metadata nor the metadata from
        the archive.
        '''
        entry_metadata = datamodel.EntryMetadata()
151
152
        if self.parser is not None:
            entry_metadata.domain = parser_dict[self.parser].domain
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
        entry_metadata.upload_id = self.upload_id
        entry_metadata.calc_id = self.calc_id
        entry_metadata.mainfile = self.mainfile
        entry_metadata.nomad_version = config.version
        entry_metadata.nomad_commit = config.commit
        entry_metadata.uploader = self.upload.user_id
        entry_metadata.upload_time = self.upload.upload_time
        entry_metadata.upload_name = self.upload.name

        return entry_metadata

    def entry_metadata(self, upload_files: UploadFiles) -> datamodel.EntryMetadata:
        '''
        Returns a complete set of :class:`nomad.datamodel.EntryMetadata` including
        the user metadata and metadata from the archive.

        Arguments:
            upload_files:
                The :class:`nomad.files.UploadFiles` instance to read the archive from.
            cache:
                A boolean that indicates if the archive file should be left unclosed,
                e.g. if this method is called for many entries of the same upload.
        '''
        archive = upload_files.read_archive(self.calc_id)
177
178
179
180
181
182
183
184
185
186
187
188
        try:
            entry_metadata = datamodel.EntryMetadata.m_from_dict(
                archive[self.calc_id][datamodel.EntryArchive.section_metadata.name].to_dict())

        except KeyError:
            # Due hard processing failures, it might be possible that an entry might not
            # have an archive
            if self._entry_metadata is not None:
                entry_metadata = self._entry_metadata

            else:
                entry_metadata = self.create_metadata()
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204

        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

    def user_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
        processing object and the user metadata, not necessarely the metadata from
        the archive.
        '''
        entry_metadata = self.create_metadata()
        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

205
206
207
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
208
209
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
210
211
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
212
    def get_logger(self, **kwargs):
213
        '''
214
215
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
216
        '''
217
218
219
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
220

221
222
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []
223

224
225
        def save_to_calc_log(logger, method_name, event_dict):
            try:
226
227
                # sanitize the event_dict, because all kinds of values might have been added
                dump_dict = {key: str(value) for key, value in event_dict.items()}
228
229
                dump_dict.update(level=method_name.upper())
                self._calc_proc_logs.append(dump_dict)
230

231
232
233
            except Exception:
                # Exceptions here will cause indefinite loop
                pass
234

235
            return event_dict
236

237
        return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
238

239
    @process
240
    def re_process_calc(self):
241
        '''
242
243
244
        Processes a calculation again. This means there is already metadata and
        instead of creating it initially, we are just updating the existing
        records.
245
        '''
246
        parser = match_parser(self.upload_files.raw_file_object(self.mainfile).os_path, strict=False)
247
        logger = self.get_logger()
248
249
250

        if parser is None and not config.reprocess_unmatched:
            self.errors = ['no parser matches during re-process, will not re-process this calc']
251

252
253
254
255
256
257
258
            try:
                upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
                with upload_files.read_archive(self.calc_id) as archive:
                    self.upload_files.write_archive(self.calc_id, archive[self.calc_id].to_dict())

            except Exception as e:
                logger.error('could not copy archive for non matching, non reprocessed entry', exc_info=e)
259
                raise e
260

261
262
263
264
265
            # mock the steps of actual processing
            self._continue_with('parsing')
            self._continue_with('normalizing')
            self._continue_with('archiving')
            self._complete()
266
267
            return

268
        if parser is None:
269
270
271
272
            self.get_logger().warn('no parser matches during re-process, use the old parser')
            self.warnings = ['no matching parser found during re-processing']

        elif self.parser != parser.name:
273
274
275
276
277
278
279
280
281
            if parser_dict[self.parser].name == parser.name:
                # parser was just renamed
                self.parser = parser.name

            else:
                self.parser = parser.name
                logger.info(
                    'different parser matches during re-process, use new parser',
                    parser=parser.name)
282

283
        try:
284
285
286
287
288
289
            self._entry_metadata = self.user_metadata()
            self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
            self._entry_metadata.nomad_version = config.version
            self._entry_metadata.nomad_commit = config.commit
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)
290
291
292
293
294
295
296

            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
            # close loghandler that was not closed due to failures
            try:
297
298
                if self._parser_backend and self._parser_backend.resource:
                    self._parser_backend.resource.unload()
299
            except Exception as e:
300
                logger.error('could unload processing results', exc_info=e)
301

302
303
304
305
306
307
    def _setup_fallback_metadata(self):
        self._entry_metadata = self.create_metadata()
        self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
        self._entry_metadata.last_processing = datetime.utcnow()
        self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)

308
    @process
309
    def process_calc(self):
310
        '''
311
312
        Processes a new calculation that has no prior records in the mongo, elastic,
        or filesystem storage. It will create an initial set of (user) metadata.
313
        '''
314
        logger = self.get_logger()
315
        if self.upload is None:
316
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
317
318

        try:
319
320
            # save preliminary minimum calc metadata in case processing fails
            # successful processing will replace it with the actual metadata
321
            self._setup_fallback_metadata()
322
323

            if len(self._entry_metadata.files) >= config.auxfile_cutoff:
324
325
326
327
                self.warning(
                    'This calc has many aux files in its directory. '
                    'Have you placed many calculations in the same directory?')

Markus Scheidgen's avatar
Markus Scheidgen committed
328
329
330
331
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
332
            # close loghandler that was not closed due to failures
333
            try:
334
335
                if self._parser_backend and self._parser_backend.resource:
                    self._parser_backend.resource.unload()
336
            except Exception as e:
337
                logger.error('could unload processing results', exc_info=e)
338

339
    def on_fail(self):
340
341
342
        # in case of failure, index a minimum set of metadata and mark
        # processing failure
        try:
343
344
345
            if self._entry_metadata is None:
                self._setup_fallback_metadata()

346
            self._entry_metadata.processed = False
347

348
            self.apply_entry_metadata(self._entry_metadata)
349
350
351
352
353
            if self._parser_backend and self._parser_backend.resource:
                backend = self._parser_backend
            else:
                backend = None
            self._entry_metadata.apply_domain_metadata(backend)
354

355
            self._entry_metadata.a_elastic.index()
356
        except Exception as e:
357
358
            self.get_logger().error(
                'could not index after processing failure', exc_info=e)
359

360
        try:
361
            self.write_archive(None)
362
        except Exception as e:
363
364
            self.get_logger().error(
                'could not write archive after processing failure', exc_info=e)
365

366
367
368
369
370
371
372
373
    def on_process_complete(self, process_name):
        # the save might be necessary to correctly read the join condition from the db
        self.save()
        # in case of error, the process_name might be unknown
        if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None:
            self.upload.reload()
            self.upload.check_join()

Markus Scheidgen's avatar
Markus Scheidgen committed
374
375
    @task
    def parsing(self):
376
        ''' The *task* that encapsulates all parsing related actions. '''
377
        context = dict(parser=self.parser, step=self.parser)
378
        logger = self.get_logger(**context)
379
        parser = parser_dict[self.parser]
380
        self._entry_metadata.parser_name = self.parser
Markus Scheidgen's avatar
Markus Scheidgen committed
381

382
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
383
384
385
386
            try:
                self._parser_backend = parser.run(
                    self.upload_files.raw_file_object(self.mainfile).os_path, logger=logger)
            except Exception as e:
387
                self.fail('parser failed with exception', exc_info=e, error=str(e), **context)
388
                return
389
            except SystemExit:
390
                self.fail('parser raised system exit', error='system exit', **context)
391
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
392

Markus Scheidgen's avatar
Markus Scheidgen committed
393
394
        if self._parser_backend.status[0] != 'ParseSuccess':
            error = self._parser_backend.status[1]
395
            self.fail('parser failed', error=error, **context)
396

397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
    def process_phonon(self):
        """Function that is run for phonon calculation before cleanup.
        This task is run by the celery process that is calling the join for the
        upload.

        This function re-opens the Archive for this calculation to add method
        information from another referenced archive. Updates the method
        information in section_encyclopedia as well as the DFT domain metadata.
        """
        try:
            # Re-create a backend
            context = dict(parser=self.parser, step=self.parser)
            logger = self.get_logger(**context)
            metainfo = phonopyparser.metainfo.m_env
            backend = Backend(metainfo, logger=logger, domain="dft")

            # Open the archive of the phonon calculation.
            upload_files = StagingUploadFiles(self.upload_id, is_authorized=lambda: True)
            with upload_files.read_archive(self.calc_id) as archive:
                arch = query_archive(archive, {self.calc_id: self.calc_id})[self.calc_id]
                phonon_archive = EntryArchive.m_from_dict(arch)
418
419
420
421
422
423

            # Save Archive contents, metadata and logs from the old entry
            backend.entry_archive = phonon_archive
            self._parser_backend = backend
            self._entry_metadata = backend.entry_archive.section_metadata
            self._calc_proc_logs = phonon_archive.processing_logs
424
425
426
427
428

            # Read in the first referenced calculation. The reference is given as
            # an absolute path which needs to be converted into a path that is
            # relative to upload root.
            scc = backend.entry_archive.section_run[0].section_single_configuration_calculation[0]
429
            relative_ref = scc.section_calculation_to_calculation_refs[0].calculation_to_calculation_external_url
430
431
432
433
434
435
            ref_id = upload_files.calc_id(relative_ref)
            with upload_files.read_archive(ref_id) as archive:
                arch = query_archive(archive, {ref_id: ref_id})[ref_id]
                ref_archive = EntryArchive.m_from_dict(arch)

            # Get encyclopedia method information directly from the referenced calculation.
436
437
            ref_enc_method = ref_archive.section_metadata.encyclopedia.method
            backend.entry_archive.section_metadata.encyclopedia.method = ref_enc_method
438
439
440
441
442
443
444
445

            # Overwrite old entry with new data. The metadata is updated with
            # new timestamp and method details taken from the referenced
            # archive.
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.dft.xc_functional = ref_archive.section_metadata.dft.xc_functional
            self._entry_metadata.dft.basis_set = ref_archive.section_metadata.dft.basis_set
            self._entry_metadata.dft.update_group_hash()
446
447
448
449
        except Exception as e:
            logger.error("Could not retrieve method information for phonon calculation.", exception=e)
            self._entry_metadata.encyclopedia.status = EncyclopediaMetadata.status.type.failure
        finally:
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
            # persist the calc metadata
            with utils.timer(logger, 'saved calc metadata', step='metadata'):
                self.apply_entry_metadata(self._entry_metadata)

            # index in search
            with utils.timer(logger, 'indexed', step='index'):
                self._entry_metadata.a_elastic.index()

            # persist the archive
            with utils.timer(
                    logger, 'archived', step='archive',
                    input_size=self.mainfile_file.size) as log_data:

                archive_size = self.write_archive(self._parser_backend)
                log_data.update(archive_size=archive_size)

466
467
468
469
470
471
472
    @contextmanager
    def use_parser_backend(self, processor_name):
        self._parser_backend.reset_status()
        yield self._parser_backend

        if self._parser_backend.status[0] == 'ParseSuccess':
            warnings = getattr(self._parser_backend, '_warnings', [])
473

474
            if len(warnings) > 0:
475
476
477
478
                self.get_logger().warn(
                    'processor completed successful with warnings',
                    processor=processor_name, warnings=[str(warning) for warning in warnings])

479
            else:
480
481
482
483
                self.get_logger().info(
                    'processor completed successful',
                    processor=processor_name)

484
        else:
485
            errors = self._parser_backend.status[1]
486
487
488
            self.get_logger().error(
                'processor completed with failure',
                processor=processor_name, errors=str(errors))
Markus Scheidgen's avatar
Markus Scheidgen committed
489
490
491

    @task
    def normalizing(self):
492
        ''' The *task* that encapsulates all normalizing related actions. '''
493
494
495
496
497

        # allow normalizer to access and add data to the entry metadata
        self._parser_backend.entry_archive.m_add_sub_section(
            datamodel.EntryArchive.section_metadata, self._entry_metadata)

Markus Scheidgen's avatar
Markus Scheidgen committed
498
        for normalizer in normalizers:
499
            if normalizer.domain != parser_dict[self.parser].domain:
500
501
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
502
            normalizer_name = normalizer.__name__
503
            context = dict(normalizer=normalizer_name, step=normalizer_name)
504
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
505
506

            with utils.timer(
507
                    logger, 'normalizer executed', input_size=self.mainfile_file.size):
508
                with self.use_parser_backend(normalizer_name) as backend:
509
510
511
                    try:
                        normalizer(backend).normalize(logger=logger)
                    except Exception as e:
512
                        self._parser_backend.finishedParsingSession('ParseFailure', [str(e)])
513
514
515
                        logger.error(
                            'normalizer failed with exception', exc_info=e,
                            error=str(e), **context)
516
517
518
519
520
521
522
523
524
                        break
                    else:
                        if self._parser_backend.status[0] != 'ParseSuccess':
                            error = self._parser_backend.status[1]
                            self.fail('normalizer failed', error=error, **context)
                            break
                        else:
                            logger.debug(
                                'completed normalizer successfully', normalizer=normalizer_name)
Markus Scheidgen's avatar
Markus Scheidgen committed
525
526
527

    @task
    def archiving(self):
528
        ''' The *task* that encapsulates all archival related actions. '''
529
530
        logger = self.get_logger()

531
532
        self._entry_metadata.apply_domain_metadata(self._parser_backend)
        self._entry_metadata.processed = True
533

534
535
        # persist the calc metadata
        with utils.timer(logger, 'saved calc metadata', step='metadata'):
536
            self.apply_entry_metadata(self._entry_metadata)
537
538

        # index in search
539
        with utils.timer(logger, 'indexed', step='index'):
540
            self._entry_metadata.a_elastic.index()
541

542
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
543
        with utils.timer(
544
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
545
                input_size=self.mainfile_file.size) as log_data:
546

547
            archive_size = self.write_archive(self._parser_backend)
548
            log_data.update(archive_size=archive_size)
Markus Scheidgen's avatar
Markus Scheidgen committed
549

550
    def write_archive(self, backend: Backend):
551
552
553
554
555
556
        def filter_processing_logs(logs):
            if len(logs) > 100:
                return [
                    log for log in logs
                    if log.get('level') != 'DEBUG']
            return logs
Markus Scheidgen's avatar
Markus Scheidgen committed
557

558
559
560
561
562
563
564
565
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []

        if backend is not None:
            entry_archive = backend.entry_archive.m_copy()
        else:
            entry_archive = datamodel.EntryArchive()

566
567
568
        if entry_archive.section_metadata is None:
            entry_archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)

569
        entry_archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
570
571
572
573
574
575
576
577
578
579

        try:
            return self.upload_files.write_archive(self.calc_id, entry_archive.m_to_dict())
        except Exception as e:
            if backend is None:
                raise e

            # most likely failed due to domain data, try to write metadata and processing logs
            entry_archive = datamodel.EntryArchive()
            entry_archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
580
            entry_archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
581
582
            self.upload_files.write_archive(self.calc_id, entry_archive.m_to_dict())
            raise e
Markus Scheidgen's avatar
Markus Scheidgen committed
583

584
    def __str__(self):
585
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
586

587

588
class Upload(Proc):
589
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
590
591
592
593
594
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
595
596
        upload_path: the path were the uploaded files was stored
        temporary: True if the uploaded file should be removed after extraction
Markus Scheidgen's avatar
Markus Scheidgen committed
597
598
        upload_id: the upload id generated by the database
        upload_time: the timestamp when the system realised the upload
599
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
600
601
        published: Boolean that indicates the publish status
        publish_time: Date when the upload was initially published
602
        last_update: Date of the last publishing/re-processing
603
        joined: Boolean indicates if the running processing has joined (:func:`check_join`)
604
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
605
606
607
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
608
609
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
610
    embargo_length = IntField(default=36)
Markus Scheidgen's avatar
Markus Scheidgen committed
611
612
613

    name = StringField(default=None)
    upload_time = DateTimeField()
614
    user_id = StringField(required=True)
615
616
    published = BooleanField(default=False)
    publish_time = DateTimeField()
617
    last_update = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
618

619
620
    joined = BooleanField(default=False)

Markus Scheidgen's avatar
Markus Scheidgen committed
621
622
    meta: Any = {
        'indexes': [
623
            'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
624
        ]
Markus Scheidgen's avatar
Markus Scheidgen committed
625
626
627
628
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
629
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
630

631
632
    @property
    def metadata(self) -> dict:
633
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
634
635
636
637
638
        Getter, setter for user metadata. Metadata is pickled to and from the public
        bucket to allow sharing among all processes. Usually uploads do not have (much)
        user defined metadata, but users provide all metadata per upload as part of
        the publish process. This will change, when we introduce editing functionality
        and metadata will be provided through different means.
639
        '''
640
641
642
643
644
        try:
            upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
        except KeyError:
            return None
        return upload_files.user_metadata
645
646
647

    @metadata.setter
    def metadata(self, data: dict) -> None:
648
649
        upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True, create=True)
        upload_files.user_metadata = data
650

Markus Scheidgen's avatar
Markus Scheidgen committed
651
    @classmethod
652
    def get(cls, id: str, include_published: bool = True) -> 'Upload':
653
        return cls.get_by_id(id, 'upload_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
654
655

    @classmethod
656
    def user_uploads(cls, user: datamodel.User, **kwargs) -> List['Upload']:
657
        ''' Returns all uploads for the given user. Kwargs are passed to mongo query. '''
658
        return cls.objects(user_id=str(user.user_id), **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
659

660
661
    @property
    def uploader(self):
662
        return datamodel.User.get(self.user_id)
663

Markus Scheidgen's avatar
Markus Scheidgen committed
664
665
    def get_logger(self, **kwargs):
        logger = super().get_logger()
666
667
        user = self.uploader
        user_name = '%s %s' % (user.first_name, user.last_name)
668
        # We are not using 'user_id' because logstash (?) will filter these entries ?!
669
670
        logger = logger.bind(
            upload_id=self.upload_id, upload_name=self.name, user_name=user_name,
671
            user=user.user_id, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
672
673
674
675
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
676
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
677
678
679
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
680
681

        Arguments:
682
            user: The user that created the upload.
683
        '''
684
685
        # use kwargs to keep compatibility with super method
        user: datamodel.User = kwargs['user']
686
        del(kwargs['user'])
687

688
689
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
690
        kwargs.update(user_id=user.user_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
691
        self = super().create(**kwargs)
692

Markus Scheidgen's avatar
Markus Scheidgen committed
693
        self._continue_with('uploading')
694

Markus Scheidgen's avatar
Markus Scheidgen committed
695
696
        return self

697
    def delete(self):
698
        ''' Deletes this upload process state entry and its calcs. '''
699
700
701
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

702
    def delete_upload_local(self):
703
        '''
704
        Deletes the upload, including its processing state and
705
        staging files. Local version without celery processing.
706
        '''
707
708
        logger = self.get_logger()

709
        with utils.lnr(logger, 'upload delete failed'):
710
            with utils.timer(
711
                    logger, 'upload deleted from index', step='index',
712
                    upload_size=self.upload_files.size):
713
                search.delete_upload(self.upload_id)
714

715
            with utils.timer(
716
                    logger, 'upload deleted', step='files',
717
718
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
719
720

            self.delete()
721

722
    @process
723
    def delete_upload(self):
724
        '''
725
726
        Deletes of the upload, including its processing state and
        staging files. This starts the celery process of deleting the upload.
727
        '''
728
        self.delete_upload_local()
729

730
        return True  # do not save the process status on the delete upload
731

732
    @process
733
    def publish_upload(self):
734
        '''
735
736
        Moves the upload out of staging to the public area. It will
        pack the staging upload files in to public upload files.
737
        '''
738
739
        assert self.processed_calcs > 0

740
        logger = self.get_logger()
741
        logger.info('started to publish')
742

743
        with utils.lnr(logger, 'publish failed'):
744
            with self.entries_metadata(self.metadata) as calcs:
745

746
                with utils.timer(
747
                        logger, 'upload metadata updated', step='metadata',
748
                        upload_size=self.upload_files.size):
749

750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
                    def create_update(calc):
                        calc.published = True
                        calc.with_embargo = calc.with_embargo if calc.with_embargo is not None else False
                        return UpdateOne(
                            {'_id': calc.calc_id},
                            {'$set': {'metadata': calc.m_to_dict(
                                include_defaults=True, categories=[datamodel.MongoMetadata])}})

                    Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload files packed', step='pack',
                            upload_size=self.upload_files.size):
                        self.upload_files.pack(calcs)
765

766
                with utils.timer(
767
                        logger, 'index updated', step='index',
768
                        upload_size=self.upload_files.size):
769
770
771
772
773
774
775
776
777
778
779
780
                    search.publish(calcs)

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload deleted', step='delete staged',
                            upload_size=self.upload_files.size):
                        self.upload_files.delete()
                        self.published = True
                        self.publish_time = datetime.utcnow()
                        self.last_update = datetime.utcnow()
                        self.save()
                else:
781
                    self.last_update = datetime.utcnow()
782
                    self.save()
783

784
785
    @process
    def re_process_upload(self):
786
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
787
788
789
        A *process* that performs the re-processing of a earlier processed
        upload.

790
791
792
793
794
795
        Runs the distributed process of fully reparsing/renormalizing an existing and
        already published upload. Will renew the archive part of the upload and update
        mongo and elastic search entries.

        TODO this implementation does not do any re-matching. This will be more complex
        due to handling of new or missing matches.
796
        '''
797
798
799
        logger = self.get_logger()
        logger.info('started to re-process')

800
801
802
803
804
        # mock the steps of actual processing
        self._continue_with('uploading')

        # extract the published raw files into a staging upload files instance
        self._continue_with('extracting')
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819

        if self.published:
            try:
                staging_upload_files = StagingUploadFiles(self.upload_id)
                # public files exist and there is a staging directory, it is probably old
                # and we delete it first
                staging_upload_files.delete()
                logger.warn('deleted old staging files')

            except KeyError as e:
                logger.info('reprocessing published files')
        else:
            logger.info('reprocessing staging files')

        staging_upload_files = self.upload_files.to_staging_upload_files(create=True)
820
821

        self._continue_with('parse_all')
822
        try:
823
            # check if a calc is already/still processing
824
825
826
827
828
829
830
831
            processing = Calc.objects(
                upload_id=self.upload_id,
                **Calc.process_running_mongoengine_query()).count()

            if processing > 0:
                logger.warn(
                    'processes are still/already running on calc, they will be resetted',
                    count=processing)
832

833
            # reset all calcs
834
            Calc._get_collection().update_many(
835
836
837
                dict(upload_id=self.upload_id),
                {'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)})

838
839
            # process call calcs
            Calc.process_all(Calc.re_process_calc, dict(upload_id=self.upload_id), exclude=['metadata'])
840
841

            logger.info('completed to trigger re-process of all calcs')
842
843
        except Exception as e:
            # try to remove the staging copy in failure case
844
845
            logger.error('failed to trigger re-process of all calcs', exc_info=e)

846
847
848
            if self.published:
                if staging_upload_files is not None and staging_upload_files.exists():
                    staging_upload_files.delete()
849
850

            raise e
851
852
853
854

        # the packing and removing of the staging upload files, will be trigged by
        # the 'cleanup' task after processing all calcs

855
856
    @process
    def re_pack(self):
857
        ''' A *process* that repacks the raw and archive data based on the current embargo data. '''
858
859
860
861
862
863
864
865
        assert self.published

        # mock the steps of actual processing
        self._continue_with('uploading')
        self._continue_with('extracting')
        self._continue_with('parse_all')
        self._continue_with('cleanup')

866
        self.upload_files.re_pack(self.user_metadata())
867
        self.joined = True
868
869
        self._complete()

Markus Scheidgen's avatar
Markus Scheidgen committed
870
    @process
871
    def process_upload(self):
872
        ''' A *process* that performs the initial upload processing. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
873
874
875
876
877
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
878
        ''' A no-op *task* as a stand-in for receiving upload data. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
879
880
        pass

881
    @property
882
883
    def upload_files(self) -> UploadFiles:
        upload_files_class = ArchiveBasedStagingUploadFiles if not self.published else PublicUploadFiles
884
        kwargs = dict(upload_path=self.upload_path) if not self.published else {}
885
886
887

        if not self._upload_files or not isinstance(self._upload_files, upload_files_class):
            self._upload_files = upload_files_class(
888
                self.upload_id, is_authorized=lambda: True, **kwargs)
889

890
        return self._upload_files
891

892
893
894
895
896
    @property
    def staging_upload_files(self) -> ArchiveBasedStagingUploadFiles:
        assert not self.published
        return cast(ArchiveBasedStagingUploadFiles, self.upload_files)

Markus Scheidgen's avatar
Markus Scheidgen committed
897
898
    @task
    def extracting(self):
899
        '''
900
901
        The *task* performed before the actual parsing/normalizing: extracting
        the uploaded files.
902
        '''
903
904
905
906
907
        # extract the uploaded file
        self._upload_files = ArchiveBasedStagingUploadFiles(
            upload_id=self.upload_id, is_authorized=lambda: True, create=True,
            upload_path=self.upload_path)

Markus Scheidgen's avatar
Markus Scheidgen committed
908
909
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
910
911
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
912
913
                    upload_size=self.upload_files.size):
                self.upload_files.extract()
914
915
916
917

            if self.temporary:
                os.remove(self.upload_path)
                self.upload_path = None
918

919
        except KeyError:
920
            self.fail('processing requested for non existing upload', log_level=logging.ERROR)
921
922
923
            return
        except ExtractError:
            self.fail('bad .zip/.tar file', log_level=logging.INFO)
Markus Scheidgen's avatar
Markus Scheidgen committed
924
925
            return

926
    def _preprocess_files(self, path):
927
        '''
928
929
        Some files need preprocessing. Currently we need to add a stripped POTCAR version
        and always restrict/embargo the original.
930
        '''
931
        if os.path.basename(path).startswith('POTCAR'):
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
            # create checksum
            hash = hashlib.sha224()
            with open(self.staging_upload_files.raw_file_object(path).os_path, 'rb') as orig_f:
                for line in orig_f.readlines():
                    hash.update(line)

            checksum = hash.hexdigest()

            # created stripped POTCAR
            stripped_path = path + '.stripped'
            with open(self.staging_upload_files.raw_file_object(stripped_path).os_path, 'wt') as stripped_f:
                stripped_f.write('Stripped POTCAR file. Checksum of original file (sha224): %s\n' % checksum)
            os.system(
                '''
                    awk < %s >> %s '
                    BEGIN { dump=1 }
                    /End of Dataset/ { dump=1 }
                    dump==1 { print }
                    /END of PSCTR/ { dump=0 }'
                ''' % (
                    self.staging_upload_files.raw_file_object(path).os_path,
                    self.staging_upload_files.raw_file_object(stripped_path).os_path))

955
956
957
958
    def match_mainfiles(self) -> Iterator[Tuple[str, object]]:
        '''
        Generator function that matches all files in the upload to all parsers to
        determine the upload's mainfiles.
959
960

        Returns:
961
962
            Tuples of mainfile, filename, and parsers
        '''
963
        directories_with_match: Dict[str, str] = dict()
964
        upload_files = self.staging_upload_files
965
966
        for filename in upload_files.raw_file_manifest():
            self._preprocess_files(filename)
967
            try:
968
                parser = match_parser(upload_files.raw_file_object(filename).os_path)