data.py 17.4 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
    :members:
.. autoclass:: Upload
    :members:
"""

27
from typing import List, Any, ContextManager, Tuple, Generator
Markus Scheidgen's avatar
Markus Scheidgen committed
28
from mongoengine import StringField, BooleanField, DateTimeField, DictField, IntField
Markus Scheidgen's avatar
Markus Scheidgen committed
29
import logging
30
from structlog import wrap_logger
31
from contextlib import contextmanager
Markus Scheidgen's avatar
Markus Scheidgen committed
32

33
from nomad import utils, coe_repo, datamodel
34
from nomad.files import PathObject, ArchiveBasedStagingUploadFiles
35
from nomad.processing.base import Proc, Chord, process, task, PENDING, SUCCESS, FAILURE
Markus Scheidgen's avatar
Markus Scheidgen committed
36
from nomad.parsing import parsers, parser_dict
Markus Scheidgen's avatar
Markus Scheidgen committed
37
38
39
40
41
42
from nomad.normalizing import normalizers


class NotAllowedDuringProcessing(Exception): pass


43
class Calc(Proc, datamodel.Calc):
Markus Scheidgen's avatar
Markus Scheidgen committed
44
45
46
47
48
49
50
51
52
53
    """
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

    The attribute list, does not include the various repository properties generated
    while parsing, including ``program_name``, ``program_version``, etc.

    Attributes:
54
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
55
56
57
58
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
    """
59
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
60
61
62
63
64
65
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

    meta: Any = {
        'indices': [
66
            'upload_id', 'mainfile', 'code', 'parser', 'status'
Markus Scheidgen's avatar
Markus Scheidgen committed
67
68
69
70
71
72
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._parser_backend = None
73
74
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
75
        self._calc_proc_logwriter = None
76
        self._calc_proc_logwriter_ctx: ContextManager = None
Markus Scheidgen's avatar
Markus Scheidgen committed
77
78
79

    @classmethod
    def get(cls, id):
80
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
81

Markus Scheidgen's avatar
Markus Scheidgen committed
82
    @property
83
84
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
85

86
87
88
89
90
91
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
        return self._upload

92
93
94
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
95
            self._upload_files = ArchiveBasedStagingUploadFiles(self.upload_id, is_authorized=lambda: True, local_path=self.upload.local_path)
96
97
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
98
99
100
    def get_logger(self, **kwargs):
        logger = super().get_logger()
        logger = logger.bind(
101
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
102

Markus Scheidgen's avatar
Markus Scheidgen committed
103
104
        return logger

105
106
107
108
109
110
111
112
    def get_calc_logger(self, **kwargs):
        """
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
        """
        logger = self.get_logger(**kwargs)

        if self._calc_proc_logwriter is None:
113
            self._calc_proc_logwriter_ctx = self.upload_files.archive_log_file(self.calc_id, 'wt')
114
            self._calc_proc_logwriter = self._calc_proc_logwriter_ctx.__enter__()  # pylint: disable=E1101
115

116
        def save_to_calc_log(logger, method_name, event_dict):
117
118
119
120
121
122
123
124
125
126
127
            program = event_dict.get('normalizer', 'parser')
            event = event_dict.get('event', '')
            entry = '[%s] %s: %s' % (method_name, program, event)
            if len(entry) > 120:
                self._calc_proc_logwriter.write(entry[:120])
                self._calc_proc_logwriter.write('...')
            else:
                self._calc_proc_logwriter.write(entry)
            self._calc_proc_logwriter.write('\n')
            return event_dict

128
        return wrap_logger(logger, processors=[save_to_calc_log])
129

Markus Scheidgen's avatar
Markus Scheidgen committed
130
131
    @process
    def process(self):
132
        logger = self.get_logger()
133
        if self.upload is None:
134
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
135
136
137
138
139
140

        try:
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
141
            # close loghandler that was not closed due to failures
142
            try:
143
144
145
                if self._calc_proc_logwriter is not None:
                    self._calc_proc_logwriter.close()
                    self._calc_proc_logwriter = None
146
147
148
149
            except Exception as e:
                logger.error('could not close calculation proc log', exc_info=e)

            # inform parent proc about completion
150
            self.upload.completed_child()
Markus Scheidgen's avatar
Markus Scheidgen committed
151
152
153

    @task
    def parsing(self):
154
155
        context = dict(parser=self.parser, step=self.parser)
        logger = self.get_calc_logger(**context)
156
        parser = parser_dict[self.parser]
Markus Scheidgen's avatar
Markus Scheidgen committed
157

158
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
159
160
            self._parser_backend = parser.run(
                self.upload_files.raw_file_object(self.mainfile).os_path, logger=logger)
Markus Scheidgen's avatar
Markus Scheidgen committed
161

162
163
        self._parser_backend.openNonOverlappingSection('section_calculation_info')
        self._parser_backend.addValue('upload_id', self.upload_id)
164
165
        self._parser_backend.addValue('calc_id', self.calc_id)
        self._parser_backend.addValue('calc_hash', self.upload_files.calc_hash(self.mainfile))
166
167
168
        self._parser_backend.addValue('main_file', self.mainfile)
        self._parser_backend.addValue('parser_name', self.parser)

Markus Scheidgen's avatar
Markus Scheidgen committed
169
        if self._parser_backend.status[0] != 'ParseSuccess':
170
            logger.error(self._parser_backend.status[1])
Markus Scheidgen's avatar
Markus Scheidgen committed
171
            error = self._parser_backend.status[1]
172
            self._parser_backend.addValue('parse_status', 'ParseFailure')
173
            self.fail(error, level=logging.DEBUG, **context)
174
175
176
177
178
        else:
            self._parser_backend.addValue('parse_status', 'ParseSuccess')

        self._parser_backend.closeNonOverlappingSection('section_calculation_info')

179
180
181
182
183
184
        self._parser_backend.openNonOverlappingSection('section_repository_info')
        self._parser_backend.addValue('repository_archive_gid', '%s/%s' % (self.upload_id, self.calc_id))
        self._parser_backend.addValue(
            'repository_filepaths', self.upload_files.calc_files(self.mainfile))
        self._parser_backend.closeNonOverlappingSection('section_repository_info')

185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
        self.add_processor_info(self.parser)

    @contextmanager
    def use_parser_backend(self, processor_name):
        self._parser_backend.reset_status()
        yield self._parser_backend
        self.add_processor_info(processor_name)

    def add_processor_info(self, processor_name: str) -> None:
        self._parser_backend.openContext('/section_calculation_info/0')
        self._parser_backend.openNonOverlappingSection('section_archive_processing_info')
        self._parser_backend.addValue('archive_processor_name', processor_name)

        if self._parser_backend.status[0] == 'ParseSuccess':
            warnings = getattr(self._parser_backend, '_warnings', [])
            if len(warnings) > 0:
                self._parser_backend.addValue('archive_processor_status', 'WithWarnings')
                self._parser_backend.addValue('archive_processor_warning_number', len(warnings))
                self._parser_backend.addArrayValues('archive_processor_warnings', [str(warning) for warning in warnings])
            else:
                self._parser_backend.addValue('archive_processor_status', 'Success')
        else:
            errors = self._parser_backend.status[1]
            self._parser_backend.addValue('archive_processor_error', str(errors))

        self._parser_backend.closeNonOverlappingSection('section_archive_processing_info')
        self._parser_backend.closeContext('/section_calculation_info/0')
Markus Scheidgen's avatar
Markus Scheidgen committed
212
213
214
215
216

    @task
    def normalizing(self):
        for normalizer in normalizers:
            normalizer_name = normalizer.__name__
217
218
            context = dict(normalizer=normalizer_name, step=normalizer_name)
            logger = self.get_calc_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
219
220

            with utils.timer(
221
                    logger, 'normalizer executed', input_size=self.mainfile_file.size):
222
223
                with self.use_parser_backend(normalizer_name) as backend:
                    normalizer(backend).normalize(logger=logger)
Markus Scheidgen's avatar
Markus Scheidgen committed
224

225
226
            failed = self._parser_backend.status[0] != 'ParseSuccess'
            if failed:
227
                logger.error(self._parser_backend.status[1])
Markus Scheidgen's avatar
Markus Scheidgen committed
228
                error = self._parser_backend.status[1]
229
                self.fail(error, level=logging.WARNING, **context)
230
231
232
233
                break
            else:
                logger.debug(
                    'completed normalizer successfully', normalizer=normalizer_name)
Markus Scheidgen's avatar
Markus Scheidgen committed
234
235
236

    @task
    def archiving(self):
237
238
        logger = self.get_logger()

239
        # persist the repository metadata
240
241
242
        with utils.timer(logger, 'indexed', step='index'):
            self.upload_files.metadata.insert(self._parser_backend.metadata())

243
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
244
245
246
        with utils.timer(
                logger, 'archived', step='archive',
                input_size=self.mainfile_file.size) as log_data:
247
            with self.upload_files.archive_file(self.calc_id, 'wt') as out:
248
249
                self._parser_backend.write_json(out, pretty=True)

250
            log_data.update(archive_size=self.upload_files.archive_file_object(self.calc_id).size)
Markus Scheidgen's avatar
Markus Scheidgen committed
251
252
253
254
255
256

        # close loghandler
        if self._calc_proc_logwriter is not None:
            with utils.timer(
                    logger, 'archived log', step='archive_log',
                    input_size=self.mainfile_file.size) as log_data:
257
                self._calc_proc_logwriter_ctx.__exit__(None, None, None)  # pylint: disable=E1101
258
                self._calc_proc_logwriter = None
Markus Scheidgen's avatar
Markus Scheidgen committed
259

260
                log_data.update(log_size=self.upload_files.archive_log_file_object(self.calc_id).size)
Markus Scheidgen's avatar
Markus Scheidgen committed
261

Markus Scheidgen's avatar
Markus Scheidgen committed
262

263
class Upload(Chord, datamodel.Upload):
Markus Scheidgen's avatar
Markus Scheidgen committed
264
265
266
267
268
269
    """
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
270
        local_path: optional local path, e.g. for files that are already somewhere on the server
