data.py 42.8 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
16
17
18
19
20
21
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
Markus Scheidgen's avatar
Markus Scheidgen committed
22

Markus Scheidgen's avatar
Markus Scheidgen committed
23
.. autoclass:: Upload
Markus Scheidgen's avatar
Markus Scheidgen committed
24

25
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
26

27
from typing import cast, List, Any, Tuple, Generator, Dict, cast, Iterable
28
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField
Markus Scheidgen's avatar
Markus Scheidgen committed
29
import logging
30
from structlog import wrap_logger
31
from contextlib import contextmanager
32
import os.path
33
34
from datetime import datetime
from pymongo import UpdateOne
35
import hashlib
36
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
Markus Scheidgen's avatar
Markus Scheidgen committed
37

38
from nomad import utils, config, infrastructure, search, datamodel
39
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles, StagingUploadFiles
40
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
41
from nomad.parsing import parser_dict, match_parser, Backend
Markus Scheidgen's avatar
Markus Scheidgen committed
42
43
44
from nomad.normalizing import normalizers


45
46
47
48
49
50
51
def _pack_log_event(logger, method_name, event_dict):
    try:
        log_data = dict(event_dict)
        log_data.update(**{
            key: value
            for key, value in getattr(logger, '_context', {}).items()
            if key not in ['service', 'release', 'upload_id', 'calc_id', 'mainfile', 'process_status']})
52
        log_data.update(logger=logger.name)
53
54
55
56
57
58
59
60
61
62
63

        return log_data
    except Exception:
        # raising an exception would cause an indefinite loop
        return event_dict


_log_processors = [
    StackInfoRenderer(),
    _pack_log_event,
    format_exc_info,
64
    TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)]
65
66


Markus Scheidgen's avatar
Markus Scheidgen committed
67
class Calc(Proc):
68
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
69
70
71
72
73
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

74
75
    The attribute list, does not include the various metadata properties generated
    while parsing, including ``code_name``, ``code_version``, etc.
Markus Scheidgen's avatar
Markus Scheidgen committed
76
77

    Attributes:
78
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
79
80
81
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
82

83
84
        metadata: the metadata record wit calc and user metadata, see :class:`datamodel.EntryMetadata`
    '''
85
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
86
87
88
89
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

90
91
    metadata = DictField()

Markus Scheidgen's avatar
Markus Scheidgen committed
92
    meta: Any = {
Markus Scheidgen's avatar
Markus Scheidgen committed
93
        'indexes': [
94
95
96
97
            'upload_id',
            ('upload_id', 'mainfile'),
            ('upload_id', 'parser'),
            ('upload_id', 'tasks_status'),
98
            ('upload_id', 'process_status'),
99
100
101
            ('upload_id', 'metadata.nomad_version'),
            'metadata.published',
            'metadata.datasets'
Markus Scheidgen's avatar
Markus Scheidgen committed
102
            'metadata.pid'
Markus Scheidgen's avatar
Markus Scheidgen committed
103
104
105
106
107
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
108
        self._parser_backend: Backend = None
109
110
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
111
        self._calc_proc_logs: List[Any] = None
Markus Scheidgen's avatar
Markus Scheidgen committed
112

113
    @classmethod
114
    def from_entry_metadata(cls, entry_metadata):
115
        calc = Calc.create(
116
117
118
119
            calc_id=entry_metadata.calc_id,
            upload_id=entry_metadata.upload_id,
            mainfile=entry_metadata.mainfile,
            metadata=entry_metadata.m_to_dict(include_defaults=True))
120
121
122

        return calc

Markus Scheidgen's avatar
Markus Scheidgen committed
123
124
    @classmethod
    def get(cls, id):
125
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
126

Markus Scheidgen's avatar
Markus Scheidgen committed
127
    @property
128
129
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
130

131
132
133
134
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
135
            self._upload.worker_hostname = self.worker_hostname
136
137
        return self._upload

138
139
140
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
141
142
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
143
144
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
145
    def get_logger(self, **kwargs):
146
        '''
