data.py 19.4 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
    :members:
.. autoclass:: Upload
    :members:
"""

27
from typing import List, Any, ContextManager, Tuple, Generator
28
from mongoengine import StringField, DateTimeField, DictField
Markus Scheidgen's avatar
Markus Scheidgen committed
29
import logging
30
from structlog import wrap_logger
31
from contextlib import contextmanager
Markus Scheidgen's avatar
Markus Scheidgen committed
32

33
from nomad import utils, coe_repo, datamodel, config, infrastructure
34
from nomad.files import PathObject, ArchiveBasedStagingUploadFiles, ExtractError, Calc as FilesCalc
35
from nomad.processing.base import Proc, Chord, process, task, PENDING, SUCCESS, FAILURE
Markus Scheidgen's avatar
Markus Scheidgen committed
36
from nomad.parsing import parsers, parser_dict
Markus Scheidgen's avatar
Markus Scheidgen committed
37
38
39
from nomad.normalizing import normalizers


40
class Calc(Proc, datamodel.Calc):
Markus Scheidgen's avatar
Markus Scheidgen committed
41
42
43
44
45
46
47
48
49
50
    """
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

    The attribute list, does not include the various repository properties generated
    while parsing, including ``program_name``, ``program_version``, etc.

    Attributes:
51
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
52
53
54
55
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
    """
56
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
57
58
59
60
61
62
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

    meta: Any = {
        'indices': [
63
            'upload_id', 'mainfile', 'code', 'parser', 'tasks_status'
Markus Scheidgen's avatar
Markus Scheidgen committed
64
65
66
67
68
69
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._parser_backend = None
70
71
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
72
        self._calc_proc_logwriter = None
73
        self._calc_proc_logwriter_ctx: ContextManager = None
Markus Scheidgen's avatar
Markus Scheidgen committed
74
75
76

    @classmethod
    def get(cls, id):
77
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
78

Markus Scheidgen's avatar
Markus Scheidgen committed
79
    @property
80
81
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
82

83
84
85
86
87
88
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
        return self._upload

89
90
91
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
92
            self._upload_files = ArchiveBasedStagingUploadFiles(self.upload_id, is_authorized=lambda: True, local_path=self.upload.local_path)
93
94
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
95
    def get_logger(self, **kwargs):
96
97
98
99
        """
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
        """
100
101
102
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
103

104
        if self._calc_proc_logwriter_ctx is None:
105
            self._calc_proc_logwriter_ctx = self.upload_files.archive_log_file(self.calc_id, 'wt')
106
            self._calc_proc_logwriter = self._calc_proc_logwriter_ctx.__enter__()  # pylint: disable=E1101
107

108
        def save_to_calc_log(logger, method_name, event_dict):
109
110
111
112
113
114
115
116
117
118
119
            if self._calc_proc_logwriter is not None:
                program = event_dict.get('normalizer', 'parser')
                event = event_dict.get('event', '')
                entry = '[%s] %s: %s' % (method_name, program, event)
                if len(entry) > 120:
                    self._calc_proc_logwriter.write(entry[:120])
                    self._calc_proc_logwriter.write('...')
                else:
                    self._calc_proc_logwriter.write(entry)
                self._calc_proc_logwriter.write('\n')

120
121
            return event_dict

122
        return wrap_logger(logger, processors=[save_to_calc_log])
123

Markus Scheidgen's avatar
Markus Scheidgen committed
124
    @process
125
    def process_calc(self):
126
        logger = self.get_logger()
127
        if self.upload is None:
128
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
129
130
131
132
133
134

        try:
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
135
            # close loghandler that was not closed due to failures
136
            try:
137
138
139
                if self._calc_proc_logwriter is not None:
                    self._calc_proc_logwriter.close()
                    self._calc_proc_logwriter = None
140
141
142
143
            except Exception as e:
                logger.error('could not close calculation proc log', exc_info=e)

            # inform parent proc about completion
144
            self.upload.completed_child()
Markus Scheidgen's avatar
Markus Scheidgen committed
145
146
147

    @task
    def parsing(self):
148
        context = dict(parser=self.parser, step=self.parser)
149
        logger = self.get_logger(**context)
150
        parser = parser_dict[self.parser]
Markus Scheidgen's avatar
Markus Scheidgen committed
151

152
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
153
154
            self._parser_backend = parser.run(
                self.upload_files.raw_file_object(self.mainfile).os_path, logger=logger)
Markus Scheidgen's avatar
Markus Scheidgen committed
155

156
157
        self._parser_backend.openNonOverlappingSection('section_calculation_info')
        self._parser_backend.addValue('upload_id', self.upload_id)
158
159
        self._parser_backend.addValue('calc_id', self.calc_id)
        self._parser_backend.addValue('calc_hash', self.upload_files.calc_hash(self.mainfile))
160
161
162
        self._parser_backend.addValue('main_file', self.mainfile)
        self._parser_backend.addValue('parser_name', self.parser)

Markus Scheidgen's avatar
Markus Scheidgen committed
163
        if self._parser_backend.status[0] != 'ParseSuccess':
164
            logger.error(self._parser_backend.status[1])
Markus Scheidgen's avatar
Markus Scheidgen committed
165
            error = self._parser_backend.status[1]
166
            self._parser_backend.addValue('parse_status', 'ParseFailure')
167
            self.fail(error, level=logging.DEBUG, **context)
168
169
170
171
172
        else:
            self._parser_backend.addValue('parse_status', 'ParseSuccess')

        self._parser_backend.closeNonOverlappingSection('section_calculation_info')

173
174
175
176
177
178
        self._parser_backend.openNonOverlappingSection('section_repository_info')
        self._parser_backend.addValue('repository_archive_gid', '%s/%s' % (self.upload_id, self.calc_id))
        self._parser_backend.addValue(
            'repository_filepaths', self.upload_files.calc_files(self.mainfile))
        self._parser_backend.closeNonOverlappingSection('section_repository_info')

179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
        self.add_processor_info(self.parser)

    @contextmanager
    def use_parser_backend(self, processor_name):
        self._parser_backend.reset_status()
        yield self._parser_backend
        self.add_processor_info(processor_name)

    def add_processor_info(self, processor_name: str) -> None:
        self._parser_backend.openContext('/section_calculation_info/0')
        self._parser_backend.openNonOverlappingSection('section_archive_processing_info')
        self._parser_backend.addValue('archive_processor_name', processor_name)

        if self._parser_backend.status[0] == 'ParseSuccess':
            warnings = getattr(self._parser_backend, '_warnings', [])
            if len(warnings) > 0:
                self._parser_backend.addValue('archive_processor_status', 'WithWarnings')
                self._parser_backend.addValue('archive_processor_warning_number', len(warnings))
                self._parser_backend.addArrayValues('archive_processor_warnings', [str(warning) for warning in warnings])
            else:
                self._parser_backend.addValue('archive_processor_status', 'Success')
        else:
201
            errors = self._parser_backend.tasks_status[1]
202
203
204
205
            self._parser_backend.addValue('archive_processor_error', str(errors))

        self._parser_backend.closeNonOverlappingSection('section_archive_processing_info')
        self._parser_backend.closeContext('/section_calculation_info/0')
Markus Scheidgen's avatar
Markus Scheidgen committed
206
207
208
209
210

    @task
    def normalizing(self):
        for normalizer in normalizers:
            normalizer_name = normalizer.__name__
211
            context = dict(normalizer=normalizer_name, step=normalizer_name)
212
            logger = self.get_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
213
214

            with utils.timer(
215
                    logger, 'normalizer executed', input_size=self.mainfile_file.size):
216
217
                with self.use_parser_backend(normalizer_name) as backend:
                    normalizer(backend).normalize(logger=logger)
Markus Scheidgen's avatar
Markus Scheidgen committed
218

219
220
            failed = self._parser_backend.status[0] != 'ParseSuccess'
            if failed:
221
                logger.error(self._parser_backend.status[1])
Markus Scheidgen's avatar
Markus Scheidgen committed
222
                error = self._parser_backend.status[1]
223
                self.fail(error, level=logging.WARNING, **context)
224
225
226
227
                break
            else:
                logger.debug(
                    'completed normalizer successfully', normalizer=normalizer_name)
Markus Scheidgen's avatar
Markus Scheidgen committed
228
229
230

    @task
    def archiving(self):
231
232
        logger = self.get_logger()

233
        # persist the repository metadata
234
235
236
        with utils.timer(logger, 'indexed', step='index'):
            self.upload_files.metadata.insert(self._parser_backend.metadata())

237
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
238
239
240
        with utils.timer(
                logger, 'archived', step='archive',
                input_size=self.mainfile_file.size) as log_data:
241
            with self.upload_files.archive_file(self.calc_id, 'wt') as out:
242
243
                self._parser_backend.write_json(out, pretty=True)

244
            log_data.update(archive_size=self.upload_files.archive_file_object(self.calc_id).size)
Markus Scheidgen's avatar
Markus Scheidgen committed
245
246
247
248
249
250

        # close loghandler
        if self._calc_proc_logwriter is not None:
            with utils.timer(
                    logger, 'archived log', step='archive_log',
                    input_size=self.mainfile_file.size) as log_data:
251
                self._calc_proc_logwriter_ctx.__exit__(None, None, None)  # pylint: disable=E1101
252
                self._calc_proc_logwriter = None
Markus Scheidgen's avatar
Markus Scheidgen committed
253

254
                log_data.update(log_size=self.upload_files.archive_log_file_object(self.calc_id).size)
Markus Scheidgen's avatar
Markus Scheidgen committed
255

256
257
258
259
260
261
    def to_calc_with_metadata(self):
        return self.to(FilesCalc).to_calc_with_metadata()


datamodel.CalcWithMetadata.register_mapping(Calc, Calc.to_calc_with_metadata)

Markus Scheidgen's avatar
Markus Scheidgen committed
262

263
class Upload(Chord, datamodel.Upload):
Markus Scheidgen's avatar
Markus Scheidgen committed
264
265
266
267
268
269
    """
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
270
        local_path: optional local path, e.g. for files that are already somewhere on the server