Markus Scheidgen's avatar
Markus Scheidgen committed
271
272
273
274
        additional_metadata: optional user provided additional meta data
        upload_id: the upload id generated by the database
        is_private: true if the upload and its derivitaves are only visible to the uploader
        upload_time: the timestamp when the system realised the upload
275
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
276
277
278
279
280
281
    """
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)

    name = StringField(default=None)
282
    local_path = StringField(default=None)
Markus Scheidgen's avatar
Markus Scheidgen committed
283
284
285
286
287
288
    additional_metadata = DictField(default=None)

    is_private = BooleanField(default=False)

    upload_time = DateTimeField()

289
    user_id = StringField(required=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
290

291
292
    coe_repo_upload_id = IntField(default=None)

293
294
    _initiated_parsers = IntField(default=-1)

Markus Scheidgen's avatar
Markus Scheidgen committed
295
296
    meta: Any = {
        'indexes': [
297
            'user_id', 'status'
Markus Scheidgen's avatar
Markus Scheidgen committed
298
299
300
301
302
        ]
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
303
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
304
305
306
307
308
309

    @classmethod
    def get(cls, id):
        return cls.get_by_id(id, 'upload_id')

    @classmethod
310
    def user_uploads(cls, user: coe_repo.User) -> List['Upload']:
Markus Scheidgen's avatar
Markus Scheidgen committed
311
        """ Returns all uploads for the given user. Currently returns all uploads. """
312
        return cls.objects(user_id=str(user.user_id))
Markus Scheidgen's avatar
Markus Scheidgen committed
313

314
315
316
317
    @property
    def uploader(self):
        return coe_repo.User.from_user_id(self.user_id)

Markus Scheidgen's avatar
Markus Scheidgen committed
318
319
320
321
322
323
    def get_logger(self, **kwargs):
        logger = super().get_logger()
        logger = logger.bind(upload_id=self.upload_id, **kwargs)
        return logger

    def delete(self):
324
        if not (self.completed or self.current_task == 'uploading'):
Markus Scheidgen's avatar
Markus Scheidgen committed
325
            raise NotAllowedDuringProcessing()
326
327
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()
Markus Scheidgen's avatar
Markus Scheidgen committed
328
329
330
331
332
333
334

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
        """
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
335
336

        Arguments:
337
            user (coe_repo.User): The user that created the upload.
Markus Scheidgen's avatar
Markus Scheidgen committed
338
        """
339
        user: coe_repo.User = kwargs['user']
340
341
342
        del(kwargs['user'])
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
343
        kwargs.update(user_id=str(user.user_id))
Markus Scheidgen's avatar
Markus Scheidgen committed
344
        self = super().create(**kwargs)
345

Markus Scheidgen's avatar
Markus Scheidgen committed
346
        self._continue_with('uploading')
347

Markus Scheidgen's avatar
Markus Scheidgen committed
348
349
        return self

350
    def unstage(self, meta_data):
351
        self.get_logger().info('unstage')
352
353
354
355

        if not (self.completed or self.current_task == 'uploading'):
            raise NotAllowedDuringProcessing()

356
        coe_repo.Upload.add(self, meta_data)
Markus Scheidgen's avatar
Markus Scheidgen committed
357

358
359
        self.upload_files.pack()
        self.upload_files.delete()
360
        self.delete()
361

Markus Scheidgen's avatar
Markus Scheidgen committed
362
363
364
365
366
367
368
369
370
    @process
    def process(self):
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
        pass

371
    @property
372
373
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
374
            self._upload_files = ArchiveBasedStagingUploadFiles(self.upload_id, is_authorized=lambda: True, local_path=self.local_path)
375
        return self._upload_files
376

Markus Scheidgen's avatar
Markus Scheidgen committed
377
378
    @task
    def extracting(self):
379
380
381
382
383
384
        """
        Task performed before the actual parsing/normalizing. Extracting and bagging
        the uploaded files, computing all keys, create an *upload* entry in the NOMAD-coe
        repository db, etc.
        """
        # extract the uploaded file, this will also create a bagit bag.
Markus Scheidgen's avatar
Markus Scheidgen committed
385
386
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
387
388
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
389
390
                    upload_size=self.upload_files.size):
                self.upload_files.extract()
391
        except KeyError:
392
            self.fail('process request for non existing upload', level=logging.ERROR)
Markus Scheidgen's avatar
Markus Scheidgen committed
393
394
            return

395
    def match_mainfiles(self) -> Generator[Tuple[str, object], None, None]:
396
397
398
399
400
401
402
        """
        Generator function that matches all files in the upload to all parsers to
        determine the upload's mainfiles.

        Returns:
            Tuples of mainfile, filename, and parsers
        """
403
        for filename in self.upload_files.raw_file_manifest():
404
405
            for parser in parsers:
                try:
406
                    with self.upload_files.raw_file(filename) as mainfile_f:
407
                        if parser.is_mainfile(filename, lambda fn: mainfile_f):
408
                            yield filename, parser
409
                except Exception as e:
410
                    self.get_logger().error(
411
412
413
                        'exception while matching pot. mainfile',
                        mainfile=filename, exc_info=e)

Markus Scheidgen's avatar
Markus Scheidgen committed
414
415
    @task
    def parse_all(self):
416
        """
417
        Identified mainfile/parser combinations among the upload's files, creates
418
419
        respective :class:`Calc` instances, and triggers their processing.
        """
420
421
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
422
        # TODO: deal with multiple possible parser specs
Markus Scheidgen's avatar
Markus Scheidgen committed
423
424
        with utils.timer(
                logger, 'upload extracted', step='matching',
425
                upload_size=self.upload_files.size):
426
            total_calcs = 0
427
            for filename, parser in self.match_mainfiles():
428
                calc = Calc.create(
429
                    calc_id=self.upload_files.calc_id(filename),
430
431
432
433
434
                    mainfile=filename, parser=parser.name,
                    upload_id=self.upload_id)

                calc.process()
                total_calcs += 1
Markus Scheidgen's avatar
Markus Scheidgen committed
435

436
437
438
439
440
        # have to save the total_calcs information for chord management
        self.spwaned_childred(total_calcs)

    def join(self):
        self.cleanup()
Markus Scheidgen's avatar
Markus Scheidgen committed
441
442
443

    @task
    def cleanup(self):
444
445
446
447
448
        # TODO issue #83
        with utils.timer(
                self.get_logger(), 'pack staging upload', step='cleaning',
                upload_size=self.upload_files.size):
            pass
Markus Scheidgen's avatar
Markus Scheidgen committed
449
450

    @property
451
452
453
454
455
456
457
458
459
460
461
    def processed_calcs(self):
        return Calc.objects(upload_id=self.upload_id, status__in=[SUCCESS, FAILURE]).count()

    @property
    def total_calcs(self):
        return Calc.objects(upload_id=self.upload_id).count()

    @property
    def failed_calcs(self):
        return Calc.objects(upload_id=self.upload_id, status=FAILURE).count()

462
463
464
465
    @property
    def pending_calcs(self):
        return Calc.objects(upload_id=self.upload_id, status=PENDING).count()

466
467
    def all_calcs(self, start, end, order_by='mainfile'):
        return Calc.objects(upload_id=self.upload_id)[start:end].order_by(order_by)
468
469
470
471

    @property
    def calcs(self):
        return Calc.objects(upload_id=self.upload_id)