data.py 45.9 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
16
17
18
19
20
21
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
Markus Scheidgen's avatar
Markus Scheidgen committed
22

Markus Scheidgen's avatar
Markus Scheidgen committed
23
.. autoclass:: Upload
Markus Scheidgen's avatar
Markus Scheidgen committed
24

25
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
26

27
from typing import cast, List, Any, Tuple, Iterator, Dict, cast, Iterable
28
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField
Markus Scheidgen's avatar
Markus Scheidgen committed
29
import logging
30
from structlog import wrap_logger
31
from contextlib import contextmanager
32
import os.path
33
34
from datetime import datetime
from pymongo import UpdateOne
35
import hashlib
36
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
Markus Scheidgen's avatar
Markus Scheidgen committed
37

38
from nomad import utils, config, infrastructure, search, datamodel
39
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles, StagingUploadFiles
40
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
41
from nomad.parsing import parser_dict, match_parser, Backend
Markus Scheidgen's avatar
Markus Scheidgen committed
42
43
44
from nomad.normalizing import normalizers


45
46
47
48
49
50
51
def _pack_log_event(logger, method_name, event_dict):
    try:
        log_data = dict(event_dict)
        log_data.update(**{
            key: value
            for key, value in getattr(logger, '_context', {}).items()
            if key not in ['service', 'release', 'upload_id', 'calc_id', 'mainfile', 'process_status']})
52
        log_data.update(logger=logger.name)
53
54
55
56
57
58
59
60
61
62
63

        return log_data
    except Exception:
        # raising an exception would cause an indefinite loop
        return event_dict


_log_processors = [
    StackInfoRenderer(),
    _pack_log_event,
    format_exc_info,
64
    TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)]
65
66


Markus Scheidgen's avatar
Markus Scheidgen committed
67
class Calc(Proc):
68
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
69
70
71
72
73
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

74
75
    The attribute list, does not include the various metadata properties generated
    while parsing, including ``code_name``, ``code_version``, etc.
Markus Scheidgen's avatar
Markus Scheidgen committed
76
77

    Attributes:
78
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
79
80
81
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
82

83
84
        metadata: the metadata record wit calc and user metadata, see :class:`datamodel.EntryMetadata`
    '''
85
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
86
87
88
89
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

90
91
    metadata = DictField()

Markus Scheidgen's avatar
Markus Scheidgen committed
92
    meta: Any = {
Markus Scheidgen's avatar
Markus Scheidgen committed
93
        'indexes': [
94
95
96
97
            'upload_id',
            ('upload_id', 'mainfile'),
            ('upload_id', 'parser'),
            ('upload_id', 'tasks_status'),
98
            ('upload_id', 'process_status'),
99
100
101
            ('upload_id', 'metadata.nomad_version'),
            'metadata.published',
            'metadata.datasets'
Markus Scheidgen's avatar
Markus Scheidgen committed
102
            'metadata.pid'
Markus Scheidgen's avatar
Markus Scheidgen committed
103
104
105
106
107
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
108
        self._parser_backend: Backend = None
109
110
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
111
        self._calc_proc_logs: List[Any] = None
Markus Scheidgen's avatar
Markus Scheidgen committed
112

113
        self._entry_metadata = None
114

Markus Scheidgen's avatar
Markus Scheidgen committed
115
116
    @classmethod
    def get(cls, id):
117
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
118

Markus Scheidgen's avatar
Markus Scheidgen committed
119
    @property
120
121
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
122

123
124
125
126
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
127
            self._upload.worker_hostname = self.worker_hostname
128
129
        return self._upload

130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
    def apply_entry_metadata(self, entry_metadata: datamodel.EntryMetadata):
        self.metadata = entry_metadata.m_to_dict(
            include_defaults=True,
            categories=[datamodel.MongoMetadata])  # TODO use embedded doc?

    def create_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
        processing object, not necessarely the user metadata nor the metadata from
        the archive.
        '''
        entry_metadata = datamodel.EntryMetadata()
        entry_metadata.domain = parser_dict[self.parser].domain
        entry_metadata.upload_id = self.upload_id
        entry_metadata.calc_id = self.calc_id
        entry_metadata.mainfile = self.mainfile
        entry_metadata.nomad_version = config.version
        entry_metadata.nomad_commit = config.commit
        entry_metadata.uploader = self.upload.user_id
        entry_metadata.upload_time = self.upload.upload_time
        entry_metadata.upload_name = self.upload.name

        return entry_metadata

    def entry_metadata(self, upload_files: UploadFiles) -> datamodel.EntryMetadata:
        '''
        Returns a complete set of :class:`nomad.datamodel.EntryMetadata` including
        the user metadata and metadata from the archive.

        Arguments:
            upload_files:
                The :class:`nomad.files.UploadFiles` instance to read the archive from.
            cache:
                A boolean that indicates if the archive file should be left unclosed,
                e.g. if this method is called for many entries of the same upload.
        '''
        archive = upload_files.read_archive(self.calc_id)
