data.py 17.4 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
    :members:
.. autoclass:: Upload
    :members:
"""

27
from typing import List, Any, ContextManager, Tuple, Generator
Markus Scheidgen's avatar
Markus Scheidgen committed
28
from mongoengine import StringField, BooleanField, DateTimeField, DictField, IntField
Markus Scheidgen's avatar
Markus Scheidgen committed
29
import logging
30
from structlog import wrap_logger
31
from contextlib import contextmanager
Markus Scheidgen's avatar
Markus Scheidgen committed
32

33
from nomad import utils, coe_repo, datamodel
34
from nomad.files import PathObject, ArchiveBasedStagingUploadFiles, ExtractError
35
from nomad.processing.base import Proc, Chord, process, task, PENDING, SUCCESS, FAILURE
Markus Scheidgen's avatar
Markus Scheidgen committed
36
from nomad.parsing import parsers, parser_dict
Markus Scheidgen's avatar
Markus Scheidgen committed
37
38
39
40
41
42
from nomad.normalizing import normalizers


class NotAllowedDuringProcessing(Exception): pass


43
class Calc(Proc, datamodel.Calc):
Markus Scheidgen's avatar
Markus Scheidgen committed
44
45
46
47
48
49
50
51
52
53
    """
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

    The attribute list, does not include the various repository properties generated
    while parsing, including ``program_name``, ``program_version``, etc.

    Attributes:
54
        calc_id: the calc_id of this calc
