Commit f9d36ffd authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Removed the nomad@FAIRDI repository search code for now.

parent 207bff0e
......@@ -17,39 +17,42 @@ The repository API of the nomad@FAIRDI APIs. Currently allows to resolve reposit
meta-data.
"""
from elasticsearch.exceptions import NotFoundError
from flask import g, request
from flask_restplus import Resource, abort, fields
from nomad.repo import RepoCalc
from nomad.files import UploadFiles, Restricted
from .app import api
from .auth import login_if_available
from .auth import login_if_available, create_authorization_predicate
from .common import pagination_model, pagination_request_parser, calc_route
ns = api.namespace('repo', description='Access repository metadata, edit user metadata.')
ns = api.namespace('repo', description='Access repository metadata.')
@calc_route(ns)
class RepoCalcResource(Resource):
@api.response(404, 'The upload or calculation does not exist')
@api.response(401, 'Not authorized to access the calculation')
@api.response(200, 'Metadata send')
@api.doc('get_repo_calc')
@login_if_available
def get(self, upload_id, calc_id):
"""
Get calculation metadata in repository form.
Repository metadata only entails the quanties shown in the repository.
This is basically the elastic search index entry for the
requested calculations. Calcs are references via *upload_id*, *calc_id*
pairs.
Calcs are references via *upload_id*, *calc_id* pairs.
"""
# TODO use elastic search instead of the files
upload_files = UploadFiles.get(upload_id, create_authorization_predicate(upload_id, calc_id))
if upload_files is None:
abort(404, message='There is no upload %s' % upload_id)
try:
return RepoCalc.get(id='%s/%s' % (upload_id, calc_id)).json_dict, 200
except NotFoundError:
return upload_files.metadata.get(calc_id), 200
except Restricted:
abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
except KeyError:
abort(404, message='There is no calculation for %s/%s' % (upload_id, calc_id))
except Exception as e:
abort(500, message=str(e))
repo_calcs_model = api.model('RepoCalculations', {
......@@ -73,38 +76,41 @@ class RepoCalcsResource(Resource):
def get(self):
"""
Get *'all'* calculations in repository from, paginated.
"""
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 10))
owner = request.args.get('owner', 'all')
try:
assert page >= 1
assert per_page > 0
except AssertionError:
abort(400, message='invalid pagination')
if owner == 'all':
search = RepoCalc.search().query('match_all')
elif owner == 'user':
if g.user is None:
abort(401, message='Authentication required for owner value user.')
search = RepoCalc.search().query('match_all')
search = search.filter('term', user_id=str(g.user.user_id))
elif owner == 'staging':
if g.user is None:
abort(401, message='Authentication required for owner value user.')
search = RepoCalc.search().query('match_all')
search = search.filter('term', user_id=str(g.user.user_id)).filter('term', staging=True)
else:
abort(400, message='Invalid owner value. Valid values are all|user|staging, default is all')
search = search[(page - 1) * per_page: page * per_page]
return {
'pagination': {
'total': search.count(),
'page': page,
'per_page': per_page
},
'results': [result.json_dict for result in search]
}, 200
This is currently not implemented!
"""
return [], 200
# page = int(request.args.get('page', 1))
# per_page = int(request.args.get('per_page', 10))
# owner = request.args.get('owner', 'all')
# try:
# assert page >= 1
# assert per_page > 0
# except AssertionError:
# abort(400, message='invalid pagination')
# if owner == 'all':
# search = RepoCalc.search().query('match_all')
# elif owner == 'user':
# if g.user is None:
# abort(401, message='Authentication required for owner value user.')
# search = RepoCalc.search().query('match_all')
# search = search.filter('term', user_id=str(g.user.user_id))
# elif owner == 'staging':
# if g.user is None:
# abort(401, message='Authentication required for owner value user.')
# search = RepoCalc.search().query('match_all')
# search = search.filter('term', user_id=str(g.user.user_id)).filter('term', staging=True)
# else:
# abort(400, message='Invalid owner value. Valid values are all|user|staging, default is all')
# search = search[(page - 1) * per_page: page * per_page]
# return {
# 'pagination': {
# 'total': search.count(),
# 'page': page,
# 'per_page': per_page
# },
# 'results': [result.json_dict for result in search]
# }, 200
......@@ -43,12 +43,11 @@ This module also provides functionality to add parsed calculation data to the db
"""
from typing import Type
import itertools
import datetime
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from nomad import utils, infrastructure, datamodel
from nomad.repo import RepoUpload, RepoCalc
from nomad import utils, infrastructure, datamodel, files
from .user import User
from .calc import Calc
......@@ -155,9 +154,9 @@ class Upload(Base, datamodel.Upload): # type: ignore
# add calculations and metadata
has_calcs = False
for calc in upload.to(RepoUpload).calcs:
for calc in upload.calcs:
has_calcs = True
coe_upload._add_calculation(calc.to(RepoCalc), upload_meta_data.get(calc.mainfile))
coe_upload._add_calculation(calc.to(files.Calc), upload_meta_data.get(calc.mainfile))
# commit
if has_calcs:
......@@ -176,7 +175,7 @@ class Upload(Base, datamodel.Upload): # type: ignore
return result
def _add_calculation(self, calc: RepoCalc, calc_meta_data: dict) -> None:
def _add_calculation(self, calc: files.Calc, calc_meta_data: dict) -> None:
repo_db = infrastructure.repository_db
# table based properties
......@@ -192,13 +191,11 @@ class Upload(Base, datamodel.Upload): # type: ignore
code_version = CodeVersion(content=program_version)
repo_db.add(code_version)
filenames = itertools.chain([calc.mainfile], calc.aux_files)
metadata = CalcMetaData(
calc=coe_calc,
added=calc_meta_data.get('_upload_time', self.upload_time),
chemical_formula=calc.chemical_composition,
filenames=('[%s]' % ','.join(['"%s"' % filename for filename in filenames])).encode('utf-8'),
filenames=('[%s]' % ','.join(['"%s"' % filename for filename in calc.files])).encode('utf-8'),
location=calc.mainfile,
version=code_version)
repo_db.add(metadata)
......
......@@ -36,7 +36,7 @@ almost readonly (beside metadata) storage.
"""
from abc import ABCMeta
from typing import IO, Generator, Dict, Iterator, Iterable, Callable
from typing import IO, Generator, Dict, Iterator, Iterable, Callable, List
import ujson
import os.path
import os
......@@ -48,7 +48,7 @@ import base64
import io
import gzip
from nomad import config, utils
from nomad import config, utils, datamodel
class PathObject:
......@@ -335,6 +335,8 @@ class StagingUploadFiles(UploadFiles):
@property
def metadata(self) -> StagingMetadata:
if not self._is_authorized():
raise Restricted
return self._metadata
def _file(self, path_object: PathObject, *args, **kwargs) -> IO:
......@@ -492,7 +494,8 @@ class StagingUploadFiles(UploadFiles):
def calc_files(self, mainfile: str, with_mainfile: bool = True) -> Iterable[str]:
"""
Returns all the auxfiles and mainfile for a given mainfile. This implements
nomad's logic about what is part of a calculation and what not.
nomad's logic about what is part of a calculation and what not. The mainfile
is first entry, the rest is sorted.
Arguments:
mainfile: The mainfile relative to upload
with_mainfile: Do include the mainfile, default is True
......@@ -501,12 +504,16 @@ class StagingUploadFiles(UploadFiles):
if not mainfile_object.exists():
raise KeyError()
mainfile = os.path.basename(mainfile)
mainfile_basename = os.path.basename(mainfile)
calc_dir = os.path.dirname(mainfile_object.os_path)
calc_relative_dir = calc_dir[len(self._raw_dir.os_path) + 1:]
return sorted(
aux_files = sorted(
os.path.join(calc_relative_dir, path) for path in os.listdir(calc_dir)
if os.path.isfile(os.path.join(calc_dir, path)) and (with_mainfile or path != mainfile))
if os.path.isfile(os.path.join(calc_dir, path)) and path != mainfile_basename)
if with_mainfile:
return [mainfile] + aux_files
else:
return aux_files
def _websave_hash(self, hash: bytes, length: int = 0) -> str:
if length > 0:
......@@ -650,3 +657,68 @@ class PublicUploadFiles(UploadFiles):
the restrictions on calculations. This is potentially a long running operation.
"""
pass
class Calc(datamodel.Calc):
@classmethod
def load_from(cls, obj):
return Calc(obj.upload.upload_id, obj.calc_id)
def __init__(self, upload_id: str, calc_id: str) -> None:
self._calc_id = calc_id
upload_files = UploadFiles.get(upload_id, is_authorized=lambda: True)
if upload_files is None:
raise KeyError
self._data = upload_files.metadata.get(calc_id)
@property
def calc_data(self) -> dict:
return self._data['section_repository_info']['section_repository_parserdata']
@property
def calc_id(self) -> str:
return self._calc_id
@property
def mainfile(self) -> str:
return self.files[0]
@property
def files(self) -> List[str]:
return self._data['section_repository_info']['repository_filepaths']
@property
def program_name(self) -> str:
return self.calc_data['repository_program_name']
@property
def program_version(self) -> str:
return self.calc_data['repository_code_version']
@property
def chemical_composition(self) -> str:
return self.calc_data['repository_chemical_formula']
@property
def space_group_number(self) -> int:
return self.calc_data['repository_spacegroup_nr']
@property
def atom_species(self) -> list:
return self.calc_data['repository_atomic_elements']
@property
def system_type(self) -> str:
return self.calc_data['repository_system_type']
@property
def XC_functional_name(self) -> str:
return self.calc_data['repository_xc_treatment']
@property
def crystal_system(self) -> str:
return self.calc_data['repository_crystal_system']
@property
def basis_set_type(self) -> str:
return self.calc_data['repository_basis_set_type']
......@@ -84,8 +84,6 @@ def setup_elastic():
logger.info('setup elastic connection')
try:
from nomad.repo import RepoCalc
RepoCalc.init()
from nomad.search import Entry
Entry.init()
except RequestError as e:
......@@ -178,8 +176,8 @@ def reset():
if not elastic_client:
setup_elastic()
elastic_client.indices.delete(index=config.elastic.index_name)
from nomad.repo import RepoCalc
RepoCalc.init()
from nomad.search import Entry
Entry.init()
logger.info('elastic index resetted')
except Exception as e:
logger.error('exception resetting elastic', exc_info=e)
......
......@@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from nomad.parsing import BadContextURI
from .normalizer import Normalizer
......@@ -24,13 +26,20 @@ class RepositoryNormalizer(Normalizer):
super().normalize(logger)
b = self._backend
b.openNonOverlappingSection('section_repository_info')
repository_info_context = '/section_repository_info/0'
try:
b.openContext(repository_info_context)
except BadContextURI:
b.openNonOverlappingSection('section_repository_info')
repository_info_context = None
b.openNonOverlappingSection('section_repository_parserdata')
b.addValue('repository_checksum', b.get_value('calc_hash', 0))
b.addValue('repository_chemical_formula', b.get_value('chemical_composition_bulk_reduced', 0))
b.addValue('repository_parser_id', b.get_value('parser_name', 0))
atoms = b.get_value('atom_labels', 0)
# TODO make list unique?
b.addValue('repository_atomic_elements', atoms)
b.addValue('repository_atomic_elements_count', len(atoms))
b.addValue('repository_basis_set_type', b.get_value('program_basis_set_type', 0))
......@@ -44,5 +53,8 @@ class RepositoryNormalizer(Normalizer):
b.addValue('repository_xc_treatment', b.get_value('XC_functional_name', 0))
b.closeNonOverlappingSection('section_repository_parserdata')
b.closeNonOverlappingSection('section_repository_info')
if repository_info_context is None:
b.closeNonOverlappingSection('section_repository_info')
else:
b.closeContext(repository_info_context)
b.finishedParsingSession("ParseSuccess", None)
......@@ -32,7 +32,6 @@ from contextlib import contextmanager
from nomad import utils, coe_repo, datamodel
from nomad.files import PathObject, ArchiveBasedStagingUploadFiles
from nomad.repo import RepoCalc, RepoUpload
from nomad.processing.base import Proc, Chord, process, task, PENDING, SUCCESS, FAILURE
from nomad.parsing import parsers, parser_dict
from nomad.normalizing import normalizers
......@@ -177,6 +176,12 @@ class Calc(Proc, datamodel.Calc):
self._parser_backend.closeNonOverlappingSection('section_calculation_info')
self._parser_backend.openNonOverlappingSection('section_repository_info')
self._parser_backend.addValue('repository_archive_gid', '%s/%s' % (self.upload_id, self.calc_id))
self._parser_backend.addValue(
'repository_filepaths', self.upload_files.calc_files(self.mainfile))
self._parser_backend.closeNonOverlappingSection('section_repository_info')
self.add_processor_info(self.parser)
@contextmanager
......@@ -231,26 +236,10 @@ class Calc(Proc, datamodel.Calc):
def archiving(self):
logger = self.get_logger()
additional = dict(
mainfile=self.mainfile,
upload_time=self.upload.upload_time,
staging=True,
restricted=False,
user_id=self.upload.user_id,
aux_files=list(self.upload_files.calc_files(self.mainfile, with_mainfile=False)))
# persist the repository metadata
with utils.timer(logger, 'indexed', step='index'):
self.upload_files.metadata.insert(self._parser_backend.metadata())
with utils.timer(logger, 'indexed', step='index'):
repo_calc = RepoCalc.create_from_backend(
self._parser_backend,
additional=additional,
calc_id=self.calc_id,
upload_id=self.upload_id)
repo_calc.persist()
# persist the archive
with utils.timer(
logger, 'archived', step='archive',
......@@ -364,14 +353,11 @@ class Upload(Chord, datamodel.Upload):
if not (self.completed or self.current_task == 'uploading'):
raise NotAllowedDuringProcessing()
self.delete()
self.to(RepoUpload).unstage()
coe_repo.Upload.add(self, meta_data)
self.save()
self.upload_files.pack()
self.upload_files.delete()
self.delete()
@process
def process(self):
......@@ -406,11 +392,6 @@ class Upload(Chord, datamodel.Upload):
self.fail('process request for non existing upload', level=logging.ERROR)
return
# check if the file was already uploaded and processed before
if self.to(RepoUpload).exists():
self.fail('The same file was already uploaded and processed.', level=logging.INFO)
return
def match_mainfiles(self) -> Generator[Tuple[str, object], None, None]:
"""
Generator function that matches all files in the upload to all parsers to
......
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module is about maintaining the repository search index and providing all
data to the repository related parts of nomad.
We use *elasticsearch_dsl* to interface with elastic search. The class :class:`RepoCalc`
is an elasticsearch_dsl document that is used to represent repository index entries.
.. autoclass:: nomad.repo.RepoCalc
:members:
"""
from typing import Dict, Any
from elasticsearch.exceptions import ConflictError, ConnectionTimeout
from elasticsearch_dsl import Document as ElasticDocument, Search, Date, Keyword, Boolean
from datetime import datetime
import time
from nomad import config, infrastructure, datamodel
from nomad.parsing import LocalBackend
from nomad.utils import get_logger
logger = get_logger(__name__)
key_mappings = {
'basis_set_type': 'program_basis_set_type',
'chemical_composition': 'chemical_composition_bulk_reduced'
}
class AlreadyExists(Exception): pass
class RepoUpload(datamodel.Entity):
def __init__(self, upload_id):
self.upload_id = upload_id
@classmethod
def load_from(cls, obj):
return RepoUpload(obj.upload_id)
@property
def calcs(self):
return Search(using=infrastructure.elastic_client, index=config.elastic.index_name) \
.query('match', upload_id=self.upload_id) \
.scan()
def delete(self):
""" Deletes all repo entries of the given upload. """
RepoCalc.search().query('match', upload_id=self.upload_id).delete()
def exists(self):
""" Returns true if there are already calcs from the given upload. """
# TODO this is deprecated and should be varyfied via repository files
search = Search(using=infrastructure.elastic_client, index=config.elastic.index_name) \
.query('match', upload_id=self.upload_id) \
.execute()
return len(search) > 0
def unstage(self, staging=False):
""" Update the staging property for all repo entries of the given upload. """
RepoCalc.update_by_query(self.upload_id, {
'inline': 'ctx._source.staging=%s' % ('true' if staging else 'false'),
'lang': 'painless'
})
class RepoCalc(ElasticDocument, datamodel.Entity):
"""
Elastic search document that represents a calculation. It is supposed to be a
component of :class:`Calc`. Should only be created by its parent :class:`Calc`
instance and only via the :func:`create_from_backend` factory method.
"""
class Index:
name = config.elastic.index_name
calc_id = Keyword()
mainfile = Keyword()
upload_id = Keyword()
upload_time = Date()
staging = Boolean()
restricted = Boolean()
user_id = Keyword()
program_name = Keyword()
program_version = Keyword()
chemical_composition = Keyword()
basis_set_type = Keyword()
atom_species = Keyword()
system_type = Keyword()
crystal_system = Keyword()
space_group_number = Keyword()
configuration_raw_gid = Keyword()
XC_functional_name = Keyword()
aux_files = Keyword()
@property
def upload(self):
return RepoUpload(self.upload_id)
@classmethod
def create_from_backend(
cls, backend: LocalBackend, additional: Dict[str, Any],
upload_id: str, calc_id: str) -> 'RepoCalc':
"""
Create a new calculation instance in elastic search. The data from the given backend
will be used. Additional meta-data can be given as *kwargs*.
``upload_id`` and ``calc_id`` are mandatory.
Arguments:
backend: The parsing/normalizing backend that contains the calculation data.
additional: Additional arguments not stored in the backend. E.g. ``user_id``,
``staging``, ``restricted``
upload_id: The upload id of the originating upload.
calc_id: The upload unique id for this calculation.
Returns:
The created instance.
"""
assert calc_id is not None and upload_id is not None
additional.update(dict(calc_id=calc_id, upload_id=upload_id))
# prepare the entry with all necessary properties from the backend
calc = cls(meta=dict(id=calc_id))
for property in cls._doc_type.mapping:
mapped_property = key_mappings.get(property, property)
if mapped_property in additional:
value = additional[mapped_property]
else:
try:
value = backend.get_value(mapped_property, 0)
if value is None:
raise KeyError
except KeyError:
try:
program_name = backend.get_value('program_name', 0)
except KeyError:
program_name = 'unknown'
logger.warning(
'Missing property value', property=mapped_property, upload_id=upload_id,
calc_id=calc_id, code=program_name)
continue
setattr(calc, property, value)
return calc
def persist(self, **kwargs):
"""
Persist this entry to elastic search. Kwargs are passed to elastic search.
Raises:
AlreadyExists: If the calculation already exists in elastic search. We use
the elastic document lock here. The elastic document is IDed via the
``calc_id``.
"""
try:
# In practive es operation might fail due to timeout under heavy loads/
# bad configuration. Retries with a small delay is a pragmatic solution.