data.py 49.9 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
16
17
18
19
20
21
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
Markus Scheidgen's avatar
Markus Scheidgen committed
22

Markus Scheidgen's avatar
Markus Scheidgen committed
23
.. autoclass:: Upload
Markus Scheidgen's avatar
Markus Scheidgen committed
24

25
'''
26
27

from typing import cast, List, Any, Tuple, Iterator, Dict, cast, Iterable
28
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField
Markus Scheidgen's avatar
Markus Scheidgen committed
29
import logging
30
from structlog import wrap_logger
31
from contextlib import contextmanager
32
import os.path
33
34
from datetime import datetime
from pymongo import UpdateOne
35
import hashlib
36
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
Markus Scheidgen's avatar
Markus Scheidgen committed
37

38
from nomad import utils, config, infrastructure, search, datamodel
39
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles, StagingUploadFiles
40
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
41
from nomad.parsing import parser_dict, match_parser, Backend
Markus Scheidgen's avatar
Markus Scheidgen committed
42
from nomad.normalizing import normalizers
43
44
45
from nomad.datamodel import EntryArchive
from nomad.archive import query_archive
import phonopyparser
Markus Scheidgen's avatar
Markus Scheidgen committed
46
47


48
49
50
51
52
53
54
def _pack_log_event(logger, method_name, event_dict):
    try:
        log_data = dict(event_dict)
        log_data.update(**{
            key: value
            for key, value in getattr(logger, '_context', {}).items()
            if key not in ['service', 'release', 'upload_id', 'calc_id', 'mainfile', 'process_status']})
55
        log_data.update(logger=logger.name)
56
57
58
59
60
61
62
63
64
65
66

        return log_data
    except Exception:
        # raising an exception would cause an indefinite loop
        return event_dict


_log_processors = [
    StackInfoRenderer(),
    _pack_log_event,
    format_exc_info,
67
    TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)]
68
69


Markus Scheidgen's avatar
Markus Scheidgen committed
70
class Calc(Proc):
71
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
72
73
74
75
76
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

77
78
    The attribute list, does not include the various metadata properties generated
    while parsing, including ``code_name``, ``code_version``, etc.
Markus Scheidgen's avatar
Markus Scheidgen committed
79
80

    Attributes:
81
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
82
83
84
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
85

86
87
        metadata: the metadata record wit calc and user metadata, see :class:`datamodel.EntryMetadata`
    '''
88
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
89
90
91
92
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

93
94
    metadata = DictField()

