data.py 44.3 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
Markus Scheidgen's avatar
Markus Scheidgen committed
22

Markus Scheidgen's avatar
Markus Scheidgen committed
23
.. autoclass:: Upload
Markus Scheidgen's avatar
Markus Scheidgen committed
24

Markus Scheidgen's avatar
Markus Scheidgen committed
25
26
"""

Markus Scheidgen's avatar
Markus Scheidgen committed
27
from typing import cast, List, Any, ContextManager, Tuple, Generator, Dict, cast
28
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField
Markus Scheidgen's avatar
Markus Scheidgen committed
29
import logging
30
from structlog import wrap_logger
31
from contextlib import contextmanager
32
import os.path
33
34
from datetime import datetime
from pymongo import UpdateOne
35
import hashlib
36
37
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
import json
Markus Scheidgen's avatar
Markus Scheidgen committed
38

39
from nomad import utils, config, infrastructure, search, datamodel
40
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles, StagingUploadFiles
41
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
42
from nomad.parsing import parser_dict, match_parser, LocalBackend
Markus Scheidgen's avatar
Markus Scheidgen committed
43
from nomad.normalizing import normalizers
44
from nomad.datamodel import UploadWithMetadata, Domain
Markus Scheidgen's avatar
Markus Scheidgen committed
45
46


47
48
49
50
51
52
53
def _pack_log_event(logger, method_name, event_dict):
    try:
        log_data = dict(event_dict)
        log_data.update(**{
            key: value
            for key, value in getattr(logger, '_context', {}).items()
            if key not in ['service', 'release', 'upload_id', 'calc_id', 'mainfile', 'process_status']})
54
        log_data.update(logger=logger.name)
55
56
57
58
59
60
61
62
63
64
65

        return log_data
    except Exception:
        # raising an exception would cause an indefinite loop
        return event_dict


_log_processors = [
    StackInfoRenderer(),
    _pack_log_event,
    format_exc_info,
66
    TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)]
67
68


Markus Scheidgen's avatar
Markus Scheidgen committed
69
class Calc(Proc):
Markus Scheidgen's avatar
Markus Scheidgen committed
70
71
72
73
74
75
    """
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

76
77
    The attribute list, does not include the various metadata properties generated
    while parsing, including ``code_name``, ``code_version``, etc.
Markus Scheidgen's avatar
Markus Scheidgen committed
78
79

    Attributes:
80
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
81
82
83
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
84

85
        metadata: the metadata record wit calc and user metadata, see :class:`datamodel.CalcWithMetadata`
