data.py 18.6 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
    :members:
.. autoclass:: Upload
    :members:
"""

27
from typing import List, Any, ContextManager, Tuple, Generator
28
from mongoengine import StringField, DateTimeField, DictField
Markus Scheidgen's avatar
Markus Scheidgen committed
29
import logging
30
from structlog import wrap_logger
31
from contextlib import contextmanager
Markus Scheidgen's avatar
Markus Scheidgen committed
32

33
from nomad import utils, coe_repo, datamodel
34
from nomad.files import PathObject, ArchiveBasedStagingUploadFiles, ExtractError, Calc as FilesCalc
35
from nomad.processing.base import Proc, Chord, process, task, PENDING, SUCCESS, FAILURE
Markus Scheidgen's avatar
Markus Scheidgen committed
36
from nomad.parsing import parsers, parser_dict
Markus Scheidgen's avatar
Markus Scheidgen committed
37
38
39
from nomad.normalizing import normalizers


40
class Calc(Proc, datamodel.Calc):
Markus Scheidgen's avatar
Markus Scheidgen committed
41
42
43
44
45
46
47
48
49
50
    """
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

    The attribute list, does not include the various repository properties generated
    while parsing, including ``program_name``, ``program_version``, etc.

    Attributes:
51
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
52
53
54
55
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
    """
56
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
57
58
59
60
61
62
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

    meta: Any = {
        'indices': [
63
            'upload_id', 'mainfile', 'code', 'parser', 'tasks_status'
Markus Scheidgen's avatar
Markus Scheidgen committed
64
65
66
67
68
69
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._parser_backend = None
70
71
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
72
        self._calc_proc_logwriter = None
73
        self._calc_proc_logwriter_ctx: ContextManager = None
Markus Scheidgen's avatar
Markus Scheidgen committed
74
75
76

    @classmethod
    def get(cls, id):
77
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
78

Markus Scheidgen's avatar
Markus Scheidgen committed
79
    @property
80
81
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
82

83
84
85
86
87
88
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
        return self._upload

89
90
91
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
92
            self._upload_files = ArchiveBasedStagingUploadFiles(self.upload_id, is_authorized=lambda: True, local_path=self.upload.local_path)
93
94
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
95
96
97
    def get_logger(self, **kwargs):
        logger = super().get_logger()
        logger = logger.bind(
98
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
99

Markus Scheidgen's avatar
Markus Scheidgen committed
100
101
        return logger

102
103
104
105
106
107
108
109
    def get_calc_logger(self, **kwargs):
        """
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
        """
        logger = self.get_logger(**kwargs)

        if self._calc_proc_logwriter is None:
110
            self._calc_proc_logwriter_ctx = self.upload_files.archive_log_file(self.calc_id, 'wt')
111
            self._calc_proc_logwriter = self._calc_proc_logwriter_ctx.__enter__()  # pylint: disable=E1101
112

113
        def save_to_calc_log(logger, method_name, event_dict):
114
115
116
117
118
119
120
121
122
123
124
            program = event_dict.get('normalizer', 'parser')
            event = event_dict.get('event', '')
            entry = '[%s] %s: %s' % (method_name, program, event)
            if len(entry) > 120:
                self._calc_proc_logwriter.write(entry[:120])
                self._calc_proc_logwriter.write('...')
            else:
                self._calc_proc_logwriter.write(entry)
            self._calc_proc_logwriter.write('\n')
            return event_dict

125
        return wrap_logger(logger, processors=[save_to_calc_log])
126

Markus Scheidgen's avatar
Markus Scheidgen committed
127
    @process
128
    def process_calc(self):
129
        logger = self.get_logger()
130
        if self.upload is None:
131
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
132
133
134
135
136
137

        try:
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
138
            # close loghandler that was not closed due to failures
139
            try:
140
141
142
                if self._calc_proc_logwriter is not None:
                    self._calc_proc_logwriter.close()
                    self._calc_proc_logwriter = None
143
144
145
146
            except Exception as e:
                logger.error('could not close calculation proc log', exc_info=e)

            # inform parent proc about completion
147
            self.upload.completed_child()
Markus Scheidgen's avatar
Markus Scheidgen committed
148
149
150

    @task
    def parsing(self):
151
152
        context = dict(parser=self.parser, step=self.parser)
        logger = self.get_calc_logger(**context)
153
        parser = parser_dict[self.parser]
Markus Scheidgen's avatar
Markus Scheidgen committed
154

155
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
156
157
            self._parser_backend = parser.run(
                self.upload_files.raw_file_object(self.mainfile).os_path, logger=logger)
Markus Scheidgen's avatar
Markus Scheidgen committed
158

159
160
        self._parser_backend.openNonOverlappingSection('section_calculation_info')
        self._parser_backend.addValue('upload_id', self.upload_id)
161
162
        self._parser_backend.addValue('calc_id', self.calc_id)
        self._parser_backend.addValue('calc_hash', self.upload_files.calc_hash(self.mainfile))
163
164
165
        self._parser_backend.addValue('main_file', self.mainfile)
        self._parser_backend.addValue('parser_name', self.parser)

Markus Scheidgen's avatar
Markus Scheidgen committed
166
        if self._parser_backend.status[0] != 'ParseSuccess':
167
            logger.error(self._parser_backend.status[1])
Markus Scheidgen's avatar
Markus Scheidgen committed
168
            error = self._parser_backend.status[1]
169
            self._parser_backend.addValue('parse_status', 'ParseFailure')
170
            self.fail(error, level=logging.DEBUG, **context)
171
172
173
174
175
        else:
            self._parser_backend.addValue('parse_status', 'ParseSuccess')

        self._parser_backend.closeNonOverlappingSection('section_calculation_info')

176
177
178
179
180
181
        self._parser_backend.openNonOverlappingSection('section_repository_info')
        self._parser_backend.addValue('repository_archive_gid', '%s/%s' % (self.upload_id, self.calc_id))
        self._parser_backend.addValue(
            'repository_filepaths', self.upload_files.calc_files(self.mainfile))
        self._parser_backend.closeNonOverlappingSection('section_repository_info')

182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
        self.add_processor_info(self.parser)

    @contextmanager
    def use_parser_backend(self, processor_name):
        self._parser_backend.reset_status()
        yield self._parser_backend
        self.add_processor_info(processor_name)

    def add_processor_info(self, processor_name: str) -> None:
        self._parser_backend.openContext('/section_calculation_info/0')
        self._parser_backend.openNonOverlappingSection('section_archive_processing_info')
        self._parser_backend.addValue('archive_processor_name', processor_name)

        if self._parser_backend.status[0] == 'ParseSuccess':
            warnings = getattr(self._parser_backend, '_warnings', [])
            if len(warnings) > 0:
                self._parser_backend.addValue('archive_processor_status', 'WithWarnings')
                self._parser_backend.addValue('archive_processor_warning_number', len(warnings))
                self._parser_backend.addArrayValues('archive_processor_warnings', [str(warning) for warning in warnings])
            else:
                self._parser_backend.addValue('archive_processor_status', 'Success')
        else:
204
            errors = self._parser_backend.tasks_status[1]
205
206
207
208
            self._parser_backend.addValue('archive_processor_error', str(errors))

        self._parser_backend.closeNonOverlappingSection('section_archive_processing_info')
        self._parser_backend.closeContext('/section_calculation_info/0')
Markus Scheidgen's avatar
Markus Scheidgen committed
209
210
211
212
213

    @task
    def normalizing(self):
        for normalizer in normalizers:
            normalizer_name = normalizer.__name__
214
215
            context = dict(normalizer=normalizer_name, step=normalizer_name)
            logger = self.get_calc_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
216
217

            with utils.timer(
218
                    logger, 'normalizer executed', input_size=self.mainfile_file.size):
219
220
                with self.use_parser_backend(normalizer_name) as backend:
                    normalizer(backend).normalize(logger=logger)
Markus Scheidgen's avatar
Markus Scheidgen committed
221

222
223
            failed = self._parser_backend.status[0] != 'ParseSuccess'
            if failed:
224
                logger.error(self._parser_backend.status[1])
Markus Scheidgen's avatar
Markus Scheidgen committed
225
                error = self._parser_backend.status[1]
226
                self.fail(error, level=logging.WARNING, **context)
227
228
229
230
                break
            else:
                logger.debug(
                    'completed normalizer successfully', normalizer=normalizer_name)
Markus Scheidgen's avatar
Markus Scheidgen committed
231
232
233

    @task
    def archiving(self):
234
235
        logger = self.get_logger()

236
        # persist the repository metadata
237
238
239
        with utils.timer(logger, 'indexed', step='index'):
            self.upload_files.metadata.insert(self._parser_backend.metadata())

240
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
241
242
243
        with utils.timer(
                logger, 'archived', step='archive',
                input_size=self.mainfile_file.size) as log_data:
244
            with self.upload_files.archive_file(self.calc_id, 'wt') as out:
245
246
                self._parser_backend.write_json(out, pretty=True)

247
            log_data.update(archive_size=self.upload_files.archive_file_object(self.calc_id).size)
Markus Scheidgen's avatar
Markus Scheidgen committed
248
249
250
251
252
253

        # close loghandler
        if self._calc_proc_logwriter is not None:
            with utils.timer(
                    logger, 'archived log', step='archive_log',
                    input_size=self.mainfile_file.size) as log_data:
254
                self._calc_proc_logwriter_ctx.__exit__(None, None, None)  # pylint: disable=E1101
255
                self._calc_proc_logwriter = None
Markus Scheidgen's avatar
Markus Scheidgen committed
256

257
                log_data.update(log_size=self.upload_files.archive_log_file_object(self.calc_id).size)
Markus Scheidgen's avatar
Markus Scheidgen committed
258

259
260
261
262
263
264
    def to_calc_with_metadata(self):
        return self.to(FilesCalc).to_calc_with_metadata()


datamodel.CalcWithMetadata.register_mapping(Calc, Calc.to_calc_with_metadata)

Markus Scheidgen's avatar
Markus Scheidgen committed
265

266
class Upload(Chord, datamodel.Upload):
Markus Scheidgen's avatar
Markus Scheidgen committed
267
268
269
270
271
272
    """
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
273
        local_path: optional local path, e.g. for files that are already somewhere on the server