Markus Scheidgen's avatar
Markus Scheidgen committed
95
    meta: Any = {
Markus Scheidgen's avatar
Markus Scheidgen committed
96
        'indexes': [
97
            'upload_id',
98
            'parser',
99
100
101
            ('upload_id', 'mainfile'),
            ('upload_id', 'parser'),
            ('upload_id', 'tasks_status'),
102
            ('upload_id', 'process_status'),
103
            ('upload_id', 'metadata.nomad_version'),
104
105
            'metadata.processed',
            'metadata.last_processing',
106
            'metadata.published',
107
            'metadata.datasets',
Markus Scheidgen's avatar
Markus Scheidgen committed
108
            'metadata.pid'
Markus Scheidgen's avatar
Markus Scheidgen committed
109
110
111
112
113
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
114
        self._parser_backend: Backend = None
115
116
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
117
        self._calc_proc_logs: List[Any] = None
118

119
        self._entry_metadata = None
120

Markus Scheidgen's avatar
Markus Scheidgen committed
121
122
    @classmethod
    def get(cls, id):
123
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
124

Markus Scheidgen's avatar
Markus Scheidgen committed
125
    @property
126
127
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
128

129
130
131
132
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
133
            self._upload.worker_hostname = self.worker_hostname
134
135
        return self._upload

136
137
138
139
140
141
142
143
144
145
146
147
    def apply_entry_metadata(self, entry_metadata: datamodel.EntryMetadata):
        self.metadata = entry_metadata.m_to_dict(
            include_defaults=True,
            categories=[datamodel.MongoMetadata])  # TODO use embedded doc?

    def create_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
        processing object, not necessarely the user metadata nor the metadata from
        the archive.
        '''
        entry_metadata = datamodel.EntryMetadata()
148
149
        if self.parser is not None:
            entry_metadata.domain = parser_dict[self.parser].domain
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
        entry_metadata.upload_id = self.upload_id
        entry_metadata.calc_id = self.calc_id
        entry_metadata.mainfile = self.mainfile
        entry_metadata.nomad_version = config.version
        entry_metadata.nomad_commit = config.commit
        entry_metadata.uploader = self.upload.user_id
        entry_metadata.upload_time = self.upload.upload_time
        entry_metadata.upload_name = self.upload.name

        return entry_metadata

    def entry_metadata(self, upload_files: UploadFiles) -> datamodel.EntryMetadata:
        '''
        Returns a complete set of :class:`nomad.datamodel.EntryMetadata` including
        the user metadata and metadata from the archive.

        Arguments:
            upload_files:
                The :class:`nomad.files.UploadFiles` instance to read the archive from.
            cache:
                A boolean that indicates if the archive file should be left unclosed,
                e.g. if this method is called for many entries of the same upload.
        '''
        archive = upload_files.read_archive(self.calc_id)
174
175
176
177
178
179
180
181
182
183
184
185
        try:
            entry_metadata = datamodel.EntryMetadata.m_from_dict(
                archive[self.calc_id][datamodel.EntryArchive.section_metadata.name].to_dict())

        except KeyError:
            # Due hard processing failures, it might be possible that an entry might not
            # have an archive
            if self._entry_metadata is not None:
                entry_metadata = self._entry_metadata

            else:
                entry_metadata = self.create_metadata()
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201

        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

    def user_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
        processing object and the user metadata, not necessarely the metadata from
        the archive.
        '''
        entry_metadata = self.create_metadata()
        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

202
203
204
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
205
206
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
207
208
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
209
    def get_logger(self, **kwargs):
210
        '''
211
212
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
213
        '''
214
215
216
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
217

218
219
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []
220

221
222
        def save_to_calc_log(logger, method_name, event_dict):
            try:
223
224
                # sanitize the event_dict, because all kinds of values might have been added
                dump_dict = {key: str(value) for key, value in event_dict.items()}
225
226
                dump_dict.update(level=method_name.upper())
                self._calc_proc_logs.append(dump_dict)
227

228
229
230
            except Exception:
                # Exceptions here will cause indefinite loop
                pass
231

232
            return event_dict
233

234
        return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
235

236
    @process
237
    def re_process_calc(self):
238
        '''
239
240
241
        Processes a calculation again. This means there is already metadata and
        instead of creating it initially, we are just updating the existing
        records.
242
        '''
243
        parser = match_parser(self.upload_files.raw_file_object(self.mainfile).os_path, strict=False)
244
        logger = self.get_logger()
245
246
247

        if parser is None and not config.reprocess_unmatched:
            self.errors = ['no parser matches during re-process, will not re-process this calc']
248

249
250
251
252
253
254
255
            try:
                upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
                with upload_files.read_archive(self.calc_id) as archive:
                    self.upload_files.write_archive(self.calc_id, archive[self.calc_id].to_dict())

            except Exception as e:
                logger.error('could not copy archive for non matching, non reprocessed entry', exc_info=e)
256
                raise e
257

258
259
260
261
262
            # mock the steps of actual processing
            self._continue_with('parsing')
            self._continue_with('normalizing')
            self._continue_with('archiving')
            self._complete()
263
264
            return

265
        if parser is None:
266
267
268
269
            self.get_logger().warn('no parser matches during re-process, use the old parser')
            self.warnings = ['no matching parser found during re-processing']

        elif self.parser != parser.name:
270
271
272
273
274
275
276
277
278
            if parser_dict[self.parser].name == parser.name:
                # parser was just renamed
                self.parser = parser.name

            else:
                self.parser = parser.name
                logger.info(
                    'different parser matches during re-process, use new parser',
                    parser=parser.name)
279

280
        try:
281
282
283
284
285
286
            self._entry_metadata = self.user_metadata()
            self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
            self._entry_metadata.nomad_version = config.version
            self._entry_metadata.nomad_commit = config.commit
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)
287
288
289
290
291
292
293

            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
            # close loghandler that was not closed due to failures
            try:
294
295
                if self._parser_backend and self._parser_backend.resource:
                    self._parser_backend.resource.unload()
296
            except Exception as e:
297
                logger.error('could unload processing results', exc_info=e)
298

299
    @process
300
    def process_calc(self):
301
        '''
302
303
        Processes a new calculation that has no prior records in the mongo, elastic,
        or filesystem storage. It will create an initial set of (user) metadata.
304
        '''
305
        logger = self.get_logger()
306
        if self.upload is None:
307
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
308
309

        try:
310
311
            # save preliminary minimum calc metadata in case processing fails
            # successful processing will replace it with the actual metadata
312
313
314
315
316
317
            self._entry_metadata = self.create_metadata()
            self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)

            if len(self._entry_metadata.files) >= config.auxfile_cutoff:
318
319
320
321
                self.warning(
                    'This calc has many aux files in its directory. '
                    'Have you placed many calculations in the same directory?')

Markus Scheidgen's avatar
Markus Scheidgen committed
322
323
324
325
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
326
            # close loghandler that was not closed due to failures
327
            try:
328
329
                if self._parser_backend and self._parser_backend.resource:
                    self._parser_backend.resource.unload()
330
            except Exception as e:
331
                logger.error('could unload processing results', exc_info=e)
332

333
    def on_fail(self):
334
335
336
        # in case of failure, index a minimum set of metadata and mark
        # processing failure
        try:
337
            self._entry_metadata.processed = False
338

339
            self.apply_entry_metadata(self._entry_metadata)
340
341
342
343
344
            if self._parser_backend and self._parser_backend.resource:
                backend = self._parser_backend
            else:
                backend = None
            self._entry_metadata.apply_domain_metadata(backend)
345

346
            self._entry_metadata.a_elastic.index()
347
        except Exception as e:
348
349
            self.get_logger().error(
                'could not index after processing failure', exc_info=e)
350

351
        try:
352
            self.write_archive(None)
353
        except Exception as e:
354
355
            self.get_logger().error(
                'could not write archive after processing failure', exc_info=e)
356

357
358
359
360
361
362
363
364
    def on_process_complete(self, process_name):
        # the save might be necessary to correctly read the join condition from the db
        self.save()
        # in case of error, the process_name might be unknown
        if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None:
            self.upload.reload()
            self.upload.check_join()

Markus Scheidgen's avatar
Markus Scheidgen committed
365
366
    @task
    def parsing(self):
367
        ''' The *task* that encapsulates all parsing related actions. '''
368
        context = dict(parser=self.parser, step=self.parser)
369
        logger = self.get_logger(**context)
370
        parser = parser_dict[self.parser]
371
        self._entry_metadata.parser_name = self.parser
Markus Scheidgen's avatar
Markus Scheidgen committed
372

373
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
374
375
376
377
            try:
                self._parser_backend = parser.run(
                    self.upload_files.raw_file_object(self.mainfile).os_path, logger=logger)
            except Exception as e:
378
                self.fail('parser failed with exception', exc_info=e, error=str(e), **context)
379
                return
380
            except SystemExit:
381
                self.fail('parser raised system exit', error='system exit', **context)
382
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
383

Markus Scheidgen's avatar
Markus Scheidgen committed
384
385
        if self._parser_backend.status[0] != 'ParseSuccess':
            error = self._parser_backend.status[1]
386
            self.fail('parser failed', error=error, **context)
387