Markus Scheidgen's avatar
Markus Scheidgen committed
55
56
57
58
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
    """
59
    calc_id = StringField(primary_key=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
60
61
62
63
64
65
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()

    meta: Any = {
        'indices': [
66
            'upload_id', 'mainfile', 'code', 'parser', 'status'
Markus Scheidgen's avatar
Markus Scheidgen committed
67
68
69
70
71
72
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._parser_backend = None
73
74
        self._upload: Upload = None
        self._upload_files: ArchiveBasedStagingUploadFiles = None
75
        self._calc_proc_logwriter = None
76
        self._calc_proc_logwriter_ctx: ContextManager = None
Markus Scheidgen's avatar
Markus Scheidgen committed
77
78
79

    @classmethod
    def get(cls, id):
80
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
81

Markus Scheidgen's avatar
Markus Scheidgen committed
82
    @property
83
84
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
85

86
87
88
89
90
91
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
        return self._upload

92
93
94
    @property
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
95
            self._upload_files = ArchiveBasedStagingUploadFiles(self.upload_id, is_authorized=lambda: True, local_path=self.upload.local_path)
96
97
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
98
99
100
    def get_logger(self, **kwargs):
        logger = super().get_logger()
        logger = logger.bind(
101
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
102

Markus Scheidgen's avatar
Markus Scheidgen committed
103
104
        return logger

105
106
107
108
109
110
111
112
    def get_calc_logger(self, **kwargs):
        """
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
        """
        logger = self.get_logger(**kwargs)

        if self._calc_proc_logwriter is None:
113
            self._calc_proc_logwriter_ctx = self.upload_files.archive_log_file(self.calc_id, 'wt')
114
            self._calc_proc_logwriter = self._calc_proc_logwriter_ctx.__enter__()  # pylint: disable=E1101
115

116
        def save_to_calc_log(logger, method_name, event_dict):
117
118
119
120
121
122
123
124
125
126
127
            program = event_dict.get('normalizer', 'parser')
            event = event_dict.get('event', '')
            entry = '[%s] %s: %s' % (method_name, program, event)
            if len(entry) > 120:
                self._calc_proc_logwriter.write(entry[:120])
                self._calc_proc_logwriter.write('...')
            else:
                self._calc_proc_logwriter.write(entry)
            self._calc_proc_logwriter.write('\n')
            return event_dict

128
        return wrap_logger(logger, processors=[save_to_calc_log])
129

Markus Scheidgen's avatar
Markus Scheidgen committed
130
131
    @process
    def process(self):
132
        logger = self.get_logger()
133
        if self.upload is None:
134
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
135
136
137
138
139
140

        try:
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
141
            # close loghandler that was not closed due to failures
142
            try:
143
144
145
                if self._calc_proc_logwriter is not None:
                    self._calc_proc_logwriter.close()
                    self._calc_proc_logwriter = None
146
147
148
149
            except Exception as e:
                logger.error('could not close calculation proc log', exc_info=e)

            # inform parent proc about completion
150
            self.upload.completed_child()
Markus Scheidgen's avatar
Markus Scheidgen committed
151
152
153

    @task
    def parsing(self):
154
155
        context = dict(parser=self.parser, step=self.parser)
        logger = self.get_calc_logger(**context)
156
        parser = parser_dict[self.parser]
Markus Scheidgen's avatar
Markus Scheidgen committed
157

158
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
159
160
            self._parser_backend = parser.run(
                self.upload_files.raw_file_object(self.mainfile).os_path, logger=logger)
Markus Scheidgen's avatar
Markus Scheidgen committed
161

162
163
        self._parser_backend.openNonOverlappingSection('section_calculation_info')
        self._parser_backend.addValue('upload_id', self.upload_id)
164
165
        self._parser_backend.addValue('calc_id', self.calc_id)
        self._parser_backend.addValue('calc_hash', self.upload_files.calc_hash(self.mainfile))
166
167
168
        self._parser_backend.addValue('main_file', self.mainfile)
        self._parser_backend.addValue('parser_name', self.parser)

Markus Scheidgen's avatar
Markus Scheidgen committed
169
        if self._parser_backend.status[0] != 'ParseSuccess':
170
            logger.error(self._parser_backend.status[1])
Markus Scheidgen's avatar
Markus Scheidgen committed
171
            error = self._parser_backend.status[1]
172
            self._parser_backend.addValue('parse_status', 'ParseFailure')
173
            self.fail(error, level=logging.DEBUG, **context)
174
175
176
177
178
        else:
            self._parser_backend.addValue('parse_status', 'ParseSuccess')

        self._parser_backend.closeNonOverlappingSection('section_calculation_info')

179
180
181
182
183
184
        self._parser_backend.openNonOverlappingSection('section_repository_info')
        self._parser_backend.addValue('repository_archive_gid', '%s/%s' % (self.upload_id, self.calc_id))
        self._parser_backend.addValue(
            'repository_filepaths', self.upload_files.calc_files(self.mainfile))
        self._parser_backend.closeNonOverlappingSection('section_repository_info')

185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
        self.add_processor_info(self.parser)

    @contextmanager
    def use_parser_backend(self, processor_name):
        self._parser_backend.reset_status()
        yield self._parser_backend
        self.add_processor_info(processor_name)

    def add_processor_info(self, processor_name: str) -> None:
        self._parser_backend.openContext('/section_calculation_info/0')
        self._parser_backend.openNonOverlappingSection('section_archive_processing_info')
        self._parser_backend.addValue('archive_processor_name', processor_name)

        if self._parser_backend.status[0] == 'ParseSuccess':
            warnings = getattr(self._parser_backend, '_warnings', [])
            if len(warnings) > 0:
                self._parser_backend.addValue('archive_processor_status', 'WithWarnings')
                self._parser_backend.addValue('archive_processor_warning_number', len(warnings))
                self._parser_backend.addArrayValues('archive_processor_warnings', [str(warning) for warning in warnings])
            else:
                self._parser_backend.addValue('archive_processor_status', 'Success')
        else:
            errors = self._parser_backend.status[1]
            self._parser_backend.addValue('archive_processor_error', str(errors))

        self._parser_backend.closeNonOverlappingSection('section_archive_processing_info')
        self._parser_backend.closeContext('/section_calculation_info/0')
Markus Scheidgen's avatar
Markus Scheidgen committed
212
213
214
215
216

    @task
    def normalizing(self):
        for normalizer in normalizers:
            normalizer_name = normalizer.__name__
217
218
            context = dict(normalizer=normalizer_name, step=normalizer_name)
            logger = self.get_calc_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
219
220

            with utils.timer(
221
                    logger, 'normalizer executed', input_size=self.mainfile_file.size):
222
223
                with self.use_parser_backend(normalizer_name) as backend:
                    normalizer(backend).normalize(logger=logger)
Markus Scheidgen's avatar
Markus Scheidgen committed
224

225
226
            failed = self._parser_backend.status[0] != 'ParseSuccess'
            if failed:
227
                logger.error(self._parser_backend.status[1])
Markus Scheidgen's avatar
Markus Scheidgen committed
228
                error = self._parser_backend.status[1]
229
                self.fail(error, level=logging.WARNING, **context)
230
231
232
233
                break
            else:
                logger.debug(
                    'completed normalizer successfully', normalizer=normalizer_name)
Markus Scheidgen's avatar
Markus Scheidgen committed
234
235
236

    @task
    def archiving(self):
237
238
        logger = self.get_logger()

239
        # persist the repository metadata
240
241
242
        with utils.timer(logger, 'indexed', step='index'):
            self.upload_files.metadata.insert(self._parser_backend.metadata())

243
        # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
244
245
246
        with utils.timer(
                logger, 'archived', step='archive',
                input_size=self.mainfile_file.size) as log_data:
247
            with self.upload_files.archive_file(self.calc_id, 'wt') as out:
248
249
                self._parser_backend.write_json(out, pretty=True)

250
            log_data.update(archive_size=self.upload_files.archive_file_object(self.calc_id).size)
Markus Scheidgen's avatar
Markus Scheidgen committed
251
252
253
254
255
256

        # close loghandler
        if self._calc_proc_logwriter is not None:
            with utils.timer(
                    logger, 'archived log', step='archive_log',
                    input_size=self.mainfile_file.size) as log_data:
257
                self._calc_proc_logwriter_ctx.__exit__(None, None, None)  # pylint: disable=E1101
258
                self._calc_proc_logwriter = None
Markus Scheidgen's avatar
Markus Scheidgen committed
259

260
                log_data.update(log_size=self.upload_files.archive_log_file_object(self.calc_id).size)
Markus Scheidgen's avatar
Markus Scheidgen committed
261

Markus Scheidgen's avatar
Markus Scheidgen committed
262

263
class Upload(Chord, datamodel.Upload):
Markus Scheidgen's avatar
Markus Scheidgen committed
264
265
266
267
268
269
    """
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
270
        local_path: optional local path, e.g. for files that are already somewhere on the server