274
        metadata: optional user provided additional meta data
Markus Scheidgen's avatar
Markus Scheidgen committed
275
276
        upload_id: the upload id generated by the database
        upload_time: the timestamp when the system realised the upload
277
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
278
279
280
281
282
283
    """
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)

    name = StringField(default=None)
284
    local_path = StringField(default=None)
285
    metadata = DictField(default=None)
Markus Scheidgen's avatar
Markus Scheidgen committed
286
    upload_time = DateTimeField()
287
    user_id = StringField(required=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
288
289
290

    meta: Any = {
        'indexes': [
291
            'user_id', 'tasks_status'
Markus Scheidgen's avatar
Markus Scheidgen committed
292
293
294
295
296
        ]
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
297
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
298
299
300
301
302
303

    @classmethod
    def get(cls, id):
        return cls.get_by_id(id, 'upload_id')

    @classmethod
304
    def user_uploads(cls, user: coe_repo.User) -> List['Upload']:
Markus Scheidgen's avatar
Markus Scheidgen committed
305
        """ Returns all uploads for the given user. Currently returns all uploads. """
306
        return cls.objects(user_id=str(user.user_id))
Markus Scheidgen's avatar
Markus Scheidgen committed
307

308
309
310
311
    @property
    def uploader(self):
        return coe_repo.User.from_user_id(self.user_id)

Markus Scheidgen's avatar
Markus Scheidgen committed
312
313
314
315
316
317
318
319
320
321
322
    def get_logger(self, **kwargs):
        logger = super().get_logger()
        logger = logger.bind(upload_id=self.upload_id, **kwargs)
        return logger

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
        """
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
323
324

        Arguments:
325
            user (coe_repo.User): The user that created the upload.
Markus Scheidgen's avatar
Markus Scheidgen committed
326
        """
327
        user: coe_repo.User = kwargs['user']
328
329
330
        del(kwargs['user'])
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
331
        kwargs.update(user_id=str(user.user_id))
Markus Scheidgen's avatar
Markus Scheidgen committed
332
        self = super().create(**kwargs)
333

Markus Scheidgen's avatar
Markus Scheidgen committed
334
        self._continue_with('uploading')
335

Markus Scheidgen's avatar
Markus Scheidgen committed
336
337
        return self

338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
    def delete(self):
        """ Deletes this upload process state entry and its calcs. """
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()

    @process
    def delete_upload(self):
        """
        Deletes of the upload, including its processing state and
        staging files.
        """
        logger = self.get_logger()

        with utils.lnr(logger, 'staged upload delete failed'):
            with utils.timer(
                    logger, 'staged upload deleted', step='delete',
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
                self.delete()

        return True  # do not save the process status on the delete upload
359

360
    @process
361
    def publish_upload(self):
362
363
364
365
366
367
368
        """
        Moves the upload out of staging to add it to the coe repository. It will
        pack the staging upload files in to public upload files, add entries to the
        coe repository db and remove this instance and its calculation from the
        processing state db.
        """
        logger = self.get_logger()
369

370
        with utils.lnr(logger, 'publish failed'):
371
            with utils.timer(
372
                    logger, 'upload added to repository', step='publish',
373
374
375
376
                    upload_size=self.upload_files.size):
                coe_repo.Upload.add(self, self.metadata)

            with utils.timer(
377
                    logger, 'staged upload files packed', step='publish',
378
379
380
381
                    upload_size=self.upload_files.size):
                self.upload_files.pack()

            with utils.timer(
382
                    logger, 'staged upload deleted', step='publish',
383
384
385
                    upload_size=self.upload_files.size):
                self.upload_files.delete()
                self.delete()
Markus Scheidgen's avatar
Markus Scheidgen committed
386

387
        return True  # do not save the process status on the delete upload
388

Markus Scheidgen's avatar
Markus Scheidgen committed
389
    @process
390
    def process_upload(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
391
392
393
394
395
396
397
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
        pass

398
    @property
399
400
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
401
            self._upload_files = ArchiveBasedStagingUploadFiles(self.upload_id, is_authorized=lambda: True, local_path=self.local_path)
402
        return self._upload_files
403

Markus Scheidgen's avatar
Markus Scheidgen committed
404
405
    @task
    def extracting(self):
406
407
408
409
410
411
        """
        Task performed before the actual parsing/normalizing. Extracting and bagging
        the uploaded files, computing all keys, create an *upload* entry in the NOMAD-coe
        repository db, etc.
        """
        # extract the uploaded file, this will also create a bagit bag.
Markus Scheidgen's avatar
Markus Scheidgen committed
412
413
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
414
415
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
416
417
                    upload_size=self.upload_files.size):
                self.upload_files.extract()
418
        except KeyError:
419
            self.fail('processing requested for non existing upload', log_level=logging.ERROR)
420
421
422
            return
        except ExtractError:
            self.fail('bad .zip/.tar file', log_level=logging.INFO)
Markus Scheidgen's avatar
Markus Scheidgen committed
423
424
            return

425
    def match_mainfiles(self) -> Generator[Tuple[str, object], None, None]:
426
427
428
429
430
431
432
        """
        Generator function that matches all files in the upload to all parsers to
        determine the upload's mainfiles.

        Returns:
            Tuples of mainfile, filename, and parsers
        """
433
        for filename in self.upload_files.raw_file_manifest():
434
435
            for parser in parsers:
                try:
436
                    with self.upload_files.raw_file(filename) as mainfile_f:
437
                        if parser.is_mainfile(filename, lambda fn: mainfile_f):
438
                            yield filename, parser
439
                except Exception as e:
440
                    self.get_logger().error(
441
442
443
                        'exception while matching pot. mainfile',
                        mainfile=filename, exc_info=e)

Markus Scheidgen's avatar
Markus Scheidgen committed
444
445
    @task
    def parse_all(self):
446
        """
