data.py 26.3 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
    :members:
.. autoclass:: Upload
    :members:
"""

27
from typing import List, Any, ContextManager, Tuple, Generator, Dict, cast
28
from mongoengine import StringField, DateTimeField, DictField, BooleanField
Markus Scheidgen's avatar
Markus Scheidgen committed
29
import logging
30
from structlog import wrap_logger
31
from contextlib import contextmanager
32
import os.path
33
34
from datetime import datetime
from pymongo import UpdateOne
Markus Scheidgen's avatar
Markus Scheidgen committed
35

36
from nomad import utils, coe_repo, config, infrastructure, search
37
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles
38
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
39
from nomad.parsing import parser_dict, match_parser, LocalBackend
Markus Scheidgen's avatar
Markus Scheidgen committed
40
from nomad.normalizing import normalizers
Markus Scheidgen's avatar
Markus Scheidgen committed
41
from nomad.datamodel import UploadWithMetadata, CalcWithMetadata
Markus Scheidgen's avatar
Markus Scheidgen committed
42
43


Markus Scheidgen's avatar
Markus Scheidgen committed
44
class Calc(Proc):
Markus Scheidgen's avatar
Markus Scheidgen committed
45
46
47
48
49
50
51
52
53
54
    """
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

    The attribute list, does not include the various repository properties generated
    while parsing, including ``program_name``, ``program_version``, etc.

    Attributes:
55
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
56
57
58
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
59
60

        metadata: the metadata record wit calc and user metadata, see :class:`CalcWithMetadata`
