Commit eacff261 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Removed non user metadata from mongodb, #298.

parent 1bb21611
Pipeline #71367 passed with stages
in 30 minutes and 50 seconds
......@@ -146,12 +146,12 @@ class RepoEntryView extends React.Component {
<Quantity quantity="upload_id" label='upload id' {...quantityProps} noWrap withClipboard />
<Quantity quantity="upload_time" label='upload time' noWrap {...quantityProps} >
<Typography noWrap>
{new Date(calcData.upload_time * 1000).toLocaleString()}
{new Date(calcData.upload_time).toLocaleString()}
</Typography>
</Quantity>
<Quantity quantity="last_processing" label='last processing' loading={loading} placeholder="not processed" noWrap {...quantityProps}>
<Typography noWrap>
{new Date(calcData.last_processing * 1000).toLocaleString()}
{new Date(calcData.last_processing).toLocaleString()}
</Typography>
</Quantity>
<Quantity quantity="last_processing" label='processing version' loading={loading} noWrap placeholder="not processed" {...quantityProps}>
......
......@@ -26,7 +26,6 @@ import json
import orjson
import importlib
import urllib.parse
from collections.abc import Mapping
import metainfo
......@@ -100,9 +99,10 @@ class ArchiveCalcResource(Resource):
try:
with upload_files.read_archive(calc_id) as archive:
return {
key: value.to_dict()
for key, value in archive[calc_id].items()
if isinstance(value, Mapping)}
key: value
for key, value in archive[calc_id].to_dict().items()
if key != 'processing_logs'}
except Restricted:
abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
except KeyError:
......
......@@ -25,7 +25,7 @@ from elasticsearch.exceptions import NotFoundError
import elasticsearch.helpers
from datetime import datetime
from nomad import search, utils, datamodel, processing as proc, infrastructure
from nomad import search, utils, datamodel, processing as proc, infrastructure, files
from nomad.metainfo import search_extension
from nomad.datamodel import Dataset, User, EditableUserMetadata
from nomad.app import common
......@@ -315,6 +315,7 @@ def edit(parsed_query: Dict[str, Any], mongo_update: Dict[str, Any] = None, re_i
apply_search_parameters(search_request, parsed_query)
upload_ids = set()
calc_ids = []
for hit in search_request.execute_scan():
calc_ids.append(hit['calc_id'])
upload_ids.add(hit['upload_id'])
......@@ -330,12 +331,24 @@ def edit(parsed_query: Dict[str, Any], mongo_update: Dict[str, Any] = None, re_i
with utils.timer(common.logger, 'edit elastic update executed', size=len(calc_ids)):
if re_index:
def elastic_updates():
upload_files_cache: Dict[str, files.UploadFiles] = dict()
for calc in proc.Calc.objects(calc_id__in=calc_ids):
entry_metadata = datamodel.EntryMetadata.m_from_dict(calc['metadata'])
upload_id = calc.upload_id
upload_files = upload_files_cache.get(upload_id)
if upload_files is None:
upload_files = files.UploadFiles.get(upload_id, is_authorized=lambda: True)
upload_files_cache[upload_id] = upload_files
entry_metadata = calc.entry_metadata(upload_files)
entry = entry_metadata.a_elastic.create_index_entry().to_dict(include_meta=True)
entry['_op_type'] = 'index'
yield entry
for upload_files in upload_files_cache.values():
upload_files.close()
_, failed = elasticsearch.helpers.bulk(
infrastructure.elastic_client, elastic_updates(), stats_only=True)
search.refresh()
......
......@@ -27,7 +27,7 @@ import os
import io
from functools import wraps
from nomad import config, utils, files, datamodel
from nomad import config, utils, files, search
from nomad.processing import Upload, FAILURE
from nomad.processing import ProcessAlreadyRunning
from nomad.app import common
......@@ -42,13 +42,6 @@ ns = api.namespace(
'uploads',
description='Uploading data and tracing uploaded data and its processing.')
class CalcMetadata(fields.Raw):
def format(self, value):
entry_metadata = datamodel.EntryMetadata.m_from_dict(value)
return entry_metadata.a_elastic.create_index_entry().to_dict()
proc_model = api.model('Processing', {
'tasks': fields.List(fields.String),
'current_task': fields.String,
......@@ -96,7 +89,9 @@ calc_model = api.inherit('UploadCalculationProcessing', proc_model, {
'mainfile': fields.String,
'upload_id': fields.String,
'parser': fields.String,
'metadata': CalcMetadata(description='The repository metadata for this entry.')
'metadata': fields.Raw(
attribute='_entry_metadata',
description='The repository metadata for this entry.')
})
upload_with_calcs_model = api.inherit('UploadWithPaginatedCalculations', upload_model, {
......@@ -381,13 +376,24 @@ class UploadResource(Resource):
order_by = ('-%s' if order == -1 else '+%s') % order_by
calcs = upload.all_calcs((page - 1) * per_page, page * per_page, order_by=order_by)
# load upload's calcs
calcs = list(upload.all_calcs(
(page - 1) * per_page, page * per_page, order_by=order_by))
calc_ids = [calc.calc_id for calc in calcs]
search_results = {
hit['calc_id']: hit
for hit in search.SearchRequest().search_parameter('calc_id', calc_ids).execute_scan()}
for calc in calcs:
calc._entry_metadata = search_results.get(calc.calc_id)
failed_calcs = upload.failed_calcs
result = ProxyUpload(upload, {
'pagination': dict(
total=upload.total_calcs, page=page, per_page=per_page,
successes=upload.processed_calcs - failed_calcs, failures=failed_calcs),
'results': [calc for calc in calcs]
'results': calcs
})
return result, 200
......
......@@ -12,11 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Dict, Any
from flask_restplus import Resource, abort
from flask import request
from elasticsearch_dsl import Q
from nomad import search
from nomad import search, files, datamodel
from nomad.datamodel import OptimadeEntry
from .api import api, url
......@@ -46,6 +47,31 @@ def base_search_request():
Q('exists', field='dft.optimade.elements')) # TODO use the elastic annotations when done
def to_calc_with_metadata(results: List[Dict[str, Any]]):
''' Translates search results into :class:`EntryMetadata` objects read from archive. '''
upload_files_cache: Dict[str, files.UploadFiles] = {}
def transform(result):
calc_id, upload_id = result['calc_id'], result['upload_id']
upload_files = upload_files_cache.get(upload_id)
if upload_files is None:
upload_files = files.UploadFiles.get(upload_id)
upload_files_cache[upload_id] = upload_files
archive = upload_files.read_archive(calc_id) # , access='public')
metadata = archive[calc_id]['section_metadata'].to_dict()
return datamodel.EntryMetadata.m_from_dict(metadata)
result = [transform(result) for result in results]
for upload_files in upload_files_cache.values():
upload_files.close()
return result
@ns.route('/calculations')
class CalculationList(Resource):
@api.doc('list_calculations')
......@@ -65,7 +91,7 @@ class CalculationList(Resource):
except Exception:
abort(400, message='bad parameter types') # TODO Specific json API error handling
search_request = base_search_request().include('calc_id')
search_request = base_search_request().include('calc_id', 'upload_id')
if filter is not None:
try:
......@@ -79,7 +105,7 @@ class CalculationList(Resource):
# order_by='optimade.%s' % sort) # TODO map the Optimade property
available = result['pagination']['total']
results = search.to_calc_with_metadata(result['results'])
results = to_calc_with_metadata(result['results'])
assert len(results) == len(result['results']), 'Mongodb and elasticsearch are not consistent'
return dict(
......@@ -115,7 +141,7 @@ class Calculation(Resource):
per_page=1)
available = result['pagination']['total']
results = search.to_calc_with_metadata(result['results'])
results = to_calc_with_metadata(result['results'])
assert len(results) == len(result['results']), 'Mongodb and elasticsearch are not consistent'
if available == 0:
......
......@@ -158,7 +158,8 @@ def lift_embargo(dry, parallel):
uploads_to_repack.append(upload)
upload.save()
search.index_all(upload.entries_metadata())
with upload.entries_metadata() as entries:
search.index_all(entries)
if not dry:
__run_processing(uploads_to_repack, parallel, lambda upload: upload.re_pack(), 're-packing')
......
......@@ -146,16 +146,17 @@ def chown(ctx, username, uploads):
upload.user_id = user.user_id
calcs = upload.entries_metadata()
def create_update(calc):
def create_update(calc_id):
return UpdateOne(
{'_id': calc.calc_id},
{'_id': calc_id},
{'$set': {'metadata.uploader': user.user_id}})
proc.Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])
proc.Calc._get_collection().bulk_write(
[create_update(calc_id) for calc_id in upload.entry_ids()])
upload.save()
calcs = upload.entries_metadata()
search.index_all(calcs, do_refresh=False)
with upload.entries_metadata() as calcs:
search.index_all(calcs, do_refresh=False)
search.refresh()
......@@ -192,9 +193,9 @@ def index(ctx, uploads):
i, failed = 0, 0
for upload in uploads:
calcs = upload.entries_metadata()
failed += search.index_all(calcs)
i += 1
with upload.entries_metadata() as calcs:
failed += search.index_all(calcs)
i += 1
print(' indexed %d of %d uploads, failed to index %d entries' % (i, uploads_count, failed))
......
......@@ -318,7 +318,8 @@ def mirror(
proc.Calc._get_collection().insert(upload_data.calcs)
# index es
search.index_all(upload.entries_metadata())
with upload.entries_metadata() as entries:
search.index_all(entries)
print(
'Mirrored %s with %d calcs at %s' %
......
......@@ -4,7 +4,7 @@ import json
import click
import sys
from nomad import config, utils
from nomad import utils
from nomad.parsing import Backend, parser_dict, match_parser, MatchingParser
from nomad.normalizing import normalizers
from nomad.datamodel import EntryMetadata
......@@ -48,14 +48,6 @@ def parse(
if not parser_backend.status[0] == 'ParseSuccess':
logger.error('parsing was not successful', status=parser_backend.status)
parser_backend.openNonOverlappingSection('section_entry_info')
parser_backend.addValue('upload_id', config.services.unavailable_value)
parser_backend.addValue('calc_id', config.services.unavailable_value)
parser_backend.addValue('calc_hash', "no hash")
parser_backend.addValue('mainfile', mainfile)
parser_backend.addValue('parser_name', parser_name)
parser_backend.closeNonOverlappingSection('section_entry_info')
logger.info('ran parser')
return parser_backend
......
......@@ -40,8 +40,12 @@ from collections import Sequence
import requests
from urllib.parse import urlparse
from nomad import config, metainfo, parsing
from nomad import config
from nomad.cli.client.client import KeycloakAuthenticator
from nomad.datamodel import EntryArchive
# TODO this import is necessary to load all metainfo defintions that the parsers are using
from nomad import parsing # pylint: disable=unused-import
class ArchiveQuery(Sequence):
......@@ -123,13 +127,7 @@ class ArchiveQuery(Sequence):
results = data.get('results', [])
for result in results:
parser_name = result['parser_name']
parser = parsing.parser_dict[parser_name]
metainfo_env = parser.metainfo_env
root_section_key = next(iter(result['archive']))
section_def = metainfo_env.resolve_definition(root_section_key, metainfo.Section)
archive = section_def.section_cls.m_from_dict(result['archive'][root_section_key])
archive = EntryArchive.m_from_dict(result['archive'])
self._results.append(archive)
......
......@@ -32,9 +32,9 @@ The class :class:`Dataset` is used to represent datasets and their attributes.
.. autoclass:: nomad.datamodel.Dataset
:members:
The class :class:`UserMetadata` is used to represent user determined entry metadata.
The class :class:`MongoMetadata` is used to tag metadata stored in mongodb.
.. autoclass:: nomad.datamodel.UserMetadata
.. autoclass:: nomad.datamodel.MongoMetadata
:members:
The class :class:`EntryMetadata` is used to represent all metadata about an entry.
......@@ -56,7 +56,7 @@ In addition there are domain specific metadata classes:
from .dft import DFTMetadata
from .ems import EMSMetadata
from .datamodel import Dataset, User, EditableUserMetadata, UserMetadata, EntryMetadata
from .datamodel import Dataset, User, EditableUserMetadata, MongoMetadata, EntryMetadata, EntryArchive
from .optimade import OptimadeEntry, Species
domains = {
......
......@@ -26,6 +26,8 @@ from nomad.metainfo.mongoengine_extension import Mongo, MongoDocument
from .dft import DFTMetadata
from .ems import EMSMetadata
from .metainfo.public import section_run
from .metainfo.general_experimental import section_experiment
def _only_atoms(atoms):
......@@ -189,8 +191,8 @@ class EditableUserMetadata(metainfo.MCategory):
''' NOMAD entry quantities that can be edited by the user after publish. '''
class UserMetadata(metainfo.MCategory):
''' NOMAD entry quantities that are given by the user or determined by user actions. '''
class MongoMetadata(metainfo.MCategory):
''' NOMAD entry quantities that are stored in mongodb and not necessarely in the archive. '''
pass
......@@ -249,6 +251,7 @@ class EntryMetadata(metainfo.MSection):
calc_hash = metainfo.Quantity(
type=str,
description='A raw file content based checksum/hash.',
categories=[MongoMetadata],
a_search=Search(
many_or='append', metric_name='unique_entries', metric='cardinality'))
......@@ -280,39 +283,48 @@ class EntryMetadata(metainfo.MSection):
pid = metainfo.Quantity(
type=int,
description='The unique, sequentially enumerated, integer persistent identifier',
categories=[MongoMetadata],
a_search=Search(many_or='append'))
raw_id = metainfo.Quantity(
type=str,
description='A raw format specific id that was acquired from the files of this entry',
categories=[MongoMetadata],
a_search=Search(many_or='append'))
domain = metainfo.Quantity(
type=metainfo.MEnum('dft', 'ems'),
description='The material science domain',
categories=[MongoMetadata],
a_search=Search())
published = metainfo.Quantity(
type=bool, default=False,
description='Indicates if the entry is published',
categories=[MongoMetadata],
a_search=Search())
processed = metainfo.Quantity(
type=bool, default=False,
description='Indicates that the entry is successfully processed.',
categories=[MongoMetadata],
a_search=Search())
last_processing = metainfo.Quantity(
type=metainfo.Datetime,
description='The datetime of the last attempted processing.')
description='The datetime of the last attempted processing.',
categories=[MongoMetadata],
a_search=Search())
nomad_version = metainfo.Quantity(
type=str,
description='The NOMAD version used for the last processing attempt.',
categories=[MongoMetadata],
a_search=Search(many_or='append'))
nomad_commit = metainfo.Quantity(
type=str,
description='The NOMAD commit used for the last processing attempt.',
categories=[MongoMetadata],
a_search=Search(many_or='append'))
parser_name = metainfo.Quantity(
type=str,
......@@ -320,17 +332,17 @@ class EntryMetadata(metainfo.MSection):
a_search=Search(many_or='append'))
comment = metainfo.Quantity(
type=str, categories=[UserMetadata, EditableUserMetadata],
type=str, categories=[MongoMetadata, EditableUserMetadata],
description='A user provided comment.',
a_search=Search(mapping=Text()))
references = metainfo.Quantity(
type=str, shape=['0..*'], categories=[UserMetadata, EditableUserMetadata],
type=str, shape=['0..*'], categories=[MongoMetadata, EditableUserMetadata],
description='User provided references (URLs).',
a_search=Search())
uploader = metainfo.Quantity(
type=user_reference, categories=[UserMetadata],
type=user_reference, categories=[MongoMetadata],
description='The uploader of the entry',
a_flask=dict(admin_only=True, verify=User),
a_search=[
......@@ -343,7 +355,7 @@ class EntryMetadata(metainfo.MSection):
])
coauthors = metainfo.Quantity(
type=user_reference, shape=['0..*'], default=[], categories=[UserMetadata, EditableUserMetadata],
type=user_reference, shape=['0..*'], default=[], categories=[MongoMetadata, EditableUserMetadata],
description='A user provided list of co-authors.',
a_flask=dict(verify=User))
......@@ -357,7 +369,7 @@ class EntryMetadata(metainfo.MSection):
many_or='append', search_field='authors.name.keyword', statistic_size=1000))
shared_with = metainfo.Quantity(
type=user_reference, shape=['0..*'], default=[], categories=[UserMetadata, EditableUserMetadata],
type=user_reference, shape=['0..*'], default=[], categories=[MongoMetadata, EditableUserMetadata],
description='A user provided list of userts to share the entry with.',
a_flask=dict(verify=User))
......@@ -370,24 +382,24 @@ class EntryMetadata(metainfo.MSection):
many_or='append', search_field='owners.name.keyword'))
with_embargo = metainfo.Quantity(
type=bool, default=False, categories=[UserMetadata, EditableUserMetadata],
type=bool, default=False, categories=[MongoMetadata, EditableUserMetadata],
description='Indicated if this entry is under an embargo',
a_search=Search())
upload_time = metainfo.Quantity(
type=metainfo.Datetime, categories=[UserMetadata],
type=metainfo.Datetime, categories=[MongoMetadata],
description='The datetime this entry was uploaded to nomad',
a_flask=dict(admin_only=True),
a_search=Search(order_default=True))
upload_name = metainfo.Quantity(
type=str, categories=[UserMetadata],
type=str, categories=[MongoMetadata],
description='The user provided upload name',
a_search=Search(many_or='append'))
datasets = metainfo.Quantity(
type=dataset_reference, shape=['0..*'], default=[],
categories=[UserMetadata, EditableUserMetadata],
categories=[MongoMetadata, EditableUserMetadata],
description='A list of user curated datasets this entry belongs to.',
a_flask=dict(verify=Dataset),
a_search=[
......@@ -401,12 +413,12 @@ class EntryMetadata(metainfo.MSection):
description='Search for a particular dataset by its id.')])
external_id = metainfo.Quantity(
type=str, categories=[UserMetadata],
type=str, categories=[MongoMetadata],
description='A user provided external id.',
a_search=Search(many_or='split'))
last_edit = metainfo.Quantity(
type=metainfo.Datetime, categories=[UserMetadata],
type=metainfo.Datetime, categories=[MongoMetadata],
description='The datetime the user metadata was edited last.',
a_search=Search())
......@@ -441,7 +453,24 @@ class EntryMetadata(metainfo.MSection):
def apply_domain_metadata(self, backend):
assert self.domain is not None, 'all entries must have a domain'
domain_section_def = self.m_def.all_sub_sections.get(self.domain).sub_section
domain_sub_section_def = self.m_def.all_sub_sections.get(self.domain)
domain_section_def = domain_sub_section_def.sub_section
assert domain_section_def is not None, 'unknown domain %s' % self.domain
domain_section = self.m_create(domain_section_def.section_cls)
# add domain section if not already there
domain_section = self.m_get_sub_section(domain_sub_section_def, -1)
if domain_section is None:
domain_section = self.m_create(domain_section_def.section_cls)
domain_section.apply_domain_metadata(backend)
class EntryArchive(metainfo.MSection):
section_run = metainfo.SubSection(sub_section=section_run, repeats=True)
section_experiment = metainfo.SubSection(sub_section=section_experiment)
section_metadata = metainfo.SubSection(sub_section=EntryMetadata)
processing_logs = metainfo.Quantity(
type=Any, shape=['0..*'],
description='The processing logs for this entry as a list of structlog entries.')
......@@ -24,7 +24,6 @@ from nomad.metainfo.search_extension import Search
from .common import get_optional_backend_value
from .optimade import OptimadeEntry
from .metainfo.public import section_run
xc_treatments = {
......@@ -235,14 +234,11 @@ class DFTMetadata(MSection):
n_total_energies = 0
n_geometries = 0
for root_section in backend.resource.contents:
if not root_section.m_follows(section_run.m_def):
continue
quantities.add(root_section.m_def.name)
for section_run in backend.entry_archive.section_run:
quantities.add(section_run.m_def.name)
n_quantities += 1
for section, property_def, _ in root_section.m_traverse():
for section, property_def, _ in section_run.m_traverse():
property_name = property_def.name
quantities.add(property_name)
n_quantities += 1
......@@ -284,8 +280,3 @@ class DFTMetadata(MSection):
if aflow_id is not None and aflow_label is not None:
self.labels.append(Label(label=aflow_label, type='prototype', source='aflow_prototype_library'))
self.labels.append(Label(label=aflow_id, type='prototype_id', source='aflow_prototype_library'))
# optimade
optimade = backend.get_mi2_section(OptimadeEntry.m_def)
if optimade is not None:
self.optimade = optimade.m_copy()
......@@ -21,7 +21,6 @@ from nomad.metainfo import Quantity, MSection, Section, Datetime
from nomad.metainfo.search_extension import Search
from .common import get_optional_backend_value
from .metainfo.general_experimental import section_experiment
class EMSMetadata(MSection):
......@@ -107,12 +106,9 @@ class EMSMetadata(MSection):
quantities = set()
for root_section in backend.resource.contents:
if not root_section.m_follows(section_experiment.m_def):
continue
quantities.add(root_section.m_def.name)
for _, property_def, _ in root_section.m_traverse():
quantities.add(property_def.name)
root_section = backend.entry_archive.section_experiment
quantities.add(root_section.m_def.name)
for _, property_def, _ in root_section.m_traverse():
quantities.add(property_def.name)
self.quantities = list(quantities)
......@@ -230,7 +230,7 @@ class UploadFiles(DirectoryObject, metaclass=ABCMeta):
with open(self._user_metadata_file.os_path, 'wb') as f:
pickle.dump(data, f)
def to_staging_upload_files(self, create: bool = False) -> 'StagingUploadFiles':
def to_staging_upload_files(self, create: bool = False, **kwargs) -> 'StagingUploadFiles':
''' Casts to or creates corresponding staging upload files or returns None. '''
raise NotImplementedError()
......@@ -308,7 +308,7 @@ class StagingUploadFiles(UploadFiles):
self._size = 0
self._shared = DirectoryObject(config.fs.public, upload_id, create=create)
def to_staging_upload_files(self, create: bool = False) -> 'StagingUploadFiles':