data.py 44.9 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
16
17
18
19
20
21
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
Markus Scheidgen's avatar
Markus Scheidgen committed
22

Markus Scheidgen's avatar
Markus Scheidgen committed
23
.. autoclass:: Upload
Markus Scheidgen's avatar
Markus Scheidgen committed
24

25
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
26

27
from typing import cast, List, Any, ContextManager, Tuple, Generator, Dict, cast, Iterable
28
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField
Markus Scheidgen's avatar
Markus Scheidgen committed
29
import logging
30
from structlog import wrap_logger
31
from contextlib import contextmanager
32
import os.path
33
34
from datetime import datetime
from pymongo import UpdateOne
35
import hashlib
36
37
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
import json
Markus Scheidgen's avatar
Markus Scheidgen committed
38

39
from nomad import utils, config, infrastructure, search, datamodel
40
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles, StagingUploadFiles
41
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
42
from nomad.parsing import parser_dict, match_parser, Backend
Markus Scheidgen's avatar
Markus Scheidgen committed
43
44
45
from nomad.normalizing import normalizers


46
47
48
49
50
51
52
def _pack_log_event(logger, method_name, event_dict):
    try:
        log_data = dict(event_dict)
        log_data.update(**{
            key: value
            for key, value in getattr(logger, '_context', {}).items()
            if key not in ['service', 'release', 'upload_id', 'calc_id', 'mainfile', 'process_status']})
53
        log_data.update(logger=logger.name)
54
55
56
57
58
59
60
61
62
63
64

        return log_data
    except Exception:
        # raising an exception would cause an indefinite loop
        return event_dict


_log_processors = [
    StackInfoRenderer(),
    _pack_log_event,
    format_exc_info,
65
    TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)]
66
67


Markus Scheidgen's avatar
Markus Scheidgen committed
68
class Calc(Proc):
69
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
70
71
72
73
74
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

75
76
    The attribute list, does not include the various metadata properties generated
    while parsing, including ``code_name``, ``code_version``, etc.
Markus Scheidgen's avatar
Markus Scheidgen committed
77
78

    Attributes:
79
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
80
81
82
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
83

84
85
        metadata: the metadata record wit calc and user metadata, see :class:`datamodel.EntryMetadata`
    '''
86
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
87
88
89
90
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

91
92
    metadata = DictField()

Markus Scheidgen's avatar
Markus Scheidgen committed
93
    meta: Any = {
Markus Scheidgen's avatar
Markus Scheidgen committed
94
        'indexes': [
95
96
97
98
            'upload_id',
            ('upload_id', 'mainfile'),
            ('upload_id', 'parser'),
            ('upload_id', 'tasks_status'),
99
            ('upload_id', 'process_status'),
100
101
102
            ('upload_id', 'metadata.nomad_version'),
            'metadata.published',
            'metadata.datasets'
Markus Scheidgen's avatar
Markus Scheidgen committed
103
            'metadata.pid'
Markus Scheidgen's avatar
Markus Scheidgen committed
104
105
106
107
108
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
109
        self._parser_backend: Backend = None
110
111
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
112
        self._calc_proc_logwriter = None
113
        self._calc_proc_logwriter_ctx: ContextManager = None
Markus Scheidgen's avatar
Markus Scheidgen committed
114

115
    @classmethod
116
    def from_entry_metadata(cls, entry_metadata):
117
        calc = Calc.create(
118
119
120
121
            calc_id=entry_metadata.calc_id,
            upload_id=entry_metadata.upload_id,
            mainfile=entry_metadata.mainfile,
            metadata=entry_metadata.m_to_dict(include_defaults=True))
122
123
124

        return calc

Markus Scheidgen's avatar
Markus Scheidgen committed
125
126
    @classmethod
    def get(cls, id):
127
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
128

Markus Scheidgen's avatar
Markus Scheidgen committed
129
    @property
130
131
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
132

133
134
135
136
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
137
            self._upload.worker_hostname = self.worker_hostname
138
139
        return self._upload

140
141
142
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
143
144
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
145
146
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
147
    def get_logger(self, **kwargs):
148
        '''
