data.py 28.4 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
    :members:
.. autoclass:: Upload
    :members:
"""

27
from typing import List, Any, ContextManager, Tuple, Generator, Dict, cast
28
from mongoengine import StringField, DateTimeField, DictField, BooleanField
Markus Scheidgen's avatar
Markus Scheidgen committed
29
import logging
30
from structlog import wrap_logger
31
from contextlib import contextmanager
32
import os.path
33
34
from datetime import datetime
from pymongo import UpdateOne
Markus Scheidgen's avatar
Markus Scheidgen committed
35

36
from nomad import utils, coe_repo, config, infrastructure, search
37
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles
38
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
39
from nomad.parsing import parser_dict, match_parser, LocalBackend
Markus Scheidgen's avatar
Markus Scheidgen committed
40
from nomad.normalizing import normalizers
Markus Scheidgen's avatar
Markus Scheidgen committed
41
from nomad.datamodel import UploadWithMetadata, CalcWithMetadata
Markus Scheidgen's avatar
Markus Scheidgen committed
42
43


Markus Scheidgen's avatar
Markus Scheidgen committed
44
class Calc(Proc):
Markus Scheidgen's avatar
Markus Scheidgen committed
45
46
47
48
49
50
51
52
53
54
    """
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

    The attribute list, does not include the various repository properties generated
    while parsing, including ``program_name``, ``program_version``, etc.

    Attributes:
55
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
56
57
58
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
59
60

        metadata: the metadata record wit calc and user metadata, see :class:`CalcWithMetadata`
