data.py 43.5 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
16
17
18
19
20
21
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
Markus Scheidgen's avatar
Markus Scheidgen committed
22

Markus Scheidgen's avatar
Markus Scheidgen committed
23
.. autoclass:: Upload
Markus Scheidgen's avatar
Markus Scheidgen committed
24

25
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
26

27
from typing import cast, List, Any, Tuple, Iterator, Dict, cast, Iterable
28
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField
Markus Scheidgen's avatar
Markus Scheidgen committed
29
import logging
30
from structlog import wrap_logger
31
from contextlib import contextmanager
32
import os.path
33
34
from datetime import datetime
from pymongo import UpdateOne
35
import hashlib
36
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
Markus Scheidgen's avatar
Markus Scheidgen committed
37

38
from nomad import utils, config, infrastructure, search, datamodel
39
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles, StagingUploadFiles
40
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
41
from nomad.parsing import parser_dict, match_parser, Backend
Markus Scheidgen's avatar
Markus Scheidgen committed
42
43
44
from nomad.normalizing import normalizers


45
46
47
48
49
50
51
def _pack_log_event(logger, method_name, event_dict):
    try:
        log_data = dict(event_dict)
        log_data.update(**{
            key: value
            for key, value in getattr(logger, '_context', {}).items()
            if key not in ['service', 'release', 'upload_id', 'calc_id', 'mainfile', 'process_status']})
52
        log_data.update(logger=logger.name)
53
54
55
56
57
58
59
60
61
62
63

        return log_data
    except Exception:
        # raising an exception would cause an indefinite loop
        return event_dict


_log_processors = [
    StackInfoRenderer(),
    _pack_log_event,
    format_exc_info,
64
    TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)]
65
66


Markus Scheidgen's avatar
Markus Scheidgen committed
67
class Calc(Proc):
68
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
69
70
71
72
73
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

74
75
    The attribute list, does not include the various metadata properties generated
    while parsing, including ``code_name``, ``code_version``, etc.
Markus Scheidgen's avatar
Markus Scheidgen committed
76
77

    Attributes:
78
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
79
80
81
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
82

83
84
        metadata: the metadata record wit calc and user metadata, see :class:`datamodel.EntryMetadata`
    '''
85
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
86
87
88
89
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

90
91
    metadata = DictField()

Markus Scheidgen's avatar
Markus Scheidgen committed
92
    meta: Any = {
Markus Scheidgen's avatar
Markus Scheidgen committed
93
        'indexes': [
94
95
96
97
            'upload_id',
            ('upload_id', 'mainfile'),
            ('upload_id', 'parser'),
            ('upload_id', 'tasks_status'),
98
            ('upload_id', 'process_status'),
99
100
101
            ('upload_id', 'metadata.nomad_version'),
            'metadata.published',
            'metadata.datasets'
Markus Scheidgen's avatar
Markus Scheidgen committed
102
            'metadata.pid'
Markus Scheidgen's avatar
Markus Scheidgen committed
103
104
105
106
107
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
108
        self._parser_backend: Backend = None
109
110
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
111
        self._calc_proc_logs: List[Any] = None
Markus Scheidgen's avatar
Markus Scheidgen committed
112

113
        self._entry_metadata = None
114

Markus Scheidgen's avatar
Markus Scheidgen committed
115
116
    @classmethod
    def get(cls, id):
117
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
118

Markus Scheidgen's avatar
Markus Scheidgen committed
119
    @property
120
121
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
122

123
124
125
126
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
127
            self._upload.worker_hostname = self.worker_hostname
128
129
        return self._upload

130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
    def apply_entry_metadata(self, entry_metadata: datamodel.EntryMetadata):
        self.metadata = entry_metadata.m_to_dict(
            include_defaults=True,
            categories=[datamodel.MongoMetadata])  # TODO use embedded doc?

    def create_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
        processing object, not necessarely the user metadata nor the metadata from
        the archive.
        '''
        entry_metadata = datamodel.EntryMetadata()
        entry_metadata.domain = parser_dict[self.parser].domain
        entry_metadata.upload_id = self.upload_id
        entry_metadata.calc_id = self.calc_id
        entry_metadata.mainfile = self.mainfile
        entry_metadata.nomad_version = config.version
        entry_metadata.nomad_commit = config.commit
        entry_metadata.uploader = self.upload.user_id
        entry_metadata.upload_time = self.upload.upload_time
        entry_metadata.upload_name = self.upload.name

        return entry_metadata

    def entry_metadata(self, upload_files: UploadFiles) -> datamodel.EntryMetadata:
        '''
        Returns a complete set of :class:`nomad.datamodel.EntryMetadata` including
        the user metadata and metadata from the archive.

        Arguments:
            upload_files:
                The :class:`nomad.files.UploadFiles` instance to read the archive from.
            cache:
                A boolean that indicates if the archive file should be left unclosed,
                e.g. if this method is called for many entries of the same upload.
        '''
        archive = upload_files.read_archive(self.calc_id)
        entry_metadata = datamodel.EntryMetadata.m_from_dict(
            archive[self.calc_id][datamodel.EntryArchive.section_metadata.name].to_dict())

        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

    def user_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
        processing object and the user metadata, not necessarely the metadata from
        the archive.
        '''
        entry_metadata = self.create_metadata()
        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

185
186
187
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
188
189
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
190
191
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
192
    def get_logger(self, **kwargs):
193
        '''
