data.py 45.7 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
16
17
18
19
20
21
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
Markus Scheidgen's avatar
Markus Scheidgen committed
22

Markus Scheidgen's avatar
Markus Scheidgen committed
23
.. autoclass:: Upload
Markus Scheidgen's avatar
Markus Scheidgen committed
24

25
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
26

27
from typing import cast, List, Any, Tuple, Iterator, Dict, cast, Iterable
28
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField
Markus Scheidgen's avatar
Markus Scheidgen committed
29
import logging
30
from structlog import wrap_logger
31
from contextlib import contextmanager
32
import os.path
33
34
from datetime import datetime
from pymongo import UpdateOne
35
import hashlib
36
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
Markus Scheidgen's avatar
Markus Scheidgen committed
37

38
from nomad import utils, config, infrastructure, search, datamodel
39
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles, StagingUploadFiles
40
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
41
from nomad.parsing import parser_dict, match_parser, Backend
Markus Scheidgen's avatar
Markus Scheidgen committed
42
43
44
from nomad.normalizing import normalizers


45
46
47
48
49
50
51
def _pack_log_event(logger, method_name, event_dict):
    try:
        log_data = dict(event_dict)
        log_data.update(**{
            key: value
            for key, value in getattr(logger, '_context', {}).items()
            if key not in ['service', 'release', 'upload_id', 'calc_id', 'mainfile', 'process_status']})
52
        log_data.update(logger=logger.name)
53
54
55
56
57
58
59
60
61
62
63

        return log_data
    except Exception:
        # raising an exception would cause an indefinite loop
        return event_dict


_log_processors = [
    StackInfoRenderer(),
    _pack_log_event,
    format_exc_info,
64
    TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)]
65
66


Markus Scheidgen's avatar
Markus Scheidgen committed
67
class Calc(Proc):
68
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
69
70
71
72
73
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

74
75
    The attribute list, does not include the various metadata properties generated
    while parsing, including ``code_name``, ``code_version``, etc.
Markus Scheidgen's avatar
Markus Scheidgen committed
76
77

    Attributes:
78
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
79
80
81
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
82

83
84
        metadata: the metadata record wit calc and user metadata, see :class:`datamodel.EntryMetadata`
    '''
85
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
86
87
88
89
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

90
91
    metadata = DictField()

Markus Scheidgen's avatar
Markus Scheidgen committed
92
    meta: Any = {
Markus Scheidgen's avatar
Markus Scheidgen committed
93
        'indexes': [
94
95
96
97
            'upload_id',
            ('upload_id', 'mainfile'),
            ('upload_id', 'parser'),
            ('upload_id', 'tasks_status'),
98
            ('upload_id', 'process_status'),
99
            ('upload_id', 'metadata.nomad_version'),
100
            'parser',
101
102
            'metadata.published',
            'metadata.datasets'
Markus Scheidgen's avatar
Markus Scheidgen committed
103
            'metadata.pid'
Markus Scheidgen's avatar
Markus Scheidgen committed
104
105
106
107
108
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
109
        self._parser_backend: Backend = None
110
111
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
112
        self._calc_proc_logs: List[Any] = None
Markus Scheidgen's avatar
Markus Scheidgen committed
113

114
        self._entry_metadata = None
115

Markus Scheidgen's avatar
Markus Scheidgen committed
116
117
    @classmethod
    def get(cls, id):
118
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
119

Markus Scheidgen's avatar
Markus Scheidgen committed
120
    @property
121
122
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
123

124
125
126
127
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
128
            self._upload.worker_hostname = self.worker_hostname
129
130
        return self._upload

131
132
133
134
135
136
137
138
139
140
141
142
    def apply_entry_metadata(self, entry_metadata: datamodel.EntryMetadata):
        self.metadata = entry_metadata.m_to_dict(
            include_defaults=True,
            categories=[datamodel.MongoMetadata])  # TODO use embedded doc?

    def create_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
        processing object, not necessarely the user metadata nor the metadata from
        the archive.
        '''
        entry_metadata = datamodel.EntryMetadata()