Markus Scheidgen's avatar
Markus Scheidgen committed
61
    """
62
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
63
64
65
66
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

67
68
    metadata = DictField()

69
70
    queue = 'calcs'

Markus Scheidgen's avatar
Markus Scheidgen committed
71
    meta: Any = {
Markus Scheidgen's avatar
Markus Scheidgen committed
72
        'indexes': [
73
            'upload_id', 'mainfile', 'parser', 'tasks_status', 'process_status'
Markus Scheidgen's avatar
Markus Scheidgen committed
74
75
76
77
78
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
79
        self._parser_backend: LocalBackend = None
80
81
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
82
        self._calc_proc_logwriter = None
83
        self._calc_proc_logwriter_ctx: ContextManager = None
Markus Scheidgen's avatar
Markus Scheidgen committed
84
85
86

    @classmethod
    def get(cls, id):
87
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
88

Markus Scheidgen's avatar
Markus Scheidgen committed
89
    @property
90
91
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
92

93
94
95
96
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
97
            self._upload.worker_hostname = self.worker_hostname
98
99
        return self._upload

100
101
102
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
103
104
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
105
106
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
107
    def get_logger(self, **kwargs):
108
109
110
111
        """
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
        """
112
113
114
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
115

116
        if self._calc_proc_logwriter_ctx is None:
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
            try:
                self._calc_proc_logwriter_ctx = self.upload_files.archive_log_file(self.calc_id, 'wt')
                self._calc_proc_logwriter = self._calc_proc_logwriter_ctx.__enter__()  # pylint: disable=E1101
            except KeyError:
                # cannot open log file
                pass

        if self._calc_proc_logwriter_ctx is None:
            return logger
        else:
            def save_to_calc_log(logger, method_name, event_dict):
                if self._calc_proc_logwriter is not None:
                    program = event_dict.get('normalizer', 'parser')
                    event = event_dict.get('event', '')
                    entry = '[%s] %s: %s' % (method_name, program, event)
                    if len(entry) > 120:
                        self._calc_proc_logwriter.write(entry[:120])
                        self._calc_proc_logwriter.write('...')
                    else:
                        self._calc_proc_logwriter.write(entry)
                    self._calc_proc_logwriter.write('\n')

                return event_dict

            return wrap_logger(logger, processors=[save_to_calc_log])
142

Markus Scheidgen's avatar
Markus Scheidgen committed
143
    @process
144
    def process_calc(self):
145
        logger = self.get_logger()
146
        if self.upload is None:
147
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
148
149

        try:
150
151
152
153
154
155
156
157
158
159
160
161
162
163
            # save preliminary minimum calc metadata in case processing fails
            # successful processing will replace it with the actual metadata
            calc_with_metadata = CalcWithMetadata(
                upload_id=self.upload_id,
                calc_id=self.calc_id,
                calc_hash=self.upload_files.calc_hash(self.mainfile),
                mainfile=self.mainfile)
            calc_with_metadata.published = False
            calc_with_metadata.uploader = self.upload.uploader.to_popo()
            calc_with_metadata.nomad_version = config.version
            calc_with_metadata.last_processing = datetime.now()
            calc_with_metadata.files = self.upload_files.calc_files(self.mainfile)
            self.metadata = calc_with_metadata.to_dict()

Markus Scheidgen's avatar
Markus Scheidgen committed
164
165
166
167
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
168
            # close loghandler that was not closed due to failures
169
            try:
170
171
172
                if self._calc_proc_logwriter is not None:
                    self._calc_proc_logwriter.close()
                    self._calc_proc_logwriter = None
173
174
175
            except Exception as e:
                logger.error('could not close calculation proc log', exc_info=e)

176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
    def fail(self, *errors, log_level=logging.ERROR, **kwargs):
        # in case of failure, index a minimum set of metadata and mark
        # processing failure
        try:
            calc_with_metadata = CalcWithMetadata(**self.metadata)
            calc_with_metadata.formula = config.services.not_processed_value
            calc_with_metadata.basis_set = config.services.not_processed_value
            calc_with_metadata.xc_functional = config.services.not_processed_value
            calc_with_metadata.system = config.services.not_processed_value
            calc_with_metadata.crystal_system = config.services.not_processed_value
            calc_with_metadata.spacegroup = config.services.not_processed_value
            calc_with_metadata.spacegroup_symbol = config.services.not_processed_value
            calc_with_metadata.code_name = config.services.not_processed_value
            calc_with_metadata.code_version = config.services.not_processed_value
            calc_with_metadata.processed = False
            self.metadata = calc_with_metadata.to_dict()
            search.Entry.from_calc_with_metadata(calc_with_metadata).save()
        except Exception as e:
            self.get_logger().error('could not index after processing failure', exc_info=e)

        super().fail(*errors, log_level=log_level, **kwargs)

198
199
200
201
202
203
204
    def on_process_complete(self, process_name):
        # the save might be necessary to correctly read the join condition from the db
        self.save()
        # in case of error, the process_name might be unknown
        if process_name == 'process_calc' or process_name is None:
            self.upload.reload()
            self.upload.check_join()
Markus Scheidgen's avatar
Markus Scheidgen committed
205
206
207

    @task
    def parsing(self):
208
        context = dict(parser=self.parser, step=self.parser)
209
        logger = self.get_logger(**context)
210
        parser = parser_dict[self.parser]
Markus Scheidgen's avatar
Markus Scheidgen committed
211

212
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
213
214
215
216
217
218
219
            try:
                self._parser_backend = parser.run(
                    self.upload_files.raw_file_object(self.mainfile).os_path, logger=logger)
            except Exception as e:
                self.fail(
                    'parser failed with exception', level=logging.ERROR,
                    exc_info=e, error=str(e), **context)
220
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
221

222
223
        # add the non code specific calc metadata to the backend
        # all other quantities have been determined by parsers/normalizers
224
225
        self._parser_backend.openNonOverlappingSection('section_calculation_info')
        self._parser_backend.addValue('upload_id', self.upload_id)
226
        self._parser_backend.addValue('calc_id', self.calc_id)
227
        self._parser_backend.addValue('calc_hash', self.metadata['calc_hash'])
228
229
230
        self._parser_backend.addValue('main_file', self.mainfile)
        self._parser_backend.addValue('parser_name', self.parser)

Markus Scheidgen's avatar
Markus Scheidgen committed
231
        if self._parser_backend.status[0] != 'ParseSuccess':
232
            logger.error(self._parser_backend.status[1])
Markus Scheidgen's avatar
Markus Scheidgen committed
233
            error = self._parser_backend.status[1]
234
            self._parser_backend.addValue('parse_status', 'ParseFailure')
235
            self.fail(error, level=logging.INFO, **context)
236
237
238
239
240
        else:
            self._parser_backend.addValue('parse_status', 'ParseSuccess')

        self._parser_backend.closeNonOverlappingSection('section_calculation_info')

241
242
        self._parser_backend.openNonOverlappingSection('section_repository_info')
        self._parser_backend.addValue('repository_archive_gid', '%s/%s' % (self.upload_id, self.calc_id))
243
        self._parser_backend.addValue('repository_filepaths', self.metadata['files'])
244
245
        self._parser_backend.closeNonOverlappingSection('section_repository_info')

246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
        self.add_processor_info(self.parser)

    @contextmanager
    def use_parser_backend(self, processor_name):
        self._parser_backend.reset_status()
        yield self._parser_backend
        self.add_processor_info(processor_name)

    def add_processor_info(self, processor_name: str) -> None:
        self._parser_backend.openContext('/section_calculation_info/0')
        self._parser_backend.openNonOverlappingSection('section_archive_processing_info')
        self._parser_backend.addValue('archive_processor_name', processor_name)

        if self._parser_backend.status[0] == 'ParseSuccess':
            warnings = getattr(self._parser_backend, '_warnings', [])
            if len(warnings) > 0:
                self._parser_backend.addValue('archive_processor_status', 'WithWarnings')
                self._parser_backend.addValue('archive_processor_warning_number', len(warnings))
                self._parser_backend.addArrayValues('archive_processor_warnings', [str(warning) for warning in warnings])
            else:
                self._parser_backend.addValue('archive_processor_status', 'Success')
        else:
268
            errors = self._parser_backend.status[1]
269
270
271
272
            self._parser_backend.addValue('archive_processor_error', str(errors))

        self._parser_backend.closeNonOverlappingSection('section_archive_processing_info')
        self._parser_backend.closeContext('/section_calculation_info/0')
Markus Scheidgen's avatar
Markus Scheidgen committed
273
274
275
276
277

    @task
    def normalizing(self):
        for normalizer in normalizers:
            normalizer_name = normalizer.__name__
278
            context = dict(normalizer=normalizer_name, step=normalizer_name)
279
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
280
281

            with utils.timer(
282
                    logger, 'normalizer executed', input_size=self.mainfile_file.size):
283
                with self.use_parser_backend(normalizer_name) as backend:
284
285
286
287
288
289
290
                    try:
                        normalizer(backend).normalize(logger=logger)
                    except Exception as e:
                        self.fail(
                            'normalizer failed with exception', level=logging.ERROR,
                            exc_info=e, error=str(e), **context)
                        self._parser_backend.status = ['ParseFailure', str(e)]
Markus Scheidgen's avatar
Markus Scheidgen committed
291

292
293
            failed = self._parser_backend.status[0] != 'ParseSuccess'
            if failed:
294
                logger.error(self._parser_backend.status[1])
Markus Scheidgen's avatar
Markus Scheidgen committed
295
                error = self._parser_backend.status[1]
296
                self.fail(error, level=logging.WARNING, error=error, **context)
297
298
299
300
                break
            else:
                logger.debug(
                    'completed normalizer successfully', normalizer=normalizer_name)
Markus Scheidgen's avatar
Markus Scheidgen committed
301
302
303

    @task
    def archiving(self):
304
305
        logger = self.get_logger()

306
        calc_with_metadata = self._parser_backend.to_calc_with_metadata()
307
        calc_with_metadata.update(**self.metadata)
308
        calc_with_metadata.processed = True
309

310
311
        # persist the calc metadata
        with utils.timer(logger, 'saved calc metadata', step='metadata'):
312
            self.metadata = calc_with_metadata.to_dict()
313
314

        # index in search
315
        with utils.timer(logger, 'indexed', step='index'):
316
            search.Entry.from_calc_with_metadata(calc_with_metadata).save()
317

318
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
319
        with utils.timer(
320
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
321
                input_size=self.mainfile_file.size) as log_data:
322
            with self.upload_files.archive_file(self.calc_id, 'wt') as out:
323
324
                self._parser_backend.write_json(out, pretty=True)

325
            log_data.update(archive_size=self.upload_files.archive_file_object(self.calc_id).size)
Markus Scheidgen's avatar
Markus Scheidgen committed
326
327
328
329

        # close loghandler
        if self._calc_proc_logwriter is not None:
            with utils.timer(
330
                    logger, 'archived log', step='logs',
Markus Scheidgen's avatar
Markus Scheidgen committed
331
                    input_size=self.mainfile_file.size) as log_data:
332
                self._calc_proc_logwriter_ctx.__exit__(None, None, None)  # pylint: disable=E1101
333
                self._calc_proc_logwriter = None
Markus Scheidgen's avatar
Markus Scheidgen committed
334

335
                log_data.update(log_size=self.upload_files.archive_log_file_object(self.calc_id).size)
Markus Scheidgen's avatar
Markus Scheidgen committed
336

337
    def __str__(self):
338
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
339

340

341
class Upload(Proc):
Markus Scheidgen's avatar
Markus Scheidgen committed
342
343
344
345
346
347
    """
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
348
349
        upload_path: the path were the uploaded files was stored
        temporary: True if the uploaded file should be removed after extraction
