data.py 46.1 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
16
17
18
19
20
21
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
Markus Scheidgen's avatar
Markus Scheidgen committed
22

Markus Scheidgen's avatar
Markus Scheidgen committed
23
.. autoclass:: Upload
Markus Scheidgen's avatar
Markus Scheidgen committed
24

25
'''
26
from typing import cast, List, Any, Iterator, Dict, cast, Iterable
27
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField
Markus Scheidgen's avatar
Markus Scheidgen committed
28
import logging
29
from structlog import wrap_logger
30
from contextlib import contextmanager
31
import os.path
32
33
from datetime import datetime
from pymongo import UpdateOne
34
from celery.utils import worker_direct
35
import hashlib
36
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
Markus Scheidgen's avatar
Markus Scheidgen committed
37

38
from nomad import utils, config, infrastructure, search, datamodel
39
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles, StagingUploadFiles
40
from nomad.processing.base import Proc, process, task, ProcessAlreadyRunning, PENDING, SUCCESS, FAILURE, PROCESS_CALLED, PROCESS_COMPLETED
41
from nomad.parsing import parser_dict, match_parser, Backend
Markus Scheidgen's avatar
Markus Scheidgen committed
42
from nomad.normalizing import normalizers
43
from nomad.processing.pipelines import run_pipelines, PipelineContext
Markus Scheidgen's avatar
Markus Scheidgen committed
44
45


46
47
48
49
50
51
52
def _pack_log_event(logger, method_name, event_dict):
    try:
        log_data = dict(event_dict)
        log_data.update(**{
            key: value
            for key, value in getattr(logger, '_context', {}).items()
            if key not in ['service', 'release', 'upload_id', 'calc_id', 'mainfile', 'process_status']})
53
        log_data.update(logger=logger.name)
54
55
56
57
58
59
60
61
62
63
64

        return log_data
    except Exception:
        # raising an exception would cause an indefinite loop
        return event_dict


_log_processors = [
    StackInfoRenderer(),
    _pack_log_event,
    format_exc_info,
65
    TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)]
66
67


Markus Scheidgen's avatar
Markus Scheidgen committed
68
class Calc(Proc):
69
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
70
71
72
73
74
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

75
76
    The attribute list, does not include the various metadata properties generated
    while parsing, including ``code_name``, ``code_version``, etc.
Markus Scheidgen's avatar
Markus Scheidgen committed
77
78

    Attributes:
79
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
80
81
82
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
83

84
85
        metadata: the metadata record wit calc and user metadata, see :class:`datamodel.EntryMetadata`
    '''
86
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
87
88
89
90
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

91
92
    metadata = DictField()

