data.py 39.4 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
    :members:
.. autoclass:: Upload
    :members:
"""

Markus Scheidgen's avatar
Markus Scheidgen committed
27
from typing import cast, List, Any, ContextManager, Tuple, Generator, Dict, cast
28
from mongoengine import StringField, DateTimeField, DictField, BooleanField
Markus Scheidgen's avatar
Markus Scheidgen committed
29
import logging
30
from structlog import wrap_logger
31
from contextlib import contextmanager
32
import os.path
33
34
from datetime import datetime
from pymongo import UpdateOne
35
import hashlib
Markus Scheidgen's avatar
Markus Scheidgen committed
36

37
from nomad import utils, coe_repo, config, infrastructure, search, datamodel
38
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles, StagingUploadFiles
39
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
40
from nomad.parsing import parser_dict, match_parser, LocalBackend
Markus Scheidgen's avatar
Markus Scheidgen committed
41
from nomad.normalizing import normalizers
42
from nomad.datamodel import UploadWithMetadata, Domain
Markus Scheidgen's avatar
Markus Scheidgen committed
43
44


Markus Scheidgen's avatar
Markus Scheidgen committed
45
class Calc(Proc):
Markus Scheidgen's avatar
Markus Scheidgen committed
46
47
48
49
50
51
52
53
54
55
    """
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

    The attribute list, does not include the various repository properties generated
    while parsing, including ``program_name``, ``program_version``, etc.

    Attributes:
56
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
57
58
59
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
60

61
        metadata: the metadata record wit calc and user metadata, see :class:`datamodel.CalcWithMetadata`
Markus Scheidgen's avatar
Markus Scheidgen committed
62
    """
63
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
64
65
66
67
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

68
69
    metadata = DictField()

Markus Scheidgen's avatar
Markus Scheidgen committed
70
    meta: Any = {
Markus Scheidgen's avatar
Markus Scheidgen committed
71
        'indexes': [
72
73
74
75
            'upload_id',
            ('upload_id', 'mainfile'),
            ('upload_id', 'parser'),
            ('upload_id', 'tasks_status'),
76
77
            ('upload_id', 'process_status'),
            ('upload_id', 'metadata.nomad_version')
Markus Scheidgen's avatar
Markus Scheidgen committed
78
79
80
81
82
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
83
        self._parser_backend: LocalBackend = None
84
85
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
86
        self._calc_proc_logwriter = None
87
        self._calc_proc_logwriter_ctx: ContextManager = None
Markus Scheidgen's avatar
Markus Scheidgen committed
88
89
90

    @classmethod
    def get(cls, id):
91
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
92

Markus Scheidgen's avatar
Markus Scheidgen committed
93
    @property
94
95
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
96

97
98
99
100
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
101
            self._upload.worker_hostname = self.worker_hostname
102
103
        return self._upload

104
105
106
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
107
108
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
109
110
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
111
    def get_logger(self, **kwargs):
112
113
114
115
        """
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
        """
116
117
118
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
119

120
        if self._calc_proc_logwriter_ctx is None:
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
            try:
                self._calc_proc_logwriter_ctx = self.upload_files.archive_log_file(self.calc_id, 'wt')
                self._calc_proc_logwriter = self._calc_proc_logwriter_ctx.__enter__()  # pylint: disable=E1101
            except KeyError:
                # cannot open log file
                pass

        if self._calc_proc_logwriter_ctx is None:
            return logger
        else:
            def save_to_calc_log(logger, method_name, event_dict):
                if self._calc_proc_logwriter is not None:
                    program = event_dict.get('normalizer', 'parser')
                    event = event_dict.get('event', '')
                    entry = '[%s] %s: %s' % (method_name, program, event)
                    if len(entry) > 120:
                        self._calc_proc_logwriter.write(entry[:120])
                        self._calc_proc_logwriter.write('...')
                    else:
                        self._calc_proc_logwriter.write(entry)
                    self._calc_proc_logwriter.write('\n')

                return event_dict

            return wrap_logger(logger, processors=[save_to_calc_log])
146

147
148
149
150
151
152
153
154
155
156
    @process
    def re_process_calc(self):
        """
        Processes a calculation again. This means there is already metadata and
        instead of creating it initially, we are just updating the existing
        records.
        """
        logger = self.get_logger()

        try:
157
158
159
160
161
162
163
164
            calc_with_metadata = datamodel.CalcWithMetadata(**self.metadata)
            calc_with_metadata.upload_id = self.upload_id
            calc_with_metadata.calc_id = self.calc_id
            calc_with_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
            calc_with_metadata.mainfile = self.mainfile
            calc_with_metadata.nomad_version = config.version
            calc_with_metadata.nomad_commit = config.commit
            calc_with_metadata.last_processing = datetime.now()
165
            calc_with_metadata.files = self.upload_files.calc_files(self.mainfile)
166
            self.metadata = calc_with_metadata.to_dict()
167
168
169
170
171
172
173
174
175
176
177
178
179

            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
            # close loghandler that was not closed due to failures
            try:
                if self._calc_proc_logwriter is not None:
                    self._calc_proc_logwriter.close()
                    self._calc_proc_logwriter = None
            except Exception as e:
                logger.error('could not close calculation proc log', exc_info=e)

Markus Scheidgen's avatar
Markus Scheidgen committed
180
    @process
181
    def process_calc(self):
182
183
184
185
        """
        Processes a new calculation that has no prior records in the mongo, elastic,
        or filesystem storage. It will create an initial set of (user) metadata.
        """
186
        logger = self.get_logger()
187
        if self.upload is None:
188
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
189
190

        try:
191
192
            # save preliminary minimum calc metadata in case processing fails
            # successful processing will replace it with the actual metadata
193
            calc_with_metadata = datamodel.CalcWithMetadata(
194
195
196
197
198
199
                upload_id=self.upload_id,
                calc_id=self.calc_id,
                calc_hash=self.upload_files.calc_hash(self.mainfile),
                mainfile=self.mainfile)
            calc_with_metadata.published = False
            calc_with_metadata.uploader = self.upload.uploader.to_popo()
200
            calc_with_metadata.upload_time = self.upload.upload_time
201
            calc_with_metadata.nomad_version = config.version
202
            calc_with_metadata.nomad_commit = config.commit
203
204
205
206
            calc_with_metadata.last_processing = datetime.now()
            calc_with_metadata.files = self.upload_files.calc_files(self.mainfile)
            self.metadata = calc_with_metadata.to_dict()

207
208
209
210
211
            if len(calc_with_metadata.files) >= config.auxfile_cutoff:
                self.warning(
                    'This calc has many aux files in its directory. '
                    'Have you placed many calculations in the same directory?')

Markus Scheidgen's avatar
Markus Scheidgen committed
212
213
214
215
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
216
            # close loghandler that was not closed due to failures
217
            try:
218
219
220
                if self._calc_proc_logwriter is not None:
                    self._calc_proc_logwriter.close()
                    self._calc_proc_logwriter = None
221
222
223
            except Exception as e:
                logger.error('could not close calculation proc log', exc_info=e)

224
225
226
227
    def fail(self, *errors, log_level=logging.ERROR, **kwargs):
        # in case of failure, index a minimum set of metadata and mark
        # processing failure
        try:
228
            calc_with_metadata = datamodel.CalcWithMetadata(**self.metadata)
229
230
231
232
233
234
235
236
            calc_with_metadata.formula = config.services.not_processed_value
            calc_with_metadata.basis_set = config.services.not_processed_value
            calc_with_metadata.xc_functional = config.services.not_processed_value
            calc_with_metadata.system = config.services.not_processed_value
            calc_with_metadata.crystal_system = config.services.not_processed_value
            calc_with_metadata.spacegroup = config.services.not_processed_value
            calc_with_metadata.spacegroup_symbol = config.services.not_processed_value
            calc_with_metadata.code_version = config.services.not_processed_value
237
238
239
240
241
242
243

            calc_with_metadata.code_name = config.services.not_processed_value
            if self.parser is not None:
                parser = parser_dict[self.parser]
                if hasattr(parser, 'code_name'):
                    calc_with_metadata.code_name = parser.code_name

244
245
246
247
248
249
250
251
            calc_with_metadata.processed = False
            self.metadata = calc_with_metadata.to_dict()
            search.Entry.from_calc_with_metadata(calc_with_metadata).save()
        except Exception as e:
            self.get_logger().error('could not index after processing failure', exc_info=e)

        super().fail(*errors, log_level=log_level, **kwargs)

252
253
254
255
    def on_process_complete(self, process_name):
        # the save might be necessary to correctly read the join condition from the db
        self.save()
        # in case of error, the process_name might be unknown
256
        if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None:
257
258
            self.upload.reload()
            self.upload.check_join()
Markus Scheidgen's avatar
Markus Scheidgen committed
259
260
261

    @task
    def parsing(self):
262
        context = dict(parser=self.parser, step=self.parser)
263
        logger = self.get_logger(**context)
264
        parser = parser_dict[self.parser]
265
        self.metadata['parser_name'] = self.parser
Markus Scheidgen's avatar
Markus Scheidgen committed
266

267
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
268
269
270
271
272
273
274
            try:
                self._parser_backend = parser.run(
                    self.upload_files.raw_file_object(self.mainfile).os_path, logger=logger)
            except Exception as e:
                self.fail(
                    'parser failed with exception', level=logging.ERROR,
                    exc_info=e, error=str(e), **context)
275
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
276

277
278
        # add the non code specific calc metadata to the backend
        # all other quantities have been determined by parsers/normalizers
279
        self._parser_backend.openNonOverlappingSection('section_entry_info')
280
        self._parser_backend.addValue('upload_id', self.upload_id)
281
        self._parser_backend.addValue('calc_id', self.calc_id)
282
        self._parser_backend.addValue('calc_hash', self.metadata['calc_hash'])
283
        self._parser_backend.addValue('mainfile', self.mainfile)
284
        self._parser_backend.addValue('parser_name', self.parser)
285
286
287
288
289
290
291
292
293
294
295
296
        filepaths = self.metadata['files']
        self._parser_backend.addValue('number_of_files', len(filepaths))
        self._parser_backend.addValue('filepaths', filepaths)
        uploader = self.upload.uploader
        self._parser_backend.addValue(
            'entry_uploader_name', '%s, %s' % (uploader.first_name, uploader.last_name))
        self._parser_backend.addValue(
            'entry_uploader_id', str(uploader.user_id))
        self._parser_backend.addValue('entry_upload_time', int(self.upload.upload_time.timestamp()))
        self._parser_backend.closeNonOverlappingSection('section_entry_info')

        self.add_processor_info(self.parser)
297

Markus Scheidgen's avatar
Markus Scheidgen committed
298
        if self._parser_backend.status[0] != 'ParseSuccess':
299
            logger.error(self._parser_backend.status[1])
Markus Scheidgen's avatar
Markus Scheidgen committed
300
            error = self._parser_backend.status[1]
301
            self.fail(error, level=logging.INFO, **context)
302
303
304
305
306
307
308
309

    @contextmanager
    def use_parser_backend(self, processor_name):
        self._parser_backend.reset_status()
        yield self._parser_backend
        self.add_processor_info(processor_name)

    def add_processor_info(self, processor_name: str) -> None:
310
        self._parser_backend.openContext('/section_entry_info/0')
311
312
313
314
315
316
317
        self._parser_backend.openNonOverlappingSection('section_archive_processing_info')
        self._parser_backend.addValue('archive_processor_name', processor_name)

        if self._parser_backend.status[0] == 'ParseSuccess':
            warnings = getattr(self._parser_backend, '_warnings', [])
            if len(warnings) > 0:
                self._parser_backend.addValue('archive_processor_status', 'WithWarnings')
318
                self._parser_backend.addValue('number_of_archive_processor_warnings', len(warnings))
319
320
321
322
                self._parser_backend.addArrayValues('archive_processor_warnings', [str(warning) for warning in warnings])
            else:
                self._parser_backend.addValue('archive_processor_status', 'Success')
        else:
323
            errors = self._parser_backend.status[1]
324
325
326
            self._parser_backend.addValue('archive_processor_error', str(errors))

        self._parser_backend.closeNonOverlappingSection('section_archive_processing_info')
327
        self._parser_backend.closeContext('/section_entry_info/0')
Markus Scheidgen's avatar
Markus Scheidgen committed
328
329
330
331

    @task
    def normalizing(self):
        for normalizer in normalizers:
332
333
334
            if normalizer.domain != config.domain:
                continue

Markus Scheidgen's avatar
Markus Scheidgen committed
335
            normalizer_name = normalizer.__name__
336
            context = dict(normalizer=normalizer_name, step=normalizer_name)
337
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
338
339

            with utils.timer(
340
                    logger, 'normalizer executed', input_size=self.mainfile_file.size):
341
                with self.use_parser_backend(normalizer_name) as backend:
342
343
344
345
346
347
348
                    try:
                        normalizer(backend).normalize(logger=logger)
                    except Exception as e:
                        self.fail(
                            'normalizer failed with exception', level=logging.ERROR,
                            exc_info=e, error=str(e), **context)
                        self._parser_backend.status = ['ParseFailure', str(e)]
Markus Scheidgen's avatar
Markus Scheidgen committed
349

350
351
            failed = self._parser_backend.status[0] != 'ParseSuccess'
            if failed:
352
                logger.error(self._parser_backend.status[1])
Markus Scheidgen's avatar
Markus Scheidgen committed
353
                error = self._parser_backend.status[1]
354
                self.fail(error, level=logging.WARNING, error=error, **context)
355
356
357
358
                break
            else:
                logger.debug(
                    'completed normalizer successfully', normalizer=normalizer_name)
Markus Scheidgen's avatar
Markus Scheidgen committed
359
360
361

    @task
    def archiving(self):
362
363
        logger = self.get_logger()

364
        calc_with_metadata = datamodel.CalcWithMetadata(**self.metadata)
365
        calc_with_metadata.apply_domain_metadata(self._parser_backend)
366
        calc_with_metadata.processed = True
367

368
369
        # persist the calc metadata
        with utils.timer(logger, 'saved calc metadata', step='metadata'):
370
            self.metadata = calc_with_metadata.to_dict()
371
372

        # index in search
373
        with utils.timer(logger, 'indexed', step='index'):
374
            search.Entry.from_calc_with_metadata(calc_with_metadata).save()
375

376
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
377
        with utils.timer(
378
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
379
                input_size=self.mainfile_file.size) as log_data:
380
            with self.upload_files.archive_file(self.calc_id, 'wt') as out:
381
                self._parser_backend.write_json(out, pretty=True, root_sections=Domain.instance.root_sections)
382

383
            log_data.update(archive_size=self.upload_files.archive_file_object(self.calc_id).size)
Markus Scheidgen's avatar
Markus Scheidgen committed
384
385
386
387

        # close loghandler
        if self._calc_proc_logwriter is not None:
            with utils.timer(
388
                    logger, 'archived log', step='logs',
Markus Scheidgen's avatar
Markus Scheidgen committed
389
                    input_size=self.mainfile_file.size) as log_data:
390
                self._calc_proc_logwriter_ctx.__exit__(None, None, None)  # pylint: disable=E1101
391
                self._calc_proc_logwriter = None
Markus Scheidgen's avatar
Markus Scheidgen committed
392

393
                log_data.update(log_size=self.upload_files.archive_log_file_object(self.calc_id).size)
Markus Scheidgen's avatar
Markus Scheidgen committed
394

395
    def __str__(self):
396
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
397

398

399
class Upload(Proc):
Markus Scheidgen's avatar
Markus Scheidgen committed
400
401
402
403
404
405
    """
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
406
407
        upload_path: the path were the uploaded files was stored
        temporary: True if the uploaded file should be removed after extraction
408
        metadata: optional user provided additional meta data
Markus Scheidgen's avatar
Markus Scheidgen committed
409
410
        upload_id: the upload id generated by the database
        upload_time: the timestamp when the system realised the upload
411
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
412
413
414
415
    """
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
416
417
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
Markus Scheidgen's avatar
Markus Scheidgen committed
418
419
420

    name = StringField(default=None)
    upload_time = DateTimeField()
421
    user_id = StringField(required=True)
422
423
    published = BooleanField(default=False)
    publish_time = DateTimeField()
424
    last_update = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
425
426
427

    meta: Any = {
        'indexes': [
428
            'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
Markus Scheidgen's avatar
Markus Scheidgen committed
429
430
431
432
433
        ]
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
434
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
435

436
437
    @property
    def metadata(self) -> dict:
438
439
440
441
442
443
        # TODO user_metadata needs to be stored in the public bucket, since staging data might not be shared
        try:
            upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
        except KeyError:
            return None
        return upload_files.user_metadata
444
445
446

    @metadata.setter
    def metadata(self, data: dict) -> None:
447
448
449
        # TODO user_metadata needs to be stored in the public bucket, since staging data might not be shared
        upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True, create=True)
        upload_files.user_metadata = data
450

Markus Scheidgen's avatar
Markus Scheidgen committed
451
    @classmethod
452
    def get(cls, id: str, include_published: bool = True) -> 'Upload':
453
        upload = cls.get_by_id(id, 'upload_id')
454
        if upload is not None:
455
456
457
            return upload

        raise KeyError()
Markus Scheidgen's avatar
Markus Scheidgen committed
458
459

    @classmethod
460
461
462
    def user_uploads(cls, user: coe_repo.User, **kwargs) -> List['Upload']:
        """ Returns all uploads for the given user. Kwargs are passed to mongo query. """
        return cls.objects(user_id=str(user.user_id), **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
463

464
465
466
467
    @property
    def uploader(self):
        return coe_repo.User.from_user_id(self.user_id)

Markus Scheidgen's avatar
Markus Scheidgen committed
468
469
    def get_logger(self, **kwargs):
        logger = super().get_logger()
470
        logger = logger.bind(upload_id=self.upload_id, upload_name=self.name, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
471
472
473
474
475
476
477
478
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
        """
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
479
480

        Arguments:
481
            user (coe_repo.User): The user that created the upload.
Markus Scheidgen's avatar
Markus Scheidgen committed
482
        """
483
        user: coe_repo.User = kwargs['user']
484
485
486
        del(kwargs['user'])
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
487
        kwargs.update(user_id=str(user.user_id))
Markus Scheidgen's avatar
Markus Scheidgen committed
488
        self = super().create(**kwargs)
489

Markus Scheidgen's avatar
Markus Scheidgen committed
490
        self._continue_with('uploading')
491

Markus Scheidgen's avatar
Markus Scheidgen committed
492
493
        return self

494
495
496
497
498
    def delete(self):
        """ Deletes this upload process state entry and its calcs. """
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

499
    def delete_upload_local(self, with_coe_repo: bool = False):
500
501
        """
        Deletes of the upload, including its processing state and
502
        staging files. Local version without celery processing.
503
504
505
506
        """
        logger = self.get_logger()

        with utils.lnr(logger, 'staged upload delete failed'):
507

508
509
510
511
512
            if with_coe_repo and self.published:
                with utils.timer(
                        logger, 'upload deleted from repo db', step='repo',
                        upload_size=self.upload_files.size):
                    coe_repo.Upload.delete(self.upload_id)
513

514
            with utils.timer(
515
                    logger, 'upload deleted from index', step='index',
516
                    upload_size=self.upload_files.size):
517
                search.delete_upload(self.upload_id)
518

519
            with utils.timer(
520
                    logger, 'staged upload deleted', step='files',
521
522
523
524
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
                self.delete()

525
526
527
528
529
530
531
532
    @process
    def delete_upload(self, with_coe_repo: bool = False):
        """
        Deletes of the upload, including its processing state and
        staging files. This starts the celery process of deleting the upload.
        """
        self.delete_upload_local(with_coe_repo=with_coe_repo)

533
        return True  # do not save the process status on the delete upload
534

535
    @process
536
    def publish_upload(self):
537
538
539
540
541
        """
        Moves the upload out of staging to add it to the coe repository. It will
        pack the staging upload files in to public upload files, add entries to the
        coe repository db and remove this instance and its calculation from the
        processing state db.
542
543
544

        If the upload is already published (i.e. re-publish), it will update user metadata from
        repository db, publish to repository db if not exists, update the search index.
545
        """
546
547
        assert self.processed_calcs > 0

548
        logger = self.get_logger()
549
        logger.info('started to publish')
550

551
552
        with utils.lnr(logger, '(re-)publish failed'):
            upload_with_metadata = self.to_upload_with_metadata(self.metadata)
553
            calcs = upload_with_metadata.calcs
554

555
            if config.repository_db.publish_enabled:
556
557
558
559
560
561
562
                if config.repository_db.mode == 'coe' and isinstance(self.upload_files, StagingUploadFiles):
                    with utils.timer(
                            logger, 'coe extracted raw-file copy created', step='repo',
                            upload_size=self.upload_files.size):

                        self.upload_files.create_extracted_copy()

563
564
565
566
567
568
569
                coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id)
                if coe_upload is None:
                    with utils.timer(
                            logger, 'upload added to repository', step='repo',
                            upload_size=self.upload_files.size):
                        coe_upload = coe_repo.Upload.publish(upload_with_metadata)

570
                with utils.timer(
571
                        logger, 'upload PIDs read from repository', step='repo',
572
                        upload_size=self.upload_files.size):
573
574
                    for calc, coe_calc in zip(calcs, coe_upload.calcs):
                        calc.pid = coe_calc.coe_calc_id
575

576
            with utils.timer(
577
                    logger, 'upload metadata updated', step='metadata',
578
                    upload_size=self.upload_files.size):
579
580

                def create_update(calc):
581
                    calc.published = True
582
583
584
585
586
                    return UpdateOne(
                        {'_id': calc.calc_id},
                        {'$set': {'metadata': calc.to_dict()}})

                Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])
587

588
589
590
591
592
            if isinstance(self.upload_files, StagingUploadFiles):
                with utils.timer(
                        logger, 'staged upload files packed', step='pack',
                        upload_size=self.upload_files.size):
                    self.upload_files.pack(upload_with_metadata)
593
594
595
596

            with utils.timer(
                    logger, 'index updated', step='index',
                    upload_size=self.upload_files.size):
597
                search.publish(calcs)
598

599
600
601
602
603
604
605
606
607
608
609
            if isinstance(self.upload_files, StagingUploadFiles):
                with utils.timer(
                        logger, 'staged upload deleted', step='delete staged',
                        upload_size=self.upload_files.size):
                    self.upload_files.delete()
                    self.published = True
                    self.publish_time = datetime.now()
                    self.last_update = datetime.now()
                    self.save()
            else:
                self.last_update = datetime.now()
610
                self.save()
611

612
613
614
615
616
617
618
619
620
621
622
623
    @process
    def re_process_upload(self):
        """
        Runs the distributed process of fully reparsing/renormalizing an existing and
        already published upload. Will renew the archive part of the upload and update
        mongo and elastic search entries.

        TODO this implementation does not do any re-matching. This will be more complex
        due to handling of new or missing matches.
        """
        assert self.published

624
625
626
        logger = self.get_logger()
        logger.info('started to re-process')

627
628
629
630
631
632
633
634
635
        # mock the steps of actual processing
        self._continue_with('uploading')

        # extract the published raw files into a staging upload files instance
        self._continue_with('extracting')
        public_upload_files = cast(PublicUploadFiles, self.upload_files)
        public_upload_files.to_staging_upload_files(create=True)

        self._continue_with('parse_all')
636
        try:
637
638
639
640
            # we use a copy of the mongo queryset; reasons are cursor timeouts and
            # changing results on modifying the calc entries
            calcs = list(Calc.objects(upload_id=self.upload_id))
            for calc in calcs:
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
                if calc.process_running:
                    if calc.current_process == 're_process_calc':
                        logger.warn('re_process_calc is already running', calc_id=calc.calc_id)
                    else:
                        logger.warn('a process is already running on calc', calc_id=calc.calc_id)

                    continue

                calc.reset()
                calc.re_process_calc()
        except Exception as e:
            # try to remove the staging copy in failure case
            staging_upload_files = self.upload_files.to_staging_upload_files()
            if staging_upload_files.exist():
                staging_upload_files.delete()

            raise e
658
659
660
661

        # the packing and removing of the staging upload files, will be trigged by
        # the 'cleanup' task after processing all calcs

Markus Scheidgen's avatar
Markus Scheidgen committed
662
    @process
663
    def process_upload(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
664
665
666
667
668
669
670
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
        pass

671
    @property
672
673
    def upload_files(self) -> UploadFiles:
        upload_files_class = ArchiveBasedStagingUploadFiles if not self.published else PublicUploadFiles
674
        kwargs = dict(upload_path=self.upload_path) if not self.published else {}
675
676
677

        if not self._upload_files or not isinstance(self._upload_files, upload_files_class):
            self._upload_files = upload_files_class(
678
                self.upload_id, is_authorized=lambda: True, **kwargs)
679

680
        return self._upload_files
681

682
683
684
685
686
    @property
    def staging_upload_files(self) -> ArchiveBasedStagingUploadFiles:
        assert not self.published
        return cast(ArchiveBasedStagingUploadFiles, self.upload_files)

Markus Scheidgen's avatar
Markus Scheidgen committed
687
688
    @task
    def extracting(self):
689
690
691
692
693
        """
        Task performed before the actual parsing/normalizing. Extracting and bagging
        the uploaded files, computing all keys, create an *upload* entry in the NOMAD-coe
        repository db, etc.
        """
694
695
696
697
698
        # extract the uploaded file
        self._upload_files = ArchiveBasedStagingUploadFiles(
            upload_id=self.upload_id, is_authorized=lambda: True, create=True,
            upload_path=self.upload_path)

Markus Scheidgen's avatar
Markus Scheidgen committed
699
700
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
701
702
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
703
704
                    upload_size=self.upload_files.size):
                self.upload_files.extract()
705
706
707
708

            if self.temporary:
                os.remove(self.upload_path)
                self.upload_path = None
709

710
        except KeyError:
711
            self.fail('processing requested for non existing upload', log_level=logging.ERROR)
712
713
714
            return
        except ExtractError:
            self.fail('bad .zip/.tar file', log_level=logging.INFO)
Markus Scheidgen's avatar
Markus Scheidgen committed
715
716
            return

717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
    def _preprocess_files(self, path):
        """
        Some files need preprocessing. Currently we need to add a stripped POTCAR version
        and always restrict/embargo the original.
        """
        if os.path.basename(path) == 'POTCAR':
            # create checksum
            hash = hashlib.sha224()
            with open(self.staging_upload_files.raw_file_object(path).os_path, 'rb') as orig_f:
                for line in orig_f.readlines():
                    hash.update(line)

            checksum = hash.hexdigest()

            # created stripped POTCAR
            stripped_path = path + '.stripped'
            with open(self.staging_upload_files.raw_file_object(stripped_path).os_path, 'wt') as stripped_f:
                stripped_f.write('Stripped POTCAR file. Checksum of original file (sha224): %s\n' % checksum)
            os.system(
                '''
                    awk < %s >> %s '
                    BEGIN { dump=1 }
                    /End of Dataset/ { dump=1 }
                    dump==1 { print }
                    /END of PSCTR/ { dump=0 }'
                ''' % (
                    self.staging_upload_files.raw_file_object(path).os_path,
                    self.staging_upload_files.raw_file_object(stripped_path).os_path))

746
    def match_mainfiles(self) -> Generator[Tuple[str, object], None, None]:
747
748
749
750
751
752
753
        """
        Generator function that matches all files in the upload to all parsers to
        determine the upload's mainfiles.

        Returns:
            Tuples of mainfile, filename, and parsers
        """
754
        directories_with_match: Dict[str, str] = dict()
755
756
        upload_files = self.staging_upload_files
        for filename in upload_files.raw_file_manifest():
757
            self._preprocess_files(filename)
758
            try:
759
                parser = match_parser(filename, upload_files)
760
                if parser is not None:
761
762
                    directory = os.path.dirname(filename)
                    if directory in directories_with_match:
763
764
765
                        # TODO this might give us the chance to store directory based relationship
                        # between calcs for the future?
                        pass
766
767
                    else:
                        directories_with_match[directory] = filename
768
769

                    yield filename, parser
770
771
772
773
            except Exception as e:
                self.get_logger().error(
                    'exception while matching pot. mainfile',
                    mainfile=filename, exc_info=e)
774

Markus Scheidgen's avatar
Markus Scheidgen committed
775
776
    @task
    def parse_all(self):
777
        """
778
        Identified mainfile/parser combinations among the upload's files, creates
779
780
        respective :class:`Calc` instances, and triggers their processing.
        """
781
782
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
783
784
        with utils.timer(
                logger, 'upload extracted', step='matching',
785
786
                upload_size=self.upload_files.size):
            for filename, parser in self.match_mainfiles():
787
                calc = Calc.create(
788
                    calc_id=self.upload_files.calc_id(filename),
789
                    mainfile=filename, parser=parser.name,
790
                    worker_hostname=self.worker_hostname,
791
792
                    upload_id=self.upload_id)

793
                calc.process_calc()
Markus Scheidgen's avatar
Markus Scheidgen committed
794

795
    def on_process_complete(self, process_name):
796
        if process_name == 'process_upload' or process_name == 're_process_upload':
797
798
799
800
801
802
803
804
805
806
            self.check_join()

    def check_join(self):
        total_calcs = self.total_calcs
        processed_calcs = self.processed_calcs

        self.get_logger().debug('check join', processed_calcs=processed_calcs, total_calcs=total_calcs)
        if not self.process_running and processed_calcs >= total_calcs:
            self.get_logger().debug('join')
            self.join()
807
808
809

    def join(self):
        self.cleanup()
Markus Scheidgen's avatar
Markus Scheidgen committed
810

811
812
813
814
815
816
817
    @property
    def gui_url(self):
        base = config.api_url()[:-3]
        if base.endswith('/'):
            base = base[:-1]
        return '%s/uploads/' % base

818
    def _cleanup_after_processing(self):
819
820
821
822
823
824
        # send email about process finish
        user = self.uploader
        name = '%s %s' % (user.first_name, user.last_name)
        message = '\n'.join([
            'Dear %s,' % name,
            '',
825
            'your data %suploaded at %s has completed processing.' % (
826
                '"%s" ' % self.name if self.name else '', self.upload_time.isoformat()),  # pylint: disable=no-member
827
828
829
830
831
            'You can review your data on your upload page: %s' % self.gui_url,
            '',
            'If you encouter any issues with your upload, please let us know and replay to this email.',
            '',
            'The nomad team'
832
        ])
833
834
835
836
837
838
839
        try:
            infrastructure.send_mail(
                name=name, email=user.email, message=message, subject='Processing completed')
        except Exception as e:
            # probably due to email configuration problems
            # don't fail or present this error to clients
            self.logger.error('could not send after processing email', exc_info=e)
Markus Scheidgen's avatar
Markus Scheidgen committed
840

841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
    def _cleanup_after_re_processing(self):
        logger = self.get_logger()
        logger.info('started to repack re-processed upload')

        staging_upload_files = self.upload_files.to_staging_upload_files()

        with utils.timer(
                logger, 'reprocessed staged upload packed', step='delete staged',
                upload_size=self.upload_files.size):

            staging_upload_files.pack(self.to_upload_with_metadata())

        with utils.timer(
                logger, 'reprocessed staged upload deleted', step='delete staged',
                upload_size=self.upload_files.size):

            staging_upload_files.delete()
            self.last_update = datetime.now()
            self.save()

    @task
    def cleanup(self):
        search.refresh()

        if self.current_process == 're_process_upload':
            self._cleanup_after_re_processing()
        else:
            self._cleanup_after_processing()

870
871
872
    def get_calc(self, calc_id) -> Calc:
        return Calc.objects(upload_id=self.upload_id, calc_id=calc_id).first()

Markus Scheidgen's avatar
Markus Scheidgen committed
873
    @property
874
    def processed_calcs(self):
875
        return Calc.objects(upload_id=self.upload_id, tasks_status__in=[SUCCESS, FAILURE]).count()
876
877
878
879
880
881
882

    @property
    def total_calcs(self):
        return Calc.objects(upload_id=self.upload_id).count()

    @property
    def failed_calcs(self):
883
        return Calc.objects(upload_id=self.upload_id, tasks_status=FAILURE).count()
884

885
886
    @property
    def pending_calcs(self):
887
        return Calc.objects(upload_id=self.upload_id, tasks_status=PENDING).count()
888

889
890
891
    def all_calcs(self, start, end, order_by=None):
        query = Calc.objects(upload_id=self.upload_id)[start:end]
        return query.order_by(order_by) if order_by is not None else query
892

893
894
895
896
897
898
    @property
    def outdated_calcs(self):
        return Calc.objects(
            upload_id=self.upload_id, tasks_status=SUCCESS,
            metadata__nomad_version__ne=config.version)

899
900
    @property
    def calcs(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
901
        return Calc.objects(upload_id=self.upload_id, tasks_status=SUCCESS)
Markus Scheidgen's avatar
Markus Scheidgen committed
902

903
    def to_upload_with_metadata(self, user_metadata: dict = None) -> UploadWithMetadata:
904
        # prepare user metadata per upload and per calc
905
906
907
        if user_metadata is not None:
            calc_metadatas: Dict[str, Any] = dict()
            upload_metadata: Dict[str, Any] = dict()
908

909
            upload_metadata.update(user_metadata)
910
911
            if 'calculations' in upload_metadata:
                del(upload_metadata['calculations'])
Markus Scheidgen's avatar
Markus Scheidgen committed
912

913
            for calc in user_metadata.get('calculations', []):  # pylint: disable=no-member
914
                calc_metadatas[calc['mainfile']] = calc
Markus Scheidgen's avatar
Markus Scheidgen committed
915

916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
            user_upload_time = upload_metadata.get('_upload_time', None)

            def get_metadata(calc: Calc):
                """
                Assemble metadata from calc's processed calc metadata and the uploads
                user metadata.
                """
                calc_data = calc.metadata
                calc_with_metadata = datamodel.CalcWithMetadata(**calc_data)
                calc_metadata = dict(upload_metadata)
                calc_metadata.update(calc_metadatas.get(calc.mainfile, {}))
                calc_with_metadata.apply_user_metadata(calc_metadata)

                return calc_with_metadata
        else:
            user_upload_time = None

            def get_metadata(calc: Calc):
                return datamodel.CalcWithMetadata(**calc.metadata)

Markus Scheidgen's avatar
Markus Scheidgen committed
936
937
938
939
940
        result = UploadWithMetadata(
            upload_id=self.upload_id,
            uploader=utils.POPO(id=int(self.user_id)),
            upload_time=self.upload_time if user_upload_time is None else user_upload_time)

941
        result.calcs = [get_metadata(calc) for calc in Calc.objects(upload_id=self.upload_id)]
942

Markus Scheidgen's avatar
Markus Scheidgen committed
943
        return result
944

945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
    def compress_and_set_metadata(self, metadata: Dict[str, Any]) -> None:
        """
        Stores the given user metadata in the upload document. This is the metadata
        adhering to the API model (``UploadMetaData``). Most quantities can be stored
        for the upload and for each calculation. This method will try to move same values
        from the calculation to the upload to "compress" the data.
        """
        compressed = {
            key: value for key, value in metadata.items() if key != 'calculations'}
        calculations: List[Dict[str, Any]] = []
        compressed['calculations'] = calculations

        for calc in metadata.get('calculations', []):
            compressed_calc: Dict[str, Any] = {}
            calculations.append(compressed_calc)
            for key, value in calc.items():
                if key in ['_pid', 'mainfile']:
                    # these quantities are explicitly calc specific and have to stay with
                    # the calc
                    compressed_calc[key] = value
                else:
                    if key not in compressed:
                        compressed[key] = value
                    elif compressed[key].__repr__ != value.__repr__:
                        compressed_calc[key] = value
                    else:
                        compressed[key] = value

        self.metadata = compressed

975
    def __str__(self):
976
        return 'upload %s upload_id%s' % (super().__str__(), self.upload_id)