data.py 45.6 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
Markus Scheidgen's avatar
Markus Scheidgen committed
22

Markus Scheidgen's avatar
Markus Scheidgen committed
23
.. autoclass:: Upload
Markus Scheidgen's avatar
Markus Scheidgen committed
24

Markus Scheidgen's avatar
Markus Scheidgen committed
25
26
"""

Markus Scheidgen's avatar
Markus Scheidgen committed
27
from typing import cast, List, Any, ContextManager, Tuple, Generator, Dict, cast
28
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField
Markus Scheidgen's avatar
Markus Scheidgen committed
29
import logging
30
from structlog import wrap_logger
31
from contextlib import contextmanager
32
import os.path
33
34
from datetime import datetime
from pymongo import UpdateOne
35
import hashlib
36
37
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
import json
Markus Scheidgen's avatar
Markus Scheidgen committed
38

39
from nomad import utils, config, infrastructure, search, datamodel
40
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles, StagingUploadFiles
41
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
42
from nomad.parsing import parser_dict, match_parser, LocalBackend
Markus Scheidgen's avatar
Markus Scheidgen committed
43
from nomad.normalizing import normalizers
44
from nomad.datamodel import UploadWithMetadata
Markus Scheidgen's avatar
Markus Scheidgen committed
45
46


47
48
49
50
51
52
53
def _pack_log_event(logger, method_name, event_dict):
    try:
        log_data = dict(event_dict)
        log_data.update(**{
            key: value
            for key, value in getattr(logger, '_context', {}).items()
            if key not in ['service', 'release', 'upload_id', 'calc_id', 'mainfile', 'process_status']})
54
        log_data.update(logger=logger.name)
55
56
57
58
59
60
61
62
63
64
65

        return log_data
    except Exception:
        # raising an exception would cause an indefinite loop
        return event_dict


_log_processors = [
    StackInfoRenderer(),
    _pack_log_event,
    format_exc_info,
66
    TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)]
67
68


69
70
71
72
73
74
_all_root_sections = []
for domain in datamodel.Domain.instances.values():
    for root_section in domain.root_sections:
        _all_root_sections.append(root_section)


Markus Scheidgen's avatar
Markus Scheidgen committed
75
class Calc(Proc):
Markus Scheidgen's avatar
Markus Scheidgen committed
76
77
78
79
80
81
    """
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

82
83
    The attribute list, does not include the various metadata properties generated
    while parsing, including ``code_name``, ``code_version``, etc.
Markus Scheidgen's avatar
Markus Scheidgen committed
84
85

    Attributes:
86
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
87
88
89
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
90

91
        metadata: the metadata record wit calc and user metadata, see :class:`datamodel.CalcWithMetadata`
Markus Scheidgen's avatar
Markus Scheidgen committed
92
    """
93
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
94
95
96
97
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

98
99
    metadata = DictField()

Markus Scheidgen's avatar
Markus Scheidgen committed
100
    meta: Any = {
Markus Scheidgen's avatar
Markus Scheidgen committed
101
        'indexes': [
102
103
104
105
            'upload_id',
            ('upload_id', 'mainfile'),
            ('upload_id', 'parser'),
            ('upload_id', 'tasks_status'),
106
            ('upload_id', 'process_status'),
107
108
109
            ('upload_id', 'metadata.nomad_version'),
            'metadata.published',
            'metadata.datasets'
Markus Scheidgen's avatar
Markus Scheidgen committed
110
            'metadata.pid'
Markus Scheidgen's avatar
Markus Scheidgen committed
111
112
113
114
115
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
116
        self._parser_backend: LocalBackend = None
117
118
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
119
        self._calc_proc_logwriter = None
120
        self._calc_proc_logwriter_ctx: ContextManager = None
Markus Scheidgen's avatar
Markus Scheidgen committed
121

122
123
124
125
126
127
128
129
130
131
    @classmethod
    def from_calc_with_metadata(cls, calc_with_metadata):
        calc = Calc.create(
            calc_id=calc_with_metadata.calc_id,
            upload_id=calc_with_metadata.upload_id,
            mainfile=calc_with_metadata.mainfile,
            metadata=calc_with_metadata.to_dict())

        return calc

Markus Scheidgen's avatar
Markus Scheidgen committed
132
133
    @classmethod
    def get(cls, id):
134
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
135

Markus Scheidgen's avatar
Markus Scheidgen committed
136
    @property
137
138
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
139

140
141
142
143
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
144
            self._upload.worker_hostname = self.worker_hostname
145
146
        return self._upload

147
148
149
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
150
151
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
152
153
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
154
    def get_logger(self, **kwargs):
155
156
157
158
        """
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
        """
159
160
161
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
162

163
        if self._calc_proc_logwriter_ctx is None:
164
165
166
167
168
169
170
171
172
173
174
175
            try:
                self._calc_proc_logwriter_ctx = self.upload_files.archive_log_file(self.calc_id, 'wt')
                self._calc_proc_logwriter = self._calc_proc_logwriter_ctx.__enter__()  # pylint: disable=E1101
            except KeyError:
                # cannot open log file
                pass

        if self._calc_proc_logwriter_ctx is None:
            return logger
        else:
            def save_to_calc_log(logger, method_name, event_dict):
                if self._calc_proc_logwriter is not None:
176
                    try:
177
178
179
                        dump_dict = dict(event_dict)
                        dump_dict.update(level=method_name.upper())
                        json.dump(dump_dict, self._calc_proc_logwriter, sort_keys=True)
180
                        self._calc_proc_logwriter.write('\n')
181

182
183
184
                    except Exception:
                        # Exceptions here will cause indefinite loop
                        pass
185
186
187

                return event_dict

188
            return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
189

190
191
192
193
194
195
196
    @process
    def re_process_calc(self):
        """
        Processes a calculation again. This means there is already metadata and
        instead of creating it initially, we are just updating the existing
        records.
        """
197
        parser = match_parser(self.mainfile, self.upload_files, strict=False)
198
199

        if parser is None and not config.reprocess_unmatched:
200
201
202
203
204
205
206
207
208
            # Remove the logsfile and set a fake logwriter to avoid creating a log file,
            # because we will probably remove this calc and don't want to have ghost logfiles.
            if self._calc_proc_logwriter_ctx is not None:
                self._calc_proc_logwriter_ctx.__exit__(None, None, None)
                self.upload_files.archive_log_file_object(self.calc_id).delete()

            self._calc_proc_logwriter_ctx = open('/dev/null', 'wt')
            self._calc_proc_logwriter = self._calc_proc_logwriter_ctx.__enter__()
            self.get_logger().error(
209
210
211
                'no parser matches during re-process, will not re-process this calc')

            self.errors = ['no parser matches during re-process, will not re-process this calc']
212
213
214
215
216
217

            # mock the steps of actual processing
            self._continue_with('parsing')
            self._continue_with('normalizing')
            self._continue_with('archiving')
            self._complete()
218
219
            return

220
        logger = self.get_logger()
221
        if parser is None:
222
223
            self.get_logger().error('no parser matches during re-process, use the old parser')
            self.errors = ['no matching parser found during re-processing']
224
        if self.parser != parser.name:
225
226
227
            self.parser = parser.name
            logger.info(
                'different parser matches during re-process, use new parser',
228
                parser=parser.name)
229

230
        try:
231
232
233
234
235
236
237
            calc_with_metadata = datamodel.CalcWithMetadata(**self.metadata)
            calc_with_metadata.upload_id = self.upload_id
            calc_with_metadata.calc_id = self.calc_id
            calc_with_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
            calc_with_metadata.mainfile = self.mainfile
            calc_with_metadata.nomad_version = config.version
            calc_with_metadata.nomad_commit = config.commit
238
            calc_with_metadata.last_processing = datetime.utcnow()
239
            calc_with_metadata.files = self.upload_files.calc_files(self.mainfile)
240
            self.metadata = calc_with_metadata.to_dict()
241
242
243
244
245
246
247
248
249
250
251
252
253

            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
            # close loghandler that was not closed due to failures
            try:
                if self._calc_proc_logwriter is not None:
                    self._calc_proc_logwriter.close()
                    self._calc_proc_logwriter = None
            except Exception as e:
                logger.error('could not close calculation proc log', exc_info=e)

Markus Scheidgen's avatar
Markus Scheidgen committed
254
    @process
255
    def process_calc(self):
256
257
258
259
        """
        Processes a new calculation that has no prior records in the mongo, elastic,
        or filesystem storage. It will create an initial set of (user) metadata.
        """
260
        logger = self.get_logger()
261
        if self.upload is None:
262
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
263
264

        try:
265
266
            # save preliminary minimum calc metadata in case processing fails
            # successful processing will replace it with the actual metadata
267
            calc_with_metadata = datamodel.CalcWithMetadata(
268
                domain=parser_dict[self.parser].domain,
269
270
271
272
273
                upload_id=self.upload_id,
                calc_id=self.calc_id,
                calc_hash=self.upload_files.calc_hash(self.mainfile),
                mainfile=self.mainfile)
            calc_with_metadata.published = False
274
            calc_with_metadata.uploader = self.upload.user_id
275
            calc_with_metadata.upload_time = self.upload.upload_time
276
            calc_with_metadata.upload_name = self.upload.name
277
            calc_with_metadata.nomad_version = config.version
278
            calc_with_metadata.nomad_commit = config.commit
279
            calc_with_metadata.last_processing = datetime.utcnow()
280
281
282
            calc_with_metadata.files = self.upload_files.calc_files(self.mainfile)
            self.metadata = calc_with_metadata.to_dict()

283
284
285
286
287
            if len(calc_with_metadata.files) >= config.auxfile_cutoff:
                self.warning(
                    'This calc has many aux files in its directory. '
                    'Have you placed many calculations in the same directory?')

Markus Scheidgen's avatar
Markus Scheidgen committed
288
289
290
291
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
292
            # close loghandler that was not closed due to failures
293
            try:
294
295
296
                if self._calc_proc_logwriter is not None:
                    self._calc_proc_logwriter.close()
                    self._calc_proc_logwriter = None
297
298
299
            except Exception as e:
                logger.error('could not close calculation proc log', exc_info=e)

300
301
302
303
    def fail(self, *errors, log_level=logging.ERROR, **kwargs):
        # in case of failure, index a minimum set of metadata and mark
        # processing failure
        try:
304
            calc_with_metadata = datamodel.CalcWithMetadata(**self.metadata)
305
306
307
308
309
310
311
312
            calc_with_metadata.formula = config.services.not_processed_value
            calc_with_metadata.basis_set = config.services.not_processed_value
            calc_with_metadata.xc_functional = config.services.not_processed_value
            calc_with_metadata.system = config.services.not_processed_value
            calc_with_metadata.crystal_system = config.services.not_processed_value
            calc_with_metadata.spacegroup = config.services.not_processed_value
            calc_with_metadata.spacegroup_symbol = config.services.not_processed_value
            calc_with_metadata.code_version = config.services.not_processed_value
313
314
315
316
317
318
319

            calc_with_metadata.code_name = config.services.not_processed_value
            if self.parser is not None:
                parser = parser_dict[self.parser]
                if hasattr(parser, 'code_name'):
                    calc_with_metadata.code_name = parser.code_name

320
321
322
323
324
325
326
327
            calc_with_metadata.processed = False
            self.metadata = calc_with_metadata.to_dict()
            search.Entry.from_calc_with_metadata(calc_with_metadata).save()
        except Exception as e:
            self.get_logger().error('could not index after processing failure', exc_info=e)

        super().fail(*errors, log_level=log_level, **kwargs)

328
329
330
331
    def on_process_complete(self, process_name):
        # the save might be necessary to correctly read the join condition from the db
        self.save()
        # in case of error, the process_name might be unknown
332
        if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None:
333
334
            self.upload.reload()
            self.upload.check_join()
Markus Scheidgen's avatar
Markus Scheidgen committed
335
336
337

    @task
    def parsing(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
338
        """ The *task* that encapsulates all parsing related actions. """
339
        context = dict(parser=self.parser, step=self.parser)
340
        logger = self.get_logger(**context)
341
        parser = parser_dict[self.parser]
342
        self.metadata['parser_name'] = self.parser
Markus Scheidgen's avatar
Markus Scheidgen committed
343

344
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
345
346
347
348
            try:
                self._parser_backend = parser.run(
                    self.upload_files.raw_file_object(self.mainfile).os_path, logger=logger)
            except Exception as e:
349
                self.fail('parser failed with exception', exc_info=e, error=str(e), **context)
350
                return
351
            except SystemExit:
352
                self.fail('parser raised system exit', error='system exit', **context)
353
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
354

355
356
        # add the non code specific calc metadata to the backend
        # all other quantities have been determined by parsers/normalizers
357
        self._parser_backend.openNonOverlappingSection('section_entry_info')
358
        self._parser_backend.addValue('upload_id', self.upload_id)
359
        self._parser_backend.addValue('calc_id', self.calc_id)
360
        self._parser_backend.addValue('calc_hash', self.metadata['calc_hash'])
361
        self._parser_backend.addValue('mainfile', self.mainfile)
362
        self._parser_backend.addValue('parser_name', self.parser)
363
364
365
366
367
368
369
370
371
372
373
374
        filepaths = self.metadata['files']
        self._parser_backend.addValue('number_of_files', len(filepaths))
        self._parser_backend.addValue('filepaths', filepaths)
        uploader = self.upload.uploader
        self._parser_backend.addValue(
            'entry_uploader_name', '%s, %s' % (uploader.first_name, uploader.last_name))
        self._parser_backend.addValue(
            'entry_uploader_id', str(uploader.user_id))
        self._parser_backend.addValue('entry_upload_time', int(self.upload.upload_time.timestamp()))
        self._parser_backend.closeNonOverlappingSection('section_entry_info')

        self.add_processor_info(self.parser)
375

Markus Scheidgen's avatar
Markus Scheidgen committed
376
377
        if self._parser_backend.status[0] != 'ParseSuccess':
            error = self._parser_backend.status[1]
378
            self.fail('parser failed', error=error, **context)
379
380
381
382
383
384
385
386

    @contextmanager
    def use_parser_backend(self, processor_name):
        self._parser_backend.reset_status()
        yield self._parser_backend
        self.add_processor_info(processor_name)

    def add_processor_info(self, processor_name: str) -> None:
387
        self._parser_backend.openContext('/section_entry_info/0')
388
389
390
391
392
393
394
        self._parser_backend.openNonOverlappingSection('section_archive_processing_info')
        self._parser_backend.addValue('archive_processor_name', processor_name)

        if self._parser_backend.status[0] == 'ParseSuccess':
            warnings = getattr(self._parser_backend, '_warnings', [])
            if len(warnings) > 0:
                self._parser_backend.addValue('archive_processor_status', 'WithWarnings')
395
                self._parser_backend.addValue('number_of_archive_processor_warnings', len(warnings))
396
397
398
399
                self._parser_backend.addArrayValues('archive_processor_warnings', [str(warning) for warning in warnings])
            else:
                self._parser_backend.addValue('archive_processor_status', 'Success')
        else:
400
            errors = self._parser_backend.status[1]
401
402
403
            self._parser_backend.addValue('archive_processor_error', str(errors))

        self._parser_backend.closeNonOverlappingSection('section_archive_processing_info')
404
        self._parser_backend.closeContext('/section_entry_info/0')
Markus Scheidgen's avatar
Markus Scheidgen committed
405
406
407

    @task
    def normalizing(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
408
        """ The *task* that encapsulates all normalizing related actions. """
Markus Scheidgen's avatar
Markus Scheidgen committed
409
        for normalizer in normalizers:
410
            if normalizer.domain != parser_dict[self.parser].domain:
411
412
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
413
            normalizer_name = normalizer.__name__
414
            context = dict(normalizer=normalizer_name, step=normalizer_name)
415
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
416
417

            with utils.timer(
418
                    logger, 'normalizer executed', input_size=self.mainfile_file.size):
419
                with self.use_parser_backend(normalizer_name) as backend:
420
421
422
                    try:
                        normalizer(backend).normalize(logger=logger)
                    except Exception as e:
423
                        self._parser_backend.finishedParsingSession('ParseFailure', [str(e)])
424
                        self.fail(
425
426
427
428
429
430
431
432
433
434
                            'normalizer failed with exception', exc_info=e, error=str(e), **context)
                        break
                    else:
                        if self._parser_backend.status[0] != 'ParseSuccess':
                            error = self._parser_backend.status[1]
                            self.fail('normalizer failed', error=error, **context)
                            break
                        else:
                            logger.debug(
                                'completed normalizer successfully', normalizer=normalizer_name)
Markus Scheidgen's avatar
Markus Scheidgen committed
435
436
437

    @task
    def archiving(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
438
        """ The *task* that encapsulates all archival related actions. """
439
440
        logger = self.get_logger()

441
        calc_with_metadata = datamodel.CalcWithMetadata(**self.metadata)
442
        calc_with_metadata.apply_domain_metadata(self._parser_backend)
443
        calc_with_metadata.processed = True
444

445
446
        # persist the calc metadata
        with utils.timer(logger, 'saved calc metadata', step='metadata'):
447
            self.metadata = calc_with_metadata.to_dict()
448
449

        # index in search
450
        with utils.timer(logger, 'indexed', step='index'):
451
            search.Entry.from_calc_with_metadata(calc_with_metadata).save()
452

453
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
454
        with utils.timer(
455
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
456
                input_size=self.mainfile_file.size) as log_data:
457
            with self.upload_files.archive_file(self.calc_id, 'wt') as out:
458
                self._parser_backend.write_json(out, pretty=True, root_sections=_all_root_sections)
459

460
            log_data.update(archive_size=self.upload_files.archive_file_object(self.calc_id).size)
Markus Scheidgen's avatar
Markus Scheidgen committed
461
462
463
464

        # close loghandler
        if self._calc_proc_logwriter is not None:
            with utils.timer(
465
                    logger, 'archived log', step='logs',
Markus Scheidgen's avatar
Markus Scheidgen committed
466
                    input_size=self.mainfile_file.size) as log_data:
467
                self._calc_proc_logwriter_ctx.__exit__(None, None, None)  # pylint: disable=E1101
468
                self._calc_proc_logwriter = None
Markus Scheidgen's avatar
Markus Scheidgen committed
469

470
                log_data.update(log_size=self.upload_files.archive_log_file_object(self.calc_id).size)
Markus Scheidgen's avatar
Markus Scheidgen committed
471

472
    def __str__(self):
473
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
474

475

476
class Upload(Proc):
Markus Scheidgen's avatar
Markus Scheidgen committed
477
478
479
480
481
482
    """
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
483
484
        upload_path: the path were the uploaded files was stored
        temporary: True if the uploaded file should be removed after extraction
Markus Scheidgen's avatar
Markus Scheidgen committed
485
486
        upload_id: the upload id generated by the database
        upload_time: the timestamp when the system realised the upload
487
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
488
489
        published: Boolean that indicates the publish status
        publish_time: Date when the upload was initially published
490
        last_update: Date of the last publishing/re-processing
Markus Scheidgen's avatar
Markus Scheidgen committed
491
        joined: Boolean indicates if the running processing has joined (:func:`check_join`)
Markus Scheidgen's avatar
Markus Scheidgen committed
492
493
494
495
    """
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
496
497
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
498
    embargo_length = IntField(default=36)
Markus Scheidgen's avatar
Markus Scheidgen committed
499
500
501

    name = StringField(default=None)
    upload_time = DateTimeField()
502
    user_id = StringField(required=True)
503
504
    published = BooleanField(default=False)
    publish_time = DateTimeField()
505
    last_update = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
506

507
508
    joined = BooleanField(default=False)

Markus Scheidgen's avatar
Markus Scheidgen committed
509
510
    meta: Any = {
        'indexes': [
511
            'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
Markus Scheidgen's avatar
Markus Scheidgen committed
512
513
514
515
516
        ]
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
517
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
518

519
520
    @property
    def metadata(self) -> dict:
Markus Scheidgen's avatar
Markus Scheidgen committed
521
522
523
524
525
526
527
        """
        Getter, setter for user metadata. Metadata is pickled to and from the public
        bucket to allow sharing among all processes. Usually uploads do not have (much)
        user defined metadata, but users provide all metadata per upload as part of
        the publish process. This will change, when we introduce editing functionality
        and metadata will be provided through different means.
        """
528
529
530
531
532
        try:
            upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
        except KeyError:
            return None
        return upload_files.user_metadata
533
534
535

    @metadata.setter
    def metadata(self, data: dict) -> None:
536
537
        upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True, create=True)
        upload_files.user_metadata = data
538

Markus Scheidgen's avatar
Markus Scheidgen committed
539
    @classmethod
540
    def get(cls, id: str, include_published: bool = True) -> 'Upload':
541
        return cls.get_by_id(id, 'upload_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
542
543

    @classmethod
544
    def user_uploads(cls, user: datamodel.User, **kwargs) -> List['Upload']:
545
546
        """ Returns all uploads for the given user. Kwargs are passed to mongo query. """
        return cls.objects(user_id=str(user.user_id), **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
547

548
549
    @property
    def uploader(self):
550
        return datamodel.User.get(self.user_id)
551

Markus Scheidgen's avatar
Markus Scheidgen committed
552
553
    def get_logger(self, **kwargs):
        logger = super().get_logger()
554
555
        user = self.uploader
        user_name = '%s %s' % (user.first_name, user.last_name)
556
        # We are not using 'user_id' because logstash (?) will filter these entries ?!
557
558
        logger = logger.bind(
            upload_id=self.upload_id, upload_name=self.name, user_name=user_name,
559
            user=user.user_id, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
560
561
562
563
564
565
566
567
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
        """
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
568
569

        Arguments:
570
            user: The user that created the upload.
Markus Scheidgen's avatar
Markus Scheidgen committed
571
        """
572
573
        # use kwargs to keep compatibility with super method
        user: datamodel.User = kwargs['user']
574
        del(kwargs['user'])
575

576
577
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
578
        kwargs.update(user_id=user.user_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
579
        self = super().create(**kwargs)
580

Markus Scheidgen's avatar
Markus Scheidgen committed
581
        self._continue_with('uploading')
582

Markus Scheidgen's avatar
Markus Scheidgen committed
583
584
        return self

585
586
587
588
589
    def delete(self):
        """ Deletes this upload process state entry and its calcs. """
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

590
    def delete_upload_local(self):
591
        """
592
        Deletes the upload, including its processing state and
593
        staging files. Local version without celery processing.
594
595
596
597
        """
        logger = self.get_logger()

        with utils.lnr(logger, 'staged upload delete failed'):
598
            with utils.timer(
599
                    logger, 'upload deleted from index', step='index',
600
                    upload_size=self.upload_files.size):
601
                search.delete_upload(self.upload_id)
602

603
            with utils.timer(
604
                    logger, 'staged upload deleted', step='files',
605
606
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
607
608

            self.delete()
609

610
    @process
611
    def delete_upload(self):
612
613
614
615
        """
        Deletes of the upload, including its processing state and
        staging files. This starts the celery process of deleting the upload.
        """
616
        self.delete_upload_local()
617

618
        return True  # do not save the process status on the delete upload
619

620
    @process
621
    def publish_upload(self):
622
        """
623
624
        Moves the upload out of staging to the public area. It will
        pack the staging upload files in to public upload files.
625
        """
626
627
        assert self.processed_calcs > 0

628
        logger = self.get_logger()
629
        logger.info('started to publish')
630

631
        with utils.lnr(logger, 'publish failed'):
632
            upload_with_metadata = self.to_upload_with_metadata(self.metadata)
633
            calcs = upload_with_metadata.calcs
634

635
            with utils.timer(
636
                    logger, 'upload metadata updated', step='metadata',
637
                    upload_size=self.upload_files.size):
638
639

                def create_update(calc):
640
                    calc.published = True
641
                    calc.with_embargo = calc.with_embargo if calc.with_embargo is not None else False
642
643
644
645
646
                    return UpdateOne(
                        {'_id': calc.calc_id},
                        {'$set': {'metadata': calc.to_dict()}})

                Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])
647

648
649
650
651
652
            if isinstance(self.upload_files, StagingUploadFiles):
                with utils.timer(
                        logger, 'staged upload files packed', step='pack',
                        upload_size=self.upload_files.size):
                    self.upload_files.pack(upload_with_metadata)
653
654
655
656

            with utils.timer(
                    logger, 'index updated', step='index',
                    upload_size=self.upload_files.size):
657
                search.publish(calcs)
658

659
660
661
662
663
664
            if isinstance(self.upload_files, StagingUploadFiles):
                with utils.timer(
                        logger, 'staged upload deleted', step='delete staged',
                        upload_size=self.upload_files.size):
                    self.upload_files.delete()
                    self.published = True
665
666
                    self.publish_time = datetime.utcnow()
                    self.last_update = datetime.utcnow()
667
668
                    self.save()
            else:
669
                self.last_update = datetime.utcnow()
670
                self.save()
671

672
673
674
    @process
    def re_process_upload(self):
        """
Markus Scheidgen's avatar
Markus Scheidgen committed
675
676
677
        A *process* that performs the re-processing of a earlier processed
        upload.

678
679
680
681
682
683
684
685
686
        Runs the distributed process of fully reparsing/renormalizing an existing and
        already published upload. Will renew the archive part of the upload and update
        mongo and elastic search entries.

        TODO this implementation does not do any re-matching. This will be more complex
        due to handling of new or missing matches.
        """
        assert self.published

687
688
689
        logger = self.get_logger()
        logger.info('started to re-process')

690
691
692
693
694
695
        # mock the steps of actual processing
        self._continue_with('uploading')

        # extract the published raw files into a staging upload files instance
        self._continue_with('extracting')
        public_upload_files = cast(PublicUploadFiles, self.upload_files)
696
        staging_upload_files = public_upload_files.to_staging_upload_files(create=True)
697
698

        self._continue_with('parse_all')
699
        try:
700
            # check if a calc is already/still processing
701
702
703
704
705
706
707
708
            processing = Calc.objects(
                upload_id=self.upload_id,
                **Calc.process_running_mongoengine_query()).count()

            if processing > 0:
                logger.warn(
                    'processes are still/already running on calc, they will be resetted',
                    count=processing)
709
710

            # reset all calcs
711
            Calc._get_collection().update_many(
712
713
714
                dict(upload_id=self.upload_id),
                {'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)})

715
            # process call calcs
716
717
718
            Calc.process_all(Calc.re_process_calc, dict(upload_id=self.upload_id), exclude=['metadata'])

            logger.info('completed to trigger re-process of all calcs')
719
720
        except Exception as e:
            # try to remove the staging copy in failure case
721
722
            logger.error('failed to trigger re-process of all calcs', exc_info=e)

723
            if staging_upload_files is not None and staging_upload_files.exists():
724
725
726
                staging_upload_files.delete()

            raise e
727
728
729
730

        # the packing and removing of the staging upload files, will be trigged by
        # the 'cleanup' task after processing all calcs

731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
    @process
    def re_pack(self):
        """ A *process* that repacks the raw and archive data based on the current embargo data. """
        assert self.published

        # mock the steps of actual processing
        self._continue_with('uploading')
        self._continue_with('extracting')
        self._continue_with('parse_all')
        self._continue_with('cleanup')

        self.upload_files.re_pack(self.to_upload_with_metadata())
        self.joined = True
        self._complete()

Markus Scheidgen's avatar
Markus Scheidgen committed
746
    @process
747
    def process_upload(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
748
        """ A *process* that performs the initial upload processing. """
Markus Scheidgen's avatar
Markus Scheidgen committed
749
750
751
752
753
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
754
        """ A no-op *task* as a stand-in for receiving upload data. """
Markus Scheidgen's avatar
Markus Scheidgen committed
755
756
        pass

757
    @property
758
759
    def upload_files(self) -> UploadFiles:
        upload_files_class = ArchiveBasedStagingUploadFiles if not self.published else PublicUploadFiles
760
        kwargs = dict(upload_path=self.upload_path) if not self.published else {}
761
762
763

        if not self._upload_files or not isinstance(self._upload_files, upload_files_class):
            self._upload_files = upload_files_class(
764
                self.upload_id, is_authorized=lambda: True, **kwargs)
765

766
        return self._upload_files
767

768
769
770
771
772
    @property
    def staging_upload_files(self) -> ArchiveBasedStagingUploadFiles:
        assert not self.published
        return cast(ArchiveBasedStagingUploadFiles, self.upload_files)

Markus Scheidgen's avatar
Markus Scheidgen committed
773
774
    @task
    def extracting(self):
775
        """
776
777
        The *task* performed before the actual parsing/normalizing: extracting
        the uploaded files.
778
        """
779
780
781
782
783
        # extract the uploaded file
        self._upload_files = ArchiveBasedStagingUploadFiles(
            upload_id=self.upload_id, is_authorized=lambda: True, create=True,
            upload_path=self.upload_path)

Markus Scheidgen's avatar
Markus Scheidgen committed
784
785
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
786
787
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
788
789
                    upload_size=self.upload_files.size):
                self.upload_files.extract()
790
791
792
793

            if self.temporary:
                os.remove(self.upload_path)
                self.upload_path = None
794

795
        except KeyError:
796
            self.fail('processing requested for non existing upload', log_level=logging.ERROR)
797
798
799
            return
        except ExtractError:
            self.fail('bad .zip/.tar file', log_level=logging.INFO)
Markus Scheidgen's avatar
Markus Scheidgen committed
800
801
            return

802
803
804
805
806
    def _preprocess_files(self, path):
        """
        Some files need preprocessing. Currently we need to add a stripped POTCAR version
        and always restrict/embargo the original.
        """
807
        if os.path.basename(path).startswith('POTCAR'):
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
            # create checksum
            hash = hashlib.sha224()
            with open(self.staging_upload_files.raw_file_object(path).os_path, 'rb') as orig_f:
                for line in orig_f.readlines():
                    hash.update(line)

            checksum = hash.hexdigest()

            # created stripped POTCAR
            stripped_path = path + '.stripped'
            with open(self.staging_upload_files.raw_file_object(stripped_path).os_path, 'wt') as stripped_f:
                stripped_f.write('Stripped POTCAR file. Checksum of original file (sha224): %s\n' % checksum)
            os.system(
                '''
                    awk < %s >> %s '
                    BEGIN { dump=1 }
                    /End of Dataset/ { dump=1 }
                    dump==1 { print }
                    /END of PSCTR/ { dump=0 }'
                ''' % (
                    self.staging_upload_files.raw_file_object(path).os_path,
                    self.staging_upload_files.raw_file_object(stripped_path).os_path))

831
    def match_mainfiles(self) -> Generator[Tuple[str, object], None, None]:
832
833
834
835
836
837
838
        """
        Generator function that matches all files in the upload to all parsers to
        determine the upload's mainfiles.

        Returns:
            Tuples of mainfile, filename, and parsers
        """
839
        directories_with_match: Dict[str, str] = dict()
840
841
        upload_files = self.staging_upload_files
        for filename in upload_files.raw_file_manifest():
842
            self._preprocess_files(filename)
843
            try:
844
                parser = match_parser(filename, upload_files)
845
                if parser is not None:
846
847
                    directory = os.path.dirname(filename)
                    if directory in directories_with_match:
848
849
850
                        # TODO this might give us the chance to store directory based relationship
                        # between calcs for the future?
                        pass
851
852
                    else:
                        directories_with_match[directory] = filename
853
854

                    yield filename, parser
855
856
857
858
            except Exception as e:
                self.get_logger().error(
                    'exception while matching pot. mainfile',
                    mainfile=filename, exc_info=e)
859

Markus Scheidgen's avatar
Markus Scheidgen committed
860
861
    @task
    def parse_all(self):
862
        """
Markus Scheidgen's avatar
Markus Scheidgen committed
863
        The *task* used to identify mainfile/parser combinations among the upload's files, creates
864
865
        respective :class:`Calc` instances, and triggers their processing.
        """
866
867
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
868
869
        with utils.timer(
                logger, 'upload extracted', step='matching',
870
871
                upload_size=self.upload_files.size):
            for filename, parser in self.match_mainfiles():
872
                calc = Calc.create(
873
                    calc_id=self.upload_files.calc_id(filename),
874
                    mainfile=filename, parser=parser.name,
875
                    worker_hostname=self.worker_hostname,
876
877
                    upload_id=self.upload_id)

878
                calc.process_calc()
Markus Scheidgen's avatar
Markus Scheidgen committed
879

880
    def on_process_complete(self, process_name):
881
        if process_name == 'process_upload' or process_name == 're_process_upload':
882
883
884
            self.check_join()

    def check_join(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
885
886
887
888
889
890
891
892
        """
        Performs an evaluation of the join condition and triggers the :func:`cleanup`
        task if necessary. The join condition allows to run the ``cleanup`` after
        all calculations have been processed. The upload processing stops after all
        calculation processings have been triggered (:func:`parse_all` or
        :func:`re_process_upload`). The cleanup task is then run within the last
        calculation process (the one that triggered the join by calling this method).
        """
893
894
895
896
        total_calcs = self.total_calcs
        processed_calcs = self.processed_calcs

        self.get_logger().debug('check join', processed_calcs=processed_calcs, total_calcs=total_calcs)
897
898
        # check if process is not running anymore, i.e. not still spawining new processes to join
        # check the join condition, i.e. all calcs have been processed
899
        if not self.process_running and processed_calcs >= total_calcs:
900
            # this can easily be called multiple times, e.g. upload finished after all calcs finished
901
902
903
            modified_upload = self._get_collection().find_one_and_update(
                {'_id': self.upload_id, 'joined': {'$ne': True}},
                {'$set': {'joined': True}})
904
905
906
907
908
909
            if modified_upload is not None:
                self.get_logger().debug('join')
                self.cleanup()
            else:
                # the join was already done due to a prior call
                pass
910

911
912
913
    def reset(self):
        self.joined = False
        super().reset()
Markus Scheidgen's avatar
Markus Scheidgen committed
914

915
916
917
918
919
920
    @classmethod
    def reset_pymongo_update(cls, worker_hostname: str = None):
        update = super().reset_pymongo_update()
        update.update(joined=False)
        return update

921
    def _cleanup_after_processing(self):
922
923
924
925
926
927
        # send email about process finish
        user = self.uploader
        name = '%s %s' % (user.first_name, user.last_name)
        message = '\n'.join([
            'Dear %s,' % name,
            '',
928
            'your data %suploaded at %s has completed processing.' % (
929
                '"%s" ' % self.name if self.name else '', self.upload_time.isoformat()),  # pylint: disable=no-member
930
            'You can review your data on your upload page: %s' % config.gui_url(),
931
932
933
934
            '',
            'If you encouter any issues with your upload, please let us know and replay to this email.',
            '',
            'The nomad team'
935
        ])
936
937
938
939
940
941
942
        try:
            infrastructure.send_mail(
                name=name, email=user.email, message=message, subject='Processing completed')
        except Exception as e:
            # probably due to email configuration problems
            # don't fail or present this error to clients
            self.logger.error('could not send after processing email', exc_info=e)
Markus Scheidgen's avatar
Markus Scheidgen committed
943

944
945
946
947
948
949
950
951
952
953
    def _cleanup_after_re_processing(self):
        logger = self.get_logger()
        logger.info('started to repack re-processed upload')

        staging_upload_files = self.upload_files.to_staging_upload_files()

        with utils.timer(
                logger, 'reprocessed staged upload packed', step='delete staged',
                upload_size=self.upload_files.size):

954
            staging_upload_files.pack(self.to_upload_with_metadata(), skip_raw=True)
955
956
957
958
959
960

        with utils.timer(
                logger, 'reprocessed staged upload deleted', step='delete staged',
                upload_size=self.upload_files.size):

            staging_upload_files.delete()
961
            self.last_update = datetime.utcnow()
962
963