388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
    def process_phonon(self):
        """Function that is run for phonon calculation before cleanup.
        This task is run by the celery process that is calling the join for the
        upload.

        This function re-opens the Archive for this calculation to add method
        information from another referenced archive. Updates the method
        information in section_encyclopedia as well as the DFT domain metadata.
        """
        try:
            # Re-create a backend
            context = dict(parser=self.parser, step=self.parser)
            logger = self.get_logger(**context)
            metainfo = phonopyparser.metainfo.m_env
            backend = Backend(metainfo, logger=logger, domain="dft")

            # Open the archive of the phonon calculation.
            upload_files = StagingUploadFiles(self.upload_id, is_authorized=lambda: True)
            with upload_files.read_archive(self.calc_id) as archive:
                arch = query_archive(archive, {self.calc_id: self.calc_id})[self.calc_id]
                phonon_archive = EntryArchive.m_from_dict(arch)
409
410
411
412
413
414

            # Save Archive contents, metadata and logs from the old entry
            backend.entry_archive = phonon_archive
            self._parser_backend = backend
            self._entry_metadata = backend.entry_archive.section_metadata
            self._calc_proc_logs = phonon_archive.processing_logs
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457

            # Read in the first referenced calculation. The reference is given as
            # an absolute path which needs to be converted into a path that is
            # relative to upload root.
            scc = backend.entry_archive.section_run[0].section_single_configuration_calculation[0]
            ref = scc.section_calculation_to_calculation_refs[0].calculation_to_calculation_external_url
            relative_ref = ref.split("/", 6)[-1]
            ref_id = upload_files.calc_id(relative_ref)
            with upload_files.read_archive(ref_id) as archive:
                arch = query_archive(archive, {ref_id: ref_id})[ref_id]
                ref_archive = EntryArchive.m_from_dict(arch)

            # Get encyclopedia method information directly from the referenced calculation.
            ref_enc_method = ref_archive.section_encyclopedia.method
            backend.entry_archive.section_encyclopedia.method = ref_enc_method

            # Overwrite old entry with new data. The metadata is updated with
            # new timestamp and method details taken from the referenced
            # archive.
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.dft.xc_functional = ref_archive.section_metadata.dft.xc_functional
            self._entry_metadata.dft.basis_set = ref_archive.section_metadata.dft.basis_set
            self._entry_metadata.dft.update_group_hash()

            # persist the calc metadata
            with utils.timer(logger, 'saved calc metadata', step='metadata'):
                self.apply_entry_metadata(self._entry_metadata)

            # index in search
            with utils.timer(logger, 'indexed', step='index'):
                self._entry_metadata.a_elastic.index()

            # persist the archive
            with utils.timer(
                    logger, 'archived', step='archive',
                    input_size=self.mainfile_file.size) as log_data:

                archive_size = self.write_archive(self._parser_backend)
                log_data.update(archive_size=archive_size)

        except Exception as e:
            logger.error("Could not retrieve method information for phonon calculation.", exception=e)

458
459
460
461
462
463
464
    @contextmanager
    def use_parser_backend(self, processor_name):
        self._parser_backend.reset_status()
        yield self._parser_backend

        if self._parser_backend.status[0] == 'ParseSuccess':
            warnings = getattr(self._parser_backend, '_warnings', [])
465

466
            if len(warnings) > 0:
467
468
469
470
                self.get_logger().warn(
                    'processor completed successful with warnings',
                    processor=processor_name, warnings=[str(warning) for warning in warnings])

471
            else:
472
473
474
475
                self.get_logger().info(
                    'processor completed successful',
                    processor=processor_name)

476
        else:
477
            errors = self._parser_backend.status[1]
478
479
480
            self.get_logger().error(
                'processor completed with failure',
                processor=processor_name, errors=str(errors))
Markus Scheidgen's avatar
Markus Scheidgen committed
481
482
483

    @task
    def normalizing(self):
484
        ''' The *task* that encapsulates all normalizing related actions. '''
485
486
487
488
489

        # allow normalizer to access and add data to the entry metadata
        self._parser_backend.entry_archive.m_add_sub_section(
            datamodel.EntryArchive.section_metadata, self._entry_metadata)

Markus Scheidgen's avatar
Markus Scheidgen committed
490
        for normalizer in normalizers:
491
            if normalizer.domain != parser_dict[self.parser].domain:
492
493
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
494
            normalizer_name = normalizer.__name__
495
            context = dict(normalizer=normalizer_name, step=normalizer_name)
