data.py 18.5 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
    :members:
.. autoclass:: Upload
    :members:
"""

27
from typing import List, Any, ContextManager, Tuple, Generator
28
from mongoengine import StringField, DateTimeField, DictField
Markus Scheidgen's avatar
Markus Scheidgen committed
29
import logging
30
from structlog import wrap_logger
31
from contextlib import contextmanager
Markus Scheidgen's avatar
Markus Scheidgen committed
32

33
from nomad import utils, coe_repo, datamodel
34
from nomad.files import PathObject, ArchiveBasedStagingUploadFiles, ExtractError
35
from nomad.processing.base import Proc, Chord, process, task, PENDING, SUCCESS, FAILURE
Markus Scheidgen's avatar
Markus Scheidgen committed
36
from nomad.parsing import parsers, parser_dict
Markus Scheidgen's avatar
Markus Scheidgen committed
37
38
39
from nomad.normalizing import normalizers


40
class Calc(Proc, datamodel.Calc):
Markus Scheidgen's avatar
Markus Scheidgen committed
41
42
43
44
45
46
47
48
49
50
    """
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

    The attribute list, does not include the various repository properties generated
    while parsing, including ``program_name``, ``program_version``, etc.

    Attributes:
51
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
52
53
54
55
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
    """
56
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
57
58
59
60
61
62
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

    meta: Any = {
        'indices': [
63
            'upload_id', 'mainfile', 'code', 'parser', 'tasks_status'
Markus Scheidgen's avatar
Markus Scheidgen committed
64
65
66
67
68
69
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._parser_backend = None
70
71
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
72
        self._calc_proc_logwriter = None
73
        self._calc_proc_logwriter_ctx: ContextManager = None
Markus Scheidgen's avatar
Markus Scheidgen committed
74
75
76

    @classmethod
    def get(cls, id):
77
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
78

Markus Scheidgen's avatar
Markus Scheidgen committed
79
    @property
80
81
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
82

83
84
85
86
87
88
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
        return self._upload

89
90
91
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
92
            self._upload_files = ArchiveBasedStagingUploadFiles(self.upload_id, is_authorized=lambda: True, local_path=self.upload.local_path)
93
94
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
95
96
97
    def get_logger(self, **kwargs):
        logger = super().get_logger()
        logger = logger.bind(
98
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
99

Markus Scheidgen's avatar
Markus Scheidgen committed
100
101
        return logger

102
103
104
105
106
107
108
109
    def get_calc_logger(self, **kwargs):
        """
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
        """
        logger = self.get_logger(**kwargs)

        if self._calc_proc_logwriter is None:
110
            self._calc_proc_logwriter_ctx = self.upload_files.archive_log_file(self.calc_id, 'wt')
111
            self._calc_proc_logwriter = self._calc_proc_logwriter_ctx.__enter__()  # pylint: disable=E1101
112

113
        def save_to_calc_log(logger, method_name, event_dict):
114
115
116
117
118
119
120
121
122
123
124
            program = event_dict.get('normalizer', 'parser')
            event = event_dict.get('event', '')
            entry = '[%s] %s: %s' % (method_name, program, event)
            if len(entry) > 120:
                self._calc_proc_logwriter.write(entry[:120])
                self._calc_proc_logwriter.write('...')
            else:
                self._calc_proc_logwriter.write(entry)
            self._calc_proc_logwriter.write('\n')
            return event_dict

125
        return wrap_logger(logger, processors=[save_to_calc_log])
126

Markus Scheidgen's avatar
Markus Scheidgen committed
127
    @process
128
    def process_calc(self):
129
        logger = self.get_logger()
130
        if self.upload is None:
131
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
132
133
134
135
136
137

        try:
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
138
            # close loghandler that was not closed due to failures
139
            try:
140
141
142
                if self._calc_proc_logwriter is not None:
                    self._calc_proc_logwriter.close()
                    self._calc_proc_logwriter = None
143
144
145
146
            except Exception as e:
                logger.error('could not close calculation proc log', exc_info=e)

            # inform parent proc about completion
147
            self.upload.completed_child()
Markus Scheidgen's avatar
Markus Scheidgen committed
148
149
150

    @task
    def parsing(self):
151
152
        context = dict(parser=self.parser, step=self.parser)
        logger = self.get_calc_logger(**context)
153
        parser = parser_dict[self.parser]
Markus Scheidgen's avatar
Markus Scheidgen committed
154

155
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
156
157
            self._parser_backend = parser.run(
                self.upload_files.raw_file_object(self.mainfile).os_path, logger=logger)
Markus Scheidgen's avatar
Markus Scheidgen committed
158

159
160
        self._parser_backend.openNonOverlappingSection('section_calculation_info')
        self._parser_backend.addValue('upload_id', self.upload_id)
161
162
        self._parser_backend.addValue('calc_id', self.calc_id)
        self._parser_backend.addValue('calc_hash', self.upload_files.calc_hash(self.mainfile))
163
164
165
        self._parser_backend.addValue('main_file', self.mainfile)
        self._parser_backend.addValue('parser_name', self.parser)

Markus Scheidgen's avatar
Markus Scheidgen committed
166
        if self._parser_backend.status[0] != 'ParseSuccess':
167
            logger.error(self._parser_backend.status[1])
Markus Scheidgen's avatar
Markus Scheidgen committed
168
            error = self._parser_backend.status[1]
169
            self._parser_backend.addValue('parse_status', 'ParseFailure')
170
            self.fail(error, level=logging.DEBUG, **context)
171
172
173
174
175
        else:
            self._parser_backend.addValue('parse_status', 'ParseSuccess')

        self._parser_backend.closeNonOverlappingSection('section_calculation_info')

176
177
178
179
180
181
        self._parser_backend.openNonOverlappingSection('section_repository_info')
        self._parser_backend.addValue('repository_archive_gid', '%s/%s' % (self.upload_id, self.calc_id))
        self._parser_backend.addValue(
            'repository_filepaths', self.upload_files.calc_files(self.mainfile))
        self._parser_backend.closeNonOverlappingSection('section_repository_info')

182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
        self.add_processor_info(self.parser)

    @contextmanager
    def use_parser_backend(self, processor_name):
        self._parser_backend.reset_status()
        yield self._parser_backend
        self.add_processor_info(processor_name)

    def add_processor_info(self, processor_name: str) -> None:
        self._parser_backend.openContext('/section_calculation_info/0')
        self._parser_backend.openNonOverlappingSection('section_archive_processing_info')
        self._parser_backend.addValue('archive_processor_name', processor_name)

        if self._parser_backend.status[0] == 'ParseSuccess':
            warnings = getattr(self._parser_backend, '_warnings', [])
            if len(warnings) > 0:
                self._parser_backend.addValue('archive_processor_status', 'WithWarnings')
                self._parser_backend.addValue('archive_processor_warning_number', len(warnings))
                self._parser_backend.addArrayValues('archive_processor_warnings', [str(warning) for warning in warnings])
            else:
                self._parser_backend.addValue('archive_processor_status', 'Success')
        else:
204
            errors = self._parser_backend.tasks_status[1]
205
206
207
208
            self._parser_backend.addValue('archive_processor_error', str(errors))

        self._parser_backend.closeNonOverlappingSection('section_archive_processing_info')
        self._parser_backend.closeContext('/section_calculation_info/0')
Markus Scheidgen's avatar
Markus Scheidgen committed
209
210
211
212
213

    @task
    def normalizing(self):
        for normalizer in normalizers:
            normalizer_name = normalizer.__name__
214
215
            context = dict(normalizer=normalizer_name, step=normalizer_name)
            logger = self.get_calc_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
216
217

            with utils.timer(
218
                    logger, 'normalizer executed', input_size=self.mainfile_file.size):
219
220
                with self.use_parser_backend(normalizer_name) as backend:
                    normalizer(backend).normalize(logger=logger)
Markus Scheidgen's avatar
Markus Scheidgen committed
221

222
223
            failed = self._parser_backend.status[0] != 'ParseSuccess'
            if failed:
224
                logger.error(self._parser_backend.status[1])
Markus Scheidgen's avatar
Markus Scheidgen committed
225
                error = self._parser_backend.status[1]
226
                self.fail(error, level=logging.WARNING, **context)
227
228
229
230
                break
            else:
                logger.debug(
                    'completed normalizer successfully', normalizer=normalizer_name)
Markus Scheidgen's avatar
Markus Scheidgen committed
231
232
233

    @task
    def archiving(self):
234
235
        logger = self.get_logger()

236
        # persist the repository metadata
237
238
239
        with utils.timer(logger, 'indexed', step='index'):
            self.upload_files.metadata.insert(self._parser_backend.metadata())

240
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
241
242
243
        with utils.timer(
                logger, 'archived', step='archive',
                input_size=self.mainfile_file.size) as log_data:
244
            with self.upload_files.archive_file(self.calc_id, 'wt') as out:
245
246
                self._parser_backend.write_json(out, pretty=True)

247
            log_data.update(archive_size=self.upload_files.archive_file_object(self.calc_id).size)
Markus Scheidgen's avatar
Markus Scheidgen committed
248
249
250
251
252
253

        # close loghandler
        if self._calc_proc_logwriter is not None:
            with utils.timer(
                    logger, 'archived log', step='archive_log',
                    input_size=self.mainfile_file.size) as log_data:
254
                self._calc_proc_logwriter_ctx.__exit__(None, None, None)  # pylint: disable=E1101
255
                self._calc_proc_logwriter = None
Markus Scheidgen's avatar
Markus Scheidgen committed
256

257
                log_data.update(log_size=self.upload_files.archive_log_file_object(self.calc_id).size)
Markus Scheidgen's avatar
Markus Scheidgen committed
258

Markus Scheidgen's avatar
Markus Scheidgen committed
259

260
class Upload(Chord, datamodel.Upload):
Markus Scheidgen's avatar
Markus Scheidgen committed
261
262
263
264
265
266
    """
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
267
        local_path: optional local path, e.g. for files that are already somewhere on the server