149
150
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
151
        '''
152
153
154
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
155

156
        if self._calc_proc_logwriter_ctx is None:
157
158
159
160
161
162
163
164
165
166
167
168
            try:
                self._calc_proc_logwriter_ctx = self.upload_files.archive_log_file(self.calc_id, 'wt')
                self._calc_proc_logwriter = self._calc_proc_logwriter_ctx.__enter__()  # pylint: disable=E1101
            except KeyError:
                # cannot open log file
                pass

        if self._calc_proc_logwriter_ctx is None:
            return logger
        else:
            def save_to_calc_log(logger, method_name, event_dict):
                if self._calc_proc_logwriter is not None:
169
                    try:
170
171
172
                        dump_dict = dict(event_dict)
                        dump_dict.update(level=method_name.upper())
                        json.dump(dump_dict, self._calc_proc_logwriter, sort_keys=True)
173
                        self._calc_proc_logwriter.write('\n')
174

175
176
177
                    except Exception:
                        # Exceptions here will cause indefinite loop
                        pass
178
179
180

                return event_dict

181
            return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
182

183
184
    @process
    def re_process_calc(self):
185
        '''
186
187
188
        Processes a calculation again. This means there is already metadata and
        instead of creating it initially, we are just updating the existing
        records.
189
        '''
190
        parser = match_parser(self.upload_files.raw_file_object(self.mainfile).os_path, strict=False)
191
192

        if parser is None and not config.reprocess_unmatched:
193
194
195
196
197
198
199
200
201
            # Remove the logsfile and set a fake logwriter to avoid creating a log file,
            # because we will probably remove this calc and don't want to have ghost logfiles.
            if self._calc_proc_logwriter_ctx is not None:
                self._calc_proc_logwriter_ctx.__exit__(None, None, None)
                self.upload_files.archive_log_file_object(self.calc_id).delete()

            self._calc_proc_logwriter_ctx = open('/dev/null', 'wt')
            self._calc_proc_logwriter = self._calc_proc_logwriter_ctx.__enter__()
            self.get_logger().error(
202
203
204
                'no parser matches during re-process, will not re-process this calc')

            self.errors = ['no parser matches during re-process, will not re-process this calc']
205
206
207
208
209
210

            # mock the steps of actual processing
            self._continue_with('parsing')
            self._continue_with('normalizing')
            self._continue_with('archiving')
            self._complete()
211
212
            return

213
        logger = self.get_logger()
214
        if parser is None:
215
216
            self.get_logger().error('no parser matches during re-process, use the old parser')
            self.errors = ['no matching parser found during re-processing']
217
        if self.parser != parser.name:
218
219
220
            self.parser = parser.name
            logger.info(
                'different parser matches during re-process, use new parser',
221
                parser=parser.name)
222

223
        try:
224
225
226
227
228
229
230
231
232
233
            entry_metadata = datamodel.EntryMetadata.m_from_dict(self.metadata)
            entry_metadata.upload_id = self.upload_id
            entry_metadata.calc_id = self.calc_id
            entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
            entry_metadata.mainfile = self.mainfile
            entry_metadata.nomad_version = config.version
            entry_metadata.nomad_commit = config.commit
            entry_metadata.last_processing = datetime.utcnow()
            entry_metadata.files = self.upload_files.calc_files(self.mainfile)
            self.metadata = entry_metadata.m_to_dict(include_defaults=True)
234
235
236
237
238
239

            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
            # close loghandler that was not closed due to failures
240
241
242
243
244
245
            try:
                if self._parser_backend and self._parser_backend.resource:
                    self._parser_backend.resource.unload()
            except Exception as e:
                logger.error('could unload processing results', exc_info=e)

246
247
248
249
250
251
252
            try:
                if self._calc_proc_logwriter is not None:
                    self._calc_proc_logwriter.close()
                    self._calc_proc_logwriter = None
            except Exception as e:
                logger.error('could not close calculation proc log', exc_info=e)

Markus Scheidgen's avatar
Markus Scheidgen committed
253
    @process
254
    def process_calc(self):
255
        '''
256
257
        Processes a new calculation that has no prior records in the mongo, elastic,
        or filesystem storage. It will create an initial set of (user) metadata.