496
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
497
498

            with utils.timer(
499
                    logger, 'normalizer executed', input_size=self.mainfile_file.size):
500
                with self.use_parser_backend(normalizer_name) as backend:
501
502
503
                    try:
                        normalizer(backend).normalize(logger=logger)
                    except Exception as e:
504
                        self._parser_backend.finishedParsingSession('ParseFailure', [str(e)])
505
506
507
                        logger.error(
                            'normalizer failed with exception', exc_info=e,
                            error=str(e), **context)
508
509
510
511
512
513
514
515
516
                        break
                    else:
                        if self._parser_backend.status[0] != 'ParseSuccess':
                            error = self._parser_backend.status[1]
                            self.fail('normalizer failed', error=error, **context)
                            break
                        else:
                            logger.debug(
                                'completed normalizer successfully', normalizer=normalizer_name)
Markus Scheidgen's avatar
Markus Scheidgen committed
517
518
519

    @task
    def archiving(self):
520
        ''' The *task* that encapsulates all archival related actions. '''
521
522
        logger = self.get_logger()

523
524
        self._entry_metadata.apply_domain_metadata(self._parser_backend)
        self._entry_metadata.processed = True
525

526
527
        # persist the calc metadata
        with utils.timer(logger, 'saved calc metadata', step='metadata'):
528
            self.apply_entry_metadata(self._entry_metadata)
529
530

        # index in search
531
        with utils.timer(logger, 'indexed', step='index'):
532
            self._entry_metadata.a_elastic.index()
533

534
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
535
        with utils.timer(
536
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
537
                input_size=self.mainfile_file.size) as log_data:
538

539
            archive_size = self.write_archive(self._parser_backend)
540
            log_data.update(archive_size=archive_size)
Markus Scheidgen's avatar
Markus Scheidgen committed
541

542
    def write_archive(self, backend: Backend):
543
544
545
546
547
548
        def filter_processing_logs(logs):
            if len(logs) > 100:
                return [
                    log for log in logs
                    if log.get('level') != 'DEBUG']
            return logs
Markus Scheidgen's avatar
Markus Scheidgen committed
549

550
551
552
553
554
555
556
557
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []

        if backend is not None:
            entry_archive = backend.entry_archive.m_copy()
        else:
            entry_archive = datamodel.EntryArchive()

558
559
560
        if entry_archive.section_metadata is None:
            entry_archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)

561
        entry_archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
562
563
564
565
566
567
568
569
570
571

        try:
            return self.upload_files.write_archive(self.calc_id, entry_archive.m_to_dict())
        except Exception as e:
            if backend is None:
                raise e

            # most likely failed due to domain data, try to write metadata and processing logs
            entry_archive = datamodel.EntryArchive()
            entry_archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
572
            entry_archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
573
574
            self.upload_files.write_archive(self.calc_id, entry_archive.m_to_dict())
            raise e
Markus Scheidgen's avatar
Markus Scheidgen committed
575

576
    def __str__(self):
577
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
578

579

580
class Upload(Proc):
581
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
582
583
584
585
586
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
587
588
        upload_path: the path were the uploaded files was stored
        temporary: True if the uploaded file should be removed after extraction
Markus Scheidgen's avatar
Markus Scheidgen committed
589
590
        upload_id: the upload id generated by the database
        upload_time: the timestamp when the system realised the upload
591
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
592
593
        published: Boolean that indicates the publish status
        publish_time: Date when the upload was initially published
594
        last_update: Date of the last publishing/re-processing
595
        joined: Boolean indicates if the running processing has joined (:func:`check_join`)
596
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
597
598
599
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
600
601
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
602
    embargo_length = IntField(default=36)
Markus Scheidgen's avatar
Markus Scheidgen committed
603
604
605

    name = StringField(default=None)
    upload_time = DateTimeField()
606
    user_id = StringField(required=True)
607
608
    published = BooleanField(default=False)
    publish_time = DateTimeField()
609
    last_update = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
610

611
612
    joined = BooleanField(default=False)