Markus Scheidgen's avatar
Markus Scheidgen committed
61
    """
62
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
63
64
65
66
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

67
68
    metadata = DictField()

69
70
    queue = 'calcs'

Markus Scheidgen's avatar
Markus Scheidgen committed
71
    meta: Any = {
Markus Scheidgen's avatar
Markus Scheidgen committed
72
73
        'indexes': [
            'upload_id', 'mainfile', 'parser', 'tasks_status'
Markus Scheidgen's avatar
Markus Scheidgen committed
74
75
76
77
78
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
79
        self._parser_backend: LocalBackend = None
80
81
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
82
        self._calc_proc_logwriter = None
83
        self._calc_proc_logwriter_ctx: ContextManager = None
Markus Scheidgen's avatar
Markus Scheidgen committed
84
85
86

    @classmethod
    def get(cls, id):
87
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
88

Markus Scheidgen's avatar
Markus Scheidgen committed
89
    @property
90
91
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
92

93
94
95
96
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
97
            self._upload.worker_hostname = self.worker_hostname
98
99
        return self._upload

100
101
102
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
103
104
            self._upload_files = ArchiveBasedStagingUploadFiles(
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
105
106
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
107
    def get_logger(self, **kwargs):
108
109
110
111
        """
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
        """
112
113
114
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
115

116
        if self._calc_proc_logwriter_ctx is None:
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
            try:
                self._calc_proc_logwriter_ctx = self.upload_files.archive_log_file(self.calc_id, 'wt')
                self._calc_proc_logwriter = self._calc_proc_logwriter_ctx.__enter__()  # pylint: disable=E1101
            except KeyError:
                # cannot open log file
                pass

        if self._calc_proc_logwriter_ctx is None:
            return logger
        else:
            def save_to_calc_log(logger, method_name, event_dict):
                if self._calc_proc_logwriter is not None:
                    program = event_dict.get('normalizer', 'parser')
                    event = event_dict.get('event', '')
                    entry = '[%s] %s: %s' % (method_name, program, event)
                    if len(entry) > 120:
                        self._calc_proc_logwriter.write(entry[:120])
                        self._calc_proc_logwriter.write('...')
                    else:
                        self._calc_proc_logwriter.write(entry)
                    self._calc_proc_logwriter.write('\n')

                return event_dict

            return wrap_logger(logger, processors=[save_to_calc_log])
142

Markus Scheidgen's avatar
Markus Scheidgen committed
143
    @process
144
    def process_calc(self):
145
        logger = self.get_logger()
146
        if self.upload is None:
147
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
148
149
150
151
152
153

        try:
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
154
            # close loghandler that was not closed due to failures
155
            try:
156
157
158
                if self._calc_proc_logwriter is not None:
                    self._calc_proc_logwriter.close()
                    self._calc_proc_logwriter = None
159
160
161
            except Exception as e:
                logger.error('could not close calculation proc log', exc_info=e)

162
163
164
165
166
167
168
    def on_process_complete(self, process_name):
        # the save might be necessary to correctly read the join condition from the db
        self.save()
        # in case of error, the process_name might be unknown
        if process_name == 'process_calc' or process_name is None:
            self.upload.reload()
            self.upload.check_join()
Markus Scheidgen's avatar
Markus Scheidgen committed
169
170
171

    @task
    def parsing(self):
172
        context = dict(parser=self.parser, step=self.parser)
173
        logger = self.get_logger(**context)
174
        parser = parser_dict[self.parser]
Markus Scheidgen's avatar
Markus Scheidgen committed
175

176
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
177
178
179
180
181
182
183
            try:
                self._parser_backend = parser.run(
                    self.upload_files.raw_file_object(self.mainfile).os_path, logger=logger)
            except Exception as e:
                self.fail(
                    'parser failed with exception', level=logging.ERROR,
                    exc_info=e, error=str(e), **context)
184
                return
Markus Scheidgen's avatar
Markus Scheidgen committed
185

186
187
        self._parser_backend.openNonOverlappingSection('section_calculation_info')
        self._parser_backend.addValue('upload_id', self.upload_id)
188
189
        self._parser_backend.addValue('calc_id', self.calc_id)
        self._parser_backend.addValue('calc_hash', self.upload_files.calc_hash(self.mainfile))
190
191
192
        self._parser_backend.addValue('main_file', self.mainfile)
        self._parser_backend.addValue('parser_name', self.parser)

Markus Scheidgen's avatar
Markus Scheidgen committed
193
        if self._parser_backend.status[0] != 'ParseSuccess':
194
            logger.error(self._parser_backend.status[1])
Markus Scheidgen's avatar
Markus Scheidgen committed
195
            error = self._parser_backend.status[1]
196
            self._parser_backend.addValue('parse_status', 'ParseFailure')
197
            self.fail(error, level=logging.INFO, **context)
198
199
200
201
202
        else:
            self._parser_backend.addValue('parse_status', 'ParseSuccess')

        self._parser_backend.closeNonOverlappingSection('section_calculation_info')

203
204
205
206
207
208
        self._parser_backend.openNonOverlappingSection('section_repository_info')
        self._parser_backend.addValue('repository_archive_gid', '%s/%s' % (self.upload_id, self.calc_id))
        self._parser_backend.addValue(
            'repository_filepaths', self.upload_files.calc_files(self.mainfile))
        self._parser_backend.closeNonOverlappingSection('section_repository_info')

209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
        self.add_processor_info(self.parser)

    @contextmanager
    def use_parser_backend(self, processor_name):
        self._parser_backend.reset_status()
        yield self._parser_backend
        self.add_processor_info(processor_name)

    def add_processor_info(self, processor_name: str) -> None:
        self._parser_backend.openContext('/section_calculation_info/0')
        self._parser_backend.openNonOverlappingSection('section_archive_processing_info')
        self._parser_backend.addValue('archive_processor_name', processor_name)

        if self._parser_backend.status[0] == 'ParseSuccess':
            warnings = getattr(self._parser_backend, '_warnings', [])
            if len(warnings) > 0:
                self._parser_backend.addValue('archive_processor_status', 'WithWarnings')
                self._parser_backend.addValue('archive_processor_warning_number', len(warnings))
                self._parser_backend.addArrayValues('archive_processor_warnings', [str(warning) for warning in warnings])
            else:
                self._parser_backend.addValue('archive_processor_status', 'Success')
        else:
231
            errors = self._parser_backend.status[1]
232
233
234
235
            self._parser_backend.addValue('archive_processor_error', str(errors))

        self._parser_backend.closeNonOverlappingSection('section_archive_processing_info')
        self._parser_backend.closeContext('/section_calculation_info/0')
Markus Scheidgen's avatar
Markus Scheidgen committed
236
237
238
239
240

    @task
    def normalizing(self):
        for normalizer in normalizers:
            normalizer_name = normalizer.__name__
241
            context = dict(normalizer=normalizer_name, step=normalizer_name)
242
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
243
244

            with utils.timer(
245
                    logger, 'normalizer executed', input_size=self.mainfile_file.size):
246
                with self.use_parser_backend(normalizer_name) as backend:
247
248
249
250
251
252
253
                    try:
                        normalizer(backend).normalize(logger=logger)
                    except Exception as e:
                        self.fail(
                            'normalizer failed with exception', level=logging.ERROR,
                            exc_info=e, error=str(e), **context)
                        self._parser_backend.status = ['ParseFailure', str(e)]
Markus Scheidgen's avatar
Markus Scheidgen committed
254

255
256
            failed = self._parser_backend.status[0] != 'ParseSuccess'
            if failed:
257
                logger.error(self._parser_backend.status[1])
Markus Scheidgen's avatar
Markus Scheidgen committed
258
                error = self._parser_backend.status[1]
259
                self.fail(error, level=logging.WARNING, error=error, **context)
260
261
262
263
                break
            else:
                logger.debug(
                    'completed normalizer successfully', normalizer=normalizer_name)
Markus Scheidgen's avatar
Markus Scheidgen committed
264
265
266

    @task
    def archiving(self):
267
268
        logger = self.get_logger()

269
        calc_with_metadata = self._parser_backend.to_calc_with_metadata()
270
271
272
273
274
        calc_with_metadata.published = False
        calc_with_metadata.uploader = self.upload.uploader.to_popo()
        calc_with_metadata.processed = True
        calc_with_metadata.last_processing = datetime.now()
        calc_with_metadata.nomad_version = config.version
275

276
        # persist the repository metadata
277
        with utils.timer(logger, 'saved repo metadata', step='metadata'):
278
279
            self.metadata = calc_with_metadata.to_dict()
            self.save()
280
281

        # index in search
282
        with utils.timer(logger, 'indexed', step='index'):
283
            search.Entry.from_calc_with_metadata(calc_with_metadata).save()
284

285
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
286
        with utils.timer(
287
                logger, 'archived', step='archive',
Markus Scheidgen's avatar
Markus Scheidgen committed
288
                input_size=self.mainfile_file.size) as log_data:
289
            with self.upload_files.archive_file(self.calc_id, 'wt') as out:
290
291
                self._parser_backend.write_json(out, pretty=True)

292
            log_data.update(archive_size=self.upload_files.archive_file_object(self.calc_id).size)
Markus Scheidgen's avatar
Markus Scheidgen committed
293
294
295
296

        # close loghandler
        if self._calc_proc_logwriter is not None:
            with utils.timer(
297
                    logger, 'archived log', step='logs',
Markus Scheidgen's avatar
Markus Scheidgen committed
298
                    input_size=self.mainfile_file.size) as log_data:
299
                self._calc_proc_logwriter_ctx.__exit__(None, None, None)  # pylint: disable=E1101
300
                self._calc_proc_logwriter = None
Markus Scheidgen's avatar
Markus Scheidgen committed
301

302
                log_data.update(log_size=self.upload_files.archive_log_file_object(self.calc_id).size)
Markus Scheidgen's avatar
Markus Scheidgen committed
303

304
    def __str__(self):
305
        return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
306

307

308
class Upload(Proc):
Markus Scheidgen's avatar
Markus Scheidgen committed
309
310
311
312
313
314
    """
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
315
316
        upload_path: the path were the uploaded files was stored
        temporary: True if the uploaded file should be removed after extraction