Markus Scheidgen's avatar
Markus Scheidgen committed
86
    """
87
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
88
89
90
91
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

92
93
    metadata = DictField()

Markus Scheidgen's avatar
Markus Scheidgen committed
94
    meta: Any = {
Markus Scheidgen's avatar
Markus Scheidgen committed
95
        'indexes': [
96
97
98
99
            'upload_id',
            ('upload_id', 'mainfile'),
            ('upload_id', 'parser'),
            ('upload_id', 'tasks_status'),
100
            ('upload_id', 'process_status'),
101
102
103
            ('upload_id', 'metadata.nomad_version'),
            'metadata.published',
            'metadata.datasets'
Markus Scheidgen's avatar
Markus Scheidgen committed
104
            'metadata.pid'
Markus Scheidgen's avatar
Markus Scheidgen committed
105
106
107
108
109
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
110
        self._parser_backend: LocalBackend = None
111
112
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
113
        self._calc_proc_logwriter = None
114
        self._calc_proc_logwriter_ctx: ContextManager = None
Markus Scheidgen's avatar
Markus Scheidgen committed
115

116
117
118
119
120
121
122
123
124
125
    @classmethod
    def from_calc_with_metadata(cls, calc_with_metadata):
        calc = Calc.create(
            calc_id=calc_with_metadata.calc_id,
            upload_id=calc_with_metadata.upload_id,
            mainfile=calc_with_metadata.mainfile,
            metadata=calc_with_metadata.to_dict())

        return calc

Markus Scheidgen's avatar
Markus Scheidgen committed
126
127
    @classmethod
    def get(cls, id):
128
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
129

Markus Scheidgen's avatar
Markus Scheidgen committed
130
    @property
131
132
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
133

134
135
136
137
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
138
            self._upload.worker_hostname = self.worker_hostname
139
140
        return self._upload

141
142
143
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
144
145
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
146
147
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
148
    def get_logger(self, **kwargs):
149
150
151
152
        """
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
        """
153
154
155
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
156

157
        if self._calc_proc_logwriter_ctx is None:
158
159
160
161
162
163
164
165
166
167
168
169
            try:
                self._calc_proc_logwriter_ctx = self.upload_files.archive_log_file(self.calc_id, 'wt')
                self._calc_proc_logwriter = self._calc_proc_logwriter_ctx.__enter__()  # pylint: disable=E1101
            except KeyError:
                # cannot open log file
                pass

        if self._calc_proc_logwriter_ctx is None:
            return logger
        else:
            def save_to_calc_log(logger, method_name, event_dict):
                if self._calc_proc_logwriter is not None:
170
                    try:
171
172
173
                        dump_dict = dict(event_dict)
                        dump_dict.update(level=method_name.upper())
                        json.dump(dump_dict, self._calc_proc_logwriter, sort_keys=True)
174
                        self._calc_proc_logwriter.write('\n')
175

176
177
178
                    except Exception:
                        # Exceptions here will cause indefinite loop
                        pass
179
180
181

                return event_dict

182
            return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
183

184
185
186
187
188
189
190
191
192
    @process
    def re_process_calc(self):
        """
        Processes a calculation again. This means there is already metadata and
        instead of creating it initially, we are just updating the existing
        records.
        """
        logger = self.get_logger()

193
194
195
196
197
198
199
200
201
202
203
        parser = match_parser(self.mainfile, self.upload_files, strict=False)
        if parser is None:
            logger.error(
                'no parser matches during re-process, use the old parser',
                calc_id=self.calc_id)
        elif self.parser != parser.name:
            self.parser = parser.name
            logger.info(
                'different parser matches during re-process, use new parser',
                calc_id=self.calc_id, parser=parser.name)

204
        try:
205
206
207
208
209
210
211
            calc_with_metadata = datamodel.CalcWithMetadata(**self.metadata)
            calc_with_metadata.upload_id = self.upload_id
            calc_with_metadata.calc_id = self.calc_id
            calc_with_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
            calc_with_metadata.mainfile = self.mainfile
            calc_with_metadata.nomad_version = config.version
            calc_with_metadata.nomad_commit = config.commit
212
            calc_with_metadata.last_processing = datetime.utcnow()
213
            calc_with_metadata.files = self.upload_files.calc_files(self.mainfile)
214
            self.metadata = calc_with_metadata.to_dict()
215
216
217
218
219
220
221
222
223
224
225
226
227

            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
            # close loghandler that was not closed due to failures
            try:
                if self._calc_proc_logwriter is not None:
                    self._calc_proc_logwriter.close()
                    self._calc_proc_logwriter = None
            except Exception as e:
                logger.error('could not close calculation proc log', exc_info=e)

Markus Scheidgen's avatar
Markus Scheidgen committed
228
    @process
229
    def process_calc(self):
230
231
232
233
        """
        Processes a new calculation that has no prior records in the mongo, elastic,
        or filesystem storage. It will create an initial set of (user) metadata.
        """
234
        logger = self.get_logger()
235
        if self.upload is None:
236
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
237
238

        try:
239
240
            # save preliminary minimum calc metadata in case processing fails
            # successful processing will replace it with the actual metadata
241
            calc_with_metadata = datamodel.CalcWithMetadata(
242
243
244
245
246
                upload_id=self.upload_id,
                calc_id=self.calc_id,
                calc_hash=self.upload_files.calc_hash(self.mainfile),
                mainfile=self.mainfile)
            calc_with_metadata.published = False
247
            calc_with_metadata.uploader = self.upload.user_id
248
            calc_with_metadata.upload_time = self.upload.upload_time
249
            calc_with_metadata.upload_name = self.upload.name
250
            calc_with_metadata.nomad_version = config.version
251
            calc_with_metadata.nomad_commit = config.commit
252
            calc_with_metadata.last_processing = datetime.utcnow()
253
254
255
            calc_with_metadata.files = self.upload_files.calc_files(self.mainfile)
            self.metadata = calc_with_metadata.to_dict()

256
257
258
259
260
            if len(calc_with_metadata.files) >= config.auxfile_cutoff:
                self.warning(
                    'This calc has many aux files in its directory. '
                    'Have you placed many calculations in the same directory?')

Markus Scheidgen's avatar
Markus Scheidgen committed
261
262
263
264
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
265
            # close loghandler that was not closed due to failures
266
            try:
267
268
269
                if self._calc_proc_logwriter is not None:
                    self._calc_proc_logwriter.close()
                    self._calc_proc_logwriter = None
270
271
272
            except Exception as e:
                logger.error('could not close calculation proc log', exc_info=e)

273
274
275
276
    def fail(self, *errors, log_level=logging.ERROR, **kwargs):
        # in case of failure, index a minimum set of metadata and mark
        # processing failure
        try:
277
            calc_with_metadata = datamodel.CalcWithMetadata(**self.metadata)
278
279
280
281
282
283
284
285
            calc_with_metadata.formula = config.services.not_processed_value
            calc_with_metadata.basis_set = config.services.not_processed_value
            calc_with_metadata.xc_functional = config.services.not_processed_value
            calc_with_metadata.system = config.services.not_processed_value
            calc_with_metadata.crystal_system = config.services.not_processed_value
            calc_with_metadata.spacegroup = config.services.not_processed_value
            calc_with_metadata.spacegroup_symbol = config.services.not_processed_value
            calc_with_metadata.code_version = config.services.not_processed_value
286
287
288
289
290
291
292

            calc_with_metadata.code_name = config.services.not_processed_value
            if self.parser is not None:
                parser = parser_dict[self.parser]
                if hasattr(parser, 'code_name'):
                    calc_with_metadata.code_name = parser.code_name

293
294
295
296
297
298
299
300
            calc_with_metadata.processed = False
            self.metadata = calc_with_metadata.to_dict()
            search.Entry.from_calc_with_metadata(calc_with_metadata).save()
        except Exception as e:
            self.get_logger().error('could not index after processing failure', exc_info=e)

        super().fail(*errors, log_level=log_level, **kwargs)

301
302
303
304
    def on_process_complete(self, process_name):
        # the save might be necessary to correctly read the join condition from the db
        self.save()
        # in case of error, the process_name might be unknown
305
        if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None:
306
307
            self.upload.reload()
            self.upload.check_join()
Markus Scheidgen's avatar
Markus Scheidgen committed
308
309
310

    @task
    def parsing(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
311
        """ The *task* that encapsulates all parsing related actions. """
312
        context = dict(parser=self.parser, step=self.parser)
313
        logger = self.get_logger(**context)
314
        parser = parser_dict[self.parser]
315
        self.metadata['parser_name'] = self.parser
Markus Scheidgen's avatar
Markus Scheidgen committed
316

317
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
318
319
320
321
            try:
                self._parser_backend = parser.run(
                    self.upload_files.raw_file_object(self.mainfile).os_path, logger=logger)
            except Exception as e:
322
                self.fail('parser failed with exception', exc_info=e, error=str(e), **context)
323
                return
324
            except SystemExit:
325
                self.fail('parser raised system exit', error='system exit', **context)
326
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
327

328
329
        # add the non code specific calc metadata to the backend
        # all other quantities have been determined by parsers/normalizers
330
        self._parser_backend.openNonOverlappingSection('section_entry_info')
331
        self._parser_backend.addValue('upload_id', self.upload_id)
332
        self._parser_backend.addValue('calc_id', self.calc_id)
333
        self._parser_backend.addValue('calc_hash', self.metadata['calc_hash'])
334
        self._parser_backend.addValue('mainfile', self.mainfile)
335
        self._parser_backend.addValue('parser_name', self.parser)
336
337
338
339
340
341
342
343
344
345
346
347
        filepaths = self.metadata['files']
        self._parser_backend.addValue('number_of_files', len(filepaths))
        self._parser_backend.addValue('filepaths', filepaths)
        uploader = self.upload.uploader
        self._parser_backend.addValue(
            'entry_uploader_name', '%s, %s' % (uploader.first_name, uploader.last_name))
        self._parser_backend.addValue(
            'entry_uploader_id', str(uploader.user_id))
        self._parser_backend.addValue('entry_upload_time', int(self.upload.upload_time.timestamp()))
        self._parser_backend.closeNonOverlappingSection('section_entry_info')

        self.add_processor_info(self.parser)
348

Markus Scheidgen's avatar
Markus Scheidgen committed
349
350
        if self._parser_backend.status[0] != 'ParseSuccess':
            error = self._parser_backend.status[1]
351
            self.fail('parser failed', error=error, **context)
352
353
354
355
356
357
358
359

    @contextmanager
    def use_parser_backend(self, processor_name):
        self._parser_backend.reset_status()
        yield self._parser_backend
        self.add_processor_info(processor_name)

    def add_processor_info(self, processor_name: str) -> None:
360
        self._parser_backend.openContext('/section_entry_info/0')
361
362
363
364
365
366
367
        self._parser_backend.openNonOverlappingSection('section_archive_processing_info')
        self._parser_backend.addValue('archive_processor_name', processor_name)

        if self._parser_backend.status[0] == 'ParseSuccess':
            warnings = getattr(self._parser_backend, '_warnings', [])
            if len(warnings) > 0:
                self._parser_backend.addValue('archive_processor_status', 'WithWarnings')
368
                self._parser_backend.addValue('number_of_archive_processor_warnings', len(warnings))
369
370
371
372
                self._parser_backend.addArrayValues('archive_processor_warnings', [str(warning) for warning in warnings])
            else:
                self._parser_backend.addValue('archive_processor_status', 'Success')
        else:
373
            errors = self._parser_backend.status[1]
374
375
376
            self._parser_backend.addValue('archive_processor_error', str(errors))

        self._parser_backend.closeNonOverlappingSection('section_archive_processing_info')
377
        self._parser_backend.closeContext('/section_entry_info/0')
Markus Scheidgen's avatar
Markus Scheidgen committed
378
379
380

    @task
    def normalizing(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
381
        """ The *task* that encapsulates all normalizing related actions. """
Markus Scheidgen's avatar
Markus Scheidgen committed
382
        for normalizer in normalizers:
383
384
385
            if normalizer.domain != config.domain:
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
386
            normalizer_name = normalizer.__name__
387
            context = dict(normalizer=normalizer_name, step=normalizer_name)
388
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
389
390

            with utils.timer(
391
                    logger, 'normalizer executed', input_size=self.mainfile_file.size):
392
                with self.use_parser_backend(normalizer_name) as backend:
393
394
395
                    try:
                        normalizer(backend).normalize(logger=logger)
                    except Exception as e:
396
                        self._parser_backend.finishedParsingSession('ParseFailure', [str(e)])
397
                        self.fail(
398
399
400
401
402
403
404
405
406
407
                            'normalizer failed with exception', exc_info=e, error=str(e), **context)
                        break
                    else:
                        if self._parser_backend.status[0] != 'ParseSuccess':
                            error = self._parser_backend.status[1]
                            self.fail('normalizer failed', error=error, **context)
                            break
                        else:
                            logger.debug(
                                'completed normalizer successfully', normalizer=normalizer_name)
Markus Scheidgen's avatar
Markus Scheidgen committed
408
409
410

    @task
    def archiving(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
411
        """ The *task* that encapsulates all archival related actions. """
412
413
        logger = self.get_logger()

414
        calc_with_metadata = datamodel.CalcWithMetadata(**self.metadata)
415
        calc_with_metadata.apply_domain_metadata(self._parser_backend)
416
        calc_with_metadata.processed = True
417

418
419
        # persist the calc metadata
        with utils.timer(logger, 'saved calc metadata', step='metadata'):
420
            self.metadata = calc_with_metadata.to_dict()
421
422

        # index in search
423
        with utils.timer(logger, 'indexed', step='index'):
424
            search.Entry.from_calc_with_metadata(calc_with_metadata).save()
425

426
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
427
        with utils.timer(
428
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
429
                input_size=self.mainfile_file.size) as log_data:
430
            with self.upload_files.archive_file(self.calc_id, 'wt') as out:
431
                self._parser_backend.write_json(out, pretty=True, root_sections=Domain.instance.root_sections)
432

433
            log_data.update(archive_size=self.upload_files.archive_file_object(self.calc_id).size)
Markus Scheidgen's avatar
Markus Scheidgen committed
434
435
436
437

        # close loghandler
        if self._calc_proc_logwriter is not None:
            with utils.timer(
438
                    logger, 'archived log', step='logs',
Markus Scheidgen's avatar
Markus Scheidgen committed
439
                    input_size=self.mainfile_file.size) as log_data:
440
                self._calc_proc_logwriter_ctx.__exit__(None, None, None)  # pylint: disable=E1101
441
                self._calc_proc_logwriter = None
Markus Scheidgen's avatar
Markus Scheidgen committed
442

443
                log_data.update(log_size=self.upload_files.archive_log_file_object(self.calc_id).size)
Markus Scheidgen's avatar
Markus Scheidgen committed
444

445
    def __str__(self):
446
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
447

448

449
class Upload(Proc):
Markus Scheidgen's avatar
Markus Scheidgen committed
450
451
452
453
454
455
    """
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
456
457
        upload_path: the path were the uploaded files was stored
        temporary: True if the uploaded file should be removed after extraction