268
        metadata: optional user provided additional meta data
Markus Scheidgen's avatar
Markus Scheidgen committed
269
270
        upload_id: the upload id generated by the database
        upload_time: the timestamp when the system realised the upload
271
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
272
273
274
275
276
277
    """
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)

    name = StringField(default=None)
278
    local_path = StringField(default=None)
279
    metadata = DictField(default=None)
Markus Scheidgen's avatar
Markus Scheidgen committed
280
    upload_time = DateTimeField()
281
    user_id = StringField(required=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
282
283
284

    meta: Any = {
        'indexes': [
285
            'user_id', 'tasks_status'
Markus Scheidgen's avatar
Markus Scheidgen committed
286
287
288
289
290
        ]
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
291
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
292
293
294
295
296
297

    @classmethod
    def get(cls, id):
        return cls.get_by_id(id, 'upload_id')

    @classmethod
298
    def user_uploads(cls, user: coe_repo.User) -> List['Upload']:
Markus Scheidgen's avatar
Markus Scheidgen committed
299
        """ Returns all uploads for the given user. Currently returns all uploads. """
300
        return cls.objects(user_id=str(user.user_id))
Markus Scheidgen's avatar
Markus Scheidgen committed
301

302
303
304
305
    @property
    def uploader(self):
        return coe_repo.User.from_user_id(self.user_id)

Markus Scheidgen's avatar
Markus Scheidgen committed
306
307
308
309
310
311
312
313
314
315
316
    def get_logger(self, **kwargs):
        logger = super().get_logger()
        logger = logger.bind(upload_id=self.upload_id, **kwargs)
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
        """
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
317
318

        Arguments:
319
            user (coe_repo.User): The user that created the upload.
Markus Scheidgen's avatar
Markus Scheidgen committed
320
        """
321
        user: coe_repo.User = kwargs['user']
322
323
324
        del(kwargs['user'])
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
325
        kwargs.update(user_id=str(user.user_id))
Markus Scheidgen's avatar
Markus Scheidgen committed
326
        self = super().create(**kwargs)
327

Markus Scheidgen's avatar
Markus Scheidgen committed
328
        self._continue_with('uploading')
329

Markus Scheidgen's avatar
Markus Scheidgen committed
330
331
        return self

332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
    def delete(self):
        """ Deletes this upload process state entry and its calcs. """
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

    @process
    def delete_upload(self):
        """
        Deletes of the upload, including its processing state and
        staging files.
        """
        logger = self.get_logger()

        with utils.lnr(logger, 'staged upload delete failed'):
            with utils.timer(
                    logger, 'staged upload deleted', step='delete',
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
                self.delete()

        return True  # do not save the process status on the delete upload
353

354
355
356
357
358
359
360
361
362
    @process
    def commit_upload(self):
        """
        Moves the upload out of staging to add it to the coe repository. It will
        pack the staging upload files in to public upload files, add entries to the
        coe repository db and remove this instance and its calculation from the
        processing state db.
        """
        logger = self.get_logger()
363

364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
        with utils.lnr(logger, 'commit failed'):
            with utils.timer(
                    logger, 'upload added to repository', step='commit',
                    upload_size=self.upload_files.size):
                coe_repo.Upload.add(self, self.metadata)

            with utils.timer(
                    logger, 'staged upload files packed', step='commit',
                    upload_size=self.upload_files.size):
                self.upload_files.pack()

            with utils.timer(
                    logger, 'staged upload deleted', step='commit',
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
                self.delete()
Markus Scheidgen's avatar
Markus Scheidgen committed
380

381
        return True  # do not save the process status on the delete upload
382

Markus Scheidgen's avatar
Markus Scheidgen committed
383
    @process
384
    def process_upload(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
385
386
387
388
389
390
391
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
        pass

392
    @property
393
394
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
395
            self._upload_files = ArchiveBasedStagingUploadFiles(self.upload_id, is_authorized=lambda: True, local_path=self.local_path)
396
        return self._upload_files
397

Markus Scheidgen's avatar
Markus Scheidgen committed
398
399
    @task
    def extracting(self):
400
401
402
403
404
405
        """
        Task performed before the actual parsing/normalizing. Extracting and bagging
        the uploaded files, computing all keys, create an *upload* entry in the NOMAD-coe
        repository db, etc.
        """
        # extract the uploaded file, this will also create a bagit bag.
Markus Scheidgen's avatar
Markus Scheidgen committed
406
407
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
408
409
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
410
411
                    upload_size=self.upload_files.size):
                self.upload_files.extract()
412
        except KeyError:
413
            self.fail('processing requested for non existing upload', log_level=logging.ERROR)
414
415
416
            return
        except ExtractError:
            self.fail('bad .zip/.tar file', log_level=logging.INFO)
Markus Scheidgen's avatar
Markus Scheidgen committed
417
418
            return

419
    def match_mainfiles(self) -> Generator[Tuple[str, object], None, None]:
420
421
422
423
424
425
426
        """
        Generator function that matches all files in the upload to all parsers to
        determine the upload's mainfiles.

        Returns:
            Tuples of mainfile, filename, and parsers
        """
427
        for filename in self.upload_files.raw_file_manifest():
428
429
            for parser in parsers:
                try:
430
                    with self.upload_files.raw_file(filename) as mainfile_f:
431
                        if parser.is_mainfile(filename, lambda fn: mainfile_f):
432
                            yield filename, parser
433
                except Exception as e:
434
                    self.get_logger().error(
435
436
437
                        'exception while matching pot. mainfile',
                        mainfile=filename, exc_info=e)

Markus Scheidgen's avatar
Markus Scheidgen committed
438
439
    @task
    def parse_all(self):
440
        """