258
        '''
259
        logger = self.get_logger()
260
        if self.upload is None:
261
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
262
263

        try:
264
265
            # save preliminary minimum calc metadata in case processing fails
            # successful processing will replace it with the actual metadata
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
            calc_metadata = datamodel.EntryMetadata()
            calc_metadata.domain = parser_dict[self.parser].domain
            calc_metadata.upload_id = self.upload_id
            calc_metadata.calc_id = self.calc_id
            calc_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
            calc_metadata.mainfile = self.mainfile
            calc_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
            calc_metadata.nomad_version = config.version
            calc_metadata.nomad_commit = config.commit
            calc_metadata.last_processing = datetime.utcnow()
            calc_metadata.files = self.upload_files.calc_files(self.mainfile)
            calc_metadata.uploader = self.upload.user_id
            calc_metadata.upload_time = self.upload.upload_time
            calc_metadata.upload_name = self.upload.name
            self.metadata = calc_metadata.m_to_dict(include_defaults=True)  # TODO use embedded doc?

            if len(calc_metadata.files) >= config.auxfile_cutoff:
283
284
285
286
                self.warning(
                    'This calc has many aux files in its directory. '
                    'Have you placed many calculations in the same directory?')

Markus Scheidgen's avatar
Markus Scheidgen committed
287
288
289
290
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
291
            # close loghandler that was not closed due to failures
292
293
294
295
296
297
            try:
                if self._parser_backend and self._parser_backend.resource:
                    self._parser_backend.resource.unload()
            except Exception as e:
                logger.error('could unload processing results', exc_info=e)

298
            try:
299
300
301
                if self._calc_proc_logwriter is not None:
                    self._calc_proc_logwriter.close()
                    self._calc_proc_logwriter = None
302
303
304
            except Exception as e:
                logger.error('could not close calculation proc log', exc_info=e)

305
306
307
308
    def fail(self, *errors, log_level=logging.ERROR, **kwargs):
        # in case of failure, index a minimum set of metadata and mark
        # processing failure
        try:
309
            entry_metadata = datamodel.EntryMetadata.m_from_dict(self.metadata)
310
311
312
            if self.parser is not None:
                parser = parser_dict[self.parser]
                if hasattr(parser, 'code_name'):
313
                    entry_metadata.code_name = parser.code_name
314

315
316
            entry_metadata.processed = False
            self.metadata = entry_metadata.m_to_dict(include_defaults=True)
317
            entry_metadata.a_elastic.index()
318
319
320
321
322
        except Exception as e:
            self.get_logger().error('could not index after processing failure', exc_info=e)

        super().fail(*errors, log_level=log_level, **kwargs)

323
324
325
326
    def on_process_complete(self, process_name):
        # the save might be necessary to correctly read the join condition from the db
        self.save()
        # in case of error, the process_name might be unknown
327
        if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None:
328
329
            self.upload.reload()
            self.upload.check_join()
Markus Scheidgen's avatar
Markus Scheidgen committed
330
331
332

    @task
    def parsing(self):
333
        ''' The *task* that encapsulates all parsing related actions. '''
334
        context = dict(parser=self.parser, step=self.parser)
335
        logger = self.get_logger(**context)
336
        parser = parser_dict[self.parser]
337
        self.metadata['parser_name'] = self.parser
Markus Scheidgen's avatar
Markus Scheidgen committed
338

339
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
340
341
342
343
            try:
                self._parser_backend = parser.run(
                    self.upload_files.raw_file_object(self.mainfile).os_path, logger=logger)
            except Exception as e:
344
                self.fail('parser failed with exception', exc_info=e, error=str(e), **context)
345
                return
346
            except SystemExit:
347
                self.fail('parser raised system exit', error='system exit', **context)
348
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
349

350
351
        # add the non code specific calc metadata to the backend
        # all other quantities have been determined by parsers/normalizers
352
        self._parser_backend.openNonOverlappingSection('section_entry_info')
353
        self._parser_backend.addValue('upload_id', self.upload_id)
354
        self._parser_backend.addValue('calc_id', self.calc_id)
355
        self._parser_backend.addValue('calc_hash', self.metadata['calc_hash'])
356
        self._parser_backend.addValue('mainfile', self.mainfile)
357
        self._parser_backend.addValue('parser_name', self.parser)
358
359
360
361
362
363
364
365
366
367
368
369
        filepaths = self.metadata['files']
        self._parser_backend.addValue('number_of_files', len(filepaths))
        self._parser_backend.addValue('filepaths', filepaths)
        uploader = self.upload.uploader
        self._parser_backend.addValue(
            'entry_uploader_name', '%s, %s' % (uploader.first_name, uploader.last_name))
        self._parser_backend.addValue(
            'entry_uploader_id', str(uploader.user_id))
        self._parser_backend.addValue('entry_upload_time', int(self.upload.upload_time.timestamp()))
        self._parser_backend.closeNonOverlappingSection('section_entry_info')

        self.add_processor_info(self.parser)
370

Markus Scheidgen's avatar
Markus Scheidgen committed
371
372
        if self._parser_backend.status[0] != 'ParseSuccess':
            error = self._parser_backend.status[1]
373
            self.fail('parser failed', error=error, **context)
374
375
376
377
378
379
380
381

    @contextmanager
    def use_parser_backend(self, processor_name):
        self._parser_backend.reset_status()
        yield self._parser_backend
        self.add_processor_info(processor_name)

    def add_processor_info(self, processor_name: str) -> None:
382
        self._parser_backend.openContext('/section_entry_info/0')
383
384
385
386
387
388
389
        self._parser_backend.openNonOverlappingSection('section_archive_processing_info')
        self._parser_backend.addValue('archive_processor_name', processor_name)

        if self._parser_backend.status[0] == 'ParseSuccess':
            warnings = getattr(self._parser_backend, '_warnings', [])
            if len(warnings) > 0:
                self._parser_backend.addValue('archive_processor_status', 'WithWarnings')
390
                self._parser_backend.addValue('number_of_archive_processor_warnings', len(warnings))
391
392
393
394
                self._parser_backend.addArrayValues('archive_processor_warnings', [str(warning) for warning in warnings])
            else:
                self._parser_backend.addValue('archive_processor_status', 'Success')
        else:
395
            errors = self._parser_backend.status[1]
396
397
398
            self._parser_backend.addValue('archive_processor_error', str(errors))

        self._parser_backend.closeNonOverlappingSection('section_archive_processing_info')
399
        self._parser_backend.closeContext('/section_entry_info/0')
Markus Scheidgen's avatar
Markus Scheidgen committed
400
401
402

    @task
    def normalizing(self):
403
        ''' The *task* that encapsulates all normalizing related actions. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