Markus Scheidgen's avatar
Markus Scheidgen committed
458
459
        upload_id: the upload id generated by the database
        upload_time: the timestamp when the system realised the upload
460
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
461
462
        published: Boolean that indicates the publish status
        publish_time: Date when the upload was initially published
463
        last_update: Date of the last publishing/re-processing
Markus Scheidgen's avatar
Markus Scheidgen committed
464
        joined: Boolean indicates if the running processing has joined (:func:`check_join`)
Markus Scheidgen's avatar
Markus Scheidgen committed
465
466
467
468
    """
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
469
470
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
471
    embargo_length = IntField(default=36)
Markus Scheidgen's avatar
Markus Scheidgen committed
472
473
474

    name = StringField(default=None)
    upload_time = DateTimeField()
475
    user_id = StringField(required=True)
476
477
    published = BooleanField(default=False)
    publish_time = DateTimeField()
478
    last_update = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
479

480
481
    joined = BooleanField(default=False)

Markus Scheidgen's avatar
Markus Scheidgen committed
482
483
    meta: Any = {
        'indexes': [
484
            'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
Markus Scheidgen's avatar
Markus Scheidgen committed
485
486
487
488
489
        ]
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
490
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
491

492
493
    @property
    def metadata(self) -> dict:
Markus Scheidgen's avatar
Markus Scheidgen committed
494
495
496
497
498
499
500
        """
        Getter, setter for user metadata. Metadata is pickled to and from the public
        bucket to allow sharing among all processes. Usually uploads do not have (much)
        user defined metadata, but users provide all metadata per upload as part of
        the publish process. This will change, when we introduce editing functionality
        and metadata will be provided through different means.
        """
501
502
503
504
505
        try:
            upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
        except KeyError:
            return None
        return upload_files.user_metadata
506
507
508

    @metadata.setter
    def metadata(self, data: dict) -> None:
509
510
        upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True, create=True)
        upload_files.user_metadata = data
511

Markus Scheidgen's avatar
Markus Scheidgen committed
512
    @classmethod
513
    def get(cls, id: str, include_published: bool = True) -> 'Upload':
514
        return cls.get_by_id(id, 'upload_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
515
516

    @classmethod
517
    def user_uploads(cls, user: datamodel.User, **kwargs) -> List['Upload']:
518
519
        """ Returns all uploads for the given user. Kwargs are passed to mongo query. """
        return cls.objects(user_id=str(user.user_id), **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
520

521
522
    @property
    def uploader(self):
523
        return datamodel.User.get(self.user_id)
524

Markus Scheidgen's avatar
Markus Scheidgen committed
525
526
    def get_logger(self, **kwargs):
        logger = super().get_logger()
527
528
        user = self.uploader
        user_name = '%s %s' % (user.first_name, user.last_name)
529
        # We are not using 'user_id' because logstash (?) will filter these entries ?!
530
531
        logger = logger.bind(
            upload_id=self.upload_id, upload_name=self.name, user_name=user_name,
532
            user=user.user_id, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
533
534
535
536
537
538
539
540
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
        """
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
541
542

        Arguments:
543
            user: The user that created the upload.
Markus Scheidgen's avatar
Markus Scheidgen committed
544
        """
545
546
        # use kwargs to keep compatibility with super method
        user: datamodel.User = kwargs['user']
547
        del(kwargs['user'])
548

549
550
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
551
        kwargs.update(user_id=user.user_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
552
        self = super().create(**kwargs)
553

Markus Scheidgen's avatar
Markus Scheidgen committed
554
        self._continue_with('uploading')
555

Markus Scheidgen's avatar
Markus Scheidgen committed
556
557
        return self

558
559
560
561
562
    def delete(self):
        """ Deletes this upload process state entry and its calcs. """
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

563
    def delete_upload_local(self):
564
        """
565
        Deletes the upload, including its processing state and
566
        staging files. Local version without celery processing.
567
568
569
570
        """
        logger = self.get_logger()

        with utils.lnr(logger, 'staged upload delete failed'):
571
            with utils.timer(
572
                    logger, 'upload deleted from index', step='index',
573
                    upload_size=self.upload_files.size):
574
                search.delete_upload(self.upload_id)
575

576
            with utils.timer(
577
                    logger, 'staged upload deleted', step='files',
578
579
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
580
581

            self.delete()
582

583
    @process
584
    def delete_upload(self):
585
586
587
588
        """
        Deletes of the upload, including its processing state and
        staging files. This starts the celery process of deleting the upload.
        """
589
        self.delete_upload_local()
590

591
        return True  # do not save the process status on the delete upload
592

593
    @process
594
    def publish_upload(self):
595
        """