147
148
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
149
        '''
150
151
152
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
153

154
155
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []
156

157
158
159
160
161
        def save_to_calc_log(logger, method_name, event_dict):
            try:
                dump_dict = dict(event_dict)
                dump_dict.update(level=method_name.upper())
                self._calc_proc_logs.append(dump_dict)
162

163
164
165
            except Exception:
                # Exceptions here will cause indefinite loop
                pass
166

167
            return event_dict
168

169
        return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
170

171
172
    @process
    def re_process_calc(self):
173
        '''
174
175
176
        Processes a calculation again. This means there is already metadata and
        instead of creating it initially, we are just updating the existing
        records.
177
        '''
178
        parser = match_parser(self.upload_files.raw_file_object(self.mainfile).os_path, strict=False)
179
180
181

        if parser is None and not config.reprocess_unmatched:
            self.errors = ['no parser matches during re-process, will not re-process this calc']
182
183
184
185
186
187

            # mock the steps of actual processing
            self._continue_with('parsing')
            self._continue_with('normalizing')
            self._continue_with('archiving')
            self._complete()
188
189
            return

190
        logger = self.get_logger()
191
        if parser is None:
192
193
            self.get_logger().error('no parser matches during re-process, use the old parser')
            self.errors = ['no matching parser found during re-processing']
194
        if self.parser != parser.name:
195
196
197
            self.parser = parser.name
            logger.info(
                'different parser matches during re-process, use new parser',
198
                parser=parser.name)
199

200
        try:
201
202
203
204
205
206
207
208
209
210
            entry_metadata = datamodel.EntryMetadata.m_from_dict(self.metadata)
            entry_metadata.upload_id = self.upload_id
            entry_metadata.calc_id = self.calc_id
            entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
            entry_metadata.mainfile = self.mainfile
            entry_metadata.nomad_version = config.version
            entry_metadata.nomad_commit = config.commit
            entry_metadata.last_processing = datetime.utcnow()
            entry_metadata.files = self.upload_files.calc_files(self.mainfile)
            self.metadata = entry_metadata.m_to_dict(include_defaults=True)
211
212
213
214
215
216

            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
            # close loghandler that was not closed due to failures
217
218
219
220
221
222
            try:
                if self._parser_backend and self._parser_backend.resource:
                    self._parser_backend.resource.unload()
            except Exception as e:
                logger.error('could unload processing results', exc_info=e)

Markus Scheidgen's avatar
Markus Scheidgen committed
223
    @process
224
    def process_calc(self):
225
        '''
226
227
        Processes a new calculation that has no prior records in the mongo, elastic,
        or filesystem storage. It will create an initial set of (user) metadata.