143
144
        if self.parser is not None:
            entry_metadata.domain = parser_dict[self.parser].domain
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
        entry_metadata.upload_id = self.upload_id
        entry_metadata.calc_id = self.calc_id
        entry_metadata.mainfile = self.mainfile
        entry_metadata.nomad_version = config.version
        entry_metadata.nomad_commit = config.commit
        entry_metadata.uploader = self.upload.user_id
        entry_metadata.upload_time = self.upload.upload_time
        entry_metadata.upload_name = self.upload.name

        return entry_metadata

    def entry_metadata(self, upload_files: UploadFiles) -> datamodel.EntryMetadata:
        '''
        Returns a complete set of :class:`nomad.datamodel.EntryMetadata` including
        the user metadata and metadata from the archive.

        Arguments:
            upload_files:
                The :class:`nomad.files.UploadFiles` instance to read the archive from.
            cache:
                A boolean that indicates if the archive file should be left unclosed,
                e.g. if this method is called for many entries of the same upload.
        '''
        archive = upload_files.read_archive(self.calc_id)
169
170
171
172
173
174
175
176
177
178
179
180
        try:
            entry_metadata = datamodel.EntryMetadata.m_from_dict(
                archive[self.calc_id][datamodel.EntryArchive.section_metadata.name].to_dict())

        except KeyError:
            # Due hard processing failures, it might be possible that an entry might not
            # have an archive
            if self._entry_metadata is not None:
                entry_metadata = self._entry_metadata

            else:
                entry_metadata = self.create_metadata()
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196

        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

    def user_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
        processing object and the user metadata, not necessarely the metadata from
        the archive.
        '''
        entry_metadata = self.create_metadata()
        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

197
198
199
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
200
201
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
202
203
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
204
    def get_logger(self, **kwargs):
205
        '''
206
207
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
208
        '''
209
210
211
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
212

213
214
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []
215

216
217
        def save_to_calc_log(logger, method_name, event_dict):
            try:
218
219
                # sanitize the event_dict, because all kinds of values might have been added
                dump_dict = {key: str(value) for key, value in event_dict.items()}
220
221
                dump_dict.update(level=method_name.upper())
                self._calc_proc_logs.append(dump_dict)
222

223
224
225
            except Exception:
                # Exceptions here will cause indefinite loop
                pass
226

227
            return event_dict
228

229
        return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
230

231
232
    @process
    def re_process_calc(self):
233
        '''
234
235
236
        Processes a calculation again. This means there is already metadata and
        instead of creating it initially, we are just updating the existing
        records.
237
        '''
238
        parser = match_parser(self.upload_files.raw_file_object(self.mainfile).os_path, strict=False)
239
        logger = self.get_logger()
240
241
242

        if parser is None and not config.reprocess_unmatched:
            self.errors = ['no parser matches during re-process, will not re-process this calc']
243

244
245
246
247
248
249
250
            try:
                upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
                with upload_files.read_archive(self.calc_id) as archive:
                    self.upload_files.write_archive(self.calc_id, archive[self.calc_id].to_dict())

            except Exception as e:
                logger.error('could not copy archive for non matching, non reprocessed entry', exc_info=e)
251
                raise e
252

253
254
255
256
257
            # mock the steps of actual processing
            self._continue_with('parsing')
            self._continue_with('normalizing')
            self._continue_with('archiving')
            self._complete()
258
259
            return

260
        if parser is None:
261
262
263
264
            self.get_logger().warn('no parser matches during re-process, use the old parser')
            self.warnings = ['no matching parser found during re-processing']

        elif self.parser != parser.name:
265
266
267
268
269
270
271
272
273
            if parser_dict[self.parser].name == parser.name:
                # parser was just renamed
                self.parser = parser.name

            else:
                self.parser = parser.name
                logger.info(
                    'different parser matches during re-process, use new parser',
                    parser=parser.name)
274

275
        try:
276
277
278
279
280
281
            self._entry_metadata = self.user_metadata()
            self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
            self._entry_metadata.nomad_version = config.version
            self._entry_metadata.nomad_commit = config.commit
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)
282
283
284
285
286
287

            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
            # close loghandler that was not closed due to failures
288
289
290
291
292
293
            try:
                if self._parser_backend and self._parser_backend.resource:
                    self._parser_backend.resource.unload()
            except Exception as e:
                logger.error('could unload processing results', exc_info=e)

Markus Scheidgen's avatar
Markus Scheidgen committed
294
    @process
295
    def process_calc(self):
296
        '''