596
597
        Moves the upload out of staging to the public area. It will
        pack the staging upload files in to public upload files.
598
        """
599
600
        assert self.processed_calcs > 0

601
        logger = self.get_logger()
602
        logger.info('started to publish')
603

604
        with utils.lnr(logger, 'publish failed'):
605
            upload_with_metadata = self.to_upload_with_metadata(self.metadata)
606
            calcs = upload_with_metadata.calcs
607

608
            with utils.timer(
609
                    logger, 'upload metadata updated', step='metadata',
610
                    upload_size=self.upload_files.size):
611
612

                def create_update(calc):
613
                    calc.published = True
614
                    calc.with_embargo = calc.with_embargo if calc.with_embargo is not None else False
615
616
617
618
619
                    return UpdateOne(
                        {'_id': calc.calc_id},
                        {'$set': {'metadata': calc.to_dict()}})

                Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])
620

621
622
623
624
625
            if isinstance(self.upload_files, StagingUploadFiles):
                with utils.timer(
                        logger, 'staged upload files packed', step='pack',
                        upload_size=self.upload_files.size):
                    self.upload_files.pack(upload_with_metadata)
626
627
628
629

            with utils.timer(
                    logger, 'index updated', step='index',
                    upload_size=self.upload_files.size):
630
                search.publish(calcs)
631

632
633
634
635
636
637
            if isinstance(self.upload_files, StagingUploadFiles):
                with utils.timer(
                        logger, 'staged upload deleted', step='delete staged',
                        upload_size=self.upload_files.size):
                    self.upload_files.delete()
                    self.published = True
638
639
                    self.publish_time = datetime.utcnow()
                    self.last_update = datetime.utcnow()
640
641
                    self.save()
            else:
642
                self.last_update = datetime.utcnow()
643
                self.save()
644

645
646
647
    @process
    def re_process_upload(self):
        """
Markus Scheidgen's avatar
Markus Scheidgen committed
648
649
650
        A *process* that performs the re-processing of a earlier processed
        upload.

651
652
653
654
655
656
657
658
659
        Runs the distributed process of fully reparsing/renormalizing an existing and
        already published upload. Will renew the archive part of the upload and update
        mongo and elastic search entries.

        TODO this implementation does not do any re-matching. This will be more complex
        due to handling of new or missing matches.
        """
        assert self.published

660
661
662
        logger = self.get_logger()
        logger.info('started to re-process')

663
664
665
666
667
668
        # mock the steps of actual processing
        self._continue_with('uploading')

        # extract the published raw files into a staging upload files instance
        self._continue_with('extracting')
        public_upload_files = cast(PublicUploadFiles, self.upload_files)
669
        staging_upload_files = public_upload_files.to_staging_upload_files(create=True)
670
671

        self._continue_with('parse_all')
672
        try:
673
            # check if a calc is already/still processing
674
675
676
677
678
679
680
681
            processing = Calc.objects(
                upload_id=self.upload_id,
                **Calc.process_running_mongoengine_query()).count()

            if processing > 0:
                logger.warn(
                    'processes are still/already running on calc, they will be resetted',
                    count=processing)
682
683

            # reset all calcs
684
            Calc._get_collection().update_many(
685
686
687
                dict(upload_id=self.upload_id),
                {'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)})

688
            # process call calcs
689
690
691
            Calc.process_all(Calc.re_process_calc, dict(upload_id=self.upload_id), exclude=['metadata'])

            logger.info('completed to trigger re-process of all calcs')
692
693
        except Exception as e:
            # try to remove the staging copy in failure case
694
695
            logger.error('failed to trigger re-process of all calcs', exc_info=e)

696
            if staging_upload_files is not None and staging_upload_files.exists():
697
698
699
                staging_upload_files.delete()

            raise e
700
701
702
703

        # the packing and removing of the staging upload files, will be trigged by
        # the 'cleanup' task after processing all calcs

704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
    @process
    def re_pack(self):
        """ A *process* that repacks the raw and archive data based on the current embargo data. """
        assert self.published

        # mock the steps of actual processing
        self._continue_with('uploading')
        self._continue_with('extracting')
        self._continue_with('parse_all')
        self._continue_with('cleanup')

        self.upload_files.re_pack(self.to_upload_with_metadata())
        self.joined = True
        self._complete()

Markus Scheidgen's avatar
Markus Scheidgen committed
719
    @process
720
    def process_upload(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
721
        """ A *process* that performs the initial upload processing. """
Markus Scheidgen's avatar
Markus Scheidgen committed
722
723
724
725
726
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
727
        """ A no-op *task* as a stand-in for receiving upload data. """
Markus Scheidgen's avatar
Markus Scheidgen committed
728
729
        pass

730
    @property
731
732
    def upload_files(self) -> UploadFiles:
        upload_files_class = ArchiveBasedStagingUploadFiles if not self.published else PublicUploadFiles
733
        kwargs = dict(upload_path=self.upload_path) if not self.published else {}
734
735
736

        if not self._upload_files or not isinstance(self._upload_files, upload_files_class):
            self._upload_files = upload_files_class(
737
                self.upload_id, is_authorized=lambda: True, **kwargs)
738

739
        return self._upload_files
740

741
742
743
744
745
    @property
    def staging_upload_files(self) -> ArchiveBasedStagingUploadFiles:
        assert not self.published
        return cast(ArchiveBasedStagingUploadFiles, self.upload_files)

Markus Scheidgen's avatar
Markus Scheidgen committed
746
747
    @task
    def extracting(self):
748
        """