441
        Identified mainfile/parser combinations among the upload's files, creates
442
443
        respective :class:`Calc` instances, and triggers their processing.
        """
444
445
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
446
        # TODO: deal with multiple possible parser specs
Markus Scheidgen's avatar
Markus Scheidgen committed
447
448
        with utils.timer(
                logger, 'upload extracted', step='matching',
449
                upload_size=self.upload_files.size):
450
            total_calcs = 0
451
            for filename, parser in self.match_mainfiles():
452
                calc = Calc.create(
453
                    calc_id=self.upload_files.calc_id(filename),
454
455
456
                    mainfile=filename, parser=parser.name,
                    upload_id=self.upload_id)

457
                calc.process_calc()
458
                total_calcs += 1
Markus Scheidgen's avatar
Markus Scheidgen committed
459

460
461
462
463
464
        # have to save the total_calcs information for chord management
        self.spwaned_childred(total_calcs)

    def join(self):
        self.cleanup()
Markus Scheidgen's avatar
Markus Scheidgen committed
465
466
467

    @task
    def cleanup(self):
468
469
470
471
472
        # TODO issue #83
        with utils.timer(
                self.get_logger(), 'pack staging upload', step='cleaning',
                upload_size=self.upload_files.size):
            pass
Markus Scheidgen's avatar
Markus Scheidgen committed
473
474

    @property
475
    def processed_calcs(self):
476
        return Calc.objects(upload_id=self.upload_id, tasks_status__in=[SUCCESS, FAILURE]).count()
477
478
479
480
481
482
483

    @property
    def total_calcs(self):
        return Calc.objects(upload_id=self.upload_id).count()

    @property
    def failed_calcs(self):
484
        return Calc.objects(upload_id=self.upload_id, tasks_status=FAILURE).count()
485

486
487
    @property
    def pending_calcs(self):
488
        return Calc.objects(upload_id=self.upload_id, tasks_status=PENDING).count()
489

490
491
    def all_calcs(self, start, end, order_by='mainfile'):
        return Calc.objects(upload_id=self.upload_id)[start:end].order_by(order_by)
492
493
494
495

    @property
    def calcs(self):
        return Calc.objects(upload_id=self.upload_id)