194
195
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
196
        '''
197
198
199
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
200

201
202
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []
203

204
205
206
207
208
        def save_to_calc_log(logger, method_name, event_dict):
            try:
                dump_dict = dict(event_dict)
                dump_dict.update(level=method_name.upper())
                self._calc_proc_logs.append(dump_dict)
209

210
211
212
            except Exception:
                # Exceptions here will cause indefinite loop
                pass
213

214
            return event_dict
215

216
        return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
217

218
219
    @process
    def re_process_calc(self):
220
        '''
221
222
223
        Processes a calculation again. This means there is already metadata and
        instead of creating it initially, we are just updating the existing
        records.
224
        '''
225
        parser = match_parser(self.upload_files.raw_file_object(self.mainfile).os_path, strict=False)
226
        logger = self.get_logger()
227
228
229

        if parser is None and not config.reprocess_unmatched:
            self.errors = ['no parser matches during re-process, will not re-process this calc']
230

231
232
233
234
235
236
237
238
            try:
                upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
                with upload_files.read_archive(self.calc_id) as archive:
                    self.upload_files.write_archive(self.calc_id, archive[self.calc_id].to_dict())

            except Exception as e:
                logger.error('could not copy archive for non matching, non reprocessed entry', exc_info=e)

239
240
241
242
243
            # mock the steps of actual processing
            self._continue_with('parsing')
            self._continue_with('normalizing')
            self._continue_with('archiving')
            self._complete()
244
245
            return

246
        if parser is None:
247
248
            self.get_logger().error('no parser matches during re-process, use the old parser')
            self.errors = ['no matching parser found during re-processing']
249
        if self.parser != parser.name:
250
251
252
            self.parser = parser.name
            logger.info(
                'different parser matches during re-process, use new parser',
253
                parser=parser.name)
254

255
        try:
256
257
258
259
260
261
            self._entry_metadata = self.user_metadata()
            self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
            self._entry_metadata.nomad_version = config.version
            self._entry_metadata.nomad_commit = config.commit
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)
262
263
264
265
266
267

            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
            # close loghandler that was not closed due to failures
268
269
270
271
272
273
            try:
                if self._parser_backend and self._parser_backend.resource:
                    self._parser_backend.resource.unload()
            except Exception as e:
                logger.error('could unload processing results', exc_info=e)

Markus Scheidgen's avatar
Markus Scheidgen committed
274
    @process
275
    def process_calc(self):
276
        '''
277
278
        Processes a new calculation that has no prior records in the mongo, elastic,
        or filesystem storage. It will create an initial set of (user) metadata.