404
        for normalizer in normalizers:
405
            if normalizer.domain != parser_dict[self.parser].domain:
406
407
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
408
            normalizer_name = normalizer.__name__
409
            context = dict(normalizer=normalizer_name, step=normalizer_name)
410
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
411
412

            with utils.timer(
413
                    logger, 'normalizer executed', input_size=self.mainfile_file.size):
414
                with self.use_parser_backend(normalizer_name) as backend:
415
416
417
                    try:
                        normalizer(backend).normalize(logger=logger)
                    except Exception as e:
418
                        self._parser_backend.finishedParsingSession('ParseFailure', [str(e)])
419
                        self.fail(
420
421
422
423
424
425
426
427
428
429
                            'normalizer failed with exception', exc_info=e, error=str(e), **context)
                        break
                    else:
                        if self._parser_backend.status[0] != 'ParseSuccess':
                            error = self._parser_backend.status[1]
                            self.fail('normalizer failed', error=error, **context)
                            break
                        else:
                            logger.debug(
                                'completed normalizer successfully', normalizer=normalizer_name)
Markus Scheidgen's avatar
Markus Scheidgen committed
430
431
432

    @task
    def archiving(self):
433
        ''' The *task* that encapsulates all archival related actions. '''
434
435
        logger = self.get_logger()

436
437
438
        entry_metadata = datamodel.EntryMetadata.m_from_dict(self.metadata)
        entry_metadata.apply_domain_metadata(self._parser_backend)
        entry_metadata.processed = True
439

440
441
        # persist the calc metadata
        with utils.timer(logger, 'saved calc metadata', step='metadata'):
442
            self.metadata = entry_metadata.m_to_dict(include_defaults=True)