317
        metadata: optional user provided additional meta data
Markus Scheidgen's avatar
Markus Scheidgen committed
318
319
        upload_id: the upload id generated by the database
        upload_time: the timestamp when the system realised the upload
320
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
321
322
323
324
    """
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)
325
326
    upload_path = StringField(default=None)
    temporary = BooleanField(default=False)
Markus Scheidgen's avatar
Markus Scheidgen committed
327
328

    name = StringField(default=None)
329
    metadata = DictField(default=None)
Markus Scheidgen's avatar
Markus Scheidgen committed
330
    upload_time = DateTimeField()
331
    user_id = StringField(required=True)
332
333
    published = BooleanField(default=False)
    publish_time = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
334

335
336
    queue = 'uploads'

Markus Scheidgen's avatar
Markus Scheidgen committed
337
338
    meta: Any = {
        'indexes': [
339
            'user_id', 'tasks_status'
Markus Scheidgen's avatar
Markus Scheidgen committed
340
341
342
343
344
        ]
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
345
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
346
347

    @classmethod
348
349
350
351
352
353
354
    def get(cls, id: str, include_published: bool = False) -> 'Upload':
        upload = cls.get_by_id(id, 'upload_id')
        # TODO published uploads should not be hidden by this and API
        if upload is not None and (not upload.published or include_published):
            return upload

        raise KeyError()
Markus Scheidgen's avatar
Markus Scheidgen committed
355
356

    @classmethod
357
    def user_uploads(cls, user: coe_repo.User) -> List['Upload']:
Markus Scheidgen's avatar
Markus Scheidgen committed
358
        """ Returns all uploads for the given user. Currently returns all uploads. """
359
        return cls.objects(user_id=str(user.user_id), published=False)
Markus Scheidgen's avatar
Markus Scheidgen committed
360

361
362
363
364
    @property
    def uploader(self):
        return coe_repo.User.from_user_id(self.user_id)

Markus Scheidgen's avatar
Markus Scheidgen committed
365
366
    def get_logger(self, **kwargs):
        logger = super().get_logger()
367
        logger = logger.bind(upload_id=self.upload_id, upload_name=self.name, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
368
369
370
371
372
373
374
375
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
        """
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
376
377

        Arguments:
378
            user (coe_repo.User): The user that created the upload.
Markus Scheidgen's avatar
Markus Scheidgen committed
379
        """
380
        user: coe_repo.User = kwargs['user']
381
382
383
        del(kwargs['user'])
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
384
        kwargs.update(user_id=str(user.user_id))
Markus Scheidgen's avatar
Markus Scheidgen committed
385
        self = super().create(**kwargs)
386

Markus Scheidgen's avatar
Markus Scheidgen committed
387
        self._continue_with('uploading')
388

Markus Scheidgen's avatar
Markus Scheidgen committed
389
390
        return self

391
392
393
394
395
396
397
398
399
400
401
402
403
404
    def delete(self):
        """ Deletes this upload process state entry and its calcs. """
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

    @process
    def delete_upload(self):
        """
        Deletes of the upload, including its processing state and
        staging files.
        """
        logger = self.get_logger()

        with utils.lnr(logger, 'staged upload delete failed'):
405
406
407
408
409
410

            with utils.timer(
                    logger, 'upload deleted from repo db', step='repo',
                    upload_size=self.upload_files.size):
                coe_repo.Upload.delete(self.upload_id)

411
            with utils.timer(
412
                    logger, 'upload deleted from index', step='index',
413
                    upload_size=self.upload_files.size):
414
                search.delete_upload(self.upload_id)
415

416
            with utils.timer(
417
                    logger, 'staged upload deleted', step='files',
418
419
420
421
422
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
                self.delete()

        return True  # do not save the process status on the delete upload
423

424
    @process
425
    def publish_upload(self):
426
427
428
429
430
431
        """
        Moves the upload out of staging to add it to the coe repository. It will
        pack the staging upload files in to public upload files, add entries to the
        coe repository db and remove this instance and its calculation from the
        processing state db.
        """
432
433
        assert self.processed_calcs > 0

434
        logger = self.get_logger()
435
        logger.info('started to publish')
436

437
        with utils.lnr(logger, 'publish failed'):
438
439
            upload_with_metadata = self.to_upload_with_metadata()

440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
            if config.repository_db.publish_enabled:
                with utils.timer(
                        logger, 'upload added to repository', step='repo',
                        upload_size=self.upload_files.size):
                    coe_repo.Upload.publish(upload_with_metadata)

            if config.repository_db.publish_enabled:
                coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id)
                if coe_upload is not None:
                    calcs = [coe_calc.to_calc_with_metadata() for coe_calc in coe_upload.calcs]
                else:
                    calcs = []
            else:
                calcs = upload_with_metadata.calcs

455
            with utils.timer(
456
                    logger, 'upload metadata updated', step='metadata',
457
                    upload_size=self.upload_files.size):
458
459

                def create_update(calc):
460
                    calc.published = True
461
462
463
464
465
                    return UpdateOne(
                        {'_id': calc.calc_id},
                        {'$set': {'metadata': calc.to_dict()}})

                Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])
466

467
468
469
            with utils.timer(
                    logger, 'staged upload files packed', step='pack',
                    upload_size=self.upload_files.size):
470
                self.upload_files.pack(upload_with_metadata)
471
472
473
474

            with utils.timer(
                    logger, 'index updated', step='index',
                    upload_size=self.upload_files.size):
475
                search.publish(calcs)
476
477
478
479
480

            with utils.timer(
                    logger, 'staged upload deleted', step='delete staged',
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
481
482
483
                self.published = True
                self.publish_time = datetime.now()
                self.save()
484

Markus Scheidgen's avatar
Markus Scheidgen committed
485
    @process
486
    def process_upload(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
487
488
489
490
491
492
493
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
        pass

494
    @property
495
496
497
498
499
    def upload_files(self) -> UploadFiles:
        upload_files_class = ArchiveBasedStagingUploadFiles if not self.published else PublicUploadFiles

        if not self._upload_files or not isinstance(self._upload_files, upload_files_class):
            self._upload_files = upload_files_class(
500
                self.upload_id, is_authorized=lambda: True, upload_path=self.upload_path)
501

502
        return self._upload_files
503

504
505
506
507
508
    @property
    def staging_upload_files(self) -> ArchiveBasedStagingUploadFiles:
        assert not self.published
        return cast(ArchiveBasedStagingUploadFiles, self.upload_files)

Markus Scheidgen's avatar
Markus Scheidgen committed
509
510
    @task
    def extracting(self):
511
512
513
514
515
        """
        Task performed before the actual parsing/normalizing. Extracting and bagging
        the uploaded files, computing all keys, create an *upload* entry in the NOMAD-coe
        repository db, etc.
        """
516
517
518
519
520
        # extract the uploaded file
        self._upload_files = ArchiveBasedStagingUploadFiles(
            upload_id=self.upload_id, is_authorized=lambda: True, create=True,
            upload_path=self.upload_path)

Markus Scheidgen's avatar
Markus Scheidgen committed
521
522
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
523
524
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
525
526
                    upload_size=self.upload_files.size):
                self.upload_files.extract()
527
528
529
530

            if self.temporary:
                os.remove(self.upload_path)
                self.upload_path = None
531
        except KeyError:
532
            self.fail('processing requested for non existing upload', log_level=logging.ERROR)
533
534
535
            return
        except ExtractError:
            self.fail('bad .zip/.tar file', log_level=logging.INFO)
Markus Scheidgen's avatar
Markus Scheidgen committed
536
537
            return

538
    def match_mainfiles(self) -> Generator[Tuple[str, object], None, None]:
539
540
541
542
543
544
545
        """
        Generator function that matches all files in the upload to all parsers to
        determine the upload's mainfiles.

        Returns:
            Tuples of mainfile, filename, and parsers
        """
546
        directories_with_match: Dict[str, str] = dict()
547
548
        upload_files = self.staging_upload_files
        for filename in upload_files.raw_file_manifest():
549
            try:
550
                parser = match_parser(filename, upload_files)
551
                if parser is not None:
552
553
                    directory = os.path.dirname(filename)
                    if directory in directories_with_match:
554
555
556
                        # TODO this might give us the chance to store directory based relationship
                        # between calcs for the future?
                        pass
557
558
                    else:
                        directories_with_match[directory] = filename
559
560

                    yield filename, parser
561
562
563
564
            except Exception as e:
                self.get_logger().error(
                    'exception while matching pot. mainfile',
                    mainfile=filename, exc_info=e)
565

Markus Scheidgen's avatar
Markus Scheidgen committed
566
567
    @task
    def parse_all(self):
568
        """