167
168
169
170
171
172
173
174
175
176
177
178
        try:
            entry_metadata = datamodel.EntryMetadata.m_from_dict(
                archive[self.calc_id][datamodel.EntryArchive.section_metadata.name].to_dict())

        except KeyError:
            # Due hard processing failures, it might be possible that an entry might not
            # have an archive
            if self._entry_metadata is not None:
                entry_metadata = self._entry_metadata

            else:
                entry_metadata = self.create_metadata()
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194

        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

    def user_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
        processing object and the user metadata, not necessarely the metadata from
        the archive.
        '''
        entry_metadata = self.create_metadata()
        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

195
196
197
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
198
199
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
200
201
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
202
    def get_logger(self, **kwargs):
203
        '''
204
205
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
206
        '''
207
208
209
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
210

211
212
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []
213

214
215
        def save_to_calc_log(logger, method_name, event_dict):
            try:
216
217
                # sanitize the event_dict, because all kinds of values might have been added
                dump_dict = {key: str(value) for key, value in event_dict.items()}
218
219
                dump_dict.update(level=method_name.upper())
                self._calc_proc_logs.append(dump_dict)
220

221
222
223
            except Exception:
                # Exceptions here will cause indefinite loop
                pass
224

225
            return event_dict
226

227
        return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
228

229
230
    @process
    def re_process_calc(self):
231
        '''
232
233
234
        Processes a calculation again. This means there is already metadata and
        instead of creating it initially, we are just updating the existing
        records.
235
        '''
236
        parser = match_parser(self.upload_files.raw_file_object(self.mainfile).os_path, strict=False)
237
        logger = self.get_logger()
238
239
240

        if parser is None and not config.reprocess_unmatched:
            self.errors = ['no parser matches during re-process, will not re-process this calc']
241

242
243
244
245
246
247
248
            try:
                upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
                with upload_files.read_archive(self.calc_id) as archive:
                    self.upload_files.write_archive(self.calc_id, archive[self.calc_id].to_dict())

            except Exception as e:
                logger.error('could not copy archive for non matching, non reprocessed entry', exc_info=e)
249
                raise e
250

251
252
253
254
255
            # mock the steps of actual processing
            self._continue_with('parsing')
            self._continue_with('normalizing')
            self._continue_with('archiving')
            self._complete()
256
257
            return

258
        if parser is None:
259
260
261
262
            self.get_logger().warn('no parser matches during re-process, use the old parser')
            self.warnings = ['no matching parser found during re-processing']

        elif self.parser != parser.name:
263
264
265
            self.parser = parser.name
            logger.info(
                'different parser matches during re-process, use new parser',
266
                parser=parser.name)
267

268
        try:
269
270
271
272
273
274
            self._entry_metadata = self.user_metadata()
            self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
            self._entry_metadata.nomad_version = config.version
            self._entry_metadata.nomad_commit = config.commit
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)
275
276
277
278
279
280

            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
            # close loghandler that was not closed due to failures
281
282
283
284
285
286
            try:
                if self._parser_backend and self._parser_backend.resource:
                    self._parser_backend.resource.unload()
            except Exception as e:
                logger.error('could unload processing results', exc_info=e)

Markus Scheidgen's avatar
Markus Scheidgen committed
287
    @process
288
    def process_calc(self):
289
        '''