443
444

        # index in search
445
        with utils.timer(logger, 'indexed', step='index'):
446
            entry_metadata.a_elastic.index()
447

448
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
449
        with utils.timer(
450
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
451
                input_size=self.mainfile_file.size) as log_data:
452

453
            with self.upload_files.archive_file(self.calc_id, 'wt') as out:
454
455
                json.dump(self._parser_backend.resource.m_to_dict(
                    lambda section: section.m_def.name in datamodel.root_sections), out, indent=2)
456

457
            log_data.update(archive_size=self.upload_files.archive_file_object(self.calc_id).size)
Markus Scheidgen's avatar
Markus Scheidgen committed
458
459
460
461

        # close loghandler
        if self._calc_proc_logwriter is not None:
            with utils.timer(
462
                    logger, 'archived log', step='logs',
Markus Scheidgen's avatar
Markus Scheidgen committed
463
                    input_size=self.mainfile_file.size) as log_data:
464
                self._calc_proc_logwriter_ctx.__exit__(None, None, None)  # pylint: disable=E1101
465
                self._calc_proc_logwriter = None
Markus Scheidgen's avatar
Markus Scheidgen committed
466

467
                log_data.update(log_size=self.upload_files.archive_log_file_object(self.calc_id).size)
Markus Scheidgen's avatar
Markus Scheidgen committed
468

469
    def __str__(self):
470
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
471

472

473
class Upload(Proc):
474
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
475
476
477
478
479
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
480
481
        upload_path: the path were the uploaded files was stored
        temporary: True if the uploaded file should be removed after extraction
Markus Scheidgen's avatar
Markus Scheidgen committed
482
483
        upload_id: the upload id generated by the database
        upload_time: the timestamp when the system realised the upload
484
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
485
486
        published: Boolean that indicates the publish status
        publish_time: Date when the upload was initially published
487
        last_update: Date of the last publishing/re-processing
Markus Scheidgen's avatar
Markus Scheidgen committed
488
        joined: Boolean indicates if the running processing has joined (:func:`check_join`)
489
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
490
491
492
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
493
494
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
495
    embargo_length = IntField(default=36)
Markus Scheidgen's avatar
Markus Scheidgen committed
496
497
498

    name = StringField(default=None)
    upload_time = DateTimeField()
499
    user_id = StringField(required=True)
500
501
    published = BooleanField(default=False)
    publish_time = DateTimeField()
502
    last_update = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
503

504
505
    joined = BooleanField(default=False)