Markus Scheidgen's avatar
Markus Scheidgen committed
93
    meta: Any = {
Markus Scheidgen's avatar
Markus Scheidgen committed
94
        'indexes': [
95
            'upload_id',
96
            'parser',
97
98
99
            ('upload_id', 'mainfile'),
            ('upload_id', 'parser'),
            ('upload_id', 'tasks_status'),
100
            ('upload_id', 'process_status'),
101
            ('upload_id', 'metadata.nomad_version'),
102
103
            'metadata.processed',
            'metadata.last_processing',
104
            'metadata.published',
105
            'metadata.datasets',
Markus Scheidgen's avatar
Markus Scheidgen committed
106
            'metadata.pid'
Markus Scheidgen's avatar
Markus Scheidgen committed
107
108
109
110
111
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
112
        self._parser_backend: Backend = None
113
114
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
115
        self._calc_proc_logs: List[Any] = None
116

117
        self._entry_metadata = None
118

Markus Scheidgen's avatar
Markus Scheidgen committed
119
120
    @classmethod
    def get(cls, id):
121
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
122

Markus Scheidgen's avatar
Markus Scheidgen committed
123
    @property
124
125
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
126

127
128
129
130
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
131
            self._upload.worker_hostname = self.worker_hostname
132
133
        return self._upload

134
135
136
137
138
139
140
141
142
143
144
145
    def apply_entry_metadata(self, entry_metadata: datamodel.EntryMetadata):
        self.metadata = entry_metadata.m_to_dict(
            include_defaults=True,
            categories=[datamodel.MongoMetadata])  # TODO use embedded doc?

    def create_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
        processing object, not necessarely the user metadata nor the metadata from
        the archive.
        '''
        entry_metadata = datamodel.EntryMetadata()
146
147
        if self.parser is not None:
            entry_metadata.domain = parser_dict[self.parser].domain
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
        entry_metadata.upload_id = self.upload_id
        entry_metadata.calc_id = self.calc_id
        entry_metadata.mainfile = self.mainfile
        entry_metadata.nomad_version = config.version
        entry_metadata.nomad_commit = config.commit
        entry_metadata.uploader = self.upload.user_id
        entry_metadata.upload_time = self.upload.upload_time
        entry_metadata.upload_name = self.upload.name

        return entry_metadata

    def entry_metadata(self, upload_files: UploadFiles) -> datamodel.EntryMetadata:
        '''
        Returns a complete set of :class:`nomad.datamodel.EntryMetadata` including
        the user metadata and metadata from the archive.

        Arguments:
            upload_files:
                The :class:`nomad.files.UploadFiles` instance to read the archive from.
            cache:
                A boolean that indicates if the archive file should be left unclosed,
                e.g. if this method is called for many entries of the same upload.
        '''
        archive = upload_files.read_archive(self.calc_id)
172
173
174
175
176
177
178
179
180
181
182
183
        try:
            entry_metadata = datamodel.EntryMetadata.m_from_dict(
                archive[self.calc_id][datamodel.EntryArchive.section_metadata.name].to_dict())

        except KeyError:
            # Due hard processing failures, it might be possible that an entry might not
            # have an archive
            if self._entry_metadata is not None:
                entry_metadata = self._entry_metadata

            else:
                entry_metadata = self.create_metadata()
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199

        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

    def user_metadata(self) -> datamodel.EntryMetadata:
        '''
        Returns a :class:`nomad.datamodel.EntryMetadata` with values from this
        processing object and the user metadata, not necessarely the metadata from
        the archive.
        '''
        entry_metadata = self.create_metadata()
        entry_metadata.m_update_from_dict(self.metadata)

        return entry_metadata

200
201
202
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
203
204
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
205
206
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
207
    def get_logger(self, **kwargs):
208
        '''
209
210
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
211
        '''
212
213
214
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
215

216
217
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []
218

219
220
        def save_to_calc_log(logger, method_name, event_dict):
            try:
221
222
                # sanitize the event_dict, because all kinds of values might have been added
                dump_dict = {key: str(value) for key, value in event_dict.items()}
223
224
                dump_dict.update(level=method_name.upper())
                self._calc_proc_logs.append(dump_dict)
225

226
227
228
            except Exception:
                # Exceptions here will cause indefinite loop
                pass
229

230
            return event_dict
231

232
        return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
233

234
    def re_process_calc(self):
235
        '''
236
237
238
        Processes a calculation again. This means there is already metadata and
        instead of creating it initially, we are just updating the existing
        records.
239
        '''
240
        parser = match_parser(self.upload_files.raw_file_object(self.mainfile).os_path, strict=False)
241
        logger = self.get_logger()
242
243
244

        if parser is None and not config.reprocess_unmatched:
            self.errors = ['no parser matches during re-process, will not re-process this calc']
245

246
247
248
249
250
251
252
            try:
                upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
                with upload_files.read_archive(self.calc_id) as archive:
                    self.upload_files.write_archive(self.calc_id, archive[self.calc_id].to_dict())

            except Exception as e:
                logger.error('could not copy archive for non matching, non reprocessed entry', exc_info=e)
253
                raise e
254

255
256
257
258
259
            # mock the steps of actual processing
            self._continue_with('parsing')
            self._continue_with('normalizing')
            self._continue_with('archiving')
            self._complete()
260
261
            return

262
        if parser is None:
263
264
265
266
            self.get_logger().warn('no parser matches during re-process, use the old parser')
            self.warnings = ['no matching parser found during re-processing']

        elif self.parser != parser.name:
267
268
269
270
271
272
273
274
275
            if parser_dict[self.parser].name == parser.name:
                # parser was just renamed
                self.parser = parser.name

            else:
                self.parser = parser.name
                logger.info(
                    'different parser matches during re-process, use new parser',
                    parser=parser.name)
276

277
        try:
278
279
280
281
282
283
            self._entry_metadata = self.user_metadata()
            self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
            self._entry_metadata.nomad_version = config.version
            self._entry_metadata.nomad_commit = config.commit
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)
284
285
286
287
288
289
290

            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
            # close loghandler that was not closed due to failures
            try:
291
292
                if self._parser_backend and self._parser_backend.resource:
                    self._parser_backend.resource.unload()
293
            except Exception as e:
294
                logger.error('could unload processing results', exc_info=e)
295

296
    def process_calc(self):
297
        '''
298
299
        Processes a new calculation that has no prior records in the mongo, elastic,
        or filesystem storage. It will create an initial set of (user) metadata.
300
        '''
301
        logger = self.get_logger()
302
        if self.upload is None:
303
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
304
305

        try:
306
307
            # save preliminary minimum calc metadata in case processing fails
            # successful processing will replace it with the actual metadata
308
309
310
311
312
313
            self._entry_metadata = self.create_metadata()
            self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
            self._entry_metadata.last_processing = datetime.utcnow()
            self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)

            if len(self._entry_metadata.files) >= config.auxfile_cutoff:
314
315
316
317
                self.warning(
                    'This calc has many aux files in its directory. '
                    'Have you placed many calculations in the same directory?')

Markus Scheidgen's avatar
Markus Scheidgen committed
318
319
320
321
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
322
            # close loghandler that was not closed due to failures
323
            try:
324
325
                if self._parser_backend and self._parser_backend.resource:
                    self._parser_backend.resource.unload()
326
            except Exception as e:
327
                logger.error('could unload processing results', exc_info=e)
328

329
    def on_fail(self):
330
331
332
        # in case of failure, index a minimum set of metadata and mark
        # processing failure
        try:
333
            self._entry_metadata.processed = False
334

335
            self.apply_entry_metadata(self._entry_metadata)
336
337
338
339
340
            if self._parser_backend and self._parser_backend.resource:
                backend = self._parser_backend
            else:
                backend = None
            self._entry_metadata.apply_domain_metadata(backend)
341

342
            self._entry_metadata.a_elastic.index()
343
        except Exception as e:
344
345
            self.get_logger().error(
                'could not index after processing failure', exc_info=e)
346

347
        try:
348
            self.write_archive(None)
349
        except Exception as e:
350
351
            self.get_logger().error(
                'could not write archive after processing failure', exc_info=e)
352

Markus Scheidgen's avatar
Markus Scheidgen committed
353
354
    @task
    def parsing(self):
355
        ''' The *task* that encapsulates all parsing related actions. '''
356
        context = dict(parser=self.parser, step=self.parser)
357
        logger = self.get_logger(**context)
358
        parser = parser_dict[self.parser]
359
        self._entry_metadata.parser_name = self.parser
Markus Scheidgen's avatar
Markus Scheidgen committed
360

361
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
362
363
364
365
            try:
                self._parser_backend = parser.run(
                    self.upload_files.raw_file_object(self.mainfile).os_path, logger=logger)
            except Exception as e:
366
                self.fail('parser failed with exception', exc_info=e, error=str(e), **context)
367
                return
368
            except SystemExit:
369
                self.fail('parser raised system exit', error='system exit', **context)
370
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
371

Markus Scheidgen's avatar
Markus Scheidgen committed
372
373
        if self._parser_backend.status[0] != 'ParseSuccess':
            error = self._parser_backend.status[1]
374
            self.fail('parser failed', error=error, **context)
375
376
377
378
379
380
381
382

    @contextmanager
    def use_parser_backend(self, processor_name):
        self._parser_backend.reset_status()
        yield self._parser_backend

        if self._parser_backend.status[0] == 'ParseSuccess':
            warnings = getattr(self._parser_backend, '_warnings', [])
383

384
            if len(warnings) > 0:
385
386
387
388
                self.get_logger().warn(
                    'processor completed successful with warnings',
                    processor=processor_name, warnings=[str(warning) for warning in warnings])

389
            else:
390
391
392
393
                self.get_logger().info(
                    'processor completed successful',
                    processor=processor_name)

394
        else:
395
            errors = self._parser_backend.status[1]
396
397
398
            self.get_logger().error(
                'processor completed with failure',
                processor=processor_name, errors=str(errors))
Markus Scheidgen's avatar
Markus Scheidgen committed
399
400
401

    @task
    def normalizing(self):
402
        ''' The *task* that encapsulates all normalizing related actions. '''
403
404
405
406
407

        # allow normalizer to access and add data to the entry metadata
        self._parser_backend.entry_archive.m_add_sub_section(
            datamodel.EntryArchive.section_metadata, self._entry_metadata)

Markus Scheidgen's avatar
Markus Scheidgen committed
408
        for normalizer in normalizers:
409
            if normalizer.domain != parser_dict[self.parser].domain:
410
411
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
412
            normalizer_name = normalizer.__name__
413
            context = dict(normalizer=normalizer_name, step=normalizer_name)
414
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
415
416

            with utils.timer(
417
                    logger, 'normalizer executed', input_size=self.mainfile_file.size):
418
                with self.use_parser_backend(normalizer_name) as backend:
419
420
421
                    try:
                        normalizer(backend).normalize(logger=logger)
                    except Exception as e:
422
                        self._parser_backend.finishedParsingSession('ParseFailure', [str(e)])
423
424
425
                        logger.error(
                            'normalizer failed with exception', exc_info=e,
                            error=str(e), **context)
426
427
428
429
430
431
432
433
434
                        break
                    else:
                        if self._parser_backend.status[0] != 'ParseSuccess':
                            error = self._parser_backend.status[1]
                            self.fail('normalizer failed', error=error, **context)
                            break
                        else:
                            logger.debug(
                                'completed normalizer successfully', normalizer=normalizer_name)
Markus Scheidgen's avatar
Markus Scheidgen committed
435
436
437

    @task
    def archiving(self):
438
        ''' The *task* that encapsulates all archival related actions. '''
439
440
        logger = self.get_logger()

441
442
        self._entry_metadata.apply_domain_metadata(self._parser_backend)
        self._entry_metadata.processed = True
443

444
445
        # persist the calc metadata
        with utils.timer(logger, 'saved calc metadata', step='metadata'):
446
            self.apply_entry_metadata(self._entry_metadata)
447
448

        # index in search
449
        with utils.timer(logger, 'indexed', step='index'):
450
            self._entry_metadata.a_elastic.index()
451

452
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
453
        with utils.timer(
454
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
455
                input_size=self.mainfile_file.size) as log_data:
456

457
            archive_size = self.write_archive(self._parser_backend)
458
            log_data.update(archive_size=archive_size)
Markus Scheidgen's avatar
Markus Scheidgen committed
459

460
    def write_archive(self, backend: Backend):
461
462
463
464
465
466
        def filter_processing_logs(logs):
            if len(logs) > 100:
                return [
                    log for log in logs
                    if log.get('level') != 'DEBUG']
            return logs
Markus Scheidgen's avatar
Markus Scheidgen committed
467

468
469
470
471
472
473
474
475
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []

        if backend is not None:
            entry_archive = backend.entry_archive.m_copy()
        else:
            entry_archive = datamodel.EntryArchive()

476
477
478
        if entry_archive.section_metadata is None:
            entry_archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)

479
        entry_archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
480
481
482
483
484
485
486
487
488
489

        try:
            return self.upload_files.write_archive(self.calc_id, entry_archive.m_to_dict())
        except Exception as e:
            if backend is None:
                raise e

            # most likely failed due to domain data, try to write metadata and processing logs
            entry_archive = datamodel.EntryArchive()
            entry_archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
490
            entry_archive.processing_logs = filter_processing_logs(self._calc_proc_logs)
491
492
            self.upload_files.write_archive(self.calc_id, entry_archive.m_to_dict())
            raise e
Markus Scheidgen's avatar
Markus Scheidgen committed
493

494
    def __str__(self):
495
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
496

497

498
class Upload(Proc):
499
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
500
501
502
503
504
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
505
506
        upload_path: the path were the uploaded files was stored
        temporary: True if the uploaded file should be removed after extraction
Markus Scheidgen's avatar
Markus Scheidgen committed
507
508
        upload_id: the upload id generated by the database
        upload_time: the timestamp when the system realised the upload
509
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
510
511
        published: Boolean that indicates the publish status
        publish_time: Date when the upload was initially published
512
        last_update: Date of the last publishing/re-processing
513
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
514
515
516
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
517
518
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
519
    embargo_length = IntField(default=36)
Markus Scheidgen's avatar
Markus Scheidgen committed
520
521
522

    name = StringField(default=None)
    upload_time = DateTimeField()
523
    user_id = StringField(required=True)
524
525
    published = BooleanField(default=False)
    publish_time = DateTimeField()
526
    last_update = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
527
528
529

    meta: Any = {
        'indexes': [
530
            'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
Markus Scheidgen's avatar
Markus Scheidgen committed
531
532
        ],
        'strict': False  # ignore extra fields to support older entries with join related fields
Markus Scheidgen's avatar
Markus Scheidgen committed
533
534
535
536
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
537
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
538

539
540
    @property
    def metadata(self) -> dict:
541
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
542
543
544
545
546
        Getter, setter for user metadata. Metadata is pickled to and from the public
        bucket to allow sharing among all processes. Usually uploads do not have (much)
        user defined metadata, but users provide all metadata per upload as part of
        the publish process. This will change, when we introduce editing functionality
        and metadata will be provided through different means.
547
        '''
548
549
550
551
552
        try:
            upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
        except KeyError:
            return None
        return upload_files.user_metadata
553
554
555

    @metadata.setter
    def metadata(self, data: dict) -> None:
556
557
        upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True, create=True)
        upload_files.user_metadata = data
558

Markus Scheidgen's avatar
Markus Scheidgen committed
559
    @classmethod
560
    def get(cls, id: str, include_published: bool = True) -> 'Upload':
561
        return cls.get_by_id(id, 'upload_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
562
563

    @classmethod
564
    def user_uploads(cls, user: datamodel.User, **kwargs) -> List['Upload']:
565
        ''' Returns all uploads for the given user. Kwargs are passed to mongo query. '''
566
        return cls.objects(user_id=str(user.user_id), **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
567

568
569
    @property
    def uploader(self):
570
        return datamodel.User.get(self.user_id)
571

Markus Scheidgen's avatar
Markus Scheidgen committed
572
573
    def get_logger(self, **kwargs):
        logger = super().get_logger()
574
575
        user = self.uploader
        user_name = '%s %s' % (user.first_name, user.last_name)
576
        # We are not using 'user_id' because logstash (?) will filter these entries ?!
577
578
        logger = logger.bind(
            upload_id=self.upload_id, upload_name=self.name, user_name=user_name,
579
            user=user.user_id, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
580
581
582
583
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
584
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
585
586
587
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
588
589

        Arguments:
590
            user: The user that created the upload.
591
        '''
592
593
        # use kwargs to keep compatibility with super method
        user: datamodel.User = kwargs['user']
594
        del(kwargs['user'])
595

596
597
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
598
        kwargs.update(user_id=user.user_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
599
        self = super().create(**kwargs)
600

Markus Scheidgen's avatar
Markus Scheidgen committed
601
        self._continue_with('uploading')
602

Markus Scheidgen's avatar
Markus Scheidgen committed
603
604
        return self

605
    def delete(self):
606
        ''' Deletes this upload process state entry and its calcs. '''
607
608
609
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

610
    def delete_upload_local(self):
611
        '''
612
        Deletes the upload, including its processing state and
613
        staging files. Local version without celery processing.
614
        '''
615
616
        logger = self.get_logger()

617
        with utils.lnr(logger, 'upload delete failed'):
618
            with utils.timer(
619
                    logger, 'upload deleted from index', step='index',
620
                    upload_size=self.upload_files.size):
621
                search.delete_upload(self.upload_id)
622

623
            with utils.timer(
624
                    logger, 'upload deleted', step='files',
625
626
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
627
628

            self.delete()
629

630
    @process
631
    def delete_upload(self):
632
        '''
633
634
        Deletes of the upload, including its processing state and
        staging files. This starts the celery process of deleting the upload.
635
        '''
636
        self.delete_upload_local()
637

638
        return True  # do not save the process status on the delete upload
639

640
    @process
641
    def publish_upload(self):
642
        '''
643
644
        Moves the upload out of staging to the public area. It will
        pack the staging upload files in to public upload files.
645
        '''
646
647
        assert self.processed_calcs > 0

648
        logger = self.get_logger()
649
        logger.info('started to publish')
650

651
        with utils.lnr(logger, 'publish failed'):
652
            with self.entries_metadata(self.metadata) as calcs:
653

654
                with utils.timer(
655
                        logger, 'upload metadata updated', step='metadata',
656
                        upload_size=self.upload_files.size):
657

658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
                    def create_update(calc):
                        calc.published = True
                        calc.with_embargo = calc.with_embargo if calc.with_embargo is not None else False
                        return UpdateOne(
                            {'_id': calc.calc_id},
                            {'$set': {'metadata': calc.m_to_dict(
                                include_defaults=True, categories=[datamodel.MongoMetadata])}})

                    Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload files packed', step='pack',
                            upload_size=self.upload_files.size):
                        self.upload_files.pack(calcs)
673

674
                with utils.timer(
675
                        logger, 'index updated', step='index',
676
                        upload_size=self.upload_files.size):
677
678
679
680
681
682
683
684
685
686
687
688
                    search.publish(calcs)

                if isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'staged upload deleted', step='delete staged',
                            upload_size=self.upload_files.size):
                        self.upload_files.delete()
                        self.published = True
                        self.publish_time = datetime.utcnow()
                        self.last_update = datetime.utcnow()
                        self.save()
                else:
689
                    self.last_update = datetime.utcnow()
690
                    self.save()
691

692
693
    @process
    def re_process_upload(self):
694
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
695
696
697
        A *process* that performs the re-processing of a earlier processed
        upload.

698
699
700
701
702
703
        Runs the distributed process of fully reparsing/renormalizing an existing and
        already published upload. Will renew the archive part of the upload and update
        mongo and elastic search entries.

        TODO this implementation does not do any re-matching. This will be more complex
        due to handling of new or missing matches.
704
        '''
705
706
707
        logger = self.get_logger()
        logger.info('started to re-process')

708
709
710
711
712
        # mock the steps of actual processing
        self._continue_with('uploading')

        # extract the published raw files into a staging upload files instance
        self._continue_with('extracting')
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727

        if self.published:
            try:
                staging_upload_files = StagingUploadFiles(self.upload_id)
                # public files exist and there is a staging directory, it is probably old
                # and we delete it first
                staging_upload_files.delete()
                logger.warn('deleted old staging files')

            except KeyError as e:
                logger.info('reprocessing published files')
        else:
            logger.info('reprocessing staging files')

        staging_upload_files = self.upload_files.to_staging_upload_files(create=True)
728
729

        self._continue_with('parse_all')
730
        try:
731
            # Check if a calc is already/still processing
732
733
734
735
736
737
738
739
            processing = Calc.objects(
                upload_id=self.upload_id,
                **Calc.process_running_mongoengine_query()).count()

            if processing > 0:
                logger.warn(
                    'processes are still/already running on calc, they will be resetted',
                    count=processing)
740

741
            # Reset all calcs
742
            Calc._get_collection().update_many(
743
744
745
                dict(upload_id=self.upload_id),
                {'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)})

746
747
748
749
750
751
            # Resolve queue and priority
            queue = None
            if config.celery.routing == config.CELERY_WORKER_ROUTING and self.worker_hostname is not None:
                queue = worker_direct(self.worker_hostname).name
            priority = config.celery.priorities.get('%s.%s' % ("Calc", "re_process"), 1)

752
            # Re-process all calcs
753
            def pipeline_generator():
754
755
756
757
758
759
760
761
                query = dict(upload_id=self.upload_id)
                running_query = dict(Calc.process_running_mongoengine_query())
                running_query.update(query)
                if Calc.objects(**running_query).first() is not None:
                    raise ProcessAlreadyRunning('Tried to call a processing function on an already processing process.')
                Calc._get_collection().update_many(query, {'$set': dict(
                    current_process="re_process_calc",
                    process_status=PROCESS_CALLED)})
762
                for calc in Calc.objects(**query):
763
764
765
766
767
768
769
770
                    yield PipelineContext(
                        calc.mainfile,
                        calc.parser,
                        calc.calc_id,
                        calc.upload_id,
                        calc.worker_hostname,
                        re_process=True
                    )
771
            run_pipelines(pipeline_generator(), self.upload_id, queue, priority)
772
773

            logger.info('completed to trigger re-process of all calcs')
774
775
        except Exception as e:
            # try to remove the staging copy in failure case
776
777
            logger.error('failed to trigger re-process of all calcs', exc_info=e)

778
779
780
            if self.published:
                if staging_upload_files is not None and staging_upload_files.exists():
                    staging_upload_files.delete()
781
782

            raise e
783
784
785
786

        # the packing and removing of the staging upload files, will be trigged by
        # the 'cleanup' task after processing all calcs

787
788
    @process
    def re_pack(self):
789
        ''' A *process* that repacks the raw and archive data based on the current embargo data. '''
790
791
792
793
794
795
796
797
        assert self.published

        # mock the steps of actual processing
        self._continue_with('uploading')
        self._continue_with('extracting')
        self._continue_with('parse_all')
        self._continue_with('cleanup')

798
        self.upload_files.re_pack(self.user_metadata())
799
800
        self._complete()

Markus Scheidgen's avatar
Markus Scheidgen committed
801
    @process
802
    def process_upload(self):
803
        ''' A *process* that performs the initial upload processing. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
804
805
806
807
808
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
809
        ''' A no-op *task* as a stand-in for receiving upload data. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
810
811
        pass

812
    @property
813
814
    def upload_files(self) -> UploadFiles:
        upload_files_class = ArchiveBasedStagingUploadFiles if not self.published else PublicUploadFiles
815
        kwargs = dict(upload_path=self.upload_path) if not self.published else {}
816
817
818

        if not self._upload_files or not isinstance(self._upload_files, upload_files_class):
            self._upload_files = upload_files_class(
819
                self.upload_id, is_authorized=lambda: True, **kwargs)
820

821
        return self._upload_files
822

823
824
825
826
827
    @property
    def staging_upload_files(self) -> ArchiveBasedStagingUploadFiles:
        assert not self.published
        return cast(ArchiveBasedStagingUploadFiles, self.upload_files)

Markus Scheidgen's avatar
Markus Scheidgen committed
828
829
    @task
    def extracting(self):
830
        '''
831
832
        The *task* performed before the actual parsing/normalizing: extracting
        the uploaded files.
833
        '''
834
835
836
837
838
        # extract the uploaded file
        self._upload_files = ArchiveBasedStagingUploadFiles(
            upload_id=self.upload_id, is_authorized=lambda: True, create=True,
            upload_path=self.upload_path)

Markus Scheidgen's avatar
Markus Scheidgen committed
839
840
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
841
842
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
843
844
                    upload_size=self.upload_files.size):
                self.upload_files.extract()
845
846
847
848

            if self.temporary:
                os.remove(self.upload_path)
                self.upload_path = None
849

850
        except KeyError:
851
            self.fail('processing requested for non existing upload', log_level=logging.ERROR)
852
853
854
            return
        except ExtractError:
            self.fail('bad .zip/.tar file', log_level=logging.INFO)
Markus Scheidgen's avatar
Markus Scheidgen committed
855
856
            return

857
    def _preprocess_files(self, path):
858
        '''
859
860
        Some files need preprocessing. Currently we need to add a stripped POTCAR version
        and always restrict/embargo the original.
861
        '''
862
        if os.path.basename(path).startswith('POTCAR'):
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
            # create checksum
            hash = hashlib.sha224()
            with open(self.staging_upload_files.raw_file_object(path).os_path, 'rb') as orig_f:
                for line in orig_f.readlines():
                    hash.update(line)

            checksum = hash.hexdigest()

            # created stripped POTCAR
            stripped_path = path + '.stripped'
            with open(self.staging_upload_files.raw_file_object(stripped_path).os_path, 'wt') as stripped_f:
                stripped_f.write('Stripped POTCAR file. Checksum of original file (sha224): %s\n' % checksum)
            os.system(
                '''
                    awk < %s >> %s '
                    BEGIN { dump=1 }
                    /End of Dataset/ { dump=1 }
                    dump==1 { print }
                    /END of PSCTR/ { dump=0 }'
                ''' % (
                    self.staging_upload_files.raw_file_object(path).os_path,
                    self.staging_upload_files.raw_file_object(stripped_path).os_path))

886
    def match_mainfiles(self) -> Iterator[PipelineContext]:
887
888
        """Generator function that iterates over files in an upload and returns
        basic information for each found mainfile.
889
890

        Returns:
891
            PipelineContext
892
        """
893
        directories_with_match: Dict[str, str] = dict()
894
        upload_files = self.staging_upload_files
895
896
        for filepath in upload_files.raw_file_manifest():
            self._preprocess_files(filepath)
897
            try:
898
                parser = match_parser(upload_files.raw_file_object(filepath).os_path)
899
                if parser is not None:
900
                    directory = os.path.dirname(filepath)
901
                    if directory in directories_with_match:
902
903
904
                        # TODO this might give us the chance to store directory based relationship
                        # between calcs for the future?
                        pass
905
                    else:
906
                        directories_with_match[directory] = filepath
907
908
909
910
911
912
913
                    yield PipelineContext(
                        filepath,
                        parser.name,
                        upload_files.calc_id(filepath),
                        self.upload_id,
                        self.worker_hostname
                    )
914
915
916
            except Exception as e:
                self.get_logger().error(
                    'exception while matching pot. mainfile',
917
                    mainfile=filepath, exc_info=e)
918

919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
    def processing_started(self):
        """Informs MongoDB that this Upload has started processing.
        """
        # Tell Upload that a process has been started.
        self.current_process = "process"
        self.process_status = PROCESS_CALLED
        self.save()

    def processing_finished(self):
        """Informs MongoDB that this Upload has finished processing.
        """
        # Tell Upload that a process has been started.
        self.process_status = PROCESS_COMPLETED
        self.save()
        self.cleanup()
934

Markus Scheidgen's avatar
Markus Scheidgen committed
935
936
    @task
    def parse_all(self):
937
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
938
        The *task* used to identify mainfile/parser combinations among the upload's files, creates
939
        respective :class:`Calc` instances, and triggers their processing.
940
        '''
941
942
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
943
944
        with utils.timer(
                logger, 'upload extracted', step='matching',