data.py 45.8 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
16
17
18
19
20
21
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
Markus Scheidgen's avatar
Markus Scheidgen committed
22

Markus Scheidgen's avatar
Markus Scheidgen committed
23
.. autoclass:: Upload
Markus Scheidgen's avatar
Markus Scheidgen committed
24

25
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
26

27
from typing import cast, List, Any, Tuple, Iterator, Dict, cast, Iterable
28
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField
Markus Scheidgen's avatar
Markus Scheidgen committed
29
import logging
30
from structlog import wrap_logger
31
from contextlib import contextmanager
32
import os.path
33
34
from datetime import datetime
from pymongo import UpdateOne
35
import hashlib
36
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
Markus Scheidgen's avatar
Markus Scheidgen committed
37

38
from nomad import utils, config, infrastructure, search, datamodel
39
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles, StagingUploadFiles
40
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
41
from nomad.parsing import parser_dict, match_parser, Backend
Markus Scheidgen's avatar
Markus Scheidgen committed
42
43
44
from nomad.normalizing import normalizers


45
46
47
48
49
50
51
def _pack_log_event(logger, method_name, event_dict):
    try:
        log_data = dict(event_dict)
        log_data.update(**{
            key: value
            for key, value in getattr(logger, '_context', {}).items()
            if key not in ['service', 'release', 'upload_id', 'calc_id', 'mainfile', 'process_status']})
52
        log_data.update(logger=logger.name)
53
54
55
56
57
58
59
60
61
62
63

        return log_data
    except Exception:
        # raising an exception would cause an indefinite loop
        return event_dict


_log_processors = [
    StackInfoRenderer(),
    _pack_log_event,
    format_exc_info,
64
    TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)]
65
66


Markus Scheidgen's avatar
Markus Scheidgen committed
67
class Calc(Proc):
68
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
69
70
71
72
73
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

74
75
    The attribute list, does not include the various metadata properties generated
    while parsing, including ``code_name``, ``code_version``, etc.
Markus Scheidgen's avatar
Markus Scheidgen committed
76
77

    Attributes:
78
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
79
80
81
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
82

83
84
        metadata: the metadata record wit calc and user metadata, see :class:`datamodel.EntryMetadata`
    '''
85
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
86
87
88
89
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

90
91
    metadata = DictField()

Markus Scheidgen's avatar
Markus Scheidgen committed
92
    meta: Any = {
Markus Scheidgen's avatar
Markus Scheidgen committed
93
        'indexes': [
94
            'upload_id',
95
            'parser',
96
97
98
            ('upload_id', 'mainfile'),
            ('upload_id', 'parser'),
            ('upload_id', 'tasks_status'),
99
            ('upload_id', 'process_status'),
100
            ('upload_id', 'metadata.nomad_version'),
101
            'parser',
102
103
            'metadata.processed',
            'metadata.last_processing',
104
            'metadata.published',
105
            'metadata.datasets',
Markus Scheidgen's avatar
Markus Scheidgen committed
106
            'metadata.pid'
Markus Scheidgen's avatar
Markus Scheidgen committed
107
108
109
110
111
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
112
        self._parser_backend: Backend = None
113
114
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
115
        self._calc_proc_logs: List[Any] = None
116

117
        self._entry_metadata = None
118

Markus Scheidgen's avatar
Markus Scheidgen committed
119
120
    @classmethod
    def get(cls, id):
121
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
122

Markus Scheidgen's avatar
Markus Scheidgen committed
123
    @property
124
125
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
126

127
128
129
130
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
131
            self._upload.worker_hostname = self.worker_hostname
132
133
        return self._upload

134
135
136
137
138
139
140
141
142
143
144
145
    def apply_entry_metadata(self, entry_metadata: datamodel.EntryMetadata):
        self.metadata = entry_metadata.m_to_dict(
            include_defaults=True,
            categories=[datamodel.MongoMetadata])  # TODO use embedded doc?

    def create_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
        processing object, not necessarely the user metadata nor the metadata from
        the archive.
        '''
        entry_metadata = datamodel.EntryMetadata()