271
        metadata: optional user provided additional meta data
Markus Scheidgen's avatar
Markus Scheidgen committed
272
273
        upload_id: the upload id generated by the database
        upload_time: the timestamp when the system realised the upload
274
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
275
276
277
278
279
280
    """
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)

    name = StringField(default=None)
281
    local_path = StringField(default=None)
282
    metadata = DictField(default=None)
Markus Scheidgen's avatar
Markus Scheidgen committed
283
    upload_time = DateTimeField()
284
    user_id = StringField(required=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
285
286
287

    meta: Any = {
        'indexes': [
288
            'user_id', 'tasks_status'
Markus Scheidgen's avatar
Markus Scheidgen committed
289
290
291
292
293
        ]
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
294
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
295
296
297
298
299
300

    @classmethod
    def get(cls, id):
        return cls.get_by_id(id, 'upload_id')

    @classmethod
301
    def user_uploads(cls, user: coe_repo.User) -> List['Upload']:
Markus Scheidgen's avatar
Markus Scheidgen committed
302
        """ Returns all uploads for the given user. Currently returns all uploads. """
303
        return cls.objects(user_id=str(user.user_id))
Markus Scheidgen's avatar
Markus Scheidgen committed
304

305
306
307
308
    @property
    def uploader(self):
        return coe_repo.User.from_user_id(self.user_id)

Markus Scheidgen's avatar
Markus Scheidgen committed
309
310
311
312
313
314
315
316
317
318
319
    def get_logger(self, **kwargs):
        logger = super().get_logger()
        logger = logger.bind(upload_id=self.upload_id, **kwargs)
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
        """
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
320
321

        Arguments:
322
            user (coe_repo.User): The user that created the upload.
Markus Scheidgen's avatar
Markus Scheidgen committed
323
        """
324
        user: coe_repo.User = kwargs['user']
325
326
327
        del(kwargs['user'])
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
328
        kwargs.update(user_id=str(user.user_id))
Markus Scheidgen's avatar
Markus Scheidgen committed
329
        self = super().create(**kwargs)
330

Markus Scheidgen's avatar
Markus Scheidgen committed
331
        self._continue_with('uploading')
332

Markus Scheidgen's avatar
Markus Scheidgen committed
333
334
        return self

335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
    def delete(self):
        """ Deletes this upload process state entry and its calcs. """
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

    @process
    def delete_upload(self):
        """
        Deletes of the upload, including its processing state and
        staging files.
        """
        logger = self.get_logger()

        with utils.lnr(logger, 'staged upload delete failed'):
            with utils.timer(
                    logger, 'staged upload deleted', step='delete',
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
                self.delete()

        return True  # do not save the process status on the delete upload
356

357
    @process
358
    def publish_upload(self):
359
360
361
362
363
364
365
        """
        Moves the upload out of staging to add it to the coe repository. It will
        pack the staging upload files in to public upload files, add entries to the
        coe repository db and remove this instance and its calculation from the
        processing state db.
        """
        logger = self.get_logger()
366

367
        with utils.lnr(logger, 'publish failed'):
368
            with utils.timer(
369
                    logger, 'upload added to repository', step='publish',
370
371
372
373
                    upload_size=self.upload_files.size):
                coe_repo.Upload.add(self, self.metadata)

            with utils.timer(
374
                    logger, 'staged upload files packed', step='publish',
375
376
377
378
                    upload_size=self.upload_files.size):
                self.upload_files.pack()

            with utils.timer(
379
                    logger, 'staged upload deleted', step='publish',
380
381
382
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
                self.delete()
Markus Scheidgen's avatar
Markus Scheidgen committed
383

384
        return True  # do not save the process status on the delete upload
385

Markus Scheidgen's avatar
Markus Scheidgen committed
386
    @process
387
    def process_upload(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
388
389
390
391
392
393
394
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
        pass

395
    @property
396
397
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
398
            self._upload_files = ArchiveBasedStagingUploadFiles(self.upload_id, is_authorized=lambda: True, local_path=self.local_path)
399
        return self._upload_files
400

Markus Scheidgen's avatar
Markus Scheidgen committed
401
402
    @task
    def extracting(self):
403
404
405
406
407
408
        """
        Task performed before the actual parsing/normalizing. Extracting and bagging
        the uploaded files, computing all keys, create an *upload* entry in the NOMAD-coe
        repository db, etc.
        """
        # extract the uploaded file, this will also create a bagit bag.
Markus Scheidgen's avatar
Markus Scheidgen committed
409
410
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
411
412
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
413
414
                    upload_size=self.upload_files.size):
                self.upload_files.extract()
415
        except KeyError:
416
            self.fail('processing requested for non existing upload', log_level=logging.ERROR)
417
418
419
            return
        except ExtractError:
            self.fail('bad .zip/.tar file', log_level=logging.INFO)
Markus Scheidgen's avatar
Markus Scheidgen committed
420
421
            return

422
    def match_mainfiles(self) -> Generator[Tuple[str, object], None, None]:
423
424
425
426
427
428
429
        """
        Generator function that matches all files in the upload to all parsers to
        determine the upload's mainfiles.

        Returns:
            Tuples of mainfile, filename, and parsers
        """
430
        for filename in self.upload_files.raw_file_manifest():
431
432
            for parser in parsers:
                try:
433
                    with self.upload_files.raw_file(filename) as mainfile_f:
434
                        if parser.is_mainfile(filename, lambda fn: mainfile_f):
435
                            yield filename, parser
436
                except Exception as e:
437
                    self.get_logger().error(
438
439
440
                        'exception while matching pot. mainfile',
                        mainfile=filename, exc_info=e)

Markus Scheidgen's avatar
Markus Scheidgen committed
441
442
    @task
    def parse_all(self):
443
        """