228
        '''
229
        logger = self.get_logger()
230
        if self.upload is None:
231
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
232
233

        try:
234
235
            # save preliminary minimum calc metadata in case processing fails
            # successful processing will replace it with the actual metadata
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
            calc_metadata = datamodel.EntryMetadata()
            calc_metadata.domain = parser_dict[self.parser].domain
            calc_metadata.upload_id = self.upload_id
            calc_metadata.calc_id = self.calc_id
            calc_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
            calc_metadata.mainfile = self.mainfile
            calc_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
            calc_metadata.nomad_version = config.version
            calc_metadata.nomad_commit = config.commit
            calc_metadata.last_processing = datetime.utcnow()
            calc_metadata.files = self.upload_files.calc_files(self.mainfile)
            calc_metadata.uploader = self.upload.user_id
            calc_metadata.upload_time = self.upload.upload_time
            calc_metadata.upload_name = self.upload.name
            self.metadata = calc_metadata.m_to_dict(include_defaults=True)  # TODO use embedded doc?

            if len(calc_metadata.files) >= config.auxfile_cutoff:
253
254
255
256
                self.warning(
                    'This calc has many aux files in its directory. '
                    'Have you placed many calculations in the same directory?')

Markus Scheidgen's avatar
Markus Scheidgen committed
257
258
259
260
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
261
            # close loghandler that was not closed due to failures
262
263
264
265
266
267
            try:
                if self._parser_backend and self._parser_backend.resource:
                    self._parser_backend.resource.unload()
            except Exception as e:
                logger.error('could unload processing results', exc_info=e)

268
269
270
271
    def fail(self, *errors, log_level=logging.ERROR, **kwargs):
        # in case of failure, index a minimum set of metadata and mark
        # processing failure
        try:
272
            entry_metadata = datamodel.EntryMetadata.m_from_dict(self.metadata)
273
274
275
            if self.parser is not None:
                parser = parser_dict[self.parser]
                if hasattr(parser, 'code_name'):
276
                    entry_metadata.code_name = parser.code_name
277

278
279
            entry_metadata.processed = False
            self.metadata = entry_metadata.m_to_dict(include_defaults=True)
280
            entry_metadata.a_elastic.index()
281
282
283
        except Exception as e:
            self.get_logger().error('could not index after processing failure', exc_info=e)

284
285
286
287
288
        try:
            self.upload_files.write_archive(self.calc_id, {'processing_logs': self._calc_proc_logs})
        except Exception as e:
            self.get_logger().error('could not write archive (logs) after processing failure', exc_info=e)

289
290
        super().fail(*errors, log_level=log_level, **kwargs)

291
292
293
294
    def on_process_complete(self, process_name):
        # the save might be necessary to correctly read the join condition from the db
        self.save()
        # in case of error, the process_name might be unknown
295
        if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None:
296
297
            self.upload.reload()
            self.upload.check_join()
Markus Scheidgen's avatar
Markus Scheidgen committed
298
299
300

    @task
    def parsing(self):
301
        ''' The *task* that encapsulates all parsing related actions. '''
302
        context = dict(parser=self.parser, step=self.parser)
303
        logger = self.get_logger(**context)
304
        parser = parser_dict[self.parser]
305
        self.metadata['parser_name'] = self.parser
Markus Scheidgen's avatar
Markus Scheidgen committed
306

307
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
308
309
310
311
            try:
                self._parser_backend = parser.run(
                    self.upload_files.raw_file_object(self.mainfile).os_path, logger=logger)
            except Exception as e:
312
                self.fail('parser failed with exception', exc_info=e, error=str(e), **context)
313
                return
314
            except SystemExit:
315
                self.fail('parser raised system exit', error='system exit', **context)
316
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
317

318
319
        # add the non code specific calc metadata to the backend
        # all other quantities have been determined by parsers/normalizers
320
        self._parser_backend.openNonOverlappingSection('section_entry_info')
321
        self._parser_backend.addValue('upload_id', self.upload_id)
322
        self._parser_backend.addValue('calc_id', self.calc_id)
323
        self._parser_backend.addValue('calc_hash', self.metadata['calc_hash'])
324
        self._parser_backend.addValue('mainfile', self.mainfile)
325
        self._parser_backend.addValue('parser_name', self.parser)
326
327
328
329
330
331
332
333
334
335
336
337
        filepaths = self.metadata['files']
        self._parser_backend.addValue('number_of_files', len(filepaths))
        self._parser_backend.addValue('filepaths', filepaths)
        uploader = self.upload.uploader
        self._parser_backend.addValue(
            'entry_uploader_name', '%s, %s' % (uploader.first_name, uploader.last_name))
        self._parser_backend.addValue(
            'entry_uploader_id', str(uploader.user_id))
        self._parser_backend.addValue('entry_upload_time', int(self.upload.upload_time.timestamp()))
        self._parser_backend.closeNonOverlappingSection('section_entry_info')

        self.add_processor_info(self.parser)
338

Markus Scheidgen's avatar
Markus Scheidgen committed
339
340
        if self._parser_backend.status[0] != 'ParseSuccess':
            error = self._parser_backend.status[1]
341
            self.fail('parser failed', error=error, **context)
342
343
344
345
346
347
348
349

    @contextmanager
    def use_parser_backend(self, processor_name):
        self._parser_backend.reset_status()
        yield self._parser_backend
        self.add_processor_info(processor_name)

    def add_processor_info(self, processor_name: str) -> None:
350
        self._parser_backend.openContext('/section_entry_info/0')
351
352
353
354
355
356
357
        self._parser_backend.openNonOverlappingSection('section_archive_processing_info')
        self._parser_backend.addValue('archive_processor_name', processor_name)

        if self._parser_backend.status[0] == 'ParseSuccess':
            warnings = getattr(self._parser_backend, '_warnings', [])
            if len(warnings) > 0:
                self._parser_backend.addValue('archive_processor_status', 'WithWarnings')
358
                self._parser_backend.addValue('number_of_archive_processor_warnings', len(warnings))
359
360
361
362
                self._parser_backend.addArrayValues('archive_processor_warnings', [str(warning) for warning in warnings])
            else:
                self._parser_backend.addValue('archive_processor_status', 'Success')
        else:
363
            errors = self._parser_backend.status[1]
364
365
366
            self._parser_backend.addValue('archive_processor_error', str(errors))

        self._parser_backend.closeNonOverlappingSection('section_archive_processing_info')
367
        self._parser_backend.closeContext('/section_entry_info/0')
Markus Scheidgen's avatar
Markus Scheidgen committed
368
369
370

    @task
    def normalizing(self):
371
        ''' The *task* that encapsulates all normalizing related actions. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