146
147
        if self.parser is not None:
            entry_metadata.domain = parser_dict[self.parser].domain
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
        entry_metadata.upload_id = self.upload_id
        entry_metadata.calc_id = self.calc_id
        entry_metadata.mainfile = self.mainfile
        entry_metadata.nomad_version = config.version
        entry_metadata.nomad_commit = config.commit
        entry_metadata.uploader = self.upload.user_id
        entry_metadata.upload_time = self.upload.upload_time
        entry_metadata.upload_name = self.upload.name

        return entry_metadata

    def entry_metadata(self, upload_files: UploadFiles) -> datamodel.EntryMetadata:
        '''
        Returns a complete set of :class:`nomad.datamodel.EntryMetadata` including
        the user metadata and metadata from the archive.

        Arguments:
            upload_files:
                The :class:`nomad.files.UploadFiles` instance to read the archive from.
            cache:
                A boolean that indicates if the archive file should be left unclosed,
                e.g. if this method is called for many entries of the same upload.
        '''
        archive = upload_files.read_archive(self.calc_id)
172
173
174
175
176
177
178
179
180
181
182
183
        try:
            entry_metadata = datamodel.EntryMetadata.m_from_dict(
                archive[self.calc_id][datamodel.EntryArchive.section_metadata.name].to_dict())

        except KeyError:
            # Due hard processing failures, it might be possible that an entry might not
            # have an archive
            if self._entry_metadata is not None:
                entry_metadata = self._entry_metadata

            else:
                entry_metadata = self.create_metadata()
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199

        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

    def user_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
        processing object and the user metadata, not necessarely the metadata from
        the archive.
        '''
        entry_metadata = self.create_metadata()
        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

200
201
202
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
203
204
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
205
206
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
207
    def get_logger(self, **kwargs):
208
        '''
209
210
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
211
        '''
212
213
214
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
215

216
217
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []
218

219
220
        def save_to_calc_log(logger, method_name, event_dict):
            try:
221
222
                # sanitize the event_dict, because all kinds of values might have been added
                dump_dict = {key: str(value) for key, value in event_dict.items()}
223
224
                dump_dict.update(level=method_name.upper())
                self._calc_proc_logs.append(dump_dict)
225

226
227
228
            except Exception:
                # Exceptions here will cause indefinite loop
                pass
229

230
            return event_dict
231

232
        return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
233

234
235
    @process
    def re_process_calc(self):
236
        '''
237
238
239
        Processes a calculation again. This means there is already metadata and
        instead of creating it initially, we are just updating the existing
        records.
240
        '''
241
        parser = match_parser(self.upload_files.raw_file_object(self.mainfile).os_path, strict=False)
242
        logger = self.get_logger()
243
244
245

        if parser is None and not config.reprocess_unmatched:
            self.errors = ['no parser matches during re-process, will not re-process this calc']
246

247
248
249
250
251
252
253
            try:
                upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
                with upload_files.read_archive(self.calc_id) as archive:
                    self.upload_files.write_archive(self.calc_id, archive[self.calc_id].to_dict())

            except Exception as e:
                logger.error('could not copy archive for non matching, non reprocessed entry', exc_info=e)
254
                raise e
255

256
257
258
259
260
            # mock the steps of actual processing
            self._continue_with('parsing')
            self._continue_with('normalizing')
            self._continue_with('archiving')
            self._complete()
261
262
            return

263
        if parser is None:
264
265
266
267
            self.get_logger().warn('no parser matches during re-process, use the old parser')
            self.warnings = ['no matching parser found during re-processing']

        elif self.parser != parser.name:
268
269
270
271
272
273
274
275
276
            if parser_dict[self.parser].name == parser.name:
                # parser was just renamed
                self.parser = parser.name

            else:
                self.parser = parser.name
                logger.info(
                    'different parser matches during re-process, use new parser',
                    parser=parser.name)
277

278
        try:
279
280
281
282
283
284
            self._entry_metadata = self.user_metadata()
            self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
            self._entry_metadata.nomad_version = config.version
            self._entry_metadata.nomad_commit = config.commit
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)
285
286
287
288
289
290
291

            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
            # close loghandler that was not closed due to failures
            try:
292
293
                if self._parser_backend and self._parser_backend.resource:
                    self._parser_backend.resource.unload()
294
            except Exception as e:
295
                logger.error('could unload processing results', exc_info=e)
296

Markus Scheidgen's avatar
Markus Scheidgen committed
297
    @process
298
    def process_calc(self):
299
        '''
300
301
        Processes a new calculation that has no prior records in the mongo, elastic,
        or filesystem storage. It will create an initial set of (user) metadata.