444
        Identified mainfile/parser combinations among the upload's files, creates
445
446
        respective :class:`Calc` instances, and triggers their processing.
        """
447
448
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
449
        # TODO: deal with multiple possible parser specs
Markus Scheidgen's avatar
Markus Scheidgen committed
450
451
        with utils.timer(
                logger, 'upload extracted', step='matching',
452
                upload_size=self.upload_files.size):
453
            total_calcs = 0
454
            for filename, parser in self.match_mainfiles():
455
                calc = Calc.create(
456
                    calc_id=self.upload_files.calc_id(filename),
457
458
459
                    mainfile=filename, parser=parser.name,
                    upload_id=self.upload_id)

460
                calc.process_calc()
461
                total_calcs += 1
Markus Scheidgen's avatar
Markus Scheidgen committed
462

463
464
465
466
467
        # have to save the total_calcs information for chord management
        self.spwaned_childred(total_calcs)

    def join(self):
        self.cleanup()
Markus Scheidgen's avatar
Markus Scheidgen committed
468
469
470

    @task
    def cleanup(self):
471
472
473
474
475
476
477
478
479
480
        # send email about process finish
        user = self.uploader
        name = '%s %s' % (user.first_name, user.last_name)
        message = '\n'.join([
            'Dear %s,' % name,
            '',
            'your data %suploaded %s has completed processing.' % (
                self.name if self.name else '', self.upload_time.isoformat()),
            'You can review your data on your upload page: %s' % config.services.upload_url
        ])
481
482
483
484
485
486
487
        try:
            infrastructure.send_mail(
                name=name, email=user.email, message=message, subject='Processing completed')
        except Exception as e:
            # probably due to email configuration problems
            # don't fail or present this error to clients
            self.logger.error('could not send after processing email', exc_info=e)
Markus Scheidgen's avatar
Markus Scheidgen committed
488
489

    @property
490
    def processed_calcs(self):
491
        return Calc.objects(upload_id=self.upload_id, tasks_status__in=[SUCCESS, FAILURE]).count()
492
493
494
495
496
497
498

    @property
    def total_calcs(self):
        return Calc.objects(upload_id=self.upload_id).count()

    @property
    def failed_calcs(self):
499
        return Calc.objects(upload_id=self.upload_id, tasks_status=FAILURE).count()
500

501
502
    @property
    def pending_calcs(self):
503
        return Calc.objects(upload_id=self.upload_id, tasks_status=PENDING).count()
504

505
506
    def all_calcs(self, start, end, order_by='mainfile'):
        return Calc.objects(upload_id=self.upload_id)[start:end].order_by(order_by)
507
508
509

    @property
    def calcs(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
510
        return Calc.objects(upload_id=self.upload_id, tasks_status=SUCCESS)