279
        '''
280
        logger = self.get_logger()
281
        if self.upload is None:
282
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
283
284

        try:
285
286
            # save preliminary minimum calc metadata in case processing fails
            # successful processing will replace it with the actual metadata
287
288
289
290
291
292
            self._entry_metadata = self.create_metadata()
            self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)

            if len(self._entry_metadata.files) >= config.auxfile_cutoff:
293
294
295
296
                self.warning(
                    'This calc has many aux files in its directory. '
                    'Have you placed many calculations in the same directory?')

Markus Scheidgen's avatar
Markus Scheidgen committed
297
298
299
300
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
301
            # close loghandler that was not closed due to failures
302
303
304
305
306
307
            try:
                if self._parser_backend and self._parser_backend.resource:
                    self._parser_backend.resource.unload()
            except Exception as e:
                logger.error('could unload processing results', exc_info=e)

308
309
310
311
    def fail(self, *errors, log_level=logging.ERROR, **kwargs):
        # in case of failure, index a minimum set of metadata and mark
        # processing failure
        try:
312
313
314
            if self.parser is not None:
                parser = parser_dict[self.parser]
                if hasattr(parser, 'code_name'):
315
                    self._entry_metadata.code_name = parser.code_name
316

317
318
319
            self._entry_metadata.processed = False
            self.apply_entry_metadata(self._entry_metadata)
            self._entry_metadata.a_elastic.index()
320
        except Exception as e:
321
322
            self.get_logger().error(
                'could not index after processing failure', exc_info=e)
323

324
        try:
325
326
327
328
329
330
            archive = datamodel.EntryArchive()
            archive.m_add_sub_section(
                datamodel.EntryArchive.section_metadata, self._entry_metadata)
            archive.processing_logs = self._calc_proc_logs

            self.upload_files.write_archive(self.calc_id, archive.m_to_dict())
331
        except Exception as e:
332
333
            self.get_logger().error(
                'could not write archive after processing failure', exc_info=e)
334

335
336
        super().fail(*errors, log_level=log_level, **kwargs)

337
338
339
340
    def on_process_complete(self, process_name):
        # the save might be necessary to correctly read the join condition from the db
        self.save()
        # in case of error, the process_name might be unknown
341
        if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None:
342
343
            self.upload.reload()
            self.upload.check_join()
Markus Scheidgen's avatar
Markus Scheidgen committed
344
345
346

    @task
    def parsing(self):
347
        ''' The *task* that encapsulates all parsing related actions. '''
348
        context = dict(parser=self.parser, step=self.parser)
349
        logger = self.get_logger(**context)
350
        parser = parser_dict[self.parser]
351
        self._entry_metadata.parser_name = self.parser
Markus Scheidgen's avatar
Markus Scheidgen committed
352

353
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
354
355
356
357
            try:
                self._parser_backend = parser.run(
                    self.upload_files.raw_file_object(self.mainfile).os_path, logger=logger)
            except Exception as e:
358
                self.fail('parser failed with exception', exc_info=e, error=str(e), **context)
359
                return
360
            except SystemExit:
361
                self.fail('parser raised system exit', error='system exit', **context)
362
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
363

364
365
        self._parser_backend.entry_archive.m_add_sub_section(
            datamodel.EntryArchive.section_metadata, self._entry_metadata)
366

Markus Scheidgen's avatar
Markus Scheidgen committed
367
368
        if self._parser_backend.status[0] != 'ParseSuccess':
            error = self._parser_backend.status[1]
369
            self.fail('parser failed', error=error, **context)
370
371
372
373
374
375
376
377

    @contextmanager
    def use_parser_backend(self, processor_name):
        self._parser_backend.reset_status()
        yield self._parser_backend

        if self._parser_backend.status[0] == 'ParseSuccess':
            warnings = getattr(self._parser_backend, '_warnings', [])
378

379
            if len(warnings) > 0:
380
381
382
383
                self.get_logger().warn(
                    'processor completed successful with warnings',
                    processor=processor_name, warnings=[str(warning) for warning in warnings])

384
            else:
385
386
387
388
                self.get_logger().info(
                    'processor completed successful',
                    processor=processor_name)

389
        else:
390
            errors = self._parser_backend.status[1]
391
392
393
            self.get_logger().error(
                'processor completed with failure',
                processor=processor_name, errors=str(errors))
Markus Scheidgen's avatar
Markus Scheidgen committed
394
395
396

    @task
    def normalizing(self):
397
        ''' The *task* that encapsulates all normalizing related actions. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