Markus Scheidgen's avatar
Markus Scheidgen committed
506
507
    meta: Any = {
        'indexes': [
508
            'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
Markus Scheidgen's avatar
Markus Scheidgen committed
509
510
511
512
513
        ]
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
514
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
515

516
517
    @property
    def metadata(self) -> dict:
518
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
519
520
521
522
523
        Getter, setter for user metadata. Metadata is pickled to and from the public
        bucket to allow sharing among all processes. Usually uploads do not have (much)
        user defined metadata, but users provide all metadata per upload as part of
        the publish process. This will change, when we introduce editing functionality
        and metadata will be provided through different means.
524
        '''
525
526
527
528
529
        try:
            upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
        except KeyError:
            return None
        return upload_files.user_metadata
530
531
532

    @metadata.setter
    def metadata(self, data: dict) -> None:
533
534
        upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True, create=True)
        upload_files.user_metadata = data
535

Markus Scheidgen's avatar
Markus Scheidgen committed
536
    @classmethod
537
    def get(cls, id: str, include_published: bool = True) -> 'Upload':
538
        return cls.get_by_id(id, 'upload_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
539
540

    @classmethod
541
    def user_uploads(cls, user: datamodel.User, **kwargs) -> List['Upload']:
542
        ''' Returns all uploads for the given user. Kwargs are passed to mongo query. '''
543
        return cls.objects(user_id=str(user.user_id), **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
544

545
546
    @property
    def uploader(self):
547
        return datamodel.User.get(self.user_id)
548

Markus Scheidgen's avatar
Markus Scheidgen committed
549
550
    def get_logger(self, **kwargs):
        logger = super().get_logger()
551
552
        user = self.uploader
        user_name = '%s %s' % (user.first_name, user.last_name)
553
        # We are not using 'user_id' because logstash (?) will filter these entries ?!
554
555
        logger = logger.bind(
            upload_id=self.upload_id, upload_name=self.name, user_name=user_name,
556
            user=user.user_id, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
557
558
559
560
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
561
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
562
563
564
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
565
566

        Arguments:
567
            user: The user that created the upload.
568
        '''
569
570
        # use kwargs to keep compatibility with super method
        user: datamodel.User = kwargs['user']
571
        del(kwargs['user'])
572

573
574
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
575
        kwargs.update(user_id=user.user_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
576
        self = super().create(**kwargs)
577

Markus Scheidgen's avatar
Markus Scheidgen committed
578
        self._continue_with('uploading')
579

Markus Scheidgen's avatar
Markus Scheidgen committed
580
581
        return self

582
    def delete(self):
583
        ''' Deletes this upload process state entry and its calcs. '''
584
585
586
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

587
    def delete_upload_local(self):
588
        '''
589
        Deletes the upload, including its processing state and
590
        staging files. Local version without celery processing.
591
        '''
592
593
594
        logger = self.get_logger()

        with utils.lnr(logger, 'staged upload delete failed'):
595
            with utils.timer(
596
                    logger, 'upload deleted from index', step='index',
597
                    upload_size=self.upload_files.size):
598
                search.delete_upload(self.upload_id)
599

600
            with utils.timer(
601
                    logger, 'staged upload deleted', step='files',
602
603
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
604
605

            self.delete()
606

607
    @process
608
    def delete_upload(self):
609
        '''
610
611
        Deletes of the upload, including its processing state and
        staging files. This starts the celery process of deleting the upload.
612
        '''
613
        self.delete_upload_local()
614

615
        return True  # do not save the process status on the delete upload
616

617
    @process
618
    def publish_upload(self):
619
        '''
620
621
        Moves the upload out of staging to the public area. It will
        pack the staging upload files in to public upload files.
622
        '''
623
624
        assert self.processed_calcs > 0

625
        logger = self.get_logger()
626
        logger.info('started to publish')
627

628
        with utils.lnr(logger, 'publish failed'):
629
            calcs = self.entries_metadata(self.metadata)
630

631
            with utils.timer(
632
                    logger, 'upload metadata updated', step='metadata',
633
                    upload_size=self.upload_files.size):
634
635

                def create_update(calc):
636
                    calc.published = True
637
                    calc.with_embargo = calc.with_embargo if calc.with_embargo is not None else False
638
639
                    return UpdateOne(
                        {'_id': calc.calc_id},
640
                        {'$set': {'metadata': calc.m_to_dict(include_defaults=True)}})
641
642

                Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])
643

644
645
646
647
            if isinstance(self.upload_files, StagingUploadFiles):
                with utils.timer(
                        logger, 'staged upload files packed', step='pack',
                        upload_size=self.upload_files.size):
648
                    self.upload_files.pack(calcs)
649
650
651
652

            with utils.timer(
                    logger, 'index updated', step='index',
                    upload_size=self.upload_files.size):
653
                search.publish(calcs)
654

655
656
657
658
659
660
            if isinstance(self.upload_files, StagingUploadFiles):
                with utils.timer(
                        logger, 'staged upload deleted', step='delete staged',
                        upload_size=self.upload_files.size):
                    self.upload_files.delete()
                    self.published = True
661
662
                    self.publish_time = datetime.utcnow()
                    self.last_update = datetime.utcnow()
663
664
                    self.save()
            else:
665
                self.last_update = datetime.utcnow()
666
                self.save()
667

668
669
    @process
    def re_process_upload(self):
670
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
671
672
673
        A *process* that performs the re-processing of a earlier processed
        upload.

674
675
676
677
678
679
        Runs the distributed process of fully reparsing/renormalizing an existing and
        already published upload. Will renew the archive part of the upload and update
        mongo and elastic search entries.

        TODO this implementation does not do any re-matching. This will be more complex
        due to handling of new or missing matches.
680
        '''
681
682
        assert self.published

683
684
685
        logger = self.get_logger()
        logger.info('started to re-process')

686
687
688
689
690
691
        # mock the steps of actual processing
        self._continue_with('uploading')

        # extract the published raw files into a staging upload files instance
        self._continue_with('extracting')
        public_upload_files = cast(PublicUploadFiles, self.upload_files)
692
        staging_upload_files = public_upload_files.to_staging_upload_files(create=True)
693
694

        self._continue_with('parse_all')
695
        try:
696
            # check if a calc is already/still processing
697
698
699
700
701
702
703
704
            processing = Calc.objects(
                upload_id=self.upload_id,
                **Calc.process_running_mongoengine_query()).count()

            if processing > 0:
                logger.warn(
                    'processes are still/already running on calc, they will be resetted',
                    count=processing)
705
706

            # reset all calcs
707
            Calc._get_collection().update_many(
708
709
710
                dict(upload_id=self.upload_id),
                {'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)})

711
            # process call calcs
712
713
714
            Calc.process_all(Calc.re_process_calc, dict(upload_id=self.upload_id), exclude=['metadata'])

            logger.info('completed to trigger re-process of all calcs')
715
716
        except Exception as e:
            # try to remove the staging copy in failure case
717
718
            logger.error('failed to trigger re-process of all calcs', exc_info=e)

719
            if staging_upload_files is not None and staging_upload_files.exists():
720
721
722
                staging_upload_files.delete()

            raise e
723
724
725
726

        # the packing and removing of the staging upload files, will be trigged by
        # the 'cleanup' task after processing all calcs

727
728
    @process
    def re_pack(self):
729
        ''' A *process* that repacks the raw and archive data based on the current embargo data. '''
730
731
732
733
734
735
736
737
        assert self.published

        # mock the steps of actual processing
        self._continue_with('uploading')
        self._continue_with('extracting')
        self._continue_with('parse_all')
        self._continue_with('cleanup')

738
        self.upload_files.re_pack(self.entries_metadata())
739
740
741
        self.joined = True
        self._complete()

Markus Scheidgen's avatar
Markus Scheidgen committed
742
    @process
743
    def process_upload(self):
744
        ''' A *process* that performs the initial upload processing. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
745
746
747
748
749
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
750
        ''' A no-op *task* as a stand-in for receiving upload data. '''
Markus Scheidgen's avatar
Markus Scheidgen committed
751
752
        pass

753
    @property
754
755
    def upload_files(self) -> UploadFiles:
        upload_files_class = ArchiveBasedStagingUploadFiles if not self.published else PublicUploadFiles
756
        kwargs = dict(upload_path=self.upload_path) if not self.published else {}
757
758
759

        if not self._upload_files or not isinstance(self._upload_files, upload_files_class):
            self._upload_files = upload_files_class(
760
                self.upload_id, is_authorized=lambda: True, **kwargs)
761

762
        return self._upload_files
763

764
765
766
767
768
    @property
    def staging_upload_files(self) -> ArchiveBasedStagingUploadFiles:
        assert not self.published
        return cast(ArchiveBasedStagingUploadFiles, self.upload_files)

Markus Scheidgen's avatar
Markus Scheidgen committed
769
770
    @task
    def extracting(self):
771
        '''
772
773
        The *task* performed before the actual parsing/normalizing: extracting
        the uploaded files.
774
        '''
775
776
777
778
779
        # extract the uploaded file
        self._upload_files = ArchiveBasedStagingUploadFiles(
            upload_id=self.upload_id, is_authorized=lambda: True, create=True,
            upload_path=self.upload_path)

Markus Scheidgen's avatar
Markus Scheidgen committed
780
781
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
782
783
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
784
785
                    upload_size=self.upload_files.size):
                self.upload_files.extract()