290
291
        Processes a new calculation that has no prior records in the mongo, elastic,
        or filesystem storage. It will create an initial set of (user) metadata.
292
        '''
293
        logger = self.get_logger()
294
        if self.upload is None:
295
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
296
297

        try:
298
299
            # save preliminary minimum calc metadata in case processing fails
            # successful processing will replace it with the actual metadata
300
301
302
303
304
305
            self._entry_metadata = self.create_metadata()
            self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)

            if len(self._entry_metadata.files) >= config.auxfile_cutoff:
306
307
308
309
                self.warning(
                    'This calc has many aux files in its directory. '
                    'Have you placed many calculations in the same directory?')

Markus Scheidgen's avatar
Markus Scheidgen committed
310
311
312
313
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
314
            # close loghandler that was not closed due to failures
315
316
317
318
319
320
            try:
                if self._parser_backend and self._parser_backend.resource:
                    self._parser_backend.resource.unload()
            except Exception as e:
                logger.error('could unload processing results', exc_info=e)

321
322
323
324
    def fail(self, *errors, log_level=logging.ERROR, **kwargs):
        # in case of failure, index a minimum set of metadata and mark
        # processing failure
        try:
325
            if self.parser is not None:
Markus Scheidgen's avatar
Markus Scheidgen committed
326
327
328
329
330
331
332
333
334
335
                try:
                    parser = parser_dict[self.parser]
                    if hasattr(parser, 'code_name'):
                        self._entry_metadata.code_name = parser.code_name
                except KeyError:
                    # This only happens in re-processing. The parser was removed.
                    # The old parser was probably only used to keep this entry matching
                    # and in the system (retain its PID). With the current nomad this is
                    # not parsable anyhow.
                    self._entry_metadata.code_name = config.services.unavailable_value
336

337
338
            self._entry_metadata.processed = False
            self.apply_entry_metadata(self._entry_metadata)
339
340
341
342
343
            if self._parser_backend and self._parser_backend.resource:
                backend = self._parser_backend
            else:
                backend = None
            self._entry_metadata.apply_domain_metadata(backend)
344
            self._entry_metadata.a_elastic.index()
345
        except Exception as e:
346
347
            self.get_logger().error(
                'could not index after processing failure', exc_info=e)
348

349
        try:
350
            self.write_archive(None)
351
        except Exception as e:
352
353
            self.get_logger().error(
                'could not write archive after processing failure', exc_info=e)
354

355
356
        super().fail(*errors, log_level=log_level, **kwargs)

357
358
359
360
    def on_process_complete(self, process_name):
        # the save might be necessary to correctly read the join condition from the db
        self.save()
        # in case of error, the process_name might be unknown
361
        if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None:
362
363
            self.upload.reload()
            self.upload.check_join()
Markus Scheidgen's avatar
Markus Scheidgen committed
364
365
366

    @task
    def parsing(self):
367
        ''' The *task* that encapsulates all parsing related actions. '''
368
        context = dict(parser=self.parser, step=self.parser)
369
        logger = self.get_logger(**context)
370
        parser = parser_dict[self.parser]
371
        self._entry_metadata.parser_name = self.parser
Markus Scheidgen's avatar
Markus Scheidgen committed
372

373
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
374
375
376
377
            try:
                self._parser_backend = parser.run(
                    self.upload_files.raw_file_object(self.mainfile).os_path, logger=logger)
            except Exception as e:
378
                self.fail('parser failed with exception', exc_info=e, error=str(e), **context)
379
                return
380
            except SystemExit:
381
                self.fail('parser raised system exit', error='system exit', **context)
382
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
383

Markus Scheidgen's avatar
Markus Scheidgen committed
384
385
        if self._parser_backend.status[0] != 'ParseSuccess':
            error = self._parser_backend.status[1]
386
            self.fail('parser failed', error=error, **context)
387
388
389
390
391
392
393
394

    @contextmanager
    def use_parser_backend(self, processor_name):
        self._parser_backend.reset_status()
        yield self._parser_backend

        if self._parser_backend.status[0] == 'ParseSuccess':
            warnings = getattr(self._parser_backend, '_warnings', [])
395

396
            if len(warnings) > 0:
397
398
399
400
                self.get_logger().warn(
                    'processor completed successful with warnings',
                    processor=processor_name, warnings=[str(warning) for warning in warnings])

401
            else:
402
403
404
405
                self.get_logger().info(
                    'processor completed successful',
                    processor=processor_name)

406
        else:
407
            errors = self._parser_backend.status[1]
408
409
410
            self.get_logger().error(
                'processor completed with failure',
                processor=processor_name, errors=str(errors))
Markus Scheidgen's avatar
Markus Scheidgen committed
411
412
413

    @task
    def normalizing(self):
414
        ''' The *task* that encapsulates all normalizing related actions. '''
415
416
417
418
419

        # allow normalizer to access and add data to the entry metadata
        self._parser_backend.entry_archive.m_add_sub_section(
            datamodel.EntryArchive.section_metadata, self._entry_metadata)

Markus Scheidgen's avatar
Markus Scheidgen committed
420
        for normalizer in normalizers:
421
            if normalizer.domain != parser_dict[self.parser].domain:
422
423
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
424
            normalizer_name = normalizer.__name__
425
            context = dict(normalizer=normalizer_name, step=normalizer_name)
426
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
427
428

            with utils.timer(
429
                    logger, 'normalizer executed', input_size=self.mainfile_file.size):
430
                with self.use_parser_backend(normalizer_name) as backend:
431
432
433
                    try:
                        normalizer(backend).normalize(logger=logger)
                    except Exception as e:
434
                        self._parser_backend.finishedParsingSession('ParseFailure', [str(e)])
435
436
437
                        logger.error(
                            'normalizer failed with exception', exc_info=e,
                            error=str(e), **context)
438
439
440
441
442
443
444
445
446
                        break
                    else:
                        if self._parser_backend.status[0] != 'ParseSuccess':
                            error = self._parser_backend.status[1]
                            self.fail('normalizer failed', error=error, **context)
                            break
                        else:
                            logger.debug(
                                'completed normalizer successfully', normalizer=normalizer_name)
Markus Scheidgen's avatar
Markus Scheidgen committed
447
448
449

    @task
    def archiving(self):
450
        ''' The *task* that encapsulates all archival related actions. '''
451
452
        logger = self.get_logger()

453
454
        self._entry_metadata.apply_domain_metadata(self._parser_backend)
        self._entry_metadata.processed = True
455

456
457
        # persist the calc metadata
        with utils.timer(logger, 'saved calc metadata', step='metadata'):
458
            self.apply_entry_metadata(self._entry_metadata)
459
460

        # index in search
461
        with utils.timer(logger, 'indexed', step='index'):
462
            self._entry_metadata.a_elastic.index()
463

464
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
465
        with utils.timer(
466
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
467
                input_size=self.mainfile_file.size) as log_data:
468

469
            archive_size = self.write_archive(self._parser_backend)
470
            log_data.update(archive_size=archive_size)
Markus Scheidgen's avatar
Markus Scheidgen committed
471

472
473
474
475
476
477
478
479
480
    def write_archive(self, backend: Backend):
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []

        if backend is not None:
            entry_archive = backend.entry_archive.m_copy()
        else:
            entry_archive = datamodel.EntryArchive()

481
482
483
        if entry_archive.section_metadata is None:
            entry_archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)

484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
        entry_archive.processing_logs = self._calc_proc_logs

        try:
            return self.upload_files.write_archive(self.calc_id, entry_archive.m_to_dict())
        except Exception as e:
            if backend is None:
                raise e

            # most likely failed due to domain data, try to write metadata and processing logs
            entry_archive = datamodel.EntryArchive()
            entry_archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
            entry_archive.processing_logs = self._calc_proc_logs
            self.upload_files.write_archive(self.calc_id, entry_archive.m_to_dict())
            raise e

499
    def __str__(self):
500
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
501

502

503
class Upload(Proc):
504
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
505
506
507
508
509
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
510
511
        upload_path: the path were the uploaded files was stored
        temporary: True if the uploaded file should be removed after extraction
Markus Scheidgen's avatar
Markus Scheidgen committed
512
513
        upload_id: the upload id generated by the database
        upload_time: the timestamp when the system realised the upload
514
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
515
516
        published: Boolean that indicates the publish status
        publish_time: Date when the upload was initially published
517
        last_update: Date of the last publishing/re-processing
Markus Scheidgen's avatar
Markus Scheidgen committed
518
        joined: Boolean indicates if the running processing has joined (:func:`check_join`)
519
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
520
521
522
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
523
524
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
525
    embargo_length = IntField(default=36)
Markus Scheidgen's avatar
Markus Scheidgen committed
526
527
528

    name = StringField(default=None)
    upload_time = DateTimeField()
529
    user_id = StringField(required=True)
530
531
    published = BooleanField(default=False)
    publish_time = DateTimeField()
532
    last_update = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
533

534
535
    joined = BooleanField(default=False)

Markus Scheidgen's avatar
Markus Scheidgen committed
536
537
    meta: Any = {
        'indexes': [
538
            'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
Markus Scheidgen's avatar
Markus Scheidgen committed
539
540
541
542
543
        ]
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
544
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
545

546
547
    @property
    def metadata(self) -> dict:
548
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
549
550
551
552
553
        Getter, setter for user metadata. Metadata is pickled to and from the public
        bucket to allow sharing among all processes. Usually uploads do not have (much)
        user defined metadata, but users provide all metadata per upload as part of
        the publish process. This will change, when we introduce editing functionality
        and metadata will be provided through different means.
554
        '''