398
        for normalizer in normalizers:
399
            if normalizer.domain != parser_dict[self.parser].domain:
400
401
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
402
            normalizer_name = normalizer.__name__
403
            context = dict(normalizer=normalizer_name, step=normalizer_name)
404
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
405
406

            with utils.timer(
407
                    logger, 'normalizer executed', input_size=self.mainfile_file.size):
408
                with self.use_parser_backend(normalizer_name) as backend:
409
410
411
                    try:
                        normalizer(backend).normalize(logger=logger)
                    except Exception as e:
412
                        self._parser_backend.finishedParsingSession('ParseFailure', [str(e)])
413
                        self.fail(
414
415
416
417
418
419
420
421
422
423
                            'normalizer failed with exception', exc_info=e, error=str(e), **context)
                        break
                    else:
                        if self._parser_backend.status[0] != 'ParseSuccess':
                            error = self._parser_backend.status[1]
                            self.fail('normalizer failed', error=error, **context)
                            break
                        else:
                            logger.debug(
                                'completed normalizer successfully', normalizer=normalizer_name)
Markus Scheidgen's avatar
Markus Scheidgen committed
424
425
426

    @task
    def archiving(self):
427
        ''' The *task* that encapsulates all archival related actions. '''
428
429
        logger = self.get_logger()

430
431
        self._entry_metadata.apply_domain_metadata(self._parser_backend)
        self._entry_metadata.processed = True
432

433
434
        # persist the calc metadata
        with utils.timer(logger, 'saved calc metadata', step='metadata'):
435
            self.apply_entry_metadata(self._entry_metadata)
436
437

        # index in search
438
        with utils.timer(logger, 'indexed', step='index'):
439
            self._entry_metadata.a_elastic.index()
440

441
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
442
        with utils.timer(
443
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
444
                input_size=self.mainfile_file.size) as log_data:
445

446
            self._parser_backend.entry_archive.processing_logs = self._calc_proc_logs
447
            self._calc_proc_logs = None
Markus Scheidgen's avatar
Markus Scheidgen committed
448

449
            archive_data = self._parser_backend.entry_archive.m_to_dict()
450
451
            archive_size = self.upload_files.write_archive(self.calc_id, archive_data)
            log_data.update(archive_size=archive_size)
Markus Scheidgen's avatar
Markus Scheidgen committed
452

453
    def __str__(self):
454
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
455

456

457
class Upload(Proc):
458
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
459
460
461
462
463
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
464
465
        upload_path: the path were the uploaded files was stored
        temporary: True if the uploaded file should be removed after extraction
Markus Scheidgen's avatar
Markus Scheidgen committed
466
467
        upload_id: the upload id generated by the database
        upload_time: the timestamp when the system realised the upload
468
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
469
470
        published: Boolean that indicates the publish status
        publish_time: Date when the upload was initially published
471
        last_update: Date of the last publishing/re-processing
Markus Scheidgen's avatar
Markus Scheidgen committed
472
        joined: Boolean indicates if the running processing has joined (:func:`check_join`)
473
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
474
475
476
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
477
478
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
479
    embargo_length = IntField(default=36)
Markus Scheidgen's avatar
Markus Scheidgen committed
480
481
482

    name = StringField(default=None)
    upload_time = DateTimeField()
483
    user_id = StringField(required=True)
484
485
    published = BooleanField(default=False)
    publish_time = DateTimeField()
486
    last_update = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
487

488
489
    joined = BooleanField(default=False)