786
787
788
789

            if self.temporary:
                os.remove(self.upload_path)
                self.upload_path = None
790

791
        except KeyError:
792
            self.fail('processing requested for non existing upload', log_level=logging.ERROR)
793
794
795
            return
        except ExtractError:
            self.fail('bad .zip/.tar file', log_level=logging.INFO)
Markus Scheidgen's avatar
Markus Scheidgen committed
796
797
            return

798
    def _preprocess_files(self, path):
799
        '''
800
801
        Some files need preprocessing. Currently we need to add a stripped POTCAR version
        and always restrict/embargo the original.
802
        '''
803
        if os.path.basename(path).startswith('POTCAR'):
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
            # create checksum
            hash = hashlib.sha224()
            with open(self.staging_upload_files.raw_file_object(path).os_path, 'rb') as orig_f:
                for line in orig_f.readlines():
                    hash.update(line)

            checksum = hash.hexdigest()

            # created stripped POTCAR
            stripped_path = path + '.stripped'
            with open(self.staging_upload_files.raw_file_object(stripped_path).os_path, 'wt') as stripped_f:
                stripped_f.write('Stripped POTCAR file. Checksum of original file (sha224): %s\n' % checksum)
            os.system(
                '''
                    awk < %s >> %s '
                    BEGIN { dump=1 }
                    /End of Dataset/ { dump=1 }
                    dump==1 { print }
                    /END of PSCTR/ { dump=0 }'
                ''' % (
                    self.staging_upload_files.raw_file_object(path).os_path,
                    self.staging_upload_files.raw_file_object(stripped_path).os_path))