447
        Identified mainfile/parser combinations among the upload's files, creates
448
449
        respective :class:`Calc` instances, and triggers their processing.
        """
450
451
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
452
        # TODO: deal with multiple possible parser specs
Markus Scheidgen's avatar
Markus Scheidgen committed
453
454
        with utils.timer(
                logger, 'upload extracted', step='matching',
455
                upload_size=self.upload_files.size):
456
            total_calcs = 0
457
            for filename, parser in self.match_mainfiles():
458
                calc = Calc.create(
459
                    calc_id=self.upload_files.calc_id(filename),
460
461
462
                    mainfile=filename, parser=parser.name,
                    upload_id=self.upload_id)

463
                calc.process_calc()
464
                total_calcs += 1
Markus Scheidgen's avatar
Markus Scheidgen committed
465

466
467
468
469
470
        # have to save the total_calcs information for chord management
        self.spwaned_childred(total_calcs)

    def join(self):
        self.cleanup()
Markus Scheidgen's avatar
Markus Scheidgen committed
471
472
473

    @task
    def cleanup(self):
474
475
        # nothing todo with the current processing setup
        pass
Markus Scheidgen's avatar
Markus Scheidgen committed
476
477

    @property
478
    def processed_calcs(self):
479
        return Calc.objects(upload_id=self.upload_id, tasks_status__in=[SUCCESS, FAILURE]).count()
480
481
482
483
484
485
486

    @property
    def total_calcs(self):
        return Calc.objects(upload_id=self.upload_id).count()

    @property
    def failed_calcs(self):
487
        return Calc.objects(upload_id=self.upload_id, tasks_status=FAILURE).count()
488

489
490
    @property
    def pending_calcs(self):
491
        return Calc.objects(upload_id=self.upload_id, tasks_status=PENDING).count()
492

493
494
    def all_calcs(self, start, end, order_by='mainfile'):
        return Calc.objects(upload_id=self.upload_id)[start:end].order_by(order_by)
495
496
497

    @property
    def calcs(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
498
        return Calc.objects(upload_id=self.upload_id, tasks_status=SUCCESS)