data.py 111 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
Markus Scheidgen's avatar
Markus Scheidgen committed
5
6
7
8
9
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
Markus Scheidgen's avatar
Markus Scheidgen committed
10
#     http://www.apache.org/licenses/LICENSE-2.0
Markus Scheidgen's avatar
Markus Scheidgen committed
11
12
#
# Unless required by applicable law or agreed to in writing, software
Markus Scheidgen's avatar
Markus Scheidgen committed
13
# distributed under the License is distributed on an "AS IS" BASIS,
Markus Scheidgen's avatar
Markus Scheidgen committed
14
15
16
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Markus Scheidgen's avatar
Markus Scheidgen committed
17
#
Markus Scheidgen's avatar
Markus Scheidgen committed
18

19
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
20
21
22
23
24
25
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
Markus Scheidgen's avatar
Markus Scheidgen committed
26

Markus Scheidgen's avatar
Markus Scheidgen committed
27
.. autoclass:: Upload
Markus Scheidgen's avatar
Markus Scheidgen committed
28

29
'''
30

31
from typing import cast, Any, List, Tuple, Set, Iterator, Dict, Iterable, Sequence, Union
32
from mongoengine import (
33
    StringField, DateTimeField, BooleanField, IntField, ListField)
34
from pymongo import UpdateOne
35
from structlog import wrap_logger
36
from contextlib import contextmanager
David Sikter's avatar
David Sikter committed
37
import copy
38
import os.path
39
from datetime import datetime, timedelta
40
import hashlib
41
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
42
43
import yaml
import json
44
from functools import lru_cache
45
import requests
46
47
from fastapi.exceptions import RequestValidationError
from pydantic.error_wrappers import ErrorWrapper
Markus Scheidgen's avatar
Markus Scheidgen committed
48

Markus Scheidgen's avatar
Markus Scheidgen committed
49
from nomad import utils, config, infrastructure, search, datamodel, metainfo, parsing, client
Markus Scheidgen's avatar
Markus Scheidgen committed
50
from nomad.files import (
51
    PathObject, UploadFiles, PublicUploadFiles, StagingUploadFiles, UploadBundle, create_tmp_dir)
52
from nomad.processing.base import Proc, process, ProcessStatus, ProcessFailure, ProcessAlreadyRunning
53
from nomad.parsing import Parser
Markus Scheidgen's avatar
Markus Scheidgen committed
54
from nomad.parsing.parsers import parser_dict, match_parser
Markus Scheidgen's avatar
Markus Scheidgen committed
55
from nomad.normalizing import normalizers
56
from nomad.datamodel import (
57
    EntryArchive, EntryMetadata, MongoUploadMetadata, MongoEntryMetadata, MongoSystemMetadata,
58
    EditableUserMetadata, UploadMetadata, AuthLevel)
59
from nomad.archive import (
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
60
    write_partial_archive_to_mongo, delete_partial_archives_from_mongo)
David Sikter's avatar
David Sikter committed
61
from nomad.app.v1.models import (
62
    MetadataEditRequest, And, Aggregation, TermsAggregation, MetadataPagination, MetadataRequired)
63
from nomad.search import update_metadata as es_update_metadata
Markus Scheidgen's avatar
Markus Scheidgen committed
64

65
66
section_metadata = datamodel.EntryArchive.metadata.name
section_workflow = datamodel.EntryArchive.workflow.name
67
section_results = datamodel.EntryArchive.results.name
68
69


David Sikter's avatar
David Sikter committed
70
71
72
73
74
75
76
_mongo_upload_metadata = tuple(
    quantity.name for quantity in MongoUploadMetadata.m_def.definitions)
_mongo_entry_metadata = tuple(
    quantity.name for quantity in MongoEntryMetadata.m_def.definitions)
_mongo_system_metadata = tuple(
    quantity.name for quantity in MongoSystemMetadata.m_def.definitions)
_mongo_entry_metadata_except_system_fields = tuple(
77
78
79
80
81
    quantity_name for quantity_name in _mongo_entry_metadata
    if quantity_name not in _mongo_system_metadata)
_editable_metadata: Dict[str, metainfo.Definition] = {
    quantity.name: quantity for quantity in EditableUserMetadata.m_def.definitions
    if isinstance(quantity, metainfo.Quantity)}
82

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
83

84
85
86
87
88
89
def _pack_log_event(logger, method_name, event_dict):
    try:
        log_data = dict(event_dict)
        log_data.update(**{
            key: value
            for key, value in getattr(logger, '_context', {}).items()
90
            if key not in ['service', 'deployment', 'upload_id', 'calc_id', 'mainfile', 'process_status']})
91
        log_data.update(logger=logger.name)
92
93
94
95
96
97
98
99
100
101
102

        return log_data
    except Exception:
        # raising an exception would cause an indefinite loop
        return event_dict


_log_processors = [
    StackInfoRenderer(),
    _pack_log_event,
    format_exc_info,
103
    TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)]
104
105


106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def check_user_ids(user_ids: Iterable[str], error_message: str):
    '''
    Checks if all user_ids provided in the Iterable `user_ids` are valid. If not, raises an
    AssertionError with the specified error message. The string {id} in `error_message` is
    replaced with the bad value.
    '''
    for user_id in user_ids:
        user = datamodel.User.get(user_id=user_id)
        assert user is not None, error_message.replace('{id}', user_id)


def keys_exist(data: Dict[str, Any], required_keys: Iterable[str], error_message: str):
    '''
    Checks if the specified keys exist in the provided dictionary structure `data`.
    Supports dot-notation to access subkeys.
    '''
    for key in required_keys:
        current = data
        for sub_key in key.split('.'):
            assert sub_key in current, error_message.replace('{key}', key)
            current = current[sub_key]


129
130
131
132
133
134
135
136
137
138
139
140
def generate_entry_id(upload_id: str, mainfile: str) -> str:
    '''
    Generates an id for an entry.
    Arguments:
        upload_id: The id of the upload
        mainfile: The mainfile path (relative to the raw directory).
    Returns:
        The generated entry id
    '''
    return utils.hash(upload_id, mainfile)


141
class MetadataEditRequestHandler:
142
    '''