Markus Scheidgen's avatar
Markus Scheidgen committed
613
614
    meta: Any = {
        'indexes': [
615
            'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
616
        ]
Markus Scheidgen's avatar
Markus Scheidgen committed
617
618
619
620
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
621
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
622

623
624
    @property
    def metadata(self) -> dict:
625
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
626
627
628
629
630
        Getter, setter for user metadata. Metadata is pickled to and from the public
        bucket to allow sharing among all processes. Usually uploads do not have (much)
        user defined metadata, but users provide all metadata per upload as part of
        the publish process. This will change, when we introduce editing functionality
        and metadata will be provided through different means.
631
        '''
632
633
634
635
636
        try:
            upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
        except KeyError:
            return None
        return upload_files.user_metadata
637
638
639

    @metadata.setter
    def metadata(self, data: dict) -> None:
640
641
        upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True, create=True)
        upload_files.user_metadata = data
642

Markus Scheidgen's avatar
Markus Scheidgen committed
643
    @classmethod
644
    def get(cls, id: str, include_published: bool = True) -> 'Upload':
645
        return cls.get_by_id(id, 'upload_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
646
647

    @classmethod
648
    def user_uploads(cls, user: datamodel.User, **kwargs) -> List['Upload']:
649
        ''' Returns all uploads for the given user. Kwargs are passed to mongo query. '''
650
        return cls.objects(user_id=str(user.user_id), **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
651

652
653
    @property
    def uploader(self):
654
        return datamodel.User.get(self.user_id)
655

Markus Scheidgen's avatar
Markus Scheidgen committed
656
657
    def get_logger(self, **kwargs):
        logger = super().get_logger()
658
659
        user = self.uploader
        user_name = '%s %s' % (user.first_name, user.last_name)
660
        # We are not using 'user_id' because logstash (?) will filter these entries ?!
661
662
        logger = logger.bind(
            upload_id=self.upload_id, upload_name=self.name, user_name=user_name,
663
            user=user.user_id, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
664
665
666
667
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
668
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
669
670
671
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
672
673

        Arguments:
674
            user: The user that created the upload.
675
        '''
676
677
        # use kwargs to keep compatibility with super method
        user: datamodel.User = kwargs['user']
678
        del(kwargs['user'])
679

680
681
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
682
        kwargs.update(user_id=user.user_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
683
        self = super().create(**kwargs)
684

Markus Scheidgen's avatar
Markus Scheidgen committed
685
        self._continue_with('uploading')
686

Markus Scheidgen's avatar
Markus Scheidgen committed
687
688
        return self

689
    def delete(self):
690
        ''' Deletes this upload process state entry and its calcs. '''
691
692
693
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

694
    def delete_upload_local(self):
695
        '''
696
        Deletes the upload, including its processing state and
697
        staging files. Local version without celery processing.
698
        '''
699
700
        logger = self.get_logger()

701
        with utils.lnr(logger, 'upload delete failed'):
702
            with utils.timer(
703
                    logger, 'upload deleted from index', step='index',
704
                    upload_size=self.upload_files.size):
705
                search.delete_upload(self.upload_id)
706

707
            with utils.timer(
708
                    logger, 'upload deleted', step='files',
709
710
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
711
712

            self.delete()
713

714
    @process
715
    def delete_upload(self):
716
        '''
717
718
        Deletes of the upload, including its processing state and
        staging files. This starts the celery process of deleting the upload.
719
        '''
720
        self.delete_upload_local()
721

722
        return True  # do not save the process status on the delete upload
723

724
    @process
725
    def publish_upload(self):
726
        '''
727
728
        Moves the upload out of staging to the public area. It will
        pack the staging upload files in to public upload files.
729
        '''
730
731
        assert self.processed_calcs > 0

732
        logger = self.get_logger()
733
        logger.info('started to publish')
734

735
        with utils.lnr(logger, 'publish failed'):
736
            with self.entries_metadata(self.metadata) as calcs:
737

738
                with utils.timer(
739
                        logger, 'upload metadata updated', step='metadata',
740
                        upload_size=self.upload_files.size):
741

742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
                    def create_update(calc):
                        calc.published = True
                        calc.with_embargo = calc.with_embargo if calc.with_embargo is not None else False
                        return UpdateOne(
                            {'_id': calc.calc_id},
                            {'$set': {'metadata': calc.m_to_dict(
                                include_defaults=True, categories=[datamodel.MongoMetadata])}})

                    Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload files packed', step='pack',
                            upload_size=self.upload_files.size):
                        self.upload_files.pack(calcs)
757

758
                with utils.timer(
759
                        logger, 'index updated', step='index',
760
                        upload_size=self.upload_files.size):
761
762
763
764
765
766
767
768
769
770
771
772
                    search.publish(calcs)

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload deleted', step='delete staged',
                            upload_size=self.upload_files.size):
                        self.upload_files.delete()
                        self.published = True
                        self.publish_time = datetime.utcnow()
                        self.last_update = datetime.utcnow()
                        self.save()
                else:
773
                    self.last_update = datetime.utcnow()
774
                    self.save()
775

776
777
    @process
    def re_process_upload(self):
778
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
779
780
781
        A *process* that performs the re-processing of a earlier processed
        upload.

782
783
784
785
786
787
        Runs the distributed process of fully reparsing/renormalizing an existing and
        already published upload. Will renew the archive part of the upload and update
        mongo and elastic search entries.

        TODO this implementation does not do any re-matching. This will be more complex
        due to handling of new or missing matches.
788
        '''
789
790
791
        logger = self.get_logger()
        logger.info('started to re-process')

792
793
794
795
796
        # mock the steps of actual processing
        self._continue_with('uploading')

        # extract the published raw files into a staging upload files instance
        self._continue_with('extracting')
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811

        if self.published:
            try:
                staging_upload_files = StagingUploadFiles(self.upload_id)
                # public files exist and there is a staging directory, it is probably old
                # and we delete it first
                staging_upload_files.delete()
                logger.warn('deleted old staging files')

            except KeyError as e:
                logger.info('reprocessing published files')
        else:
            logger.info('reprocessing staging files')

        staging_upload_files = self.upload_files.to_staging_upload_files(create=True)
812
813

        self._continue_with('parse_all')
814
        try:
815
            # check if a calc is already/still processing
816
817
818
819
820
821
822
823
            processing = Calc.objects(
                upload_id=self.upload_id,
                **Calc.process_running_mongoengine_query()).count()

            if processing > 0:
                logger.warn(
                    'processes are still/already running on calc, they will be resetted',
                    count=processing)
824

825
            # reset all calcs
826
            Calc._get_collection().update_many(
827
828
829
                dict(upload_id=self.upload_id),
                {'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)})

830
831
            # process call calcs
            Calc.process_all(Calc.re_process_calc, dict(upload_id=self.upload_id), exclude=['metadata'])
832
833

            logger.info('completed to trigger re-process of all calcs')
834
835
        except Exception as e:
            # try to remove the staging copy in failure case
836
837
            logger.error('failed to trigger re-process of all calcs', exc_info=e)

838
839
840
            if self.published:
                if staging_upload_files is not None and staging_upload_files.exists():
                    staging_upload_files.delete()
841
842

            raise e
843
844
845
846

        # the packing and removing of the staging upload files, will be trigged by
        # the 'cleanup' task after processing all calcs

847
848
    @process
    def re_pack(self):
849
        ''' A *process* that repacks the raw and archive data based on the current embargo data. '''
850
851
852
853
854
855
856
857
        assert self.published

        # mock the steps of actual processing
        self._continue_with('uploading')
        self._continue_with('extracting')
        self._continue_with('parse_all')
        self._continue_with('cleanup')

858
        self.upload_files.re_pack(self.user_metadata())
859
        self.joined = True
860
861
        self._complete()

Markus Scheidgen's avatar
Markus Scheidgen committed
862
    @process
863
    def process_upload(self):
864
        ''' A *process* that performs the initial upload processing. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
865
866
867
868
869
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
870
        ''' A no-op *task* as a stand-in for receiving upload data. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
871
872
        pass

873
    @property
874
875
    def upload_files(self) -> UploadFiles:
        upload_files_class = ArchiveBasedStagingUploadFiles if not self.published else PublicUploadFiles
876
        kwargs = dict(upload_path=self.upload_path) if not self.published else {}
877
878
879

        if not self._upload_files or not isinstance(self._upload_files, upload_files_class):
            self._upload_files = upload_files_class(
880
                self.upload_id, is_authorized=lambda: True, **kwargs)
881

882
        return self._upload_files
883

884
885
886
887
888
    @property
    def staging_upload_files(self) -> ArchiveBasedStagingUploadFiles:
        assert not self.published
        return cast(ArchiveBasedStagingUploadFiles, self.upload_files)

Markus Scheidgen's avatar
Markus Scheidgen committed
889
890
    @task
    def extracting(self):
891
        '''
892
893
        The *task* performed before the actual parsing/normalizing: extracting
        the uploaded files.
894
        '''
895
896
897
898
899
        # extract the uploaded file
        self._upload_files = ArchiveBasedStagingUploadFiles(
            upload_id=self.upload_id, is_authorized=lambda: True, create=True,
            upload_path=self.upload_path)

Markus Scheidgen's avatar
Markus Scheidgen committed
900
901
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
902
903
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
904
905
                    upload_size=self.upload_files.size):
                self.upload_files.extract()
906
907
908
909

            if self.temporary:
                os.remove(self.upload_path)
                self.upload_path = None
910

911
        except KeyError:
912
            self.fail('processing requested for non existing upload', log_level=logging.ERROR)
913
914
915
            return
        except ExtractError:
            self.fail('bad .zip/.tar file', log_level=logging.INFO)
Markus Scheidgen's avatar
Markus Scheidgen committed
916
917
            return

918
    def _preprocess_files(self, path):
919
        '''
920
921
        Some files need preprocessing. Currently we need to add a stripped POTCAR version
        and always restrict/embargo the original.
922
        '''
923
        if os.path.basename(path).startswith('POTCAR'):
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
            # create checksum
            hash = hashlib.sha224()
            with open(self.staging_upload_files.raw_file_object(path).os_path, 'rb') as orig_f:
                for line in orig_f.readlines():
                    hash.update(line)

            checksum = hash.hexdigest()

            # created stripped POTCAR
            stripped_path = path + '.stripped'
            with open(self.staging_upload_files.raw_file_object(stripped_path).os_path, 'wt') as stripped_f:
                stripped_f.write('Stripped POTCAR file. Checksum of original file (sha224): %s\n' % checksum)
            os.system(
                '''
                    awk < %s >> %s '
                    BEGIN { dump=1 }
                    /End of Dataset/ { dump=1 }
                    dump==1 { print }
                    /END of PSCTR/ { dump=0 }'
                ''' % (
                    self.staging_upload_files.raw_file_object(path).os_path,
                    self.staging_upload_files.raw_file_object(stripped_path).os_path))

947
948
949
950
    def match_mainfiles(self) -> Iterator[Tuple[str, object]]:
        '''
        Generator function that matches all files in the upload to all parsers to
        determine the upload's mainfiles.
951
952

        Returns:
953
954
            Tuples of mainfile, filename, and parsers
        '''
955
        directories_with_match: Dict[str, str] = dict()
956
        upload_files = self.staging_upload_files
957
958
        for filename in upload_files.raw_file_manifest():
            self._preprocess_files(filename)
959
            try:
960
                parser = match_parser(upload_files.raw_file_object(filename).os_path)
961
                if parser is not None:
962
                    directory = os.path.dirname(filename)
963
                    if directory in directories_with_match:
964
965
966
                        # TODO this might give us the chance to store directory based relationship
                        # between calcs for the future?
                        pass
967
                    else:
968
969
970
                        directories_with_match[directory] = filename

                    yield filename, parser
971
972
973
            except Exception as e:
                self.get_logger().error(
                    'exception while matching pot. mainfile',
974
                    mainfile=filename, exc_info=e)
975

Markus Scheidgen's avatar