555
556
557
558
559
        try:
            upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
        except KeyError:
            return None
        return upload_files.user_metadata
560
561
562

    @metadata.setter
    def metadata(self, data: dict) -> None:
563
564
        upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True, create=True)
        upload_files.user_metadata = data
565

Markus Scheidgen's avatar
Markus Scheidgen committed
566
    @classmethod
567
    def get(cls, id: str, include_published: bool = True) -> 'Upload':
568
        return cls.get_by_id(id, 'upload_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
569
570

    @classmethod
571
    def user_uploads(cls, user: datamodel.User, **kwargs) -> List['Upload']:
572
        ''' Returns all uploads for the given user. Kwargs are passed to mongo query. '''
573
        return cls.objects(user_id=str(user.user_id), **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
574

575
576
    @property
    def uploader(self):
577
        return datamodel.User.get(self.user_id)
578

Markus Scheidgen's avatar
Markus Scheidgen committed
579
580
    def get_logger(self, **kwargs):
        logger = super().get_logger()
581
582
        user = self.uploader
        user_name = '%s %s' % (user.first_name, user.last_name)
583
        # We are not using 'user_id' because logstash (?) will filter these entries ?!
584
585
        logger = logger.bind(
            upload_id=self.upload_id, upload_name=self.name, user_name=user_name,
586
            user=user.user_id, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
587
588
589
590
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
591
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
592
593
594
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
595
596

        Arguments:
597
            user: The user that created the upload.
598
        '''
599
600
        # use kwargs to keep compatibility with super method
        user: datamodel.User = kwargs['user']
601
        del(kwargs['user'])
602

603
604
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
605
        kwargs.update(user_id=user.user_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
606
        self = super().create(**kwargs)
607

Markus Scheidgen's avatar
Markus Scheidgen committed
608
        self._continue_with('uploading')
609

Markus Scheidgen's avatar
Markus Scheidgen committed
610
611
        return self

612
    def delete(self):
613
        ''' Deletes this upload process state entry and its calcs. '''
614
615
616
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

617
    def delete_upload_local(self):
618
        '''
619
        Deletes the upload, including its processing state and
620
        staging files. Local version without celery processing.
621
        '''
622
623
624
        logger = self.get_logger()

        with utils.lnr(logger, 'staged upload delete failed'):
625
            with utils.timer(
626
                    logger, 'upload deleted from index', step='index',
627
                    upload_size=self.upload_files.size):
628
                search.delete_upload(self.upload_id)
629

630
            with utils.timer(
631
                    logger, 'staged upload deleted', step='files',
632
633
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
634
635

            self.delete()
636

637
    @process
638
    def delete_upload(self):
639
        '''
640
641
        Deletes of the upload, including its processing state and
        staging files. This starts the celery process of deleting the upload.
642
        '''
643
        self.delete_upload_local()
644

645
        return True  # do not save the process status on the delete upload
646

647
    @process
648
    def publish_upload(self):
649
        '''
650
651
        Moves the upload out of staging to the public area. It will
        pack the staging upload files in to public upload files.
652
        '''
653
654
        assert self.processed_calcs > 0

655
        logger = self.get_logger()
656
        logger.info('started to publish')
657

658
        with utils.lnr(logger, 'publish failed'):
659
            with self.entries_metadata(self.metadata) as calcs:
660

661
                with utils.timer(
662
                        logger, 'upload metadata updated', step='metadata',
663
                        upload_size=self.upload_files.size):
664

665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
                    def create_update(calc):
                        calc.published = True
                        calc.with_embargo = calc.with_embargo if calc.with_embargo is not None else False
                        return UpdateOne(
                            {'_id': calc.calc_id},
                            {'$set': {'metadata': calc.m_to_dict(
                                include_defaults=True, categories=[datamodel.MongoMetadata])}})

                    Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload files packed', step='pack',
                            upload_size=self.upload_files.size):
                        self.upload_files.pack(calcs)
680

681
                with utils.timer(
682
                        logger, 'index updated', step='index',
683
                        upload_size=self.upload_files.size):
684
685
686
687
688
689
690
691
692
693
694
695
                    search.publish(calcs)

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload deleted', step='delete staged',
                            upload_size=self.upload_files.size):
                        self.upload_files.delete()
                        self.published = True
                        self.publish_time = datetime.utcnow()
                        self.last_update = datetime.utcnow()
                        self.save()
                else:
696
                    self.last_update = datetime.utcnow()
697
                    self.save()
698

699
700
    @process
    def re_process_upload(self):
701
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
702
703
704
        A *process* that performs the re-processing of a earlier processed
        upload.

705
706
707
708
709
710
        Runs the distributed process of fully reparsing/renormalizing an existing and
        already published upload. Will renew the archive part of the upload and update
        mongo and elastic search entries.

        TODO this implementation does not do any re-matching. This will be more complex
        due to handling of new or missing matches.
711
        '''
712
713
714
        logger = self.get_logger()
        logger.info('started to re-process')

715
716
717
718
719
        # mock the steps of actual processing
        self._continue_with('uploading')

        # extract the published raw files into a staging upload files instance
        self._continue_with('extracting')
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734

        if self.published:
            try:
                staging_upload_files = StagingUploadFiles(self.upload_id)
                # public files exist and there is a staging directory, it is probably old
                # and we delete it first
                staging_upload_files.delete()
                logger.warn('deleted old staging files')

            except KeyError as e:
                logger.info('reprocessing published files')
        else:
            logger.info('reprocessing staging files')

        staging_upload_files = self.upload_files.to_staging_upload_files(create=True)
735
736

        self._continue_with('parse_all')
737
        try:
738
            # check if a calc is already/still processing
739
740
741
742
743
744
745
746
            processing = Calc.objects(
                upload_id=self.upload_id,
                **Calc.process_running_mongoengine_query()).count()

            if processing > 0:
                logger.warn(
                    'processes are still/already running on calc, they will be resetted',
                    count=processing)
747
748

            # reset all calcs
749
            Calc._get_collection().update_many(
750
751
752
                dict(upload_id=self.upload_id),
                {'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)})

753
            # process call calcs
754
755
756
            Calc.process_all(Calc.re_process_calc, dict(upload_id=self.upload_id), exclude=['metadata'])

            logger.info('completed to trigger re-process of all calcs')
757
758
        except Exception as e:
            # try to remove the staging copy in failure case
759
760
            logger.error('failed to trigger re-process of all calcs', exc_info=e)

761
762
763
            if self.published:
                if staging_upload_files is not None and staging_upload_files.exists():
                    staging_upload_files.delete()
764
765

            raise e
766
767
768
769

        # the packing and removing of the staging upload files, will be trigged by
        # the 'cleanup' task after processing all calcs

770
771
    @process
    def re_pack(self):
772
        ''' A *process* that repacks the raw and archive data based on the current embargo data. '''
773
774
775
776
777
778
779
780
        assert self.published

        # mock the steps of actual processing
        self._continue_with('uploading')
        self._continue_with('extracting')
        self._continue_with('parse_all')
        self._continue_with('cleanup')

781
        self.upload_files.re_pack(self.user_metadata())
782
783
784
        self.joined = True
        self._complete()

Markus Scheidgen's avatar
Markus Scheidgen committed
785
    @process
786
    def process_upload(self):
787
        ''' A *process* that performs the initial upload processing. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
788
789
790
791
792
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
793
        ''' A no-op *task* as a stand-in for receiving upload data. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
794
795
        pass

796
    @property
797
798
    def upload_files(self) -> UploadFiles:
        upload_files_class = ArchiveBasedStagingUploadFiles if not self.published else PublicUploadFiles
799
        kwargs = dict(upload_path=self.upload_path) if not self.published else {}
800
801
802

        if not self._upload_files or not isinstance(self._upload_files, upload_files_class):
            self._upload_files = upload_files_class(
803
                self.upload_id, is_authorized=lambda: True, **kwargs)
804

805
        return self._upload_files
806

807
808
809
810
811
    @property
    def staging_upload_files(self) -> ArchiveBasedStagingUploadFiles:
        assert not self.published
        return cast(ArchiveBasedStagingUploadFiles, self.upload_files)

Markus Scheidgen's avatar
Markus Scheidgen committed
812
813
    @task
    def extracting(self):
814
        '''
815
816
        The *task* performed before the actual parsing/normalizing: extracting
        the uploaded files.
817
        '''
818
819
820
821
822
        # extract the uploaded file
        self._upload_files = ArchiveBasedStagingUploadFiles(
            upload_id=self.upload_id, is_authorized=lambda: True, create=True,
            upload_path=self.upload_path)

Markus Scheidgen's avatar
Markus Scheidgen committed
823
824
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
825
826
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
827
828
                    upload_size=self.upload_files.size):
                self.upload_files.extract()
829
830
831
832

            if self.temporary:
                os.remove(self.upload_path)
                self.upload_path = None
833

834
        except KeyError:
835
            self.fail('processing requested for non existing upload', log_level=logging.ERROR)
836
837
838
            return
        except ExtractError:
            self.fail('bad .zip/.tar file', log_level=logging.INFO)
Markus Scheidgen's avatar
Markus Scheidgen committed
839
840
            return

841
    def _preprocess_files(self, path):
842
        '''
843
844
        Some files need preprocessing. Currently we need to add a stripped POTCAR version
        and always restrict/embargo the original.
845
        '''
846
        if os.path.basename(path).startswith('POTCAR'):
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
            # create checksum
            hash = hashlib.sha224()
            with open(self.staging_upload_files.raw_file_object(path).os_path, 'rb') as orig_f:
                for line in orig_f.readlines():
                    hash.update(line)

            checksum = hash.hexdigest()

            # created stripped POTCAR
            stripped_path = path + '.stripped'
            with open(self.staging_upload_files.raw_file_object(stripped_path).os_path, 'wt') as stripped_f:
                stripped_f.write('Stripped POTCAR file. Checksum of original file (sha224): %s\n' % checksum)
            os.system(
                '''
                    awk < %s >> %s '
                    BEGIN { dump=1 }
                    /End of Dataset/ { dump=1 }
                    dump==1 { print }
                    /END of PSCTR/ { dump=0 }'
                ''' % (
                    self.staging_upload_files.raw_file_object(path).os_path,
                    self.staging_upload_files.raw_file_object(stripped_path).os_path))

870
    def match_mainfiles(self) -> Iterator[Tuple[str, object]]:
871
        '''
872
873
874
875
876
        Generator function that matches all files in the upload to all parsers to
        determine the upload's mainfiles.

        Returns:
            Tuples of mainfile, filename, and parsers
877
        '''
878
        directories_with_match: Dict[str, str] = dict()
879
880
        upload_files = self.staging_upload_files
        for filename in upload_files.raw_file_manifest():
881
            self._preprocess_files(filename)
882
            try:
883
                parser = match_parser(upload_files.raw_file_object(filename).os_path)
884
                if parser is not None:
885
886
                    directory = os.path.dirname(filename)
                    if directory in directories_with_match:
887
888
889
                        # TODO this might give us the chance to store directory based relationship
                        # between calcs for the future?
                        pass
890
891
                    else:
                        directories_with_match[directory] = filename
892
893

                    yield filename, parser
894
895
896
897
            except Exception as e:
                self.get_logger().error(
                    'exception while matching pot. mainfile',
                    mainfile=filename, exc_info=e)
898

Markus Scheidgen's avatar
Markus Scheidgen committed
899
900
    @task
    def parse_all(self):
901
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
902
        The *task* used to identify mainfile/parser combinations among the upload's files, creates
903
        respective :class:`Calc` instances, and triggers their processing.
904
        '''
905
906
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
907
908
        with utils.timer(
                logger, 'upload extracted', step='matching',
909
910
                upload_size=self.upload_files.size):
            for filename, parser in self.match_mainfiles():
911
                calc = Calc.create(
912
                    calc_id=self.upload_files.calc_id(filename),
913
                    mainfile=filename, parser=parser.name,
914
                    worker_hostname=self.worker_hostname,
915
916
                    upload_id=self.upload_id)

917
                calc.process_calc()
Markus Scheidgen's avatar
Markus Scheidgen committed
918

919
    def on_process_complete(self, process_name):
920
        if process_name == 'process_upload' or process_name == 're_process_upload':
921
922
923
            self.check_join()

    def check_join(self):
924
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
925
926
927
928
929
930
        Performs an evaluation of the join condition and triggers the :func:`cleanup`
        task if necessary. The join condition allows to run the ``cleanup`` after
        all calculations have been processed. The upload processing stops after all
        calculation processings have been triggered (:func:`parse_all` or
        :func:`re_process_upload`). The cleanup task is then run within the last
        calculation process (the one that triggered the join by calling this method).
931
        '''
932
933
934
935
        total_calcs = self.total_calcs
        processed_calcs = self.processed_calcs

        self.get_logger().debug('check join', processed_calcs=processed_calcs, total_calcs=total_calcs)
936
937
        # check if process is not running anymore, i.e. not still spawining new processes to join
        # check the join condition, i.e. all calcs have been processed
938
        if not self.process_running and processed_calcs >= total_calcs:
939
            # this can easily be called multiple times, e.g. upload finished after all calcs finished
940
941
942
            modified_upload = self._get_collection().find_one_and_update(
                {'_id': self.upload_id, 'joined': {'$ne': True}},
                {'$set': {'joined': True}})
943
944
945
946
947
948
            if modified_upload is not None:
                self.get_logger().debug('join')
                self.cleanup()
            else:
                # the join was already done due to a prior call
                pass
949

950
951
952
    def reset(self):
        self.joined = False
        super().reset()
Markus Scheidgen's avatar
Markus Scheidgen committed
953

954
955
956
957
958
959
    @classmethod
    def reset_pymongo_update(cls, worker_hostname: str = None):
        update = super().reset_pymongo_update()
        update.update(joined=False)
        return update

960
    def _cleanup_after_processing(self):
961
962
963
964
965
966
        # send email about process finish
        user = self.uploader
        name = '%s %s' % (user.first_name, user.last_name)
        message = '\n'.join([
            'Dear %s,' % name,
            '',