372
        for normalizer in normalizers:
373
            if normalizer.domain != parser_dict[self.parser].domain:
374
375
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
376
            normalizer_name = normalizer.__name__
377
            context = dict(normalizer=normalizer_name, step=normalizer_name)
378
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
379
380

            with utils.timer(
381
                    logger, 'normalizer executed', input_size=self.mainfile_file.size):
382
                with self.use_parser_backend(normalizer_name) as backend:
383
384
385
                    try:
                        normalizer(backend).normalize(logger=logger)
                    except Exception as e:
386
                        self._parser_backend.finishedParsingSession('ParseFailure', [str(e)])
387
                        self.fail(
388
389
390
391
392
393
394
395
396
397
                            'normalizer failed with exception', exc_info=e, error=str(e), **context)
                        break
                    else:
                        if self._parser_backend.status[0] != 'ParseSuccess':
                            error = self._parser_backend.status[1]
                            self.fail('normalizer failed', error=error, **context)
                            break
                        else:
                            logger.debug(
                                'completed normalizer successfully', normalizer=normalizer_name)
Markus Scheidgen's avatar
Markus Scheidgen committed
398
399
400

    @task
    def archiving(self):
401
        ''' The *task* that encapsulates all archival related actions. '''
402
403
        logger = self.get_logger()

404
405
406
        entry_metadata = datamodel.EntryMetadata.m_from_dict(self.metadata)
        entry_metadata.apply_domain_metadata(self._parser_backend)
        entry_metadata.processed = True
407

408
409
        # persist the calc metadata
        with utils.timer(logger, 'saved calc metadata', step='metadata'):
410
            self.metadata = entry_metadata.m_to_dict(include_defaults=True)
411
412

        # index in search
413
        with utils.timer(logger, 'indexed', step='index'):
414
            entry_metadata.a_elastic.index()
415

416
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
417
        with utils.timer(
418
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
419
                input_size=self.mainfile_file.size) as log_data:
420

421
422
            archive_data = self._parser_backend.resource.m_to_dict(
                lambda section: section.m_def.name in datamodel.root_sections)
423

424
425
            archive_data['processing_logs'] = self._calc_proc_logs
            self._calc_proc_logs = None
Markus Scheidgen's avatar
Markus Scheidgen committed
426

427
428
            archive_size = self.upload_files.write_archive(self.calc_id, archive_data)
            log_data.update(archive_size=archive_size)
Markus Scheidgen's avatar
Markus Scheidgen committed
429

430
    def __str__(self):
431
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
432

433

434
class Upload(Proc):
435
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
436
437
438
439
440
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
441
442
        upload_path: the path were the uploaded files was stored
        temporary: True if the uploaded file should be removed after extraction
Markus Scheidgen's avatar
Markus Scheidgen committed
443
444
        upload_id: the upload id generated by the database
        upload_time: the timestamp when the system realised the upload
445
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
446
447
        published: Boolean that indicates the publish status
        publish_time: Date when the upload was initially published