Markus Scheidgen's avatar
Markus Scheidgen committed
271
272
273
274
        additional_metadata: optional user provided additional meta data
        upload_id: the upload id generated by the database
        is_private: true if the upload and its derivitaves are only visible to the uploader
        upload_time: the timestamp when the system realised the upload
275
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
276
277
278
279
280
281
    """
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)

    name = StringField(default=None)
282
    local_path = StringField(default=None)
Markus Scheidgen's avatar
Markus Scheidgen committed
283
284
285
286
287
288
    additional_metadata = DictField(default=None)

    is_private = BooleanField(default=False)

    upload_time = DateTimeField()

289
    user_id = StringField(required=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
290

291
292
    coe_repo_upload_id = IntField(default=None)

293
294
    _initiated_parsers = IntField(default=-1)

Markus Scheidgen's avatar
Markus Scheidgen committed
295
296
    meta: Any = {
        'indexes': [
297
            'user_id', 'status'
Markus Scheidgen's avatar
Markus Scheidgen committed
298
299
300
301
302
        ]
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
303
        self._upload_files: ArchiveBasedStagingUploadFiles = None
Markus Scheidgen's avatar
Markus Scheidgen committed
304
305
306
307
308
309

    @classmethod
    def get(cls, id):
        return cls.get_by_id(id, 'upload_id')

    @classmethod
310
    def user_uploads(cls, user: coe_repo.User) -> List['Upload']:
Markus Scheidgen's avatar
Markus Scheidgen committed
311
        """ Returns all uploads for the given user. Currently returns all uploads. """
312
        return cls.objects(user_id=str(user.user_id))
Markus Scheidgen's avatar
Markus Scheidgen committed
313

314
315
316
317
    @property
    def uploader(self):
        return coe_repo.User.from_user_id(self.user_id)

Markus Scheidgen's avatar
Markus Scheidgen committed
318
319
320
321
322
    def get_logger(self, **kwargs):
        logger = super().get_logger()
        logger = logger.bind(upload_id=self.upload_id, **kwargs)
        return logger

323
324
    def delete(self, force: bool = False):
        if not (self.completed or force):
Markus Scheidgen's avatar
Markus Scheidgen committed
325
            raise NotAllowedDuringProcessing()
326

327
328
        Calc.objects(upload_id=self.upload_id).delete()
        super().delete()
Markus Scheidgen's avatar
Markus Scheidgen committed
329
330
331
332
333
334
335

    @classmethod
    def create(cls, **kwargs) -> 'Upload':
        """
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
336
337

        Arguments:
338
            user (coe_repo.User): The user that created the upload.