749
750
        The *task* performed before the actual parsing/normalizing: extracting
        the uploaded files.
751
        """
752
753
754
755
756
        # extract the uploaded file
        self._upload_files = ArchiveBasedStagingUploadFiles(
            upload_id=self.upload_id, is_authorized=lambda: True, create=True,
            upload_path=self.upload_path)

Markus Scheidgen's avatar
Markus Scheidgen committed
757
758
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
759
760
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
761
762
                    upload_size=self.upload_files.size):
                self.upload_files.extract()
763
764
765
766

            if self.temporary:
                os.remove(self.upload_path)
                self.upload_path = None
767

768
        except KeyError:
769
            self.fail('processing requested for non existing upload', log_level=logging.ERROR)
770
771
772
            return
        except ExtractError:
            self.fail('bad .zip/.tar file', log_level=logging.INFO)
Markus Scheidgen's avatar
Markus Scheidgen committed
773
774
            return

775
776
777
778
779
    def _preprocess_files(self, path):
        """
        Some files need preprocessing. Currently we need to add a stripped POTCAR version
        and always restrict/embargo the original.
        """
780
        if os.path.basename(path).startswith('POTCAR'):
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
            # create checksum
            hash = hashlib.sha224()
            with open(self.staging_upload_files.raw_file_object(path).os_path, 'rb') as orig_f:
                for line in orig_f.readlines():
                    hash.update(line)

            checksum = hash.hexdigest()

            # created stripped POTCAR
            stripped_path = path + '.stripped'
            with open(self.staging_upload_files.raw_file_object(stripped_path).os_path, 'wt') as stripped_f:
                stripped_f.write('Stripped POTCAR file. Checksum of original file (sha224): %s\n' % checksum)
            os.system(
                '''
                    awk < %s >> %s '
                    BEGIN { dump=1 }
                    /End of Dataset/ { dump=1 }
                    dump==1 { print }
                    /END of PSCTR/ { dump=0 }'
                ''' % (
                    self.staging_upload_files.raw_file_object(path).os_path,
                    self.staging_upload_files.raw_file_object(stripped_path).os_path))

804
    def match_mainfiles(self) -> Generator[Tuple[str, object], None, None]:
805
806
807
808
809
810
811
        """
        Generator function that matches all files in the upload to all parsers to
        determine the upload's mainfiles.

        Returns:
            Tuples of mainfile, filename, and parsers
        """
812
        directories_with_match: Dict[str, str] = dict()
813
814
        upload_files = self.staging_upload_files
        for filename in upload_files.raw_file_manifest():
815
            self._preprocess_files(filename)
816
            try:
817
                parser = match_parser(filename, upload_files)
818
                if parser is not None:
819
820
                    directory = os.path.dirname(filename)
                    if directory in directories_with_match:
821
822
823
                        # TODO this might give us the chance to store directory based relationship
                        # between calcs for the future?
                        pass
824
825
                    else:
                        directories_with_match[directory] = filename
826
827

                    yield filename, parser
828
829
830
831
            except Exception as e:
                self.get_logger().error(
                    'exception while matching pot. mainfile',
                    mainfile=filename, exc_info=e)
832

Markus Scheidgen's avatar
Markus Scheidgen committed
833
834
    @task
    def parse_all(self):
835
        """
Markus Scheidgen's avatar
Markus Scheidgen committed
836
        The *task* used to identify mainfile/parser combinations among the upload's files, creates