827
    def match_mainfiles(self) -> Generator[Tuple[str, object], None, None]:
828
        '''
829
830
831
832
833
        Generator function that matches all files in the upload to all parsers to
        determine the upload's mainfiles.

        Returns:
            Tuples of mainfile, filename, and parsers
834
        '''
835
        directories_with_match: Dict[str, str] = dict()
836
837
        upload_files = self.staging_upload_files
        for filename in upload_files.raw_file_manifest():
838
            self._preprocess_files(filename)
839
            try:
840
                parser = match_parser(upload_files.raw_file_object(filename).os_path)
841
                if parser is not None:
842
843
                    directory = os.path.dirname(filename)
                    if directory in directories_with_match:
844
845
846
                        # TODO this might give us the chance to store directory based relationship
                        # between calcs for the future?
                        pass
847
848
                    else:
                        directories_with_match[directory] = filename
849
850

                    yield filename, parser
851
852
853
854
            except Exception as e:
                self.get_logger().error(
                    'exception while matching pot. mainfile',
                    mainfile=filename, exc_info=e)
855

Markus Scheidgen's avatar
Markus Scheidgen committed
856
857
    @task
    def parse_all(self):
858
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
859
        The *task* used to identify mainfile/parser combinations among the upload's files, creates
860
        respective :class:`Calc` instances, and triggers their processing.
861
        '''
862
863
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
864
865
        with utils.timer(
                logger, 'upload extracted', step='matching',
866
867
                upload_size=self.upload_files.size):
            for filename, parser in self.match_mainfiles():
868
                calc = Calc.create(
869
                    calc_id=self.upload_files.calc_id(filename),
870
                    mainfile=filename, parser=parser.name,
871
                    worker_hostname=self.worker_hostname,
872
873
                    upload_id=self.upload_id)

874
                calc.process_calc()
Markus Scheidgen's avatar
Markus Scheidgen committed
875

876
    def on_process_complete(self, process_name):
877
        if process_name == 'process_upload' or process_name == 're_process_upload':
878
879
880
            self.check_join()

    def check_join(self):
881
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
882
883
884
885
886
887
        Performs an evaluation of the join condition and triggers the :func:`cleanup`
        task if necessary. The join condition allows to run the ``cleanup`` after
        all calculations have been processed. The upload processing stops after all
        calculation processings have been triggered (:func:`parse_all` or
        :func:`re_process_upload`). The cleanup task is then run within the last
        calculation process (the one that triggered the join by calling this method).
888
        '''
889
890
891
892
        total_calcs = self.total_calcs
        processed_calcs = self.processed_calcs

        self.get_logger().debug('check join', processed_calcs=processed_calcs, total_calcs=total_calcs)
893
894
        # check if process is not running anymore, i.e. not still spawining new processes to join
        # check the join condition, i.e. all calcs have been processed
895
        if not self.process_running and processed_calcs >= total_calcs:
896
            # this can easily be called multiple times, e.g. upload finished after all calcs finished
897
898
899
            modified_upload = self._get_collection().find_one_and_update(
                {'_id': self.upload_id, 'joined': {'$ne': True}},
                {'$set': {'joined': True}})