David Sikter's avatar
David Sikter committed
143
144
145
146
    Class for handling a request to edit metadata. The request may originate either from
    metadata files in the raw directory or from a json dictionary complying with the
    :class:`MetadataEditRequest` format. If the edit request is limited to a specific upload,
    `upload_id` should be specified (only when this is the case can upload level metadata be edited).
147
    '''
148
149
    @classmethod
    def edit_metadata(
David Sikter's avatar
David Sikter committed
150
            cls, edit_request_json: Dict[str, Any], upload_id: str,
151
            user: datamodel.User) -> Dict[str, Any]:
152
        '''
David Sikter's avatar
David Sikter committed
153
154
155
156
157
158
159
160
161
162
        Method to verify and execute a generic request to edit metadata from a certain user.
        The request is specified as a json dictionary. Optionally, the request could be restricted
        to a single upload by specifying `upload_id` (this is necessary when editing upload
        level attributes). If `edit_request_json` has `verify_only` set to True, only
        verification is carried out (i.e. nothing is actually updated). To just run the
        verification should be quick in comparison to actually executing the request (which
        may take some time and requires one or more @process to finish). If the request passes
        the verification step and `verify_only` is not set to True, we will send the request
        for execution, by initiating the @process :func:`edit_upload_metadata` for each affected
        upload.
163

164
165
166
167
168
169
170
171
172
173
        The method returns a json dictionary with verified data (references resolved to explicit
        IDs, list actions always expressed as dicts with "op" and "values", etc), or raises
        an exception, namely:
         -  A :class:`ValidationError` if the request json can't be parsed by pydantic
         -  A :class:`RequestValidationError` with information about validation failures and
            their location (most errors should be of this type, provided that the json is valid)
         -  A :class:`ProcessAlreadyRunning` exception if one of the affected uploads has
            a running process
         -  Some other type of exception, if something goes wrong unexpectedly (should hopefully
            never happen)