302
        '''
303
        logger = self.get_logger()
304
        if self.upload is None:
305
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
306
307

        try:
308
309
            # save preliminary minimum calc metadata in case processing fails
            # successful processing will replace it with the actual metadata
310
311
312
313
314
315
            self._entry_metadata = self.create_metadata()
            self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)

            if len(self._entry_metadata.files) >= config.auxfile_cutoff:
316
317
318
319
                self.warning(
                    'This calc has many aux files in its directory. '
                    'Have you placed many calculations in the same directory?')

Markus Scheidgen's avatar
Markus Scheidgen committed
320
321
322
323
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
324
            # close loghandler that was not closed due to failures
325
            try:
326
327
                if self._parser_backend and self._parser_backend.resource:
                    self._parser_backend.resource.unload()
328
            except Exception as e:
329
                logger.error('could unload processing results', exc_info=e)
330

331
    def on_fail(self):
332
333
334
        # in case of failure, index a minimum set of metadata and mark
        # processing failure
        try:
335
            self._entry_metadata.processed = False
336

337
            self.apply_entry_metadata(self._entry_metadata)
338
339
340
341
342
            if self._parser_backend and self._parser_backend.resource:
                backend = self._parser_backend
            else:
                backend = None
            self._entry_metadata.apply_domain_metadata(backend)
343

344
            self._entry_metadata.a_elastic.index()
345
        except Exception as e:
346
347
            self.get_logger().error(
                'could not index after processing failure', exc_info=e)
348

349
        try:
350
            self.write_archive(None)
351
        except Exception as e:
352
353
            self.get_logger().error(
                'could not write archive after processing failure', exc_info=e)
354

355
356
357
358
    def on_process_complete(self, process_name):
        # the save might be necessary to correctly read the join condition from the db
        self.save()
        # in case of error, the process_name might be unknown
359
        if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None:
360
361
            self.upload.reload()
            self.upload.check_join()
Markus Scheidgen's avatar
Markus Scheidgen committed
362
363
364

    @task
    def parsing(self):
365
        ''' The *task* that encapsulates all parsing related actions. '''
366
        context = dict(parser=self.parser, step=self.parser)
367
        logger = self.get_logger(**context)
368
        parser = parser_dict[self.parser]
369
        self._entry_metadata.parser_name = self.parser
Markus Scheidgen's avatar
Markus Scheidgen committed
370

371
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
372
373
374
375
            try:
                self._parser_backend = parser.run(
                    self.upload_files.raw_file_object(self.mainfile).os_path, logger=logger)
            except Exception as e:
376
                self.fail('parser failed with exception', exc_info=e, error=str(e), **context)
377
                return
378
            except SystemExit:
379
                self.fail('parser raised system exit', error='system exit', **context)
380
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
381

Markus Scheidgen's avatar
Markus Scheidgen committed
382
383
        if self._parser_backend.status[0] != 'ParseSuccess':
            error = self._parser_backend.status[1]
384
            self.fail('parser failed', error=error, **context)
385
386
387
388
389
390
391
392

    @contextmanager
    def use_parser_backend(self, processor_name):
        self._parser_backend.reset_status()
        yield self._parser_backend

        if self._parser_backend.status[0] == 'ParseSuccess':
            warnings = getattr(self._parser_backend, '_warnings', [])
393

394
            if len(warnings) > 0:
395
396
397
398
                self.get_logger().warn(
                    'processor completed successful with warnings',
                    processor=processor_name, warnings=[str(warning) for warning in warnings])

399
            else:
400
401
402
403
                self.get_logger().info(
                    'processor completed successful',
                    processor=processor_name)

404
        else:
405
            errors = self._parser_backend.status[1]
406
407
408
            self.get_logger().error(
                'processor completed with failure',
                processor=processor_name, errors=str(errors))
Markus Scheidgen's avatar
Markus Scheidgen committed
409
410
411

    @task
    def normalizing(self):
412
        ''' The *task* that encapsulates all normalizing related actions. '''
413
414
415
416
417

        # allow normalizer to access and add data to the entry metadata
        self._parser_backend.entry_archive.m_add_sub_section(
            datamodel.EntryArchive.section_metadata, self._entry_metadata)

Markus Scheidgen's avatar
Markus Scheidgen committed
418
        for normalizer in normalizers:
419
            if normalizer.domain != parser_dict[self.parser].domain:
420
421
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
422
            normalizer_name = normalizer.__name__
423
            context = dict(normalizer=normalizer_name, step=normalizer_name)
424
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
425
426

            with utils.timer(
427
                    logger, 'normalizer executed', input_size=self.mainfile_file.size):
428
                with self.use_parser_backend(normalizer_name) as backend:
429
430
431
                    try:
                        normalizer(backend).normalize(logger=logger)
                    except Exception as e:
432
                        self._parser_backend.finishedParsingSession('ParseFailure', [str(e)])
433
434
435
                        logger.error(
                            'normalizer failed with exception', exc_info=e,
                            error=str(e), **context)
436
437
438
439
440
441
442
443
444
                        break
                    else:
                        if self._parser_backend.status[0] != 'ParseSuccess':
                            error = self._parser_backend.status[1]
                            self.fail('normalizer failed', error=error, **context)
                            break
                        else:
                            logger.debug(
                                'completed normalizer successfully', normalizer=normalizer_name)
Markus Scheidgen's avatar
Markus Scheidgen committed
445
446
447

    @task
    def archiving(self):
448
        ''' The *task* that encapsulates all archival related actions. '''
449
450
        logger = self.get_logger()

451
452
        self._entry_metadata.apply_domain_metadata(self._parser_backend)
        self._entry_metadata.processed = True
453

454
455
        # persist the calc metadata
        with utils.timer(logger, 'saved calc metadata', step='metadata'):
456
            self.apply_entry_metadata(self._entry_metadata)
457
458

        # index in search
459
        with utils.timer(logger, 'indexed', step='index'):
460
            self._entry_metadata.a_elastic.index()
461

462
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
463
        with utils.timer(
464
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
465
                input_size=self.mainfile_file.size) as log_data:
466

467
            archive_size = self.write_archive(self._parser_backend)
468
            log_data.update(archive_size=archive_size)
Markus Scheidgen's avatar
Markus Scheidgen committed
469

470
    def write_archive(self, backend: Backend):
471
472
473
474
475
476
        def filter_processing_logs(logs):
            if len(logs) > 100:
                return [
                    log for log in logs
                    if log.get('level') != 'DEBUG']
            return logs
Markus Scheidgen's avatar
Markus Scheidgen committed
477

478
479
480
481
482
483
484
485
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []

        if backend is not None:
            entry_archive = backend.entry_archive.m_copy()
        else:
            entry_archive = datamodel.EntryArchive()

486
487
488
        if entry_archive.section_metadata is None:
            entry_archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)

489
        entry_archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
490
491
492
493
494
495
496
497
498
499

        try:
            return self.upload_files.write_archive(self.calc_id, entry_archive.m_to_dict())
        except Exception as e:
            if backend is None:
                raise e

            # most likely failed due to domain data, try to write metadata and processing logs
            entry_archive = datamodel.EntryArchive()
            entry_archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
500
            entry_archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
501
502
            self.upload_files.write_archive(self.calc_id, entry_archive.m_to_dict())
            raise e
Markus Scheidgen's avatar
Markus Scheidgen committed
503

504
    def __str__(self):
505
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
506

507

508
class Upload(Proc):
509
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
510
511
512
513
514
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
515
516
        upload_path: the path were the uploaded files was stored
        temporary: True if the uploaded file should be removed after extraction
Markus Scheidgen's avatar
Markus Scheidgen committed
517
518
        upload_id: the upload id generated by the database
        upload_time: the timestamp when the system realised the upload
519
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
520
521
        published: Boolean that indicates the publish status
        publish_time: Date when the upload was initially published
522
        last_update: Date of the last publishing/re-processing
Markus Scheidgen's avatar
Markus Scheidgen committed
523
        joined: Boolean indicates if the running processing has joined (:func:`check_join`)
524
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
525
526
527
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
528
529
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
530
    embargo_length = IntField(default=36)
Markus Scheidgen's avatar
Markus Scheidgen committed
531
532
533

    name = StringField(default=None)
    upload_time = DateTimeField()
534
    user_id = StringField(required=True)
535
536
    published = BooleanField(default=False)
    publish_time = DateTimeField()
537
    last_update = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
538

539
540
    joined = BooleanField(default=False)

Markus Scheidgen's avatar
Markus Scheidgen committed
541
542
    meta: Any = {
        'indexes': [
543
            'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
Markus Scheidgen's avatar
Markus Scheidgen committed
544
545
546
547
548
        ]
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
549
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
550

551
552
    @property
    def metadata(self) -> dict:
553
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
554
555
556
557
558
        Getter, setter for user metadata. Metadata is pickled to and from the public
        bucket to allow sharing among all processes. Usually uploads do not have (much)
        user defined metadata, but users provide all metadata per upload as part of
        the publish process. This will change, when we introduce editing functionality
        and metadata will be provided through different means.
559
        '''