837
838
        respective :class:`Calc` instances, and triggers their processing.
        """
839
840
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
841
842
        with utils.timer(
                logger, 'upload extracted', step='matching',
843
844
                upload_size=self.upload_files.size):
            for filename, parser in self.match_mainfiles():
845
                calc = Calc.create(
846
                    calc_id=self.upload_files.calc_id(filename),
847
                    mainfile=filename, parser=parser.name,
848
                    worker_hostname=self.worker_hostname,
849
850
                    upload_id=self.upload_id)

851
                calc.process_calc()
Markus Scheidgen's avatar
Markus Scheidgen committed
852

853
    def on_process_complete(self, process_name):
854
        if process_name == 'process_upload' or process_name == 're_process_upload':
855
856
857
            self.check_join()

    def check_join(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
858
859
860
861
862
863
864
865
        """
        Performs an evaluation of the join condition and triggers the :func:`cleanup`
        task if necessary. The join condition allows to run the ``cleanup`` after
        all calculations have been processed. The upload processing stops after all
        calculation processings have been triggered (:func:`parse_all` or
        :func:`re_process_upload`). The cleanup task is then run within the last
        calculation process (the one that triggered the join by calling this method).
        """
866
867
868
869
        total_calcs = self.total_calcs
        processed_calcs = self.processed_calcs

        self.get_logger().debug('check join', processed_calcs=processed_calcs, total_calcs=total_calcs)
870
871
        # check if process is not running anymore, i.e. not still spawining new processes to join
        # check the join condition, i.e. all calcs have been processed
872
        if not self.process_running and processed_calcs >= total_calcs:
873
            # this can easily be called multiple times, e.g. upload finished after all calcs finished
874
875
876
            modified_upload = self._get_collection().find_one_and_update(
                {'_id': self.upload_id, 'joined': {'$ne': True}},
                {'$set': {'joined': True}})
877
878
879
880
881
882
            if modified_upload is not None:
                self.get_logger().debug('join')
                self.cleanup()
            else:
                # the join was already done due to a prior call
                pass
883

884
885
886
    def reset(self):
        self.joined = False
        super().reset()
Markus Scheidgen's avatar
Markus Scheidgen committed
887

888
889
890
891
892
893
    @classmethod
    def reset_pymongo_update(cls, worker_hostname: str = None):
        update = super().reset_pymongo_update()
        update.update(joined=False)
        return update

894
    def _cleanup_after_processing(self):
895
896
897
898
899
900
        # send email about process finish
        user = self.uploader
        name = '%s %s' % (user.first_name, user.last_name)
        message = '\n'.join([
            'Dear %s,' % name,
            '',
901
            'your data %suploaded at %s has completed processing.' % (
902
                '"%s" ' % self.name if self.name else '', self.upload_time.isoformat()),  # pylint: disable=no-member
903
            'You can review your data on your upload page: %s' % config.gui_url(),
904
905
906
907
            '',
            'If you encouter any issues with your upload, please let us know and replay to this email.',
            '',
            'The nomad team'
908
        ])
909
910
911
912
913
914
915
        try:
            infrastructure.send_mail(
                name=name, email=user.email, message=message, subject='Processing completed')
        except Exception as e:
            # probably due to email configuration problems
            # don't fail or present this error to clients
            self.logger.error('could not send after processing email', exc_info=e)
Markus Scheidgen's avatar
Markus Scheidgen committed
916

917
918
919
920
921
922
923
924
925
926
    def _cleanup_after_re_processing(self):
        logger = self.get_logger()
        logger.info('started to repack re-processed upload')

        staging_upload_files = self.upload_files.to_staging_upload_files()

        with utils.timer(
                logger, 'reprocessed staged upload packed', step='delete staged',
                upload_size=self.upload_files.size):

927
            staging_upload_files.pack(self.to_upload_with_metadata(), skip_raw=True)
928
929
930
931
932
933

        with utils.timer(
                logger, 'reprocessed staged upload deleted', step='delete staged',
                upload_size=self.upload_files.size):

            staging_upload_files.delete()
934
            self.last_update = datetime.utcnow()
935
936
937
938
            self.save()

    @task
    def cleanup(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
939
940
941
942
        """
        The *task* that "cleans" the processing, i.e. removed obsolete files and performs
        pending archival operations. Depends on the type of processing.
        """
943
944
945
946
947
948
949
        search.refresh()

        if self.current_process == 're_process_upload':
            self._cleanup_after_re_processing()
        else:
            self._cleanup_after_processing()

950
    def get_calc(self, calc_id) -> Calc:
Markus Scheidgen's avatar
Markus Scheidgen committed
951
        """ Returns the upload calc with the given id or ``None``. """
952
953
        return Calc.objects(upload_id=self.upload_id, calc_id=calc_id).first()

Markus Scheidgen's avatar
Markus Scheidgen committed
954
    @property
955
    def processed_calcs(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
956
957
958
959
        """
        The number of successfully or not successfully processed calculations. I.e.
        calculations that have finished processing.
        """
960
        return Calc.objects(upload_id=self.upload_id, tasks_status__in=[SUCCESS, FAILURE]).count()
961
962
963

    @property
    def total_calcs(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
964
        """ The number of all calculations. """
965
966
967
968
        return Calc.objects(upload_id=self.upload_id).count()

    @property
    def failed_calcs(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
969
        """ The number of calculations with failed processing. """
970
        return Calc.objects(upload_id=self.upload_id, tasks_status=FAILURE).count()
Markus Scheidgen's avatar