174
175
176
        '''
        logger = utils.get_logger('nomad.processing.edit_metadata')
        handler = MetadataEditRequestHandler(
David Sikter's avatar
David Sikter committed
177
            logger, user, edit_request_json=edit_request_json, upload_id=upload_id)
178
        # Validate the request
179
        handler.validate_request()  # Should raise errors if something looks wrong
David Sikter's avatar
David Sikter committed
180

181
        if not edit_request_json.get('verify_only'):
David Sikter's avatar
David Sikter committed
182
            # Check if any of the affected uploads are processing
183
184
185
            for upload in handler.affected_uploads:
                upload.reload()
                if upload.process_running:
186
                    raise ProcessAlreadyRunning(f'Upload {upload.upload_id} is currently processing')
187
188
            # Looks good, try to trigger processing
            for upload in handler.affected_uploads:
189
190
191
192
193
194
                upload.edit_upload_metadata(edit_request_json, user.user_id)  # Trigger the process
        # All went well, return a verified json as response
        verified_json = copy.deepcopy(handler.edit_request_json)
        verified_json['metadata'] = handler.root_metadata
        verified_json['entries'] = handler.entries_metadata
        return verified_json
195

196
197
    def __init__(
            self, logger, user: datamodel.User,
David Sikter's avatar
David Sikter committed
198
            edit_request_json: Dict[str, Any] = None,
199
200
            upload_files: StagingUploadFiles = None,
            upload_id: str = None):
201
202
        # Initialization
        assert user, 'Must specify `user`'
David Sikter's avatar
David Sikter committed
203
        assert (edit_request_json is None) != (upload_files is None), (
204
205
206
            'Must specify either `edit_request` or `upload_files`')
        self.logger = logger
        self.user = user
David Sikter's avatar
David Sikter committed
207
        self.edit_request_json = edit_request_json
208
        self.upload_files = upload_files
209
        self.upload_id = upload_id
210

211
212
        self.errors: List[ErrorWrapper] = []  # A list of all encountered errors, if any
        self.edit_attempt_locs: List[Tuple[str, ...]] = []  # locs where user has attempted to edit something
213
        self.required_auth_level = AuthLevel.none  # Maximum required auth level for the edit
214
        self.required_auth_level_locs: List[Tuple[str, ...]] = []  # locs where maximal auth level is needed
215
216
217
218
219
        self.encountered_users: Dict[str, str] = {}  # { ref: user_id | None }, ref = user_id | username | email
        self.encountered_datasets: Dict[str, datamodel.Dataset] = {}  # { ref : dataset | None }, ref = dataset_id | dataset_name
        self.root_metadata: Dict[str, Any] = None  # The metadata specified at the top/root level

        # Specific to the MetadataEditRequest case
David Sikter's avatar
David Sikter committed
220
        self.edit_request: MetadataEditRequest = None
221
222
223
224
225
226
227
        self.affected_uploads: List['Upload'] = None  # A MetadataEditRequest may involve multiple uploads
        self.entries_metadata: Dict[str, Dict[str, Any]] = {}  # Metadata specified for individual entries

    def validate_metadata_files(self):
        pass  # TODO

    def validate_request(self):
228
229
230
        ''' Validates the provided edit_request_json. '''
        # Validate the request json. Will raise ValidationError if json is malformed
        self.edit_request = MetadataEditRequest(**self.edit_request_json)
David Sikter's avatar
David Sikter committed
231
        try:
232
            if not self.upload_id and not self.edit_request.query:
233
                return self._loc_error('Must specify `query`', 'query')
234
            if self.edit_request.entries and not self.edit_request.entries_key:
235
                return self._loc_error('Must specify `entries_key` when specifying `entries`', 'entries_key')
236

237
            can_edit_upload_fields = bool(self.upload_id and not self.edit_request.query)
238
239
            if self.edit_request.metadata:
                self.root_metadata = self._verify_metadata_edit_actions(
240
                    self.edit_request_json['metadata'], ('metadata',), can_edit_upload_fields)
241
            if self.edit_request.entries:
242
243
244
                for key, entry_metadata in self.edit_request_json['entries'].items():
                    verified_metadata = self._verify_metadata_edit_actions(
                        entry_metadata, ('entries', key), False)
245
246
                    self.entries_metadata[key] = verified_metadata

247
248
            if not self.edit_attempt_locs:
                return self._loc_error('No fields to update specified', 'metadata')
249
            if self.required_auth_level == AuthLevel.admin and not self.user.is_admin:
250
251
252
                for loc in self.required_auth_level_locs:
                    self._loc_error('Admin rights required', loc)
                return
253

254
            embargo_length: int = self.root_metadata.get('embargo_length')
David Sikter's avatar
David Sikter committed
255
256
257
            try:
                self.affected_uploads = self._find_request_uploads()
            except Exception as e:
258
                return self._loc_error('Could not evaluate query: ' + str(e), 'query')
259
            if not self.affected_uploads:
260
261
262
                if self.edit_request.query:
                    return self._loc_error('No matching entries found', 'query')
                return self._loc_error('No matching upload found', 'upload_id')
263
264
265
266
267
268
269
270
271
272
273
274
275
276
            for upload in self.affected_uploads:
                # Check permissions
                coauthor = upload.coauthors and self.user.user_id in upload.coauthors
                main_author = self.user.user_id == upload.main_author
                admin = self.user.is_admin
                if self.required_auth_level == AuthLevel.coauthor:
                    has_access = coauthor or main_author or admin
                elif self.required_auth_level == AuthLevel.main_author:
                    has_access = main_author or admin
                elif self.required_auth_level == AuthLevel.admin:
                    has_access = admin
                else:
                    assert False, 'Invalid required_auth_level'  # Should not happen
                if not has_access:
277
278
279
280
281
                    for loc in self.required_auth_level_locs:
                        self._loc_error(
                            f'{self.required_auth_level} access required for upload '
                            f'{upload.upload_id}', loc)
                    return
282
283
284
                # Other checks
                if embargo_length is not None:
                    if upload.published and not admin and embargo_length != 0:
285
286
287
                        self._loc_error(
                            f'Upload {upload.upload_id} is published, embargo can only be lifted',
                            ('metadata', 'embargo_length'))
288
                if upload.published and not admin:
289
290
291
292
293
294
295
296
                    has_invalid_edit = False
                    for edit_loc in self.edit_attempt_locs:
                        if edit_loc[-1] not in ('embargo_length', 'datasets'):
                            has_invalid_edit = True
                            self._loc_error(
                                f'Cannot update, upload {upload.upload_id} is published.', edit_loc)
                    if has_invalid_edit:
                        return
297
        except Exception as e:
298
299
300
301
302
303
            # Something unexpected has gone wrong
            self.logger.error(e)
            raise
        finally:
            if self.errors:
                raise RequestValidationError(errors=self.errors)
304
305
306
307
308
309
310

    def get_upload_metadata_to_set(self, upload: 'Upload') -> Dict[str, Any]:
        '''
        Returns a dictionary with verified metadata to update on the Upload object. The
        values have the correct type for mongo. Assumes that the corresponding validation method
        (i.e. :func:`validate_metadata_files` or :func: `validate_request`) have been run.
        '''
311
        rv: Dict[str, Any] = {}
312
        if self.root_metadata:
313
            self._applied_mongo_actions(upload, self.root_metadata, rv)
314
315
316
317
318
319
320
321
        return rv

    def get_entry_metadata_to_set(self, upload: 'Upload', entry: 'Calc') -> Dict[str, Any]:
        '''
        Returns a dictionary with verified metadata to update on the entry object. The
        values have the correct type for mongo. Assumes that the corresponding validation method
        (i.e. :func:`validate_metadata_files` or :func: `validate_request`) have been run.
        '''
322
        rv: Dict[str, Any] = {}
323
        if self.root_metadata:
324
            self._applied_mongo_actions(entry, self.root_metadata, rv)
325
326
327
328
329
330
331
        if self.edit_request:
            # Source = edit_request
            if self.entries_metadata:
                entry_key = self._get_entry_key(entry, self.edit_request.entries_key)
                entry_metadata = self.entries_metadata.get(entry_key)
                if entry_metadata:
                    # We also have actions for this particular entry specified
332
                    self._applied_mongo_actions(entry, entry_metadata, rv)
333
334
335
336
337
        else:
            # Source = metadata files
            pass  # TODO
        return rv

338
339
340
341
    def _loc_error(self, msg: str, loc: Union[str, Tuple[str, ...]]):
        ''' Registers a located error. '''
        self.errors.append(ErrorWrapper(Exception(msg), loc=loc))
        self.logger.error(msg, loc=loc)
342
343

    def _verify_metadata_edit_actions(
344
345
            self, metadata_edit_actions: Dict[str, Any], loc: Tuple[str, ...],
            can_edit_upload_fields: bool, auth_level: AuthLevel = None) -> Dict[str, Any]:
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
        '''
        Performs *basic* validation of a dictionary with metadata edit actions, and returns a
        dictionary with the same structure, but containing only the *verified* actions. Verified
        actions are actions that pass validation. Moreover:
            1)  For actions on lists, the verified action value is always expressed as a
                list operation (a dictionary with `op` and `values`)
            2)  User references (which can be specified by a user_id, a username, or an email)
                are always converted to user_id
            3)  dataset references (which can be specified either by dataset_id or dataset_name)
                are always replaced with dataset_ids, and it is verified that none of the
                datasets has a doi.
            4)  Only `add` and `remove` operations are allowed for datasets.
        '''
        rv = {}
        for quantity_name, action in metadata_edit_actions.items():
            if action is not None:
                success, verified_action = self._verify_metadata_edit_action(
363
                    quantity_name, action, loc + (quantity_name,), can_edit_upload_fields, auth_level)
364
365
366
367
368
                if success:
                    rv[quantity_name] = verified_action
        return rv

    def _verify_metadata_edit_action(
369
370
            self, quantity_name: str, action: Any, loc: Tuple[str, ...],
            can_edit_upload_fields: bool, auth_level: AuthLevel) -> Tuple[bool, Any]:
371
372
        '''
        Performs basic validation of a single action. Returns (success, verified_action).