448
        last_update: Date of the last publishing/re-processing
Markus Scheidgen's avatar
Markus Scheidgen committed
449
        joined: Boolean indicates if the running processing has joined (:func:`check_join`)
450
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
451
452
453
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
454
455
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
456
    embargo_length = IntField(default=36)
Markus Scheidgen's avatar
Markus Scheidgen committed
457
458
459

    name = StringField(default=None)
    upload_time = DateTimeField()
460
    user_id = StringField(required=True)
461
462
    published = BooleanField(default=False)
    publish_time = DateTimeField()
463
    last_update = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
464

465
466
    joined = BooleanField(default=False)

Markus Scheidgen's avatar
Markus Scheidgen committed
467
468
    meta: Any = {
        'indexes': [
469
            'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
Markus Scheidgen's avatar
Markus Scheidgen committed
470
471
472
473
474
        ]
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
475
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
476

477
478
    @property
    def metadata(self) -> dict:
479
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
480
481
482
483
484
        Getter, setter for user metadata. Metadata is pickled to and from the public
        bucket to allow sharing among all processes. Usually uploads do not have (much)
        user defined metadata, but users provide all metadata per upload as part of
        the publish process. This will change, when we introduce editing functionality
        and metadata will be provided through different means.
485
        '''
486
487
488
489
490
        try:
            upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
        except KeyError:
            return None
        return upload_files.user_metadata
491
492
493

    @metadata.setter
    def metadata(self, data: dict) -> None:
494
495
        upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True, create=True)
        upload_files.user_metadata = data
496

Markus Scheidgen's avatar
Markus Scheidgen committed
497
    @classmethod
498
    def get(cls, id: str, include_published: bool = True) -> 'Upload':
499
        return cls.get_by_id(id, 'upload_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
500
501

    @classmethod
502
    def user_uploads(cls, user: datamodel.User, **kwargs) -> List['Upload']:
503
        ''' Returns all uploads for the given user. Kwargs are passed to mongo query. '''
504
        return cls.objects(user_id=str(user.user_id), **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
505

506
507
    @property
    def uploader(self):
508
        return datamodel.User.get(self.user_id)
509

Markus Scheidgen's avatar
Markus Scheidgen committed
510
511
    def get_logger(self, **kwargs):
        logger = super().get_logger()
512
513
        user = self.uploader
        user_name = '%s %s' % (user.first_name, user.last_name)
514
        # We are not using 'user_id' because logstash (?) will filter these entries ?!
515
516
        logger = logger.bind(
            upload_id=self.upload_id, upload_name=self.name, user_name=user_name,
517
            user=user.user_id, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
518
519
520
521
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
522
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
523
524
525
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
526
527

        Arguments:
528
            user: The user that created the upload.
529
        '''
530
531
        # use kwargs to keep compatibility with super method
        user: datamodel.User = kwargs['user']
532
        del(kwargs['user'])
533

534
535
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
536
        kwargs.update(user_id=user.user_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
537
        self = super().create(**kwargs)
538

Markus Scheidgen's avatar
Markus Scheidgen committed
539
        self._continue_with('uploading')
540

Markus Scheidgen's avatar
Markus Scheidgen committed
541
542
        return self

543
    def delete(self):
544
        ''' Deletes this upload process state entry and its calcs. '''
545
546
547
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

548
    def delete_upload_local(self):
549
        '''
550
        Deletes the upload, including its processing state and
551
        staging files. Local version without celery processing.
552
        '''
553
554
555
        logger = self.get_logger()

        with utils.lnr(logger, 'staged upload delete failed'):
556
            with utils.timer(
557
                    logger, 'upload deleted from index', step='index',
558
                    upload_size=self.upload_files.size):
559
                search.delete_upload(self.upload_id)
560

561
            with utils.timer(
562
                    logger, 'staged upload deleted', step='files',
563
564
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
565
566

            self.delete()
567

568
    @process
569
    def delete_upload(self):
570
        '''
571
572
        Deletes of the upload, including its processing state and
        staging files. This starts the celery process of deleting the upload.
573
        '''
574
        self.delete_upload_local()
575

576
        return True  # do not save the process status on the delete upload
577

578
    @process
579
    def publish_upload(self):
580
        '''
581
582
        Moves the upload out of staging to the public area. It will
        pack the staging upload files in to public upload files.
583
        '''
584
585
        assert self.processed_calcs > 0

586
        logger = self.get_logger()
587
        logger.info('started to publish')
588

589
        with utils.lnr(logger, 'publish failed'):
590
            calcs = self.entries_metadata(self.metadata)
591

592
            with utils.timer(
593
                    logger, 'upload metadata updated', step='metadata',
594
                    upload_size=self.upload_files.size):
595
596

                def create_update(calc):
597
                    calc.published = True
598
                    calc.with_embargo = calc.with_embargo if calc.with_embargo is not None else False
599
600
                    return UpdateOne(
                        {'_id': calc.calc_id},
601
                        {'$set': {'metadata': calc.m_to_dict(include_defaults=True)}})
602
603

                Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])
604

605
606
607
608
            if isinstance(self.upload_files, StagingUploadFiles):
                with utils.timer(
                        logger, 'staged upload files packed', step='pack',
                        upload_size=self.upload_files.size):
609
                    self.upload_files.pack(calcs)
610
611
612
613

            with utils.timer(
                    logger, 'index updated', step='index',
                    upload_size=self.upload_files.size):
614
                search.publish(calcs)
615

616
617
618
619
620
621
            if isinstance(self.upload_files, StagingUploadFiles):
                with utils.timer(
                        logger, 'staged upload deleted', step='delete staged',
                        upload_size=self.upload_files.size):
                    self.upload_files.delete()
                    self.published = True
622
623
                    self.publish_time = datetime.utcnow()
                    self.last_update = datetime.utcnow()
624
625
                    self.save()
            else:
626
                self.last_update = datetime.utcnow()
627
                self.save()
628

629
630
    @process
    def re_process_upload(self):
631
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
632
633
634
        A *process* that performs the re-processing of a earlier processed
        upload.

635
636
637
638
639
640
        Runs the distributed process of fully reparsing/renormalizing an existing and
        already published upload. Will renew the archive part of the upload and update
        mongo and elastic search entries.

        TODO this implementation does not do any re-matching. This will be more complex
        due to handling of new or missing matches.
641
        '''
642
643
        assert self.published

644
645
646
        logger = self.get_logger()
        logger.info('started to re-process')

647
648
649
650
651
652
        # mock the steps of actual processing
        self._continue_with('uploading')

        # extract the published raw files into a staging upload files instance
        self._continue_with('extracting')
        public_upload_files = cast(PublicUploadFiles, self.upload_files)
653
        staging_upload_files = public_upload_files.to_staging_upload_files(create=True)
654
655

        self._continue_with('parse_all')
656
        try:
657
            # check if a calc is already/still processing
658
659
660
661
662
663
664
665
            processing = Calc.objects(
                upload_id=self.upload_id,
                **Calc.process_running_mongoengine_query()).count()

            if processing > 0:
                logger.warn(
                    'processes are still/already running on calc, they will be resetted',
                    count=processing)
666
667

            # reset all calcs
668
            Calc._get_collection().update_many(
669
670
671
                dict(upload_id=self.upload_id),
                {'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)})

672
            # process call calcs
673
674
675
            Calc.process_all(Calc.re_process_calc, dict(upload_id=self.upload_id), exclude=['metadata'])

            logger.info('completed to trigger re-process of all calcs')
676
677
        except Exception as e:
            # try to remove the staging copy in failure case
678
679
            logger.error('failed to trigger re-process of all calcs', exc_info=e)

680
            if staging_upload_files is not None and staging_upload_files.exists():
681
682
683
                staging_upload_files.delete()

            raise e
684
685
686
687

        # the packing and removing of the staging upload files, will be trigged by
        # the 'cleanup' task after processing all calcs

688
689
    @process
    def re_pack(self):
690
        ''' A *process* that repacks the raw and archive data based on the current embargo data. '''
691
692
693
694
695
696
697
698
        assert self.published

        # mock the steps of actual processing
        self._continue_with('uploading')
        self._continue_with('extracting')
        self._continue_with('parse_all')
        self._continue_with('cleanup')

699
        self.upload_files.re_pack(self.entries_metadata())
700
701
702
        self.joined = True
        self._complete()

Markus Scheidgen's avatar
Markus Scheidgen committed
703
    @process
704
    def process_upload(self):
705
        ''' A *process* that performs the initial upload processing. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
706
707
708
709
710
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
711
        ''' A no-op *task* as a stand-in for receiving upload data. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
712
713
        pass

714
    @property
715
716
    def upload_files(self) -> UploadFiles:
        upload_files_class = ArchiveBasedStagingUploadFiles if not self.published else PublicUploadFiles
717
        kwargs = dict(upload_path=self.upload_path) if not self.published else {}
718
719
720

        if not self._upload_files or not isinstance(self._upload_files, upload_files_class):
            self._upload_files = upload_files_class(
721
                self.upload_id, is_authorized=lambda: True, **kwargs)
722

723
        return self._upload_files
724

725
726
727
728
729
    @property
    def staging_upload_files(self) -> ArchiveBasedStagingUploadFiles:
        assert not self.published
        return cast(ArchiveBasedStagingUploadFiles, self.upload_files)

Markus Scheidgen's avatar
Markus Scheidgen committed
730
731
    @task
    def extracting(self):
732
        '''
733
734
        The *task* performed before the actual parsing/normalizing: extracting
        the uploaded files.
735
        '''
736
737
738
739
740
        # extract the uploaded file
        self._upload_files = ArchiveBasedStagingUploadFiles(
            upload_id=self.upload_id, is_authorized=lambda: True, create=True,
            upload_path=self.upload_path)

Markus Scheidgen's avatar
Markus Scheidgen committed
741
742
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
743
744
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
745
746
                    upload_size=self.upload_files.size):
                self.upload_files.extract()
747
748
749
750

            if self.temporary:
                os.remove(self.upload_path)
                self.upload_path = None
751

752
        except KeyError:
753
            self.fail('processing requested for non existing upload', log_level=logging.ERROR)
754
755
756
            return
        except ExtractError:
            self.fail('bad .zip/.tar file', log_level=logging.INFO)
Markus Scheidgen's avatar
Markus Scheidgen committed
757
758
            return

759
    def _preprocess_files(self, path):
760
        '''
761
762
        Some files need preprocessing. Currently we need to add a stripped POTCAR version
        and always restrict/embargo the original.
763
        '''
764
        if os.path.basename(path).startswith('POTCAR'):
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
            # create checksum
            hash = hashlib.sha224()
            with open(self.staging_upload_files.raw_file_object(path).os_path, 'rb') as orig_f:
                for line in orig_f.readlines():
                    hash.update(line)

            checksum = hash.hexdigest()

            # created stripped POTCAR
            stripped_path = path + '.stripped'
            with open(self.staging_upload_files.raw_file_object(stripped_path).os_path, 'wt') as stripped_f:
                stripped_f.write('Stripped POTCAR file. Checksum of original file (sha224): %s\n' % checksum)
            os.system(
                '''
                    awk < %s >> %s '
                    BEGIN { dump=1 }
                    /End of Dataset/ { dump=1 }
                    dump==1 { print }
                    /END of PSCTR/ { dump=0 }'
                ''' % (
                    self.staging_upload_files.raw_file_object(path).os_path,
                    self.staging_upload_files.raw_file_object(stripped_path).os_path))

788
    def match_mainfiles(self) -> Generator[Tuple[str, object], None, None]:
789
        '''
790
791
792
793
794
        Generator function that matches all files in the upload to all parsers to
        determine the upload's mainfiles.

        Returns:
            Tuples of mainfile, filename, and parsers
795
        '''
796
        directories_with_match: Dict[str, str] = dict()
797
798
        upload_files = self.staging_upload_files
        for filename in upload_files.raw_file_manifest():
799
            self._preprocess_files(filename)
800
            try:
801
                parser = match_parser(upload_files.raw_file_object(filename).os_path)
802
                if parser is not None:
803
804
                    directory = os.path.dirname(filename)
                    if directory in directories_with_match:
805
806
807
                        # TODO this might give us the chance to store directory based relationship
                        # between calcs for the future?
                        pass
808
809
                    else:
                        directories_with_match[directory] = filename
810
811

                    yield filename, parser
812
813
814
815
            except Exception as e:
                self.get_logger().error(
                    'exception while matching pot. mainfile',
                    mainfile=filename, exc_info=e)
816

Markus Scheidgen's avatar
Markus Scheidgen committed
817
818
    @task
    def parse_all(self):
819
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
820
        The *task* used to identify mainfile/parser combinations among the upload's files, creates
821
        respective :class:`Calc` instances, and triggers their processing.
822
        '''
823
824
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
825
826
        with utils.timer(
                logger, 'upload extracted', step='matching',
827
828
                upload_size=self.upload_files.size):
            for filename, parser in self.match_mainfiles():
829
                calc = Calc.create(
830
                    calc_id=self.upload_files.calc_id(filename),
831
                    mainfile=filename, parser=parser.name,
832
                    worker_hostname=self.worker_hostname,
833
834
                    upload_id=self.upload_id)

835
                calc.process_calc()
Markus Scheidgen's avatar
Markus Scheidgen committed
836

837
    def on_process_complete(self, process_name):
838
        if process_name == 'process_upload' or process_name == 're_process_upload':
839
840
841
            self.check_join()

    def check_join(self):
842
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
843
844
845
846
847
848
        Performs an evaluation of the join condition and triggers the :func:`cleanup`
        task if necessary. The join condition allows to run the ``cleanup`` after
        all calculations have been processed. The upload processing stops after all
        calculation processings have been triggered (:func:`parse_all` or
        :func:`re_process_upload`). The cleanup task is then run within the last
        calculation process (the one that triggered the join by calling this method).
849
        '''
850
851
852
853
        total_calcs = self.total_calcs
        processed_calcs = self.processed_calcs

        self.get_logger().debug('check join', processed_calcs=processed_calcs, total_calcs=total_calcs)
854
855
        # check if process is not running anymore, i.e. not still spawining new processes to join
        # check the join condition, i.e. all calcs have been processed
856
        if not self.process_running and processed_calcs >= total_calcs:
857
            # this can easily be called multiple times, e.g. upload finished after all calcs finished
858
859
860
            modified_upload = self._get_collection().find_one_and_update(
                {'_id': self.upload_id, 'joined': {'$ne': True}},
                {'$set': {'joined': True}})
861
862
863
864
865
866
            if modified_upload is not None:
                self.get_logger().debug('join')
                self.cleanup()
            else:
                # the join was already done due to a prior call
                pass
867

868
869
870
    def reset(self):
        self.joined = False
        super().reset()
Markus Scheidgen's avatar
Markus Scheidgen committed
871

872
873
874
875
876
877
    @classmethod
    def reset_pymongo_update(cls, worker_hostname: str = None):
        update = super().reset_pymongo_update()
        update.update(joined=False)
        return update

878
    def _cleanup_after_processing(self):
879
880
881
882
883
884
        # send email about process finish
        user = self.uploader
        name = '%s %s' % (user.first_name, user.last_name)
        message = '\n'.join([
            'Dear %s,' % name,
            '',
885
            'your data %suploaded at %s has completed processing.' % (
886
                '"%s" ' % self.name if self.name else '', self.upload_time.isoformat()),  # pylint: disable=no-member
887
            'You can review your data on your upload page: %s' % config.gui_url(),
888
889
890
891
            '',
            'If you encouter any issues with your upload, please let us know and replay to this email.',
            '',
            'The nomad team'
892
        ])
893
894
895
896
897
898
899
        try:
            infrastructure.send_mail(
                name=name, email=user.email, message=message, subject='Processing completed')
        except Exception as e:
            # probably due to email configuration problems
            # don't fail or present this error to clients
            self.logger.error('could not send after processing email', exc_info=e)