Markus Scheidgen's avatar
Markus Scheidgen committed
490
491
    meta: Any = {
        'indexes': [
492
            'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
Markus Scheidgen's avatar
Markus Scheidgen committed
493
494
495
496
497
        ]
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
498
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
499

500
501
    @property
    def metadata(self) -> dict:
502
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
503
504
505
506
507
        Getter, setter for user metadata. Metadata is pickled to and from the public
        bucket to allow sharing among all processes. Usually uploads do not have (much)
        user defined metadata, but users provide all metadata per upload as part of
        the publish process. This will change, when we introduce editing functionality
        and metadata will be provided through different means.
508
        '''
509
510
511
512
513
        try:
            upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
        except KeyError:
            return None
        return upload_files.user_metadata
514
515
516

    @metadata.setter
    def metadata(self, data: dict) -> None:
517
518
        upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True, create=True)
        upload_files.user_metadata = data
519

Markus Scheidgen's avatar
Markus Scheidgen committed
520
    @classmethod
521
    def get(cls, id: str, include_published: bool = True) -> 'Upload':
522
        return cls.get_by_id(id, 'upload_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
523
524

    @classmethod
525
    def user_uploads(cls, user: datamodel.User, **kwargs) -> List['Upload']:
526
        ''' Returns all uploads for the given user. Kwargs are passed to mongo query. '''
527
        return cls.objects(user_id=str(user.user_id), **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
528

529
530
    @property
    def uploader(self):
531
        return datamodel.User.get(self.user_id)
532

Markus Scheidgen's avatar
Markus Scheidgen committed
533
534
    def get_logger(self, **kwargs):
        logger = super().get_logger()
535
536
        user = self.uploader
        user_name = '%s %s' % (user.first_name, user.last_name)
537
        # We are not using 'user_id' because logstash (?) will filter these entries ?!
538
539
        logger = logger.bind(
            upload_id=self.upload_id, upload_name=self.name, user_name=user_name,
540
            user=user.user_id, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
541
542
543
544
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
545
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
546
547
548
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
549
550

        Arguments:
551
            user: The user that created the upload.
552
        '''
553
554
        # use kwargs to keep compatibility with super method
        user: datamodel.User = kwargs['user']
555
        del(kwargs['user'])
556

557
558
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
559
        kwargs.update(user_id=user.user_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
560
        self = super().create(**kwargs)
561

Markus Scheidgen's avatar
Markus Scheidgen committed
562
        self._continue_with('uploading')
563

Markus Scheidgen's avatar
Markus Scheidgen committed
564
565
        return self

566
    def delete(self):
567
        ''' Deletes this upload process state entry and its calcs. '''
568
569
570
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

571
    def delete_upload_local(self):
572
        '''
573
        Deletes the upload, including its processing state and
574
        staging files. Local version without celery processing.
575
        '''
576
577
578
        logger = self.get_logger()

        with utils.lnr(logger, 'staged upload delete failed'):
579
            with utils.timer(
580
                    logger, 'upload deleted from index', step='index',
581
                    upload_size=self.upload_files.size):
582
                search.delete_upload(self.upload_id)
583

584
            with utils.timer(
585
                    logger, 'staged upload deleted', step='files',
586
587
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
588
589

            self.delete()
590

591
    @process
592
    def delete_upload(self):
593
        '''
594
595
        Deletes of the upload, including its processing state and
        staging files. This starts the celery process of deleting the upload.
596
        '''
597
        self.delete_upload_local()
598

599
        return True  # do not save the process status on the delete upload
600

601
    @process
602
    def publish_upload(self):
603
        '''
604
605
        Moves the upload out of staging to the public area. It will
        pack the staging upload files in to public upload files.
606
        '''
607
608
        assert self.processed_calcs > 0

609
        logger = self.get_logger()
610
        logger.info('started to publish')
611

612
        with utils.lnr(logger, 'publish failed'):
613
            with self.entries_metadata(self.metadata) as calcs:
614

615
                with utils.timer(
616
                        logger, 'upload metadata updated', step='metadata',
617
                        upload_size=self.upload_files.size):
618

619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
                    def create_update(calc):
                        calc.published = True
                        calc.with_embargo = calc.with_embargo if calc.with_embargo is not None else False
                        return UpdateOne(
                            {'_id': calc.calc_id},
                            {'$set': {'metadata': calc.m_to_dict(
                                include_defaults=True, categories=[datamodel.MongoMetadata])}})

                    Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload files packed', step='pack',
                            upload_size=self.upload_files.size):
                        self.upload_files.pack(calcs)
634

635
                with utils.timer(
636
                        logger, 'index updated', step='index',
637
                        upload_size=self.upload_files.size):
638
639
640
641
642
643
644
645
646
647
648
649
                    search.publish(calcs)

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload deleted', step='delete staged',
                            upload_size=self.upload_files.size):
                        self.upload_files.delete()
                        self.published = True
                        self.publish_time = datetime.utcnow()
                        self.last_update = datetime.utcnow()
                        self.save()
                else:
650
                    self.last_update = datetime.utcnow()
651
                    self.save()
652

653
654
    @process
    def re_process_upload(self):
655
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
656
657
658
        A *process* that performs the re-processing of a earlier processed
        upload.

659
660
661
662
663
664
        Runs the distributed process of fully reparsing/renormalizing an existing and
        already published upload. Will renew the archive part of the upload and update
        mongo and elastic search entries.

        TODO this implementation does not do any re-matching. This will be more complex
        due to handling of new or missing matches.
665
        '''
666
667
        assert self.published

668
669
670
        logger = self.get_logger()
        logger.info('started to re-process')

671
672
673
674
675
676
        # mock the steps of actual processing
        self._continue_with('uploading')

        # extract the published raw files into a staging upload files instance
        self._continue_with('extracting')
        public_upload_files = cast(PublicUploadFiles, self.upload_files)
677
        staging_upload_files = public_upload_files.to_staging_upload_files(create=True)
678
679

        self._continue_with('parse_all')
680
        try:
681
            # check if a calc is already/still processing
682
683
684
685
686
687
688
689
            processing = Calc.objects(
                upload_id=self.upload_id,
                **Calc.process_running_mongoengine_query()).count()

            if processing > 0:
                logger.warn(
                    'processes are still/already running on calc, they will be resetted',
                    count=processing)
690
691

            # reset all calcs
692
            Calc._get_collection().update_many(
693
694
695
                dict(upload_id=self.upload_id),
                {'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)})

696
            # process call calcs
697
698
699
            Calc.process_all(Calc.re_process_calc, dict(upload_id=self.upload_id), exclude=['metadata'])

            logger.info('completed to trigger re-process of all calcs')
700
701
        except Exception as e:
            # try to remove the staging copy in failure case
702
703
            logger.error('failed to trigger re-process of all calcs', exc_info=e)

704
            if staging_upload_files is not None and staging_upload_files.exists():
705
706
707
                staging_upload_files.delete()

            raise e
708
709
710
711

        # the packing and removing of the staging upload files, will be trigged by
        # the 'cleanup' task after processing all calcs

712
713
    @process
    def re_pack(self):
714
        ''' A *process* that repacks the raw and archive data based on the current embargo data. '''
715
716
717
718
719
720
721
722
        assert self.published

        # mock the steps of actual processing
        self._continue_with('uploading')
        self._continue_with('extracting')
        self._continue_with('parse_all')
        self._continue_with('cleanup')

723
        self.upload_files.re_pack(self.user_metadata())
724
725
726
        self.joined = True
        self._complete()

Markus Scheidgen's avatar
Markus Scheidgen committed
727
    @process
728
    def process_upload(self):
729
        ''' A *process* that performs the initial upload processing. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
730
731
732
733
734
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
735
        ''' A no-op *task* as a stand-in for receiving upload data. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
736
737
        pass

738
    @property
739
740
    def upload_files(self) -> UploadFiles:
        upload_files_class = ArchiveBasedStagingUploadFiles if not self.published else PublicUploadFiles
741
        kwargs = dict(upload_path=self.upload_path) if not self.published else {}
742
743
744

        if not self._upload_files or not isinstance(self._upload_files, upload_files_class):
            self._upload_files = upload_files_class(
745
                self.upload_id, is_authorized=lambda: True, **kwargs)
746

747
        return self._upload_files
748

749
750
751
752
753
    @property
    def staging_upload_files(self) -> ArchiveBasedStagingUploadFiles:
        assert not self.published
        return cast(ArchiveBasedStagingUploadFiles, self.upload_files)

Markus Scheidgen's avatar
Markus Scheidgen committed
754
755
    @task
    def extracting(self):
756
        '''
757
758
        The *task* performed before the actual parsing/normalizing: extracting
        the uploaded files.
759
        '''
760
761
762
763
764
        # extract the uploaded file
        self._upload_files = ArchiveBasedStagingUploadFiles(
            upload_id=self.upload_id, is_authorized=lambda: True, create=True,
            upload_path=self.upload_path)

Markus Scheidgen's avatar
Markus Scheidgen committed
765
766
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
767
768
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
769
770
                    upload_size=self.upload_files.size):
                self.upload_files.extract()
771
772
773
774

            if self.temporary:
                os.remove(self.upload_path)
                self.upload_path = None
775

776
        except KeyError:
777
            self.fail('processing requested for non existing upload', log_level=logging.ERROR)
778
779
780
            return
        except ExtractError:
            self.fail('bad .zip/.tar file', log_level=logging.INFO)
Markus Scheidgen's avatar
Markus Scheidgen committed
781
782
            return

783
    def _preprocess_files(self, path):
784
        '''
785
786
        Some files need preprocessing. Currently we need to add a stripped POTCAR version
        and always restrict/embargo the original.
787
        '''
788
        if os.path.basename(path).startswith('POTCAR'):
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
            # create checksum
            hash = hashlib.sha224()
            with open(self.staging_upload_files.raw_file_object(path).os_path, 'rb') as orig_f:
                for line in orig_f.readlines():
                    hash.update(line)

            checksum = hash.hexdigest()

            # created stripped POTCAR
            stripped_path = path + '.stripped'
            with open(self.staging_upload_files.raw_file_object(stripped_path).os_path, 'wt') as stripped_f:
                stripped_f.write('Stripped POTCAR file. Checksum of original file (sha224): %s\n' % checksum)
            os.system(
                '''
                    awk < %s >> %s '
                    BEGIN { dump=1 }
                    /End of Dataset/ { dump=1 }
                    dump==1 { print }
                    /END of PSCTR/ { dump=0 }'
                ''' % (
                    self.staging_upload_files.raw_file_object(path).os_path,
                    self.staging_upload_files.raw_file_object(stripped_path).os_path))

812
    def match_mainfiles(self) -> Iterator[Tuple[str, object]]:
813
        '''
814
815
816
817
818
        Generator function that matches all files in the upload to all parsers to
        determine the upload's mainfiles.

        Returns:
            Tuples of mainfile, filename, and parsers
819
        '''
820
        directories_with_match: Dict[str, str] = dict()
821
822
        upload_files = self.staging_upload_files
        for filename in upload_files.raw_file_manifest():
823
            self._preprocess_files(filename)
824
            try:
825
                parser = match_parser(upload_files.raw_file_object(filename).os_path)
826
                if parser is not None:
827
828
                    directory = os.path.dirname(filename)
                    if directory in directories_with_match:
829
830
831
                        # TODO this might give us the chance to store directory based relationship
                        # between calcs for the future?
                        pass
832
833
                    else:
                        directories_with_match[directory] = filename
834
835

                    yield filename, parser
836
837
838
839
            except Exception as e:
                self.get_logger().error(
                    'exception while matching pot. mainfile',
                    mainfile=filename, exc_info=e)
840

Markus Scheidgen's avatar
Markus Scheidgen committed
841
842
    @task
    def parse_all(self):
843
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
844
        The *task* used to identify mainfile/parser combinations among the upload's files, creates
845
        respective :class:`Calc` instances, and triggers their processing.
846
        '''
847
848
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
849
850
        with utils.timer(
                logger, 'upload extracted', step='matching',
851
852
                upload_size=self.upload_files.size):
            for filename, parser in self.match_mainfiles():
853
                calc = Calc.create(
854
                    calc_id=self.upload_files.calc_id(filename),
855
                    mainfile=filename, parser=parser.name,
856
                    worker_hostname=self.worker_hostname,
857
858
                    upload_id=self.upload_id)

859
                calc.process_calc()
Markus Scheidgen's avatar
Markus Scheidgen committed
860

861
    def on_process_complete(self, process_name):
862
        if process_name == 'process_upload' or process_name == 're_process_upload':
863
864
865
            self.check_join()

    def check_join(self):
866
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
867
868
869
870
871
872
        Performs an evaluation of the join condition and triggers the :func:`cleanup`
        task if necessary. The join condition allows to run the ``cleanup`` after
        all calculations have been processed. The upload processing stops after all
        calculation processings have been triggered (:func:`parse_all` or
        :func:`re_process_upload`). The cleanup task is then run within the last
        calculation process (the one that triggered the join by calling this method).
873
        '''
874
875
876
877
        total_calcs = self.total_calcs
        processed_calcs = self.processed_calcs

        self.get_logger().debug('check join', processed_calcs=processed_calcs, total_calcs=total_calcs)
878
879
        # check if process is not running anymore, i.e. not still spawining new processes to join
        # check the join condition, i.e. all calcs have been processed
880
        if not self.process_running and processed_calcs >= total_calcs:
881
            # this can easily be called multiple times, e.g. upload finished after all calcs finished
882
883
884
            modified_upload = self._get_collection().find_one_and_update(
                {'_id': self.upload_id, 'joined': {'$ne': True}},
                {'$set': {'joined': True}})
885
886
887
888
889
890
            if modified_upload is not None:
                self.get_logger().debug('join')
                self.cleanup()
            else:
                # the join was already done due to a prior call
                pass
891

892
893
894
    def reset(self):
        self.joined = False
        super().reset()
Markus Scheidgen's avatar
Markus Scheidgen committed
895

896
897
898
899
900
901
    @classmethod
    def reset_pymongo_update(cls, worker_hostname: str = None):
        update = super().reset_pymongo_update()
        update.update(joined=False)
        return update

902
    def _cleanup_after_processing(self):
903
904
905
906
907
908
        # send email about process finish
        user = self.uploader
        name = '%s %s' % (user.first_name, user.last_name)
        message = '\n'.join([
            'Dear %s,' % name,
            '',
909
            'your data %suploaded at %s has completed processing.' % (
910
                '"%s" ' % self.name if self.name else '', self.upload_time.isoformat()),  # pylint: disable=no-member
911
            'You can review your data on your upload page: %s' % config.gui_url(),
912
913
914
915
            '',
            'If you encouter any issues with your upload, please let us know and replay to this email.',
            '',
            'The nomad team'
916
        ])
917
918
919
920
921
922
923
        try:
            infrastructure.send_mail(
                name=name, email=user.email, message=message, subject='Processing completed')
        except Exception as e:
            # probably due to email configuration problems
            # don't fail or present this error to clients
            self.logger.error('could not send after processing email', exc_info=e)
Markus Scheidgen's avatar
Markus Scheidgen committed
924

925
926
927
928
929
930
931
932
933
934
    def _cleanup_after_re_processing(self):
        logger = self.get_logger()
        logger.info('started to repack re-processed upload')

        staging_upload_files = self.upload_files.to_staging_upload_files()

        with utils.timer(
                logger, 'reprocessed staged upload packed', step='delete staged',
                upload_size=self.upload_files.size):

935
            staging_upload_files.pack(self.user_metadata(), skip_raw=True)
936
937
938
939
940
941

        with utils.timer(
                logger, 'reprocessed staged upload deleted', step='delete staged',
                upload_size=self.upload_files.size):

            staging_upload_files.delete()