569
        Identified mainfile/parser combinations among the upload's files, creates
570
571
        respective :class:`Calc` instances, and triggers their processing.
        """
572
573
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
574
575
        with utils.timer(
                logger, 'upload extracted', step='matching',
576
577
                upload_size=self.upload_files.size):
            for filename, parser in self.match_mainfiles():
578
                calc = Calc.create(
579
                    calc_id=self.upload_files.calc_id(filename),
580
                    mainfile=filename, parser=parser.name,
581
                    worker_hostname=self.worker_hostname,
582
583
                    upload_id=self.upload_id)

584
                calc.process_calc()
Markus Scheidgen's avatar
Markus Scheidgen committed
585

586
587
588
589
590
591
592
593
594
595
596
597
    def on_process_complete(self, process_name):
        if process_name == 'process_upload':
            self.check_join()

    def check_join(self):
        total_calcs = self.total_calcs
        processed_calcs = self.processed_calcs

        self.get_logger().debug('check join', processed_calcs=processed_calcs, total_calcs=total_calcs)
        if not self.process_running and processed_calcs >= total_calcs:
            self.get_logger().debug('join')
            self.join()
598
599
600

    def join(self):
        self.cleanup()
Markus Scheidgen's avatar
Markus Scheidgen committed
601
602
603

    @task
    def cleanup(self):
604
605
        search.refresh()

606
607
608
609
610
611
612
613
        # send email about process finish
        user = self.uploader
        name = '%s %s' % (user.first_name, user.last_name)
        message = '\n'.join([
            'Dear %s,' % name,
            '',
            'your data %suploaded %s has completed processing.' % (
                self.name if self.name else '', self.upload_time.isoformat()),
614
            'You can review your data on your upload page: %s' % config.upload_url()
615
        ])
616
617
618
619
620
621
622
        try:
            infrastructure.send_mail(
                name=name, email=user.email, message=message, subject='Processing completed')
        except Exception as e:
            # probably due to email configuration problems
            # don't fail or present this error to clients
            self.logger.error('could not send after processing email', exc_info=e)
Markus Scheidgen's avatar
Markus Scheidgen committed
623

624
625
626
    def get_calc(self, calc_id) -> Calc:
        return Calc.objects(upload_id=self.upload_id, calc_id=calc_id).first()

Markus Scheidgen's avatar
Markus Scheidgen committed
627
    @property
628
    def processed_calcs(self):
629
        return Calc.objects(upload_id=self.upload_id, tasks_status__in=[SUCCESS, FAILURE]).count()
630
631
632
633
634
635
636

    @property
    def total_calcs(self):
        return Calc.objects(upload_id=self.upload_id).count()

    @property
    def failed_calcs(self):
637
        return Calc.objects(upload_id=self.upload_id, tasks_status=FAILURE).count()
638

639
640
    @property
    def pending_calcs(self):
641
        return Calc.objects(upload_id=self.upload_id, tasks_status=PENDING).count()
642

643
644
    def all_calcs(self, start, end, order_by='mainfile'):
        return Calc.objects(upload_id=self.upload_id)[start:end].order_by(order_by)
645
646
647

    @property
    def calcs(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
648
        return Calc.objects(upload_id=self.upload_id, tasks_status=SUCCESS)
Markus Scheidgen's avatar
Markus Scheidgen committed
649
650

    def to_upload_with_metadata(self) -> UploadWithMetadata:
651
652
653
654
        # prepare user metadata per upload and per calc
        calc_metadatas: Dict[str, Any] = dict()
        upload_metadata: Dict[str, Any] = dict()

Markus Scheidgen's avatar
Markus Scheidgen committed
655
        if self.metadata is not None:
656
657
658
            upload_metadata.update(self.metadata)
            if 'calculations' in upload_metadata:
                del(upload_metadata['calculations'])
Markus Scheidgen's avatar
Markus Scheidgen committed
659

660
661
            for calc in self.metadata.get('calculations', []):
                calc_metadatas[calc['mainfile']] = calc
Markus Scheidgen's avatar
Markus Scheidgen committed
662

663
        user_upload_time = upload_metadata.get('_upload_time', None)
Markus Scheidgen's avatar
Markus Scheidgen committed
664
665
666
667
668
        result = UploadWithMetadata(
            upload_id=self.upload_id,
            uploader=utils.POPO(id=int(self.user_id)),
            upload_time=self.upload_time if user_upload_time is None else user_upload_time)

669
        def apply_metadata(calc):
670
            calc_data = calc.metadata
671
672
            calc_with_metadata = CalcWithMetadata(**calc_data)

673
674
            calc_metadata = dict(upload_metadata)
            calc_metadata.update(calc_metadatas.get(calc.mainfile, {}))
675
676
677
678
            calc_with_metadata.apply_user_metadata(calc_metadata)

            return calc_with_metadata

679
680
        # TODO publish failed calcs
        # result.calcs = [apply_metadata(calc) for calc in Calc.objects(upload_id=self.upload_id)]
681
682
        result.calcs = [apply_metadata(calc) for calc in self.calcs]

Markus Scheidgen's avatar
Markus Scheidgen committed
683
        return result
684
685

    def __str__(self):
686
        return 'upload %s upload_id%s' % (super().__str__(), self.upload_id)