350
        metadata: optional user provided additional meta data
Markus Scheidgen's avatar
Markus Scheidgen committed
351
352
        upload_id: the upload id generated by the database
        upload_time: the timestamp when the system realised the upload
353
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
354
355
356
357
    """
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
358
359
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
Markus Scheidgen's avatar
Markus Scheidgen committed
360
361

    name = StringField(default=None)
362
    metadata = DictField(default=None)
Markus Scheidgen's avatar
Markus Scheidgen committed
363
    upload_time = DateTimeField()
364
    user_id = StringField(required=True)
365
366
    published = BooleanField(default=False)
    publish_time = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
367

368
369
    queue = 'uploads'

Markus Scheidgen's avatar
Markus Scheidgen committed
370
371
    meta: Any = {
        'indexes': [
372
            'user_id', 'tasks_status', 'process_status', 'published'
Markus Scheidgen's avatar
Markus Scheidgen committed
373
374
375
376
377
        ]
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
378
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
379
380

    @classmethod
381
382
383
384
385
386
387
    def get(cls, id: str, include_published: bool = False) -> 'Upload':
        upload = cls.get_by_id(id, 'upload_id')
        # TODO published uploads should not be hidden by this and API
        if upload is not None and (not upload.published or include_published):
            return upload

        raise KeyError()
Markus Scheidgen's avatar
Markus Scheidgen committed
388
389

    @classmethod
390
    def user_uploads(cls, user: coe_repo.User) -> List['Upload']:
Markus Scheidgen's avatar
Markus Scheidgen committed
391
        """ Returns all uploads for the given user. Currently returns all uploads. """
392
        return cls.objects(user_id=str(user.user_id), published=False)
Markus Scheidgen's avatar
Markus Scheidgen committed
393

394
395
396
397
    @property
    def uploader(self):
        return coe_repo.User.from_user_id(self.user_id)

Markus Scheidgen's avatar
Markus Scheidgen committed
398
399
    def get_logger(self, **kwargs):
        logger = super().get_logger()
400
        logger = logger.bind(upload_id=self.upload_id, upload_name=self.name, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
401
402
403
404
405
406
407
408
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
        """
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
409
410

        Arguments:
411
            user (coe_repo.User): The user that created the upload.
Markus Scheidgen's avatar
Markus Scheidgen committed
412
        """
413
        user: coe_repo.User = kwargs['user']
414
415
416
        del(kwargs['user'])
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
417
        kwargs.update(user_id=str(user.user_id))
Markus Scheidgen's avatar
Markus Scheidgen committed
418
        self = super().create(**kwargs)
419

Markus Scheidgen's avatar
Markus Scheidgen committed
420
        self._continue_with('uploading')
421

Markus Scheidgen's avatar
Markus Scheidgen committed
422
423
        return self

424
425
426
427
428
429
430
431
432
433
434
435
436
437
    def delete(self):
        """ Deletes this upload process state entry and its calcs. """
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

    @process
    def delete_upload(self):
        """
        Deletes of the upload, including its processing state and
        staging files.
        """
        logger = self.get_logger()

        with utils.lnr(logger, 'staged upload delete failed'):
438
439
440
441
442
443

            with utils.timer(
                    logger, 'upload deleted from repo db', step='repo',
                    upload_size=self.upload_files.size):
                coe_repo.Upload.delete(self.upload_id)

444
            with utils.timer(
445
                    logger, 'upload deleted from index', step='index',
446
                    upload_size=self.upload_files.size):
447
                search.delete_upload(self.upload_id)
448

449
            with utils.timer(
450
                    logger, 'staged upload deleted', step='files',
451
452
453
454
455
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
                self.delete()

        return True  # do not save the process status on the delete upload
456

457
    @process
458
    def publish_upload(self):
459
460
461
462
463
464
        """
        Moves the upload out of staging to add it to the coe repository. It will
        pack the staging upload files in to public upload files, add entries to the
        coe repository db and remove this instance and its calculation from the
        processing state db.
        """
465
466
        assert self.processed_calcs > 0

467
        logger = self.get_logger()
468
        logger.info('started to publish')
469

470
        with utils.lnr(logger, 'publish failed'):
471
472
            upload_with_metadata = self.to_upload_with_metadata()

473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
            if config.repository_db.publish_enabled:
                with utils.timer(
                        logger, 'upload added to repository', step='repo',
                        upload_size=self.upload_files.size):
                    coe_repo.Upload.publish(upload_with_metadata)

            if config.repository_db.publish_enabled:
                coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id)
                if coe_upload is not None:
                    calcs = [coe_calc.to_calc_with_metadata() for coe_calc in coe_upload.calcs]
                else:
                    calcs = []
            else:
                calcs = upload_with_metadata.calcs

488
            with utils.timer(
489
                    logger, 'upload metadata updated', step='metadata',
490
                    upload_size=self.upload_files.size):
491
492

                def create_update(calc):
493
                    calc.published = True
494
495
496
497
498
                    return UpdateOne(
                        {'_id': calc.calc_id},
                        {'$set': {'metadata': calc.to_dict()}})

                Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])
499

500
501
502
            with utils.timer(
                    logger, 'staged upload files packed', step='pack',
                    upload_size=self.upload_files.size):
503
                self.upload_files.pack(upload_with_metadata)
504
505
506
507

            with utils.timer(
                    logger, 'index updated', step='index',
                    upload_size=self.upload_files.size):
508
                search.publish(calcs)
509
510
511
512
513

            with utils.timer(
                    logger, 'staged upload deleted', step='delete staged',
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
514
515
516
                self.published = True
                self.publish_time = datetime.now()
                self.save()
517

Markus Scheidgen's avatar
Markus Scheidgen committed
518
    @process
519
    def process_upload(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
520
521
522
523
524
525
526
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
        pass

527
    @property
528
529
530
531
532
    def upload_files(self) -> UploadFiles:
        upload_files_class = ArchiveBasedStagingUploadFiles if not self.published else PublicUploadFiles

        if not self._upload_files or not isinstance(self._upload_files, upload_files_class):
            self._upload_files = upload_files_class(
533
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload_path)
534

535
        return self._upload_files
536

537
538
539
540
541
    @property
    def staging_upload_files(self) -> ArchiveBasedStagingUploadFiles:
        assert not self.published
        return cast(ArchiveBasedStagingUploadFiles, self.upload_files)

Markus Scheidgen's avatar
Markus Scheidgen committed
542
543
    @task
    def extracting(self):
544
545
546
547
548
        """
        Task performed before the actual parsing/normalizing. Extracting and bagging
        the uploaded files, computing all keys, create an *upload* entry in the NOMAD-coe
        repository db, etc.
        """
549
550
551
552
553
        # extract the uploaded file
        self._upload_files = ArchiveBasedStagingUploadFiles(
            upload_id=self.upload_id, is_authorized=lambda: True, create=True,
            upload_path=self.upload_path)

Markus Scheidgen's avatar
Markus Scheidgen committed
554
555
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
556
557
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
558
559
                    upload_size=self.upload_files.size):
                self.upload_files.extract()
560
561
562
563

            if self.temporary:
                os.remove(self.upload_path)
                self.upload_path = None
564

565
        except KeyError:
566
            self.fail('processing requested for non existing upload', log_level=logging.ERROR)
567
568
569
            return
        except ExtractError:
            self.fail('bad .zip/.tar file', log_level=logging.INFO)
Markus Scheidgen's avatar
Markus Scheidgen committed
570
571
            return

572
    def match_mainfiles(self) -> Generator[Tuple[str, object], None, None]:
573
574
575
576
577
578
579
        """
        Generator function that matches all files in the upload to all parsers to
        determine the upload's mainfiles.

        Returns:
            Tuples of mainfile, filename, and parsers
        """
580
        directories_with_match: Dict[str, str] = dict()
581
582
        upload_files = self.staging_upload_files
        for filename in upload_files.raw_file_manifest():
583
            try:
584
                parser = match_parser(filename, upload_files)
585
                if parser is not None:
586
587
                    directory = os.path.dirname(filename)
                    if directory in directories_with_match:
588
589
590
                        # TODO this might give us the chance to store directory based relationship
                        # between calcs for the future?
                        pass
591
592
                    else:
                        directories_with_match[directory] = filename
593
594

                    yield filename, parser
595
596
597
598
            except Exception as e:
                self.get_logger().error(
                    'exception while matching pot. mainfile',
                    mainfile=filename, exc_info=e)
599

Markus Scheidgen's avatar
Markus Scheidgen committed
600
601
    @task
    def parse_all(self):
602
        """
603
        Identified mainfile/parser combinations among the upload's files, creates
604
605
        respective :class:`Calc` instances, and triggers their processing.
        """
606
607
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
608
609
        with utils.timer(
                logger, 'upload extracted', step='matching',
610
611
                upload_size=self.upload_files.size):
            for filename, parser in self.match_mainfiles():
612
                calc = Calc.create(
613
                    calc_id=self.upload_files.calc_id(filename),
614
                    mainfile=filename, parser=parser.name,
615
                    worker_hostname=self.worker_hostname,
616
617
                    upload_id=self.upload_id)

618
                calc.process_calc()
Markus Scheidgen's avatar
Markus Scheidgen committed
619

620
621
622
623
624
625
626
627
628
629
630
631
    def on_process_complete(self, process_name):
        if process_name == 'process_upload':
            self.check_join()

    def check_join(self):
        total_calcs = self.total_calcs
        processed_calcs = self.processed_calcs

        self.get_logger().debug('check join', processed_calcs=processed_calcs, total_calcs=total_calcs)
        if not self.process_running and processed_calcs >= total_calcs:
            self.get_logger().debug('join')
            self.join()
632
633
634

    def join(self):
        self.cleanup()
Markus Scheidgen's avatar
Markus Scheidgen committed
635
636
637

    @task
    def cleanup(self):
638
639
        search.refresh()

640
641
642
643
644
645
646
647
        # send email about process finish
        user = self.uploader
        name = '%s %s' % (user.first_name, user.last_name)
        message = '\n'.join([
            'Dear %s,' % name,
            '',
            'your data %suploaded %s has completed processing.' % (
                self.name if self.name else '', self.upload_time.isoformat()),
648
            'You can review your data on your upload page: %s' % config.upload_url()
649
        ])
650
651
652
653
654
655
656
        try:
            infrastructure.send_mail(
                name=name, email=user.email, message=message, subject='Processing completed')
        except Exception as e:
            # probably due to email configuration problems
            # don't fail or present this error to clients
            self.logger.error('could not send after processing email', exc_info=e)
Markus Scheidgen's avatar
Markus Scheidgen committed
657

658
659
660
    def get_calc(self, calc_id) -> Calc:
        return Calc.objects(upload_id=self.upload_id, calc_id=calc_id).first()

Markus Scheidgen's avatar
Markus Scheidgen committed
661
    @property
662
    def processed_calcs(self):
663
        return Calc.objects(upload_id=self.upload_id, tasks_status__in=[SUCCESS, FAILURE]).count()
664
665
666
667
668
669
670

    @property
    def total_calcs(self):
        return Calc.objects(upload_id=self.upload_id).count()

    @property
    def failed_calcs(self):
671
        return Calc.objects(upload_id=self.upload_id, tasks_status=FAILURE).count()
672

673
674
    @property
    def pending_calcs(self):
675
        return Calc.objects(upload_id=self.upload_id, tasks_status=PENDING).count()
676

677
678
    def all_calcs(self, start, end, order_by='mainfile'):
        return Calc.objects(upload_id=self.upload_id)[start:end].order_by(order_by)
679
680
681

    @property
    def calcs(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
682
        return Calc.objects(upload_id=self.upload_id, tasks_status=SUCCESS)
Markus Scheidgen's avatar
Markus Scheidgen committed
683
684

    def to_upload_with_metadata(self) -> UploadWithMetadata:
685
686
687
688
        # prepare user metadata per upload and per calc
        calc_metadatas: Dict[str, Any] = dict()
        upload_metadata: Dict[str, Any] = dict()

Markus Scheidgen's avatar
Markus Scheidgen committed
689
        if self.metadata is not None:
690
691
692
            upload_metadata.update(self.metadata)
            if 'calculations' in upload_metadata:
                del(upload_metadata['calculations'])
Markus Scheidgen's avatar
Markus Scheidgen committed
693

694
695
            for calc in self.metadata.get('calculations', []):
                calc_metadatas[calc['mainfile']] = calc
Markus Scheidgen's avatar
Markus Scheidgen committed
696

697
        user_upload_time = upload_metadata.get('_upload_time', None)
Markus Scheidgen's avatar
Markus Scheidgen committed
698
699
700
701
702
        result = UploadWithMetadata(
            upload_id=self.upload_id,
            uploader=utils.POPO(id=int(self.user_id)),
            upload_time=self.upload_time if user_upload_time is None else user_upload_time)

703
704
705
706
707
        def get_metadata(calc: Calc):
            """
            Assemble metadata from calc's processed calc metadata and the uploads
            user metadata.
            """
708
            calc_data = calc.metadata
709
            calc_with_metadata = CalcWithMetadata(**calc_data)
710
711
            calc_metadata = dict(upload_metadata)
            calc_metadata.update(calc_metadatas.get(calc.mainfile, {}))
712
713
714
715
            calc_with_metadata.apply_user_metadata(calc_metadata)

            return calc_with_metadata

716
        result.calcs = [get_metadata(calc) for calc in Calc.objects(upload_id=self.upload_id)]
717

Markus Scheidgen's avatar
Markus Scheidgen committed
718
        return result
719
720

    def __str__(self):
721
        return 'upload %s upload_id%s' % (super().__str__(), self.upload_id)