Markus Scheidgen's avatar
Markus Scheidgen committed
339
        """
340
        user: coe_repo.User = kwargs['user']
341
342
343
        del(kwargs['user'])
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
344
        kwargs.update(user_id=str(user.user_id))
Markus Scheidgen's avatar
Markus Scheidgen committed
345
        self = super().create(**kwargs)
346

Markus Scheidgen's avatar
Markus Scheidgen committed
347
        self._continue_with('uploading')
348

Markus Scheidgen's avatar
Markus Scheidgen committed
349
350
        return self

351
    def unstage(self, meta_data):
352
        self.get_logger().info('unstage')
353

354
        if not self.completed:
355
356
            raise NotAllowedDuringProcessing()

357
        coe_repo.Upload.add(self, meta_data)
Markus Scheidgen's avatar
Markus Scheidgen committed
358

359
360
        self.upload_files.pack()
        self.upload_files.delete()
361
        self.delete()
362

Markus Scheidgen's avatar
Markus Scheidgen committed
363
364
365
366
367
368
369
370
371
    @process
    def process(self):
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
        pass

372
    @property
373
374
    def upload_files(self) -> ArchiveBasedStagingUploadFiles:
        if not self._upload_files:
375
            self._upload_files = ArchiveBasedStagingUploadFiles(self.upload_id, is_authorized=lambda: True, local_path=self.local_path)
376
        return self._upload_files
377

Markus Scheidgen's avatar
Markus Scheidgen committed
378
379
    @task
    def extracting(self):
380
381
382
383
384
385
        """
        Task performed before the actual parsing/normalizing. Extracting and bagging
        the uploaded files, computing all keys, create an *upload* entry in the NOMAD-coe
        repository db, etc.
        """
        # extract the uploaded file, this will also create a bagit bag.
Markus Scheidgen's avatar
Markus Scheidgen committed
386
387
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
388
389
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
390
391
                    upload_size=self.upload_files.size):
                self.upload_files.extract()
392
        except KeyError:
393
394
395
396
            self.fail('process request for non existing upload', log_level=logging.ERROR)
            return
        except ExtractError:
            self.fail('bad .zip/.tar file', log_level=logging.INFO)
Markus Scheidgen's avatar
Markus Scheidgen committed
397
398
            return

399
    def match_mainfiles(self) -> Generator[Tuple[str, object], None, None]:
400
401
402
403
404
405
406
        """
        Generator function that matches all files in the upload to all parsers to
        determine the upload's mainfiles.

        Returns:
            Tuples of mainfile, filename, and parsers
        """
407
        for filename in self.upload_files.raw_file_manifest():
408
409
            for parser in parsers:
                try:
410
                    with self.upload_files.raw_file(filename) as mainfile_f:
411
                        if parser.is_mainfile(filename, lambda fn: mainfile_f):
412
                            yield filename, parser
413
                except Exception as e:
414
                    self.get_logger().error(
415
416
417
                        'exception while matching pot. mainfile',
                        mainfile=filename, exc_info=e)

Markus Scheidgen's avatar
Markus Scheidgen committed
418
419
    @task
    def parse_all(self):
420
        """
421
        Identified mainfile/parser combinations among the upload's files, creates
422
423
        respective :class:`Calc` instances, and triggers their processing.
        """
424
425
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
426
        # TODO: deal with multiple possible parser specs
Markus Scheidgen's avatar
Markus Scheidgen committed
427
428
        with utils.timer(
                logger, 'upload extracted', step='matching',
429
                upload_size=self.upload_files.size):
430
            total_calcs = 0
431
            for filename, parser in self.match_mainfiles():
432
                calc = Calc.create(
433
                    calc_id=self.upload_files.calc_id(filename),
434
435
436
437
438
                    mainfile=filename, parser=parser.name,
                    upload_id=self.upload_id)

                calc.process()
                total_calcs += 1
Markus Scheidgen's avatar
Markus Scheidgen committed
439

440
441
442
443
444
        # have to save the total_calcs information for chord management
        self.spwaned_childred(total_calcs)

    def join(self):
        self.cleanup()
Markus Scheidgen's avatar
Markus Scheidgen committed
445
446
447

    @task
    def cleanup(self):
448
449
450
451
452
        # TODO issue #83
        with utils.timer(
                self.get_logger(), 'pack staging upload', step='cleaning',
                upload_size=self.upload_files.size):
            pass
Markus Scheidgen's avatar
Markus Scheidgen committed
453
454

    @property
455
456
457
458
459
460
461
462
463
464
465
    def processed_calcs(self):
        return Calc.objects(upload_id=self.upload_id, status__in=[SUCCESS, FAILURE]).count()

    @property
    def total_calcs(self):
        return Calc.objects(upload_id=self.upload_id).count()

    @property
    def failed_calcs(self):
        return Calc.objects(upload_id=self.upload_id, status=FAILURE).count()

466
467
468
469
    @property
    def pending_calcs(self):
        return Calc.objects(upload_id=self.upload_id, status=PENDING).count()

470
471
    def all_calcs(self, start, end, order_by='mainfile'):
        return Calc.objects(upload_id=self.upload_id)[start:end].order_by(order_by)
472
473
474
475

    @property
    def calcs(self):
        return Calc.objects(upload_id=self.upload_id)