373
        '''
374
375
        definition = _editable_metadata.get(quantity_name)
        if not definition:
376
            self._loc_error('Unknown quantity', loc)
377
378
            return False, None

379
        self.edit_attempt_locs.append(loc)
380
381
382
383
384
385

        field_auth_level = getattr(definition, 'a_auth_level', AuthLevel.coauthor)

        if auth_level is not None:
            # Our auth level is known, check it immediately
            if field_auth_level > auth_level:
386
                self._loc_error(f'{field_auth_level} privileges required', loc)
387
388
389
                return False, None
        if field_auth_level > self.required_auth_level:
            self.required_auth_level = field_auth_level
390
            self.required_auth_level_locs = [loc]
391
        if quantity_name in _mongo_upload_metadata and not can_edit_upload_fields:
392
            self._loc_error('Quantity can only be edited on the upload level', loc)
393
394
395
396
397
398
399
400
401
402
403
404
405
406
            return False, None

        try:
            if definition.is_scalar:
                return True, self._verified_value(definition, action)
            else:
                # We have a non-scalar quantity
                if type(action) == dict:
                    # Action is a dict - expected to contain op and values
                    assert action.keys() == {'op', 'values'}, 'Expected keys `op` and `values`'
                    op = action['op']
                    values = action['values']
                    assert op in ('set', 'add', 'remove'), 'op should be `set`, `add` or `remove`'
                    if quantity_name == 'datasets' and op == 'set':
407
408
                        self._loc_error(
                            'Only `add` and `remove` operations permitted for datasets', loc)
409
410
411
412
413
414
415
416
417
418
                        return False, None
                else:
                    op = 'set'
                    values = action
                    if quantity_name == 'datasets':
                        op = 'add'  # Just specifying a list will be interpreted as add, rather than fail.
                values = values if type(values) == list else [values]
                verified_values = [self._verified_value(definition, v) for v in values]
                return True, dict(op=op, values=verified_values)
        except Exception as e:
419
            self._loc_error(str(e), loc)
420
421
422
423
424
425
            return False, None

    def _verified_value(
            self, definition: metainfo.Definition, value: Any) -> Any:
        '''
        Verifies a *singular* action value (i.e. for list quantities we should run this method
426
427
428
        for each value in the list, not with the list itself as input). Returns the verified
        value, which may be different from the origial value. It:
            1) ensures a return value of a primitive type (str, int, float, bool or None),
429
430
431
432
433
434
435
436
437
438
            2) that user refs exist,
            3) that datasets refs exist and do not have a doi.
            4) Translates user refs to user_id and dataset refs to dataset_id, if needed.
        Raises exception in case of failures.
        '''
        if definition.type in (str, int, float, bool):
            assert value is None or type(value) == definition.type, f'Expected a {definition.type.__name__}'
            if definition.name == 'embargo_length':
                assert 0 <= value <= 36, 'Value should be between 0 and 36'
            return None if value == '' else value
439
440
441
442
        elif definition.type == metainfo.Datetime:
            if value is not None:
                datetime.fromisoformat(value)  # Throws exception if badly formatted timestamp
            return None if value == '' else value
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
        elif isinstance(definition.type, metainfo.MEnum):
            assert type(value) == str, 'Expected a string value'
            if value == '':
                return None
            assert value in definition.type._values, f'Bad enum value {value}'
            return value
        elif isinstance(definition.type, metainfo.Reference):
            assert type(value) == str, 'Expected a string value'
            reference_type = definition.type.target_section_def.section_cls
            if reference_type in [datamodel.User, datamodel.Author]:
                if value in self.encountered_users:
                    user_id = self.encountered_users[value]
                else:
                    # New user reference encountered, try to fetch it
                    user_id = None
                    try:
                        user_id = datamodel.User.get(user_id=value).user_id
                    except KeyError:
                        try:
                            user_id = datamodel.User.get(username=value).user_id
                        except KeyError:
                            if '@' in value:
                                try:
                                    user_id = datamodel.User.get(email=value).user_id
                                except KeyError:
                                    pass
                    self.encountered_users[value] = user_id
                assert user_id is not None, f'User reference not found: `{value}`'
                return user_id
            elif reference_type == datamodel.Dataset:
                dataset = self._get_dataset(value)
                assert dataset is not None, f'Dataset reference not found: `{value}`'
                assert self.user.is_admin or dataset.user_id == self.user.user_id, (
                    f'Dataset `{value}` does not belong to you')
                assert not dataset.doi, f'Dataset `{value}` has a doi and cannot be changed'
                return dataset.dataset_id
        else:
            assert False, 'Unhandled value type'  # Should not happen

482
483
484
    def _applied_mongo_actions(
            self, mongo_doc: Union['Upload', 'Calc'],
            verified_actions: Dict[str, Any], applied_actions: Dict[str, Any]):
485
        '''