297
298
        Processes a new calculation that has no prior records in the mongo, elastic,
        or filesystem storage. It will create an initial set of (user) metadata.
299
        '''
300
        logger = self.get_logger()
301
        if self.upload is None:
302
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
303
304

        try:
305
306
            # save preliminary minimum calc metadata in case processing fails
            # successful processing will replace it with the actual metadata
307
308
309
310
311
312
            self._entry_metadata = self.create_metadata()
            self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)

            if len(self._entry_metadata.files) >= config.auxfile_cutoff:
313
314
315
316
                self.warning(
                    'This calc has many aux files in its directory. '
                    'Have you placed many calculations in the same directory?')

Markus Scheidgen's avatar
Markus Scheidgen committed
317
318
319
320
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
321
            # close loghandler that was not closed due to failures
322
323
324
325
326
327
            try:
                if self._parser_backend and self._parser_backend.resource:
                    self._parser_backend.resource.unload()
            except Exception as e:
                logger.error('could unload processing results', exc_info=e)

328
    def on_fail(self):
329
330
331
        # in case of failure, index a minimum set of metadata and mark
        # processing failure
        try:
332
            self._entry_metadata.processed = False
333

334
            self.apply_entry_metadata(self._entry_metadata)
335
336
337
338
339
            if self._parser_backend and self._parser_backend.resource:
                backend = self._parser_backend
            else:
                backend = None
            self._entry_metadata.apply_domain_metadata(backend)
340

341
            self._entry_metadata.a_elastic.index()
342
        except Exception as e:
343
344
            self.get_logger().error(
                'could not index after processing failure', exc_info=e)
345

346
        try:
347
            self.write_archive(None)
348
        except Exception as e:
349
350
            self.get_logger().error(
                'could not write archive after processing failure', exc_info=e)
351

352
353
354
355
    def on_process_complete(self, process_name):
        # the save might be necessary to correctly read the join condition from the db
        self.save()
        # in case of error, the process_name might be unknown
356
        if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None:
357
358
            self.upload.reload()
            self.upload.check_join()
Markus Scheidgen's avatar
Markus Scheidgen committed
359
360
361

    @task
    def parsing(self):
362
        ''' The *task* that encapsulates all parsing related actions. '''
363
        context = dict(parser=self.parser, step=self.parser)
364
        logger = self.get_logger(**context)
365
        parser = parser_dict[self.parser]
366
        self._entry_metadata.parser_name = self.parser
Markus Scheidgen's avatar
Markus Scheidgen committed
367

368
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
369
370
371
372
            try:
                self._parser_backend = parser.run(
                    self.upload_files.raw_file_object(self.mainfile).os_path, logger=logger)
            except Exception as e:
373
                self.fail('parser failed with exception', exc_info=e, error=str(e), **context)
374
                return
375
            except SystemExit:
376
                self.fail('parser raised system exit', error='system exit', **context)
377
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
378

Markus Scheidgen's avatar
Markus Scheidgen committed
379
380
        if self._parser_backend.status[0] != 'ParseSuccess':
            error = self._parser_backend.status[1]
381
            self.fail('parser failed', error=error, **context)
382
383
384
385
386
387
388
389

    @contextmanager
    def use_parser_backend(self, processor_name):
        self._parser_backend.reset_status()
        yield self._parser_backend

        if self._parser_backend.status[0] == 'ParseSuccess':
            warnings = getattr(self._parser_backend, '_warnings', [])
390

391
            if len(warnings) > 0:
392
393
394
395
                self.get_logger().warn(
                    'processor completed successful with warnings',
                    processor=processor_name, warnings=[str(warning) for warning in warnings])

396
            else:
397
398
399
400
                self.get_logger().info(
                    'processor completed successful',
                    processor=processor_name)

401
        else:
402
            errors = self._parser_backend.status[1]
403
404
405
            self.get_logger().error(
                'processor completed with failure',
                processor=processor_name, errors=str(errors))
Markus Scheidgen's avatar
Markus Scheidgen committed
406
407
408

    @task
    def normalizing(self):
409
        ''' The *task* that encapsulates all normalizing related actions. '''
410
411
412
413
414

        # allow normalizer to access and add data to the entry metadata
        self._parser_backend.entry_archive.m_add_sub_section(
            datamodel.EntryArchive.section_metadata, self._entry_metadata)

Markus Scheidgen's avatar
Markus Scheidgen committed
415
        for normalizer in normalizers:
416
            if normalizer.domain != parser_dict[self.parser].domain:
417
418
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
419
            normalizer_name = normalizer.__name__
420
            context = dict(normalizer=normalizer_name, step=normalizer_name)
421
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
422
423

            with utils.timer(
424
                    logger, 'normalizer executed', input_size=self.mainfile_file.size):
425
                with self.use_parser_backend(normalizer_name) as backend:
426
427
428
                    try:
                        normalizer(backend).normalize(logger=logger)
                    except Exception as e:
429
                        self._parser_backend.finishedParsingSession('ParseFailure', [str(e)])
430
431
432
                        logger.error(
                            'normalizer failed with exception', exc_info=e,
                            error=str(e), **context)
433
434
435
436
437
438
439
440
441
                        break
                    else:
                        if self._parser_backend.status[0] != 'ParseSuccess':
                            error = self._parser_backend.status[1]
                            self.fail('normalizer failed', error=error, **context)
                            break
                        else:
                            logger.debug(
                                'completed normalizer successfully', normalizer=normalizer_name)
Markus Scheidgen's avatar
Markus Scheidgen committed
442
443
444

    @task
    def archiving(self):
445
        ''' The *task* that encapsulates all archival related actions. '''
446
447
        logger = self.get_logger()

448
449
        self._entry_metadata.apply_domain_metadata(self._parser_backend)
        self._entry_metadata.processed = True
450

451
452
        # persist the calc metadata
        with utils.timer(logger, 'saved calc metadata', step='metadata'):
453
            self.apply_entry_metadata(self._entry_metadata)
454
455

        # index in search
456
        with utils.timer(logger, 'indexed', step='index'):
457
            self._entry_metadata.a_elastic.index()
458

459
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
460
        with utils.timer(
461
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
462
                input_size=self.mainfile_file.size) as log_data:
463

464
            archive_size = self.write_archive(self._parser_backend)
465
            log_data.update(archive_size=archive_size)
Markus Scheidgen's avatar
Markus Scheidgen committed
466

467
    def write_archive(self, backend: Backend):
468
469
470
471
472
473
474
        def filter_processing_logs(logs):
            if len(logs) > 100:
                return [
                    log for log in logs
                    if log.get('level') != 'DEBUG']
            return logs

475
476
477
478
479
480
481
482
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []

        if backend is not None:
            entry_archive = backend.entry_archive.m_copy()
        else:
            entry_archive = datamodel.EntryArchive()

483
484
485
        if entry_archive.section_metadata is None:
            entry_archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)

486
        entry_archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
487
488
489
490
491
492
493
494
495
496

        try:
            return self.upload_files.write_archive(self.calc_id, entry_archive.m_to_dict())
        except Exception as e:
            if backend is None:
                raise e

            # most likely failed due to domain data, try to write metadata and processing logs
            entry_archive = datamodel.EntryArchive()
            entry_archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
497
            entry_archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
498
499
500
            self.upload_files.write_archive(self.calc_id, entry_archive.m_to_dict())
            raise e

501
    def __str__(self):
502
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
503

504

505
class Upload(Proc):
506
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
507
508
509
510
511
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
512
513
        upload_path: the path were the uploaded files was stored
        temporary: True if the uploaded file should be removed after extraction
Markus Scheidgen's avatar
Markus Scheidgen committed
514
515
        upload_id: the upload id generated by the database
        upload_time: the timestamp when the system realised the upload
516
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
517
518
        published: Boolean that indicates the publish status
        publish_time: Date when the upload was initially published
519
        last_update: Date of the last publishing/re-processing
Markus Scheidgen's avatar
Markus Scheidgen committed
520
        joined: Boolean indicates if the running processing has joined (:func:`check_join`)
521
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
522
523
524
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
525
526
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
527
    embargo_length = IntField(default=36)
Markus Scheidgen's avatar
Markus Scheidgen committed
528
529
530

    name = StringField(default=None)
    upload_time = DateTimeField()
531
    user_id = StringField(required=True)
532
533
    published = BooleanField(default=False)
    publish_time = DateTimeField()
534
    last_update = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
535

536
537
    joined = BooleanField(default=False)

Markus Scheidgen's avatar
Markus Scheidgen committed
538
539
    meta: Any = {
        'indexes': [
540
            'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
Markus Scheidgen's avatar
Markus Scheidgen committed
541
542
543
544
545
        ]
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
546
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
547

548
549
    @property
    def metadata(self) -> dict:
550
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
551
552
553
554
555
        Getter, setter for user metadata. Metadata is pickled to and from the public
        bucket to allow sharing among all processes. Usually uploads do not have (much)
        user defined metadata, but users provide all metadata per upload as part of
        the publish process. This will change, when we introduce editing functionality
        and metadata will be provided through different means.
556
        '''
557
558
559
560
561
        try:
            upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
        except KeyError:
            return None
        return upload_files.user_metadata
562
563
564

    @metadata.setter
    def metadata(self, data: dict) -> None:
565
566
        upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True, create=True)
        upload_files.user_metadata = data
567

Markus Scheidgen's avatar
Markus Scheidgen committed
568
    @classmethod
569
    def get(cls, id: str, include_published: bool = True) -> 'Upload':
570
        return cls.get_by_id(id, 'upload_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
571
572

    @classmethod
573
    def user_uploads(cls, user: datamodel.User, **kwargs) -> List['Upload']:
574
        ''' Returns all uploads for the given user. Kwargs are passed to mongo query. '''
575
        return cls.objects(user_id=str(user.user_id), **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
576

577
578
    @property
    def uploader(self):
579
        return datamodel.User.get(self.user_id)
580

Markus Scheidgen's avatar
Markus Scheidgen committed
581
582
    def get_logger(self, **kwargs):
        logger = super().get_logger()
583
584
        user = self.uploader
        user_name = '%s %s' % (user.first_name, user.last_name)
585
        # We are not using 'user_id' because logstash (?) will filter these entries ?!
586
587
        logger = logger.bind(
            upload_id=self.upload_id, upload_name=self.name, user_name=user_name,
588
            user=user.user_id, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
589
590
591
592
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
593
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
594
595
596
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
597
598

        Arguments:
599
            user: The user that created the upload.
600
        '''
601
602
        # use kwargs to keep compatibility with super method
        user: datamodel.User = kwargs['user']
603
        del(kwargs['user'])
604

605
606
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
607
        kwargs.update(user_id=user.user_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
608
        self = super().create(**kwargs)
609

Markus Scheidgen's avatar
Markus Scheidgen committed
610
        self._continue_with('uploading')
611

Markus Scheidgen's avatar
Markus Scheidgen committed
612
613
        return self

614
    def delete(self):
615
        ''' Deletes this upload process state entry and its calcs. '''
616
617
618
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

619
    def delete_upload_local(self):
620
        '''
621
        Deletes the upload, including its processing state and
622
        staging files. Local version without celery processing.
623
        '''
624
625
626
        logger = self.get_logger()

        with utils.lnr(logger, 'staged upload delete failed'):
627
            with utils.timer(
628
                    logger, 'upload deleted from index', step='index',
629
                    upload_size=self.upload_files.size):
630
                search.delete_upload(self.upload_id)
631

632
            with utils.timer(
633
                    logger, 'staged upload deleted', step='files',
634
635
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
636
637

            self.delete()
638

639
    @process
640
    def delete_upload(self):
641
        '''
642
643
        Deletes of the upload, including its processing state and
        staging files. This starts the celery process of deleting the upload.
644
        '''
645
        self.delete_upload_local()
646

647
        return True  # do not save the process status on the delete upload
648

649
    @process
650
    def publish_upload(self):
651
        '''
652
653
        Moves the upload out of staging to the public area. It will
        pack the staging upload files in to public upload files.
654
        '''
655
656
        assert self.processed_calcs > 0

657
        logger = self.get_logger()
658
        logger.info('started to publish')
659

660
        with utils.lnr(logger, 'publish failed'):
661
            with self.entries_metadata(self.metadata) as calcs:
662

663
                with utils.timer(
664
                        logger, 'upload metadata updated', step='metadata',
665
                        upload_size=self.upload_files.size):
666

667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
                    def create_update(calc):
                        calc.published = True
                        calc.with_embargo = calc.with_embargo if calc.with_embargo is not None else False
                        return UpdateOne(
                            {'_id': calc.calc_id},
                            {'$set': {'metadata': calc.m_to_dict(
                                include_defaults=True, categories=[datamodel.MongoMetadata])}})

                    Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload files packed', step='pack',
                            upload_size=self.upload_files.size):
                        self.upload_files.pack(calcs)
682

683
                with utils.timer(
684
                        logger, 'index updated', step='index',
685
                        upload_size=self.upload_files.size):
686
687
688
689
690
691
692
693
694
695
696
697
                    search.publish(calcs)

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload deleted', step='delete staged',
                            upload_size=self.upload_files.size):
                        self.upload_files.delete()
                        self.published = True
                        self.publish_time = datetime.utcnow()
                        self.last_update = datetime.utcnow()
                        self.save()
                else:
698
                    self.last_update = datetime.utcnow()
699
                    self.save()
700

701
702
    @process
    def re_process_upload(self):
703
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
704
705
706
        A *process* that performs the re-processing of a earlier processed
        upload.

707
708
709
710
711
712
        Runs the distributed process of fully reparsing/renormalizing an existing and
        already published upload. Will renew the archive part of the upload and update
        mongo and elastic search entries.

        TODO this implementation does not do any re-matching. This will be more complex
        due to handling of new or missing matches.
713
        '''
714
715
716
        logger = self.get_logger()
        logger.info('started to re-process')

717
718
719
720
721
        # mock the steps of actual processing
        self._continue_with('uploading')

        # extract the published raw files into a staging upload files instance
        self._continue_with('extracting')
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736

        if self.published:
            try:
                staging_upload_files = StagingUploadFiles(self.upload_id)
                # public files exist and there is a staging directory, it is probably old
                # and we delete it first
                staging_upload_files.delete()
                logger.warn('deleted old staging files')

            except KeyError as e:
                logger.info('reprocessing published files')
        else:
            logger.info('reprocessing staging files')

        staging_upload_files = self.upload_files.to_staging_upload_files(create=True)
737
738

        self._continue_with('parse_all')
739
        try:
740
            # check if a calc is already/still processing
741
742
743
744
745
746
747
748
            processing = Calc.objects(
                upload_id=self.upload_id,
                **Calc.process_running_mongoengine_query()).count()

            if processing > 0:
                logger.warn(
                    'processes are still/already running on calc, they will be resetted',
                    count=processing)
749
750

            # reset all calcs
751
            Calc._get_collection().update_many(
752
753
754
                dict(upload_id=self.upload_id),
                {'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)})

755
            # process call calcs
756
757
758
            Calc.process_all(Calc.re_process_calc, dict(upload_id=self.upload_id), exclude=['metadata'])

            logger.info('completed to trigger re-process of all calcs')
759
760
        except Exception as e:
            # try to remove the staging copy in failure case
761
762
            logger.error('failed to trigger re-process of all calcs', exc_info=e)

763
764
765
            if self.published:
                if staging_upload_files is not None and staging_upload_files.exists():
                    staging_upload_files.delete()
766
767

            raise e
768
769
770
771

        # the packing and removing of the staging upload files, will be trigged by
        # the 'cleanup' task after processing all calcs

772
773
    @process
    def re_pack(self):
774
        ''' A *process* that repacks the raw and archive data based on the current embargo data. '''
775
776
777
778
779
780
781
782
        assert self.published

        # mock the steps of actual processing
        self._continue_with('uploading')
        self._continue_with('extracting')
        self._continue_with('parse_all')
        self._continue_with('cleanup')

783
        self.upload_files.re_pack(self.user_metadata())
784
785
786
        self.joined = True
        self._complete()

Markus Scheidgen's avatar
Markus Scheidgen committed
787
    @process
788
    def process_upload(self):
789
        ''' A *process* that performs the initial upload processing. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
790
791
792
793
794
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
795
        ''' A no-op *task* as a stand-in for receiving upload data. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
796
797
        pass

798
    @property
799
800
    def upload_files(self) -> UploadFiles:
        upload_files_class = ArchiveBasedStagingUploadFiles if not self.published else PublicUploadFiles
801
        kwargs = dict(upload_path=self.upload_path) if not self.published else {}
802
803
804

        if not self._upload_files or not isinstance(self._upload_files, upload_files_class):
            self._upload_files = upload_files_class(
805
                self.upload_id, is_authorized=lambda: True, **kwargs)
806

807
        return self._upload_files
808

809
810
811
812
813
    @property
    def staging_upload_files(self) -> ArchiveBasedStagingUploadFiles:
        assert not self.published
        return cast(ArchiveBasedStagingUploadFiles, self.upload_files)

Markus Scheidgen's avatar
Markus Scheidgen committed
814
815
    @task
    def extracting(self):
816
        '''
817
818
        The *task* performed before the actual parsing/normalizing: extracting
        the uploaded files.
819
        '''
820
821
822
823
824
        # extract the uploaded file
        self._upload_files = ArchiveBasedStagingUploadFiles(
            upload_id=self.upload_id, is_authorized=lambda: True, create=True,
            upload_path=self.upload_path)

Markus Scheidgen's avatar
Markus Scheidgen committed
825
826
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
827
828
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
829
830
                    upload_size=self.upload_files.size):
                self.upload_files.extract()
831
832
833
834

            if self.temporary:
                os.remove(self.upload_path)
                self.upload_path = None
835

836
        except KeyError:
837
            self.fail('processing requested for non existing upload', log_level=logging.ERROR)
838
839
840
            return
        except ExtractError:
            self.fail('bad .zip/.tar file', log_level=logging.INFO)
Markus Scheidgen's avatar
Markus Scheidgen committed
841
842
            return

843
    def _preprocess_files(self, path):
844
        '''
845
846
        Some files need preprocessing. Currently we need to add a stripped POTCAR version
        and always restrict/embargo the original.
847
        '''
848
        if os.path.basename(path).startswith('POTCAR'):
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
            # create checksum
            hash = hashlib.sha224()
            with open(self.staging_upload_files.raw_file_object(path).os_path, 'rb') as orig_f:
                for line in orig_f.readlines():
                    hash.update(line)

            checksum = hash.hexdigest()

            # created stripped POTCAR
            stripped_path = path + '.stripped'
            with open(self.staging_upload_files.raw_file_object(stripped_path).os_path, 'wt') as stripped_f:
                stripped_f.write('Stripped POTCAR file. Checksum of original file (sha224): %s\n' % checksum)
            os.system(
                '''
                    awk < %s >> %s '
                    BEGIN { dump=1 }
                    /End of Dataset/ { dump=1 }
                    dump==1 { print }
                    /END of PSCTR/ { dump=0 }'
                ''' % (
                    self.staging_upload_files.raw_file_object(path).os_path,
                    self.staging_upload_files.raw_file_object(stripped_path).os_path))

872
    def match_mainfiles(self) -> Iterator[Tuple[str, object]]:
873
        '''
874
875
876
877
878
        Generator function that matches all files in the upload to all parsers to
        determine the upload's mainfiles.

        Returns:
            Tuples of mainfile, filename, and parsers
879
        '''
880
        directories_with_match: Dict[str, str] = dict()
881
882
        upload_files = self.staging_upload_files
        for filename in upload_files.raw_file_manifest():
883
            self._preprocess_files(filename)
884
            try:
885
                parser = match_parser(upload_files.raw_file_object(filename).os_path)
886
                if parser is not None:
887
888
                    directory = os.path.dirname(filename)
                    if directory in directories_with_match:
889
890
891
                        # TODO this might give us the chance to store directory based relationship
                        # between calcs for the future?
                        pass
892
893
                    else:
                        directories_with_match[directory] = filename
894
895

                    yield filename, parser
896
897
898
899
            except Exception as e:
                self.get_logger().error(
                    'exception while matching pot. mainfile',
                    mainfile=filename, exc_info=e)
900

Markus Scheidgen's avatar
Markus Scheidgen committed
901
902
    @task
    def parse_all(self):
903
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
904
        The *task* used to identify mainfile/parser combinations among the upload's files, creates
905
        respective :class:`Calc` instances, and triggers their processing.
906
        '''
907
908
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
909
910
        with utils.timer(
                logger, 'upload extracted', step='matching',
911
912
                upload_size=self.upload_files.size):
            for filename, parser in self.match_mainfiles():
913
                calc = Calc.create(
914
                    calc_id=self.upload_files.calc_id(filename),
915
                    mainfile=filename, parser=parser.name,
916
                    worker_hostname=self.worker_hostname,
917
918
                    upload_id=self.upload_id)

919
                calc.process_calc()
Markus Scheidgen's avatar
Markus Scheidgen committed
920

921
    def on_process_complete(self, process_name):
922
        if process_name == 'process_upload' or process_name == 're_process_upload':
923
924
925
            self.check_join()

    def check_join(self):
926
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
927
928
929
930
931
932
        Performs an evaluation of the join condition and triggers the :func:`cleanup`
        task if necessary. The join condition allows to run the ``cleanup`` after
        all calculations have been processed. The upload processing stops after all
        calculation processings have been triggered (:func:`parse_all` or
        :func:`re_process_upload`). The cleanup task is then run within the last
        calculation process (the one that triggered the join by calling this method).
933
        '''
934
935
936
937
        total_calcs = self.total_calcs
        processed_calcs = self.processed_calcs

        self.get_logger().debug('check join', processed_calcs=processed_calcs, total_calcs=total_calcs)
938
939
        # check if process is not running anymore, i.e. not still spawining new processes to join
        # check the join condition, i.e. all calcs have been processed
940
        if not self.process_running and processed_calcs >= total_calcs:
941
            # this can easily be called multiple times, e.g. upload finished after all calcs finished
942
943
944
            modified_upload = self._get_collection().find_one_and_update(
                {'_id': self.upload_id, 'joined': {'$ne': True}},
                {'$set': {'joined': True}})
945
946
947
948
949
950
            if modified_upload is not None:
                self.get_logger().debug('join')
                self.cleanup()
            else:
                # the join was already done due to a prior call
                pass
951