560
561
562
563
564
        try:
            upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
        except KeyError:
            return None
        return upload_files.user_metadata
565
566
567

    @metadata.setter
    def metadata(self, data: dict) -> None:
568
569
        upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True, create=True)
        upload_files.user_metadata = data
570

Markus Scheidgen's avatar
Markus Scheidgen committed
571
    @classmethod
572
    def get(cls, id: str, include_published: bool = True) -> 'Upload':
573
        return cls.get_by_id(id, 'upload_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
574
575

    @classmethod
576
    def user_uploads(cls, user: datamodel.User, **kwargs) -> List['Upload']:
577
        ''' Returns all uploads for the given user. Kwargs are passed to mongo query. '''
578
        return cls.objects(user_id=str(user.user_id), **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
579

580
581
    @property
    def uploader(self):
582
        return datamodel.User.get(self.user_id)
583

Markus Scheidgen's avatar
Markus Scheidgen committed
584
585
    def get_logger(self, **kwargs):
        logger = super().get_logger()
586
587
        user = self.uploader
        user_name = '%s %s' % (user.first_name, user.last_name)
588
        # We are not using 'user_id' because logstash (?) will filter these entries ?!
589
590
        logger = logger.bind(
            upload_id=self.upload_id, upload_name=self.name, user_name=user_name,
591
            user=user.user_id, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
592
593
594
595
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
596
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
597
598
599
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
600
601

        Arguments:
602
            user: The user that created the upload.
603
        '''
604
605
        # use kwargs to keep compatibility with super method
        user: datamodel.User = kwargs['user']
606
        del(kwargs['user'])
607

608
609
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
610
        kwargs.update(user_id=user.user_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
611
        self = super().create(**kwargs)
612

Markus Scheidgen's avatar
Markus Scheidgen committed
613
        self._continue_with('uploading')
614

Markus Scheidgen's avatar
Markus Scheidgen committed
615
616
        return self

617
    def delete(self):
618
        ''' Deletes this upload process state entry and its calcs. '''
619
620
621
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

622
    def delete_upload_local(self):
623
        '''
624
        Deletes the upload, including its processing state and
625
        staging files. Local version without celery processing.
626
        '''
627
628
        logger = self.get_logger()

629
        with utils.lnr(logger, 'upload delete failed'):
630
            with utils.timer(
631
                    logger, 'upload deleted from index', step='index',
632
                    upload_size=self.upload_files.size):
633
                search.delete_upload(self.upload_id)
634

635
            with utils.timer(
636
                    logger, 'upload deleted', step='files',
637
638
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
639
640

            self.delete()
641

642
    @process
643
    def delete_upload(self):
644
        '''
645
646
        Deletes of the upload, including its processing state and
        staging files. This starts the celery process of deleting the upload.
647
        '''
648
        self.delete_upload_local()
649

650
        return True  # do not save the process status on the delete upload
651

652
    @process
653
    def publish_upload(self):
654
        '''
655
656
        Moves the upload out of staging to the public area. It will
        pack the staging upload files in to public upload files.
657
        '''
658
659
        assert self.processed_calcs > 0

660
        logger = self.get_logger()
661
        logger.info('started to publish')
662

663
        with utils.lnr(logger, 'publish failed'):
664
            with self.entries_metadata(self.metadata) as calcs:
665

666
                with utils.timer(
667
                        logger, 'upload metadata updated', step='metadata',
668
                        upload_size=self.upload_files.size):
669

670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
                    def create_update(calc):
                        calc.published = True
                        calc.with_embargo = calc.with_embargo if calc.with_embargo is not None else False
                        return UpdateOne(
                            {'_id': calc.calc_id},
                            {'$set': {'metadata': calc.m_to_dict(
                                include_defaults=True, categories=[datamodel.MongoMetadata])}})

                    Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload files packed', step='pack',
                            upload_size=self.upload_files.size):
                        self.upload_files.pack(calcs)
685

686
                with utils.timer(
687
                        logger, 'index updated', step='index',
688
                        upload_size=self.upload_files.size):
689
690
691
692
693
694
695
696
697
698
699
700
                    search.publish(calcs)

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload deleted', step='delete staged',
                            upload_size=self.upload_files.size):
                        self.upload_files.delete()
                        self.published = True
                        self.publish_time = datetime.utcnow()
                        self.last_update = datetime.utcnow()
                        self.save()
                else:
701
                    self.last_update = datetime.utcnow()
702
                    self.save()
703

704
705
    @process
    def re_process_upload(self):
706
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
707
708
709
        A *process* that performs the re-processing of a earlier processed
        upload.

710
711
712
713
714
715
        Runs the distributed process of fully reparsing/renormalizing an existing and
        already published upload. Will renew the archive part of the upload and update
        mongo and elastic search entries.

        TODO this implementation does not do any re-matching. This will be more complex
        due to handling of new or missing matches.
716
        '''
717
718
719
        logger = self.get_logger()
        logger.info('started to re-process')

720
721
722
723
724
        # mock the steps of actual processing
        self._continue_with('uploading')

        # extract the published raw files into a staging upload files instance
        self._continue_with('extracting')
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739

        if self.published:
            try:
                staging_upload_files = StagingUploadFiles(self.upload_id)
                # public files exist and there is a staging directory, it is probably old
                # and we delete it first
                staging_upload_files.delete()
                logger.warn('deleted old staging files')

            except KeyError as e:
                logger.info('reprocessing published files')
        else:
            logger.info('reprocessing staging files')

        staging_upload_files = self.upload_files.to_staging_upload_files(create=True)
740
741

        self._continue_with('parse_all')
742
        try:
743
            # check if a calc is already/still processing
744
745
746
747
748
749
750
751
            processing = Calc.objects(
                upload_id=self.upload_id,
                **Calc.process_running_mongoengine_query()).count()

            if processing > 0:
                logger.warn(
                    'processes are still/already running on calc, they will be resetted',
                    count=processing)
752
753

            # reset all calcs
754
            Calc._get_collection().update_many(
755
756
757
                dict(upload_id=self.upload_id),
                {'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)})

758
            # process call calcs
759
760
761
            Calc.process_all(Calc.re_process_calc, dict(upload_id=self.upload_id), exclude=['metadata'])

            logger.info('completed to trigger re-process of all calcs')
762
763
        except Exception as e:
            # try to remove the staging copy in failure case
764
765
            logger.error('failed to trigger re-process of all calcs', exc_info=e)

766
767
768
            if self.published:
                if staging_upload_files is not None and staging_upload_files.exists():
                    staging_upload_files.delete()
769
770

            raise e
771
772
773
774

        # the packing and removing of the staging upload files, will be trigged by
        # the 'cleanup' task after processing all calcs

775
776
    @process
    def re_pack(self):
777
        ''' A *process* that repacks the raw and archive data based on the current embargo data. '''
778
779
780
781
782
783
784
785
        assert self.published

        # mock the steps of actual processing
        self._continue_with('uploading')
        self._continue_with('extracting')
        self._continue_with('parse_all')
        self._continue_with('cleanup')

786
        self.upload_files.re_pack(self.user_metadata())
787
788
789
        self.joined = True
        self._complete()

Markus Scheidgen's avatar
Markus Scheidgen committed
790
    @process
791
    def process_upload(self):
792
        ''' A *process* that performs the initial upload processing. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
793
794
795
796
797
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
798
        ''' A no-op *task* as a stand-in for receiving upload data. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
799
800
        pass

801
    @property
802
803
    def upload_files(self) -> UploadFiles:
        upload_files_class = ArchiveBasedStagingUploadFiles if not self.published else PublicUploadFiles
804
        kwargs = dict(upload_path=self.upload_path) if not self.published else {}
805
806
807

        if not self._upload_files or not isinstance(self._upload_files, upload_files_class):
            self._upload_files = upload_files_class(
808
                self.upload_id, is_authorized=lambda: True, **kwargs)
809

810
        return self._upload_files
811

812
813
814
815
816
    @property
    def staging_upload_files(self) -> ArchiveBasedStagingUploadFiles:
        assert not self.published
        return cast(ArchiveBasedStagingUploadFiles, self.upload_files)

Markus Scheidgen's avatar
Markus Scheidgen committed
817
818
    @task
    def extracting(self):
819
        '''
820
821
        The *task* performed before the actual parsing/normalizing: extracting
        the uploaded files.
822
        '''
823
824
825
826
827
        # extract the uploaded file
        self._upload_files = ArchiveBasedStagingUploadFiles(
            upload_id=self.upload_id, is_authorized=lambda: True, create=True,
            upload_path=self.upload_path)

Markus Scheidgen's avatar
Markus Scheidgen committed
828
829
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
830
831
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
832
833
                    upload_size=self.upload_files.size):
                self.upload_files.extract()
834
835
836
837

            if self.temporary:
                os.remove(self.upload_path)
                self.upload_path = None
838

839
        except KeyError:
840
            self.fail('processing requested for non existing upload', log_level=logging.ERROR)
841
842
843
            return
        except ExtractError:
            self.fail('bad .zip/.tar file', log_level=logging.INFO)
Markus Scheidgen's avatar
Markus Scheidgen committed
844
845
            return

846
    def _preprocess_files(self, path):
847
        '''
848
849
        Some files need preprocessing. Currently we need to add a stripped POTCAR version
        and always restrict/embargo the original.
850
        '''
851
        if os.path.basename(path).startswith('POTCAR'):
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
            # create checksum
            hash = hashlib.sha224()
            with open(self.staging_upload_files.raw_file_object(path).os_path, 'rb') as orig_f:
                for line in orig_f.readlines():
                    hash.update(line)

            checksum = hash.hexdigest()

            # created stripped POTCAR
            stripped_path = path + '.stripped'
            with open(self.staging_upload_files.raw_file_object(stripped_path).os_path, 'wt') as stripped_f:
                stripped_f.write('Stripped POTCAR file. Checksum of original file (sha224): %s\n' % checksum)
            os.system(
                '''
                    awk < %s >> %s '
                    BEGIN { dump=1 }
                    /End of Dataset/ { dump=1 }
                    dump==1 { print }
                    /END of PSCTR/ { dump=0 }'
                ''' % (
                    self.staging_upload_files.raw_file_object(path).os_path,
                    self.staging_upload_files.raw_file_object(stripped_path).os_path))

875
    def match_mainfiles(self) -> Iterator[Tuple[str, object]]:
876
        '''
877
878
879
880
881
        Generator function that matches all files in the upload to all parsers to
        determine the upload's mainfiles.

        Returns:
            Tuples of mainfile, filename, and parsers
882
        '''
883
        directories_with_match: Dict[str, str] = dict()
884
885
        upload_files = self.staging_upload_files
        for filename in upload_files.raw_file_manifest():
886
            self._preprocess_files(filename)
887
            try:
888
                parser = match_parser(upload_files.raw_file_object(filename).os_path)
889
                if parser is not None:
890
891
                    directory = os.path.dirname(filename)
                    if directory in directories_with_match:
892
893
894
                        # TODO this might give us the chance to store directory based relationship
                        # between calcs for the future?
                        pass
895
896
                    else:
                        directories_with_match[directory] = filename
897
898

                    yield filename, parser
899
900
901
902
            except Exception as e:
                self.get_logger().error(
                    'exception while matching pot. mainfile',
                    mainfile=filename, exc_info=e)
903

Markus Scheidgen's avatar
Markus Scheidgen committed
904
905
    @task
    def parse_all(self):
906
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
907
        The *task* used to identify mainfile/parser combinations among the upload's files, creates
908
        respective :class:`Calc` instances, and triggers their processing.
909
        '''
910
911
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
912
913
        with utils.timer(
                logger, 'upload extracted', step='matching',
914
915
                upload_size=self.upload_files.size):
            for filename, parser in self.match_mainfiles():
916
                calc = Calc.create(
917
                    calc_id=self.upload_files.calc_id(filename),
918
                    mainfile=filename, parser=parser.name,
919
                    worker_hostname=self.worker_hostname,
920
921
                    upload_id=self.upload_id)

922
                calc.process_calc()
Markus Scheidgen's avatar
Markus Scheidgen committed
923

924
    def on_process_complete(self, process_name):
925
        if process_name == 'process_upload' or process_name == 're_process_upload':
926
927
928
            self.check_join()

    def check_join(self):
929
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
930