data.py 28.4 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
    :members:
.. autoclass:: Upload
    :members:
"""

27
from typing import List, Any, ContextManager, Tuple, Generator, Dict, cast
28
from mongoengine import StringField, DateTimeField, DictField, BooleanField
Markus Scheidgen's avatar
Markus Scheidgen committed
29
import logging
30
from structlog import wrap_logger
31
from contextlib import contextmanager
32
import os.path
33
34
from datetime import datetime
from pymongo import UpdateOne
Markus Scheidgen's avatar
Markus Scheidgen committed
35

36
from nomad import utils, coe_repo, config, infrastructure, search
37
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles
38
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
39
from nomad.parsing import parser_dict, match_parser, LocalBackend
Markus Scheidgen's avatar
Markus Scheidgen committed
40
from nomad.normalizing import normalizers
Markus Scheidgen's avatar
Markus Scheidgen committed
41
from nomad.datamodel import UploadWithMetadata, CalcWithMetadata
Markus Scheidgen's avatar
Markus Scheidgen committed
42
43


Markus Scheidgen's avatar
Markus Scheidgen committed
44
class Calc(Proc):
Markus Scheidgen's avatar
Markus Scheidgen committed
45
46
47
48
49
50
51
52
53
54
    """
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

    The attribute list, does not include the various repository properties generated
    while parsing, including ``program_name``, ``program_version``, etc.

    Attributes:
55
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
56
57
58
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
59
60

        metadata: the metadata record wit calc and user metadata, see :class:`CalcWithMetadata`
Markus Scheidgen's avatar
Markus Scheidgen committed
61
    """
62
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
63
64
65
66
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

67
68
    metadata = DictField()

Markus Scheidgen's avatar
Markus Scheidgen committed
69
    meta: Any = {
Markus Scheidgen's avatar
Markus Scheidgen committed
70
        'indexes': [
71
            'upload_id', 'mainfile', 'parser', 'tasks_status', 'process_status'
Markus Scheidgen's avatar
Markus Scheidgen committed
72
73
74
75
76
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
77
        self._parser_backend: LocalBackend = None
78
79
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
80
        self._calc_proc_logwriter = None
81
        self._calc_proc_logwriter_ctx: ContextManager = None
Markus Scheidgen's avatar
Markus Scheidgen committed
82
83
84

    @classmethod
    def get(cls, id):
85
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
86

Markus Scheidgen's avatar
Markus Scheidgen committed
87
    @property
88
89
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
90

91
92
93
94
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
95
            self._upload.worker_hostname = self.worker_hostname
96
97
        return self._upload

98
99
100
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
101
102
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
103
104
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
105
    def get_logger(self, **kwargs):
106
107
108
109
        """
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
        """
110
111
112
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
113

114
        if self._calc_proc_logwriter_ctx is None:
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
            try:
                self._calc_proc_logwriter_ctx = self.upload_files.archive_log_file(self.calc_id, 'wt')
                self._calc_proc_logwriter = self._calc_proc_logwriter_ctx.__enter__()  # pylint: disable=E1101
            except KeyError:
                # cannot open log file
                pass

        if self._calc_proc_logwriter_ctx is None:
            return logger
        else:
            def save_to_calc_log(logger, method_name, event_dict):
                if self._calc_proc_logwriter is not None:
                    program = event_dict.get('normalizer', 'parser')
                    event = event_dict.get('event', '')
                    entry = '[%s] %s: %s' % (method_name, program, event)
                    if len(entry) > 120:
                        self._calc_proc_logwriter.write(entry[:120])
                        self._calc_proc_logwriter.write('...')
                    else:
                        self._calc_proc_logwriter.write(entry)
                    self._calc_proc_logwriter.write('\n')

                return event_dict

            return wrap_logger(logger, processors=[save_to_calc_log])
140

Markus Scheidgen's avatar
Markus Scheidgen committed
141
    @process
142
    def process_calc(self):
143
        logger = self.get_logger()
144
        if self.upload is None:
145
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
146
147

        try:
148
149
150
151
152
153
154
155
156
157
158
159
160
161
            # save preliminary minimum calc metadata in case processing fails
            # successful processing will replace it with the actual metadata
            calc_with_metadata = CalcWithMetadata(
                upload_id=self.upload_id,
                calc_id=self.calc_id,
                calc_hash=self.upload_files.calc_hash(self.mainfile),
                mainfile=self.mainfile)
            calc_with_metadata.published = False
            calc_with_metadata.uploader = self.upload.uploader.to_popo()
            calc_with_metadata.nomad_version = config.version
            calc_with_metadata.last_processing = datetime.now()
            calc_with_metadata.files = self.upload_files.calc_files(self.mainfile)
            self.metadata = calc_with_metadata.to_dict()

Markus Scheidgen's avatar
Markus Scheidgen committed
162
163
164
165
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
166
            # close loghandler that was not closed due to failures
167
            try:
168
169
170
                if self._calc_proc_logwriter is not None:
                    self._calc_proc_logwriter.close()
                    self._calc_proc_logwriter = None
171
172
173
            except Exception as e:
                logger.error('could not close calculation proc log', exc_info=e)

174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
    def fail(self, *errors, log_level=logging.ERROR, **kwargs):
        # in case of failure, index a minimum set of metadata and mark
        # processing failure
        try:
            calc_with_metadata = CalcWithMetadata(**self.metadata)
            calc_with_metadata.formula = config.services.not_processed_value
            calc_with_metadata.basis_set = config.services.not_processed_value
            calc_with_metadata.xc_functional = config.services.not_processed_value
            calc_with_metadata.system = config.services.not_processed_value
            calc_with_metadata.crystal_system = config.services.not_processed_value
            calc_with_metadata.spacegroup = config.services.not_processed_value
            calc_with_metadata.spacegroup_symbol = config.services.not_processed_value
            calc_with_metadata.code_name = config.services.not_processed_value
            calc_with_metadata.code_version = config.services.not_processed_value
            calc_with_metadata.processed = False
            self.metadata = calc_with_metadata.to_dict()
            search.Entry.from_calc_with_metadata(calc_with_metadata).save()
        except Exception as e:
            self.get_logger().error('could not index after processing failure', exc_info=e)

        super().fail(*errors, log_level=log_level, **kwargs)

196
197
198
199
200
201
202
    def on_process_complete(self, process_name):
        # the save might be necessary to correctly read the join condition from the db
        self.save()
        # in case of error, the process_name might be unknown
        if process_name == 'process_calc' or process_name is None:
            self.upload.reload()
            self.upload.check_join()
Markus Scheidgen's avatar
Markus Scheidgen committed
203
204
205

    @task
    def parsing(self):
206
        context = dict(parser=self.parser, step=self.parser)
207
        logger = self.get_logger(**context)
208
        parser = parser_dict[self.parser]
Markus Scheidgen's avatar
Markus Scheidgen committed
209

210
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
211
212
213
214
215
216
217
            try:
                self._parser_backend = parser.run(
                    self.upload_files.raw_file_object(self.mainfile).os_path, logger=logger)
            except Exception as e:
                self.fail(
                    'parser failed with exception', level=logging.ERROR,
                    exc_info=e, error=str(e), **context)
218
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
219

220
221
        # add the non code specific calc metadata to the backend
        # all other quantities have been determined by parsers/normalizers
222
223
        self._parser_backend.openNonOverlappingSection('section_calculation_info')
        self._parser_backend.addValue('upload_id', self.upload_id)
224
        self._parser_backend.addValue('calc_id', self.calc_id)
225
        self._parser_backend.addValue('calc_hash', self.metadata['calc_hash'])
226
227
228
        self._parser_backend.addValue('main_file', self.mainfile)
        self._parser_backend.addValue('parser_name', self.parser)

Markus Scheidgen's avatar
Markus Scheidgen committed
229
        if self._parser_backend.status[0] != 'ParseSuccess':
230
            logger.error(self._parser_backend.status[1])
Markus Scheidgen's avatar
Markus Scheidgen committed
231
            error = self._parser_backend.status[1]
232
            self._parser_backend.addValue('parse_status', 'ParseFailure')
233
            self.fail(error, level=logging.INFO, **context)
234
235
236
237
238
        else:
            self._parser_backend.addValue('parse_status', 'ParseSuccess')

        self._parser_backend.closeNonOverlappingSection('section_calculation_info')

239
240
        self._parser_backend.openNonOverlappingSection('section_repository_info')
        self._parser_backend.addValue('repository_archive_gid', '%s/%s' % (self.upload_id, self.calc_id))
241
        self._parser_backend.addValue('repository_filepaths', self.metadata['files'])
242
243
        self._parser_backend.closeNonOverlappingSection('section_repository_info')

244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
        self.add_processor_info(self.parser)

    @contextmanager
    def use_parser_backend(self, processor_name):
        self._parser_backend.reset_status()
        yield self._parser_backend
        self.add_processor_info(processor_name)

    def add_processor_info(self, processor_name: str) -> None:
        self._parser_backend.openContext('/section_calculation_info/0')
        self._parser_backend.openNonOverlappingSection('section_archive_processing_info')
        self._parser_backend.addValue('archive_processor_name', processor_name)

        if self._parser_backend.status[0] == 'ParseSuccess':
            warnings = getattr(self._parser_backend, '_warnings', [])
            if len(warnings) > 0:
                self._parser_backend.addValue('archive_processor_status', 'WithWarnings')
                self._parser_backend.addValue('archive_processor_warning_number', len(warnings))
                self._parser_backend.addArrayValues('archive_processor_warnings', [str(warning) for warning in warnings])
            else:
                self._parser_backend.addValue('archive_processor_status', 'Success')
        else:
266
            errors = self._parser_backend.status[1]
267
268
269
270
            self._parser_backend.addValue('archive_processor_error', str(errors))

        self._parser_backend.closeNonOverlappingSection('section_archive_processing_info')
        self._parser_backend.closeContext('/section_calculation_info/0')
Markus Scheidgen's avatar
Markus Scheidgen committed
271
272
273
274
275

    @task
    def normalizing(self):
        for normalizer in normalizers:
            normalizer_name = normalizer.__name__
276
            context = dict(normalizer=normalizer_name, step=normalizer_name)
277
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
278
279

            with utils.timer(
280
                    logger, 'normalizer executed', input_size=self.mainfile_file.size):
281
                with self.use_parser_backend(normalizer_name) as backend:
282
283
284
285
286
287
288
                    try:
                        normalizer(backend).normalize(logger=logger)
                    except Exception as e:
                        self.fail(
                            'normalizer failed with exception', level=logging.ERROR,
                            exc_info=e, error=str(e), **context)
                        self._parser_backend.status = ['ParseFailure', str(e)]
Markus Scheidgen's avatar
Markus Scheidgen committed
289

290
291
            failed = self._parser_backend.status[0] != 'ParseSuccess'
            if failed:
292
                logger.error(self._parser_backend.status[1])
Markus Scheidgen's avatar
Markus Scheidgen committed
293
                error = self._parser_backend.status[1]
294
                self.fail(error, level=logging.WARNING, error=error, **context)
295
296
297
298
                break
            else:
                logger.debug(
                    'completed normalizer successfully', normalizer=normalizer_name)
Markus Scheidgen's avatar
Markus Scheidgen committed
299
300
301

    @task
    def archiving(self):
302
303
        logger = self.get_logger()

304
305
        calc_with_metadata = CalcWithMetadata(**self.metadata)
        calc_with_metadata.apply_domain_metadata(self._parser_backend)
306
        calc_with_metadata.parser_name = self.parser
307
        calc_with_metadata.processed = True
308

309
310
        # persist the calc metadata
        with utils.timer(logger, 'saved calc metadata', step='metadata'):
311
            self.metadata = calc_with_metadata.to_dict()
312
313

        # index in search
314
        with utils.timer(logger, 'indexed', step='index'):
315
            search.Entry.from_calc_with_metadata(calc_with_metadata).save()
316

317
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
318
        with utils.timer(
319
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
320
                input_size=self.mainfile_file.size) as log_data:
321
            with self.upload_files.archive_file(self.calc_id, 'wt') as out:
322
323
                self._parser_backend.write_json(out, pretty=True)

324
            log_data.update(archive_size=self.upload_files.archive_file_object(self.calc_id).size)
Markus Scheidgen's avatar
Markus Scheidgen committed
325
326
327
328

        # close loghandler
        if self._calc_proc_logwriter is not None:
            with utils.timer(
329
                    logger, 'archived log', step='logs',
Markus Scheidgen's avatar
Markus Scheidgen committed
330
                    input_size=self.mainfile_file.size) as log_data:
331
                self._calc_proc_logwriter_ctx.__exit__(None, None, None)  # pylint: disable=E1101
332
                self._calc_proc_logwriter = None
Markus Scheidgen's avatar
Markus Scheidgen committed
333

334
                log_data.update(log_size=self.upload_files.archive_log_file_object(self.calc_id).size)
Markus Scheidgen's avatar
Markus Scheidgen committed
335

336
    def __str__(self):
337
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
338

339

340
class Upload(Proc):
Markus Scheidgen's avatar
Markus Scheidgen committed
341
342
343
344
345
346
    """
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
347
348
        upload_path: the path were the uploaded files was stored
        temporary: True if the uploaded file should be removed after extraction
349
        metadata: optional user provided additional meta data
Markus Scheidgen's avatar
Markus Scheidgen committed
350
351
        upload_id: the upload id generated by the database
        upload_time: the timestamp when the system realised the upload
352
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
353
354
355
356
    """
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
357
358
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
Markus Scheidgen's avatar
Markus Scheidgen committed
359
360

    name = StringField(default=None)
361
    metadata = DictField(default=None)
Markus Scheidgen's avatar
Markus Scheidgen committed
362
    upload_time = DateTimeField()
363
    user_id = StringField(required=True)
364
365
    published = BooleanField(default=False)
    publish_time = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
366
367
368

    meta: Any = {
        'indexes': [
369
            'user_id', 'tasks_status', 'process_status', 'published'
Markus Scheidgen's avatar
Markus Scheidgen committed
370
371
372
373
374
        ]
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
375
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
376
377

    @classmethod
378
379
380
381
382
383
384
    def get(cls, id: str, include_published: bool = False) -> 'Upload':
        upload = cls.get_by_id(id, 'upload_id')
        # TODO published uploads should not be hidden by this and API
        if upload is not None and (not upload.published or include_published):
            return upload

        raise KeyError()
Markus Scheidgen's avatar
Markus Scheidgen committed
385
386

    @classmethod
387
    def user_uploads(cls, user: coe_repo.User) -> List['Upload']:
Markus Scheidgen's avatar
Markus Scheidgen committed
388
        """ Returns all uploads for the given user. Currently returns all uploads. """
389
        return cls.objects(user_id=str(user.user_id), published=False)
Markus Scheidgen's avatar
Markus Scheidgen committed
390

391
392
393
394
    @property
    def uploader(self):
        return coe_repo.User.from_user_id(self.user_id)

Markus Scheidgen's avatar
Markus Scheidgen committed
395
396
    def get_logger(self, **kwargs):
        logger = super().get_logger()
397
        logger = logger.bind(upload_id=self.upload_id, upload_name=self.name, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
398
399
400
401
402
403
404
405
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
        """
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
406
407

        Arguments:
408
            user (coe_repo.User): The user that created the upload.
Markus Scheidgen's avatar
Markus Scheidgen committed
409
        """
410
        user: coe_repo.User = kwargs['user']
411
412
413
        del(kwargs['user'])
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
414
        kwargs.update(user_id=str(user.user_id))
Markus Scheidgen's avatar
Markus Scheidgen committed
415
        self = super().create(**kwargs)
416

Markus Scheidgen's avatar
Markus Scheidgen committed
417
        self._continue_with('uploading')
418

Markus Scheidgen's avatar
Markus Scheidgen committed
419
420
        return self

421
422
423
424
425
426
427
428
429
430
431
432
433
434
    def delete(self):
        """ Deletes this upload process state entry and its calcs. """
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

    @process
    def delete_upload(self):
        """
        Deletes of the upload, including its processing state and
        staging files.
        """
        logger = self.get_logger()

        with utils.lnr(logger, 'staged upload delete failed'):
435
436
437
438
439
440

            with utils.timer(
                    logger, 'upload deleted from repo db', step='repo',
                    upload_size=self.upload_files.size):
                coe_repo.Upload.delete(self.upload_id)

441
            with utils.timer(
442
                    logger, 'upload deleted from index', step='index',
443
                    upload_size=self.upload_files.size):
444
                search.delete_upload(self.upload_id)
445

446
            with utils.timer(
447
                    logger, 'staged upload deleted', step='files',
448
449
450
451
452
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
                self.delete()

        return True  # do not save the process status on the delete upload
453

454
    @process
455
    def publish_upload(self):
456
457
458
459
460
461
        """
        Moves the upload out of staging to add it to the coe repository. It will
        pack the staging upload files in to public upload files, add entries to the
        coe repository db and remove this instance and its calculation from the
        processing state db.
        """
462
463
        assert self.processed_calcs > 0

464
        logger = self.get_logger()
465
        logger.info('started to publish')
466

467
        with utils.lnr(logger, 'publish failed'):
468
469
            upload_with_metadata = self.to_upload_with_metadata()

470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
            if config.repository_db.publish_enabled:
                with utils.timer(
                        logger, 'upload added to repository', step='repo',
                        upload_size=self.upload_files.size):
                    coe_repo.Upload.publish(upload_with_metadata)

            if config.repository_db.publish_enabled:
                coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id)
                if coe_upload is not None:
                    calcs = [coe_calc.to_calc_with_metadata() for coe_calc in coe_upload.calcs]
                else:
                    calcs = []
            else:
                calcs = upload_with_metadata.calcs

485
            with utils.timer(
486
                    logger, 'upload metadata updated', step='metadata',
487
                    upload_size=self.upload_files.size):
488
489

                def create_update(calc):
490
                    calc.published = True
491
492
493
494
495
                    return UpdateOne(
                        {'_id': calc.calc_id},
                        {'$set': {'metadata': calc.to_dict()}})

                Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])
496

497
498
499
            with utils.timer(
                    logger, 'staged upload files packed', step='pack',
                    upload_size=self.upload_files.size):
500
                self.upload_files.pack(upload_with_metadata)
501
502
503
504

            with utils.timer(
                    logger, 'index updated', step='index',
                    upload_size=self.upload_files.size):
505
                search.publish(calcs)
506
507
508
509
510

            with utils.timer(
                    logger, 'staged upload deleted', step='delete staged',
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
511
512
513
                self.published = True
                self.publish_time = datetime.now()
                self.save()
514

Markus Scheidgen's avatar
Markus Scheidgen committed
515
    @process
516
    def process_upload(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
517
518
519
520
521
522
523
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
        pass

524
    @property
525
526
527
528
529
    def upload_files(self) -> UploadFiles:
        upload_files_class = ArchiveBasedStagingUploadFiles if not self.published else PublicUploadFiles

        if not self._upload_files or not isinstance(self._upload_files, upload_files_class):
            self._upload_files = upload_files_class(
530
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload_path)
531

532
        return self._upload_files
533

534
535
536
537
538
    @property
    def staging_upload_files(self) -> ArchiveBasedStagingUploadFiles:
        assert not self.published
        return cast(ArchiveBasedStagingUploadFiles, self.upload_files)

Markus Scheidgen's avatar
Markus Scheidgen committed
539
540
    @task
    def extracting(self):
541
542
543
544
545
        """
        Task performed before the actual parsing/normalizing. Extracting and bagging
        the uploaded files, computing all keys, create an *upload* entry in the NOMAD-coe
        repository db, etc.
        """
546
547
548
549
550
        # extract the uploaded file
        self._upload_files = ArchiveBasedStagingUploadFiles(
            upload_id=self.upload_id, is_authorized=lambda: True, create=True,
            upload_path=self.upload_path)

Markus Scheidgen's avatar
Markus Scheidgen committed
551
552
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
553
554
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
555
556
                    upload_size=self.upload_files.size):
                self.upload_files.extract()
557
558
559
560

            if self.temporary:
                os.remove(self.upload_path)
                self.upload_path = None
561

562
        except KeyError:
563
            self.fail('processing requested for non existing upload', log_level=logging.ERROR)
564
565
566
            return
        except ExtractError:
            self.fail('bad .zip/.tar file', log_level=logging.INFO)
Markus Scheidgen's avatar
Markus Scheidgen committed
567
568
            return

569
    def match_mainfiles(self) -> Generator[Tuple[str, object], None, None]:
570
571
572
573
574
575
576
        """
        Generator function that matches all files in the upload to all parsers to
        determine the upload's mainfiles.

        Returns:
            Tuples of mainfile, filename, and parsers
        """
577
        directories_with_match: Dict[str, str] = dict()
578
579
        upload_files = self.staging_upload_files
        for filename in upload_files.raw_file_manifest():
580
            try:
581
                parser = match_parser(filename, upload_files)
582
                if parser is not None:
583
584
                    directory = os.path.dirname(filename)
                    if directory in directories_with_match:
585
586
587
                        # TODO this might give us the chance to store directory based relationship
                        # between calcs for the future?
                        pass
588
589
                    else:
                        directories_with_match[directory] = filename
590
591

                    yield filename, parser
592
593
594
595
            except Exception as e:
                self.get_logger().error(
                    'exception while matching pot. mainfile',
                    mainfile=filename, exc_info=e)
596

Markus Scheidgen's avatar
Markus Scheidgen committed
597
598
    @task
    def parse_all(self):
599
        """
600
        Identified mainfile/parser combinations among the upload's files, creates
601
602
        respective :class:`Calc` instances, and triggers their processing.
        """
603
604
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
605
606
        with utils.timer(
                logger, 'upload extracted', step='matching',
607
608
                upload_size=self.upload_files.size):
            for filename, parser in self.match_mainfiles():
609
                calc = Calc.create(
610
                    calc_id=self.upload_files.calc_id(filename),
611
                    mainfile=filename, parser=parser.name,
612
                    worker_hostname=self.worker_hostname,
613
614
                    upload_id=self.upload_id)

615
                calc.process_calc()
Markus Scheidgen's avatar
Markus Scheidgen committed
616

617
618
619
620
621
622
623
624
625
626
627
628
    def on_process_complete(self, process_name):
        if process_name == 'process_upload':
            self.check_join()

    def check_join(self):
        total_calcs = self.total_calcs
        processed_calcs = self.processed_calcs

        self.get_logger().debug('check join', processed_calcs=processed_calcs, total_calcs=total_calcs)
        if not self.process_running and processed_calcs >= total_calcs:
            self.get_logger().debug('join')
            self.join()
629
630
631

    def join(self):
        self.cleanup()
Markus Scheidgen's avatar
Markus Scheidgen committed
632
633
634

    @task
    def cleanup(self):
635
636
        search.refresh()

637
638
639
640
641
642
643
644
        # send email about process finish
        user = self.uploader
        name = '%s %s' % (user.first_name, user.last_name)
        message = '\n'.join([
            'Dear %s,' % name,
            '',
            'your data %suploaded %s has completed processing.' % (
                self.name if self.name else '', self.upload_time.isoformat()),
Markus Scheidgen's avatar
Markus Scheidgen committed
645
            'You can review your data on your upload page: %s/uploads' % config.api_url()[:-3]
646
        ])
647
648
649
650
651
652
653
        try:
            infrastructure.send_mail(
                name=name, email=user.email, message=message, subject='Processing completed')
        except Exception as e:
            # probably due to email configuration problems
            # don't fail or present this error to clients
            self.logger.error('could not send after processing email', exc_info=e)
Markus Scheidgen's avatar
Markus Scheidgen committed
654

655
656
657
    def get_calc(self, calc_id) -> Calc:
        return Calc.objects(upload_id=self.upload_id, calc_id=calc_id).first()

Markus Scheidgen's avatar
Markus Scheidgen committed
658
    @property
659
    def processed_calcs(self):
660
        return Calc.objects(upload_id=self.upload_id, tasks_status__in=[SUCCESS, FAILURE]).count()
661
662
663
664
665
666
667

    @property
    def total_calcs(self):
        return Calc.objects(upload_id=self.upload_id).count()

    @property
    def failed_calcs(self):
668
        return Calc.objects(upload_id=self.upload_id, tasks_status=FAILURE).count()
669

670
671
    @property
    def pending_calcs(self):
672
        return Calc.objects(upload_id=self.upload_id, tasks_status=PENDING).count()
673

674
675
    def all_calcs(self, start, end, order_by='mainfile'):
        return Calc.objects(upload_id=self.upload_id)[start:end].order_by(order_by)
676
677
678

    @property
    def calcs(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
679
        return Calc.objects(upload_id=self.upload_id, tasks_status=SUCCESS)
Markus Scheidgen's avatar
Markus Scheidgen committed
680
681

    def to_upload_with_metadata(self) -> UploadWithMetadata:
682
683
684
685
        # prepare user metadata per upload and per calc
        calc_metadatas: Dict[str, Any] = dict()
        upload_metadata: Dict[str, Any] = dict()

Markus Scheidgen's avatar
Markus Scheidgen committed
686
        if self.metadata is not None:
687
688
689
            upload_metadata.update(self.metadata)
            if 'calculations' in upload_metadata:
                del(upload_metadata['calculations'])
Markus Scheidgen's avatar
Markus Scheidgen committed
690

691
692
            for calc in self.metadata.get('calculations', []):
                calc_metadatas[calc['mainfile']] = calc
Markus Scheidgen's avatar
Markus Scheidgen committed
693

694
        user_upload_time = upload_metadata.get('_upload_time', None)
Markus Scheidgen's avatar
Markus Scheidgen committed
695
696
697
698
699
        result = UploadWithMetadata(
            upload_id=self.upload_id,
            uploader=utils.POPO(id=int(self.user_id)),
            upload_time=self.upload_time if user_upload_time is None else user_upload_time)

700
701
702
703
704
        def get_metadata(calc: Calc):
            """
            Assemble metadata from calc's processed calc metadata and the uploads
            user metadata.
            """
705
            calc_data = calc.metadata
706
            calc_with_metadata = CalcWithMetadata(**calc_data)
707
708
            calc_metadata = dict(upload_metadata)
            calc_metadata.update(calc_metadatas.get(calc.mainfile, {}))
709
710
711
712
            calc_with_metadata.apply_user_metadata(calc_metadata)

            return calc_with_metadata

713
        result.calcs = [get_metadata(calc) for calc in Calc.objects(upload_id=self.upload_id)]
714

Markus Scheidgen's avatar
Markus Scheidgen committed
715
        return result
716
717

    def __str__(self):
718
        return 'upload %s upload_id%s' % (super().__str__(), self.upload_id)