486
487
488
        Calculates the upload or entry level *applied actions*, i.e. key-value pairs with
        data to set on the provided `mongo_doc` in order to carry out the actions specified
        by `verified_actions`. The result is added to `applied_actions`.
489
        '''
490
491
492
493
494
495
496
497
498
        for quantity_name, verified_action in verified_actions.items():
            if isinstance(mongo_doc, Calc) and quantity_name not in _mongo_entry_metadata:
                continue
            elif isinstance(mongo_doc, Upload) and quantity_name not in _mongo_upload_metadata:
                continue
            applied_actions[quantity_name] = self._applied_mongo_action(
                mongo_doc, quantity_name, verified_action)

    def _applied_mongo_action(self, mongo_doc, quantity_name: str, verified_action: Any) -> Any:
499
500
        definition = _editable_metadata[quantity_name]
        if definition.is_scalar:
501
502
503
504
505
            if definition.type == metainfo.Datetime and verified_action:
                return datetime.fromisoformat(verified_action)
            return verified_action
        # Non-scalar property. The verified action should be a dict with op and values
        op, values = verified_action['op'], verified_action['values']
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
        old_list = getattr(mongo_doc, quantity_name, [])
        new_list = [] if op == 'set' else old_list.copy()
        for v in values:
            if op == 'add' or op == 'set':
                if v not in new_list:
                    if quantity_name in ('coauthors', 'reviewers') and v == mongo_doc.main_author:
                        continue  # Prevent adding the main author to coauthors or reviewers
                    new_list.append(v)
            elif op == 'remove':
                if v in new_list:
                    new_list.remove(v)
        return new_list

    def _get_entry_key(self, entry: 'Calc', entries_key: str) -> str:
        if entries_key == 'calc_id' or entries_key == 'entry_id':
            return entry.calc_id
        elif entries_key == 'mainfile':
            return entry.mainfile
        assert False, f'Invalid entries_key: {entries_key}'

    def _get_dataset(self, ref: str) -> datamodel.Dataset:
        '''
        Gets a dataset. They can be identified either using dataset_id or dataset_name, but
        only datasets belonging to the user can be specified using names. If no matching
        dataset can be found, None is returned.
        '''
        if ref in self.encountered_datasets:
            return self.encountered_datasets[ref]
        else:
            # First time we encounter this ref
            try:
                dataset = datamodel.Dataset.m_def.a_mongo.get(dataset_id=ref)
            except KeyError:
                try:
                    dataset = datamodel.Dataset.m_def.a_mongo.get(
                        user_id=self.user.user_id, dataset_name=ref)
                except KeyError:
                    dataset = None
            self.encountered_datasets[ref] = dataset
        return dataset

    def _restricted_request_query(self, upload_id: str = None):
        '''
        Gets the query of the request, if it has any. If we have a query and if an `upload_id`
        is specified, we return a modified query, by restricting the original query to this upload.
        '''
        query = self.edit_request.query
        if upload_id and query:
            # Restrict query to the specified upload
David Sikter's avatar
David Sikter committed
555
            return And(**{'and': [{'upload_id': upload_id}, query]})
556
557
558
        return query

    def _find_request_uploads(self) -> List['Upload']:
David Sikter's avatar
David Sikter committed
559
        ''' Returns a list of :class:`Upload`s matching the edit request. '''
560
        query = self._restricted_request_query(self.upload_id)
561
        if query:
David Sikter's avatar
David Sikter committed
562
563
564
565
566
567
568
569
570
            # Perform the search, aggregating by upload_id
            search_response = search.search(
                user_id=self.user.user_id,
                owner=self.edit_request.owner,
                query=query,
                aggregations=dict(agg=Aggregation(terms=TermsAggregation(quantity='upload_id'))),
                pagination=MetadataPagination(page_size=0))
            terms = search_response.aggregations['agg'].terms  # pylint: disable=no-member
            return [Upload.get(bucket.value) for bucket in terms.data]  # type: ignore
571
        elif self.upload_id:
572
573
            # Request just specifies an upload_id, no query
            try:
574
                return [Upload.get(self.upload_id)]
575
576
577
578
579
580
581
582
583
            except KeyError:
                pass
        return []

    def find_request_entries(self, upload: 'Upload') -> Iterable['Calc']:
        ''' Finds the entries of the specified upload which are effected by the request. '''
        query = self._restricted_request_query(upload.upload_id)
        if query:
            # We have a query. Execute it to get the entries.
David Sikter's avatar
David Sikter committed
584
585
586
587
588
589
590
            search_result = search.search_iterator(
                user_id=self.user.user_id,
                owner=self.edit_request.owner,
                query=query,
                required=MetadataRequired(include=['calc_id']))
            for result in search_result:
                yield Calc.get(result['calc_id'])
591
592
        else:
            # We have no query. Return all entries for the upload
David Sikter's avatar
David Sikter committed
593
594
            for entry in Calc.objects(upload_id=upload.upload_id):
                yield entry
595
596


Markus Scheidgen's avatar
Markus Scheidgen committed
597
class Calc(Proc):
598
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
599
600
601
602
603
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

604
605
    The attribute list, does not include the various metadata properties generated
    while parsing, including ``code_name``, ``code_version``, etc.
Markus Scheidgen's avatar
Markus Scheidgen committed
606
607

    Attributes:
608
        upload_id: the id of the upload to which this entry belongs
609
        calc_id: the calc_id of this calc
610
        calc_hash: the hash of the entry files
611
        entry_create_time: the date and time of the creation of the entry
David Sikter's avatar
David Sikter committed
612
        last_processing_time: the date and time of the last processing
613
        last_edit_time: the date and time the user metadata was last edited
Markus Scheidgen's avatar
Markus Scheidgen committed
614
        mainfile: the mainfile (including path in upload) that was used to create this calc
615
616
        parser_name: the name of the parser used to process this calc
        pid: the legacy NOMAD pid of the entry
David Sikter's avatar
David Sikter committed
617
618
619
620
621
        external_id: a user provided external id. Usually the id for an entry in an
            external database where the data was imported from
        nomad_version: the NOMAD version used for the last processing
        nomad_commit: the NOMAD commit used for the last processing
        comment: a user provided comment for this entry
David Sikter's avatar
David Sikter committed
622
        references: user provided references (URLs) for this entry
623
624
        entry_coauthors: a user provided list of co-authors specific for this entry. Note
            that normally, coauthors should be set on the upload level.
625
        datasets: a list of user curated datasets this entry belongs to
626
    '''
627
    upload_id = StringField(required=True)
628
    calc_id = StringField(primary_key=True)
629
    calc_hash = StringField()
630
    entry_create_time = DateTimeField(required=True)
631
    last_processing_time = DateTimeField()
632
    last_edit_time = DateTimeField()
Markus Scheidgen's avatar
Markus Scheidgen committed
633
    mainfile = StringField()
634
    parser_name = StringField()
635
    pid = StringField()
636
    external_id = StringField()
637
638
    nomad_version = StringField()
    nomad_commit = StringField()
David Sikter's avatar
David Sikter committed
639
    comment = StringField()
David Sikter's avatar
David Sikter committed
640
    references = ListField(StringField(), default=None)
641
    entry_coauthors = ListField(default=None)
642
    datasets = ListField(StringField(), default=None)
Markus Scheidgen's avatar
Markus Scheidgen committed
643
644

    meta: Any = {
645
        'strict': False,
Markus Scheidgen's avatar
Markus Scheidgen committed
646
        'indexes': [
647
            'upload_id',
648
            'parser_name',
649
            ('upload_id', 'mainfile'),
650
            ('upload_id', 'parser_name'),
651
            ('upload_id', 'process_status'),
652
            ('upload_id', 'nomad_version'),
653
            'process_status',
654
            'last_processing_time',
655
            'datasets',
656
            'pid'
Markus Scheidgen's avatar
Markus Scheidgen committed
657
658
659
660
        ]
    }

    def __init__(self, *args, **kwargs):
661
        kwargs.setdefault('entry_create_time', datetime.utcnow())
Markus Scheidgen's avatar
Markus Scheidgen committed
662
        super().__init__(*args, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
663
        self._parser_results: EntryArchive = None
664
        self._is_initial_processing: bool = False
665
        self._upload: Upload = None
666
        self._upload_files: StagingUploadFiles = None
667
        self._calc_proc_logs: List[Any] = None
668

669
        self._entry_metadata: EntryMetadata = None
670
        self._perform_index = True
671

Markus Scheidgen's avatar
Markus Scheidgen committed
672
    @classmethod
673
    def get(cls, id) -> 'Calc':
674
        return cls.get_by_id(id, 'calc_id')
Markus Scheidgen's avatar
Markus Scheidgen committed
675

676
677
678
679
680
    @property
    def entry_id(self) -> str:
        ''' Just an alias for calc_id. '''
        return self.calc_id

Markus Scheidgen's avatar
Markus Scheidgen committed
681
    @property
682
683
    def mainfile_file(self) -> PathObject:
        return self.upload_files.raw_file_object(self.mainfile)
Markus Scheidgen's avatar
Markus Scheidgen committed
684

David Sikter's avatar
David Sikter committed
685
686
687
688
    @property
    def processed(self) -> bool:
        return self.process_status == ProcessStatus.SUCCESS

689
690
691
692
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
693
            self._upload.worker_hostname = self.worker_hostname
694
695
        return self._upload

696
    def _initialize_metadata_for_processing(self):
697
        '''
698
        Initializes self._entry_metadata and self._parser_results in preparation for processing.
699
        Existing values in mongo are loaded first, then generated system values are
700
701
        applied.
        '''
702
        self._entry_metadata = EntryMetadata()
703
704
        self._apply_metadata_from_mongo(self.upload, self._entry_metadata)
        self._apply_metadata_from_process(self._entry_metadata)
705
706

        self._parser_results = EntryArchive()
707
        self._parser_results.metadata = self._entry_metadata
708

709
    def _apply_metadata_from_file(self, logger):
710
711
712
        # metadata file name defined in nomad.config nomad_metadata.yaml/json
        # which can be placed in the directory containing the mainfile or somewhere up
        # highest priority is directory with mainfile
713
        metadata_file = config.process.metadata_file_name
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
        metadata_dir = os.path.dirname(self.mainfile_file.os_path)
        upload_raw_dir = self.upload_files._raw_dir.os_path

        metadata = {}
        metadata_part = None
        # apply the nomad files of the current directory and parent directories
        while True:
            metadata_part = self.upload.metadata_file_cached(
                os.path.join(metadata_dir, metadata_file))
            for key, val in metadata_part.items():
                if key in ['entries', 'oasis_datasets']:
                    continue
                metadata.setdefault(key, val)

            if metadata_dir == upload_raw_dir:
                break

            metadata_dir = os.path.dirname(metadata_dir)

        # Top-level nomad file can also contain an entries dict with entry
        # metadata per mainfile as key. This takes precedence of the other files.
        entries = metadata_part.get('entries', {})
        metadata_part = entries.get(self.mainfile, {})
        for key, val in metadata_part.items():
            metadata[key] = val

        if len(metadata) > 0:
741
            logger.info('Apply user metadata from nomad.yaml/json file(s)')
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759

        for key, val in metadata.items():
            if key == 'entries':
                continue

            definition = _editable_metadata.get(key, None)

            if definition is None:
                logger.warn('Users cannot set metadata', quantity=key)
                continue

            try:
                self._entry_metadata.m_set(definition, val)
            except Exception as e:
                logger.error(
                    'Could not apply user metadata from nomad.yaml/json file',
                    quantitiy=definition.name, exc_info=e)

760
    def _apply_metadata_from_process(self, entry_metadata: EntryMetadata):
761
762
763
764
765
766
767
        '''
        Applies metadata generated when processing or re-processing an entry to `entry_metadata`.
        '''
        entry_metadata.nomad_version = config.meta.version
        entry_metadata.nomad_commit = config.meta.commit
        entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
        entry_metadata.files = self.upload_files.calc_files(self.mainfile)
768
        entry_metadata.last_processing_time = datetime.utcnow()
769
770
        entry_metadata.processing_errors = []

771
    def _apply_metadata_from_mongo(self, upload: 'Upload', entry_metadata: EntryMetadata):
772
773
774
775
776
777
        '''
        Loads entry metadata from mongo (that is: from `self` and the provided `upload` object)
        and applies the values to `entry_metadata`.
        '''
        assert upload.upload_id == self.upload_id, 'Could not apply metadata: upload_id mismatch'
        # Upload metadata
778
779
        for quantity_name in _mongo_upload_metadata:
            setattr(entry_metadata, quantity_name, getattr(upload, quantity_name))
780
        # Entry metadata
781
782
        for quantity_name in _mongo_entry_metadata:
            setattr(entry_metadata, quantity_name, getattr(self, quantity_name))
David Sikter's avatar
David Sikter committed
783
        # Special case: domain. May be derivable from mongo, or may have to be read from the archive
784
785
        if self.parser_name is not None:
            parser = parser_dict[self.parser_name]
786
            if parser.domain:
787
                entry_metadata.domain = parser.domain
788

789
    def _apply_metadata_to_mongo_entry(self, entry_metadata: EntryMetadata):
790
791
792
        '''
        Applies the metadata fields that are stored on the mongo entry level to self.
        In other words, basically the reverse operation of :func:`_apply_metadata_from_mongo`,
David Sikter's avatar
David Sikter committed
793
        but excluding upload level metadata and system fields (like mainfile, parser_name etc.).
794
        '''
David Sikter's avatar
David Sikter committed
795
        entry_metadata_dict = entry_metadata.m_to_dict(include_defaults=True)
796
797
        for quantity_name in _mongo_entry_metadata_except_system_fields:
            setattr(self, quantity_name, entry_metadata_dict.get(quantity_name))
David Sikter's avatar
David Sikter committed
798

799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
    def set_mongo_entry_metadata(self, *args, **kwargs):
        '''
        Sets the entry level metadata in mongo. Expects either a positional argument
        which is an instance of :class:`EntryMetadata` or keyword arguments with data to set.
        '''
        assert not (args and kwargs), 'Cannot provide both keyword arguments and a positional argument'
        if args:
            assert len(args) == 1 and isinstance(args[0], EntryMetadata), (
                'Expected exactly one keyword argument of type `EntryMetadata`')
            self._apply_metadata_to_mongo_entry(args[0])
        else:
            for key, value in kwargs.items():
                if key in _mongo_entry_metadata_except_system_fields:
                    setattr(self, key, value)
                else:
814
                    assert False, f'Cannot set metadata quantity: {key}'
815
816

    def full_entry_metadata(self, upload: 'Upload') -> EntryMetadata:
817
        '''
818
        Returns a complete set of :class:`EntryMetadata` including
819
        both the mongo metadata and the metadata from the archive.
820
821

        Arguments:
822
823
            upload: The :class:`Upload` to which this entry belongs. Upload level metadata
                and the archive files will be read from this object.
824
        '''
825
826
        assert upload.upload_id == self.upload_id, 'Mismatching upload_id encountered'
        archive = upload.upload_files.read_archive(self.calc_id)
827
        try:
828
829
830
831
832
833
            # instead of loading the whole archive, it should be enough to load the
            # parts that are referenced by section_metadata/EntryMetadata
            # TODO somehow it should determine which root setions too load from the metainfo
            # or configuration
            calc_archive = archive[self.calc_id]
            entry_archive_dict = {section_metadata: calc_archive[section_metadata].to_dict()}
834
835
836
837
838
839
            if section_workflow in calc_archive:
                for workflow in calc_archive[section_workflow]:
                    entry_archive_dict.setdefault(section_workflow, [])
                    entry_archive_dict[section_workflow].append(workflow.to_dict())
            if section_results in calc_archive:
                entry_archive_dict[section_results] = calc_archive[section_results].to_dict()
840
            entry_metadata = datamodel.EntryArchive.m_from_dict(entry_archive_dict)[section_metadata]
841
            self._apply_metadata_from_mongo(upload, entry_metadata)
842
            return entry_metadata
843
844
        except KeyError:
            # Due hard processing failures, it might be possible that an entry might not
845
            # have an archive. Return the metadata that is available.
846
            if self._entry_metadata is not None:
847
                return self._entry_metadata
848
            else:
849
                return self.mongo_metadata(upload)
850

851
    def mongo_metadata(self, upload: 'Upload') -> EntryMetadata:
852
        '''
853
        Returns a :class:`EntryMetadata` with mongo metadata only
854
        (fetched from `self` and `upload`), no archive metadata.
855
        '''
856
        assert upload.upload_id == self.upload_id, 'Mismatching upload_id encountered'
857
        entry_metadata = EntryMetadata()
858
        self._apply_metadata_from_mongo(upload, entry_metadata)
859
860
        return entry_metadata

861
    @property
862
    def upload_files(self) -> StagingUploadFiles:
863
        if not self._upload_files:
David Sikter's avatar
David Sikter committed
864
            self._upload_files = StagingUploadFiles(self.upload_id)
865
866
        return self._upload_files

Markus Scheidgen's avatar
Markus Scheidgen committed
867
    def get_logger(self, **kwargs):
868
        '''
869
870
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
871
        '''
872
873
        logger = super().get_logger()
        logger = logger.bind(
874
875
            upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id,
            parser=self.parser_name, **kwargs)
876

877
878
        if self._calc_proc_logs is None:
            self._calc_proc_logs = []
879

880
881
        def save_to_calc_log(logger, method_name, event_dict):
            try:
882
883
                # sanitize the event_dict, because all kinds of values might have been added
                dump_dict = {key: str(value) for key, value in event_dict.items()}
884
885
                dump_dict.update(level=method_name.upper())
                self._calc_proc_logs.append(dump_dict)
886

887
888
889
890
891
                if method_name == 'error':
                    error = event_dict.get('event', None)
                    if error is not None:
                        self._entry_metadata.processing_errors.append(error)

892
893
894
            except Exception:
                # Exceptions here will cause indefinite loop
                pass
895

896
            return event_dict
897

898
        return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
899

900
    @process
901
    def process_calc(self, reprocess_settings: Dict[str, Any] = None):
902
        '''
903
        Processes (or reprocesses) a calculation.
904
905
906
907
908

        Arguments:
            reprocess_settings: An optional dictionary specifying the behaviour when reprocessing.
                Settings that are not specified are defaulted. See `config.reprocess` for
                available options and the configured default values.
909
        '''
910
        logger = self.get_logger()
911
912
        if self.upload is None:
            logger.error('calculation upload does not exist')
913

914
915
        settings = config.reprocess.customize(reprocess_settings)  # Add default settings

916
        # 1. Determine if we should parse or not
917
        self.set_last_status_message('Determining action')
918
919
920
        # If this entry has been processed before, or imported from a bundle, nomad_version
        # should be set. If not, this is the initial processing.
        self._is_initial_processing = self.nomad_version is None
921
        self._perform_index = self._is_initial_processing or settings.get('index_invidiual_entries', True)
922
        if not self.upload.published or self._is_initial_processing:
923
            should_parse = True
924
        elif not settings.reprocess_existing_entries:
925
            should_parse = False
926
927
        else:
            if settings.rematch_published and not settings.use_original_parser:
928
929
930
                with utils.timer(logger, 'parser matching executed'):
                    parser = match_parser(
                        self.upload_files.raw_file_object(self.mainfile).os_path, strict=False)
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
            else:
                parser = parser_dict[self.parser_name]

            if parser is None:
                # Should only be possible if the upload is published and we have
                logger.warn('no parser matches during process, not updating the entry')
                self.warnings = ['no matching parser found during processing']
            else:
                should_parse = True
                parser_changed = self.parser_name != parser.name and parser_dict[self.parser_name].name != parser.name
                if parser_changed:
                    if not settings.use_original_parser:
                        logger.info(
                            'different parser matches during process, use new parser',
                            parser=parser.name)
                        self.parser_name = parser.name  # Parser renamed
947

948
        # 2. Either parse the entry, or preserve it as it is.
949
950
        if should_parse:
            # 2a. Parse (or reparse) it
951
            try:
952
                self.set_last_status_message('Initializing metadata')
953
                self._initialize_metadata_for_processing()
954
955
956
957
958
959
960
961
962
963
964
965
966

                if len(self._entry_metadata.files) >= config.auxfile_cutoff:
                    self.warning(
                        'This calc has many aux files in its directory. '
                        'Have you placed many calculations in the same directory?')

                self.parsing()
                self.normalizing()
                self.archiving()
            finally:
                # close loghandler that was not closed due to failures
                try:
                    if self._parser_results and self._parser_results.m_resource:
967
                        self._parser_results.metadata = None
968
969
970
                        self._parser_results.m_resource.unload()
                except Exception as e:
                    logger.error('could not unload processing results', exc_info=e)
971
        elif self.upload.published:
972
            # 2b. Keep published entry as it is
973
            self.set_last_status_message('Preserving entry data')
974
            try:
David Sikter's avatar
David Sikter committed
975
                upload_files = PublicUploadFiles(self.upload_id)
976
977
978
979
980
981
                with upload_files.read_archive(self.calc_id) as archive:
                    self.upload_files.write_archive(self.calc_id, archive[self.calc_id].to_dict())

            except Exception as e:
                logger.error('could not copy archive for non-reprocessed entry', exc_info=e)
                raise
982
983
984
        else:
            # 2b. Keep staging entry as it is
            pass
985

986
    def on_fail(self):
987
        # in case of failure, create a minimum set of metadata and mark
988
989
        # processing failure
        try:
990
991
            if self._entry_metadata is None:
                self._initialize_metadata_for_processing()
992
            self._entry_metadata.processed = False
993

994
            try:
995
                self._apply_metadata_to_mongo_entry(self._entry_metadata)
996
997
998
999
1000
            except Exception as e:
                self.get_logger().error(
                    'could not apply entry metadata to entry', exc_info=e)

            try:
1001
                self._entry_metadata.apply_archvie_metadata(self._parser_results)
1002
1003
1004
            except Exception as e:
                self.get_logger().error(
                    'could not apply domain metadata to entry', exc_info=e)
1005
        except Exception as e:
1006
1007
1008
1009
1010
            self._parser_results = EntryArchive(
                entry_id=self._parser_results.entry_id,
                metadata=self._parser_results.metadata,
                processing_logs=self._parser_results.processing_logs)

1011
            self.get_logger().error(
1012
1013
1014
                'could not create minimal metadata after processing failure', exc_info=e)

        if self._perform_index:
1015
            try: