Commit 876069cc authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Implemented insertion of parsed calculation data into the lecary NOMAD-coe repository db.

parent 4dbbecd5
......@@ -44,7 +44,7 @@
"cwd": "${workspaceFolder}",
"program": "${workspaceFolder}/.pyenv/bin/pytest",
"args": [
"-sv", "tests/test_api.py::test_repo_calcs_user"
"-sv", "tests/test_coe_repo.py::test_add_upload"
]
},
{
......
......@@ -48,9 +48,9 @@ class Repo extends React.Component {
})
static rowConfig = {
chemical_composition_bulk_reduced: 'Formula',
chemical_composition: 'Formula',
program_name: 'Code',
program_basis_set_type: 'Basis set',
basis_set_type: 'Basis set',
system_type: 'System',
crystal_system: 'Crystal',
space_group_number: 'Space group',
......
......@@ -29,7 +29,7 @@ from flask_httpauth import HTTPBasicAuth
import os.path
from nomad import config, infrastructure
from nomad.user import User
from nomad.coe_repo import User
from nomad.processing import Upload
base_path = config.services.api_base_path
......
......@@ -67,8 +67,8 @@ class RepoCalcRes(Resource):
"mainfile":"RopD3Mo8oMV_-E5bh8uW5PiiCRkH1/data/BrK_svSi/TFCC010.CAB/vasprun.xml.relax1",
"program_name":"VASP",
"program_version":"4.6.35 3Apr08 complex parallel LinuxIFC",
"chemical_composition_bulk_reduced":"BrKSi2",
"program_basis_set_type":"plane waves",
"chemical_composition":"BrKSi2",
"basis_set_type":"plane waves",
"atom_species":[
35,
19,
......@@ -135,8 +135,8 @@ class RepoCalcsRes(Resource):
"mainfile":"RopD3Mo8oMV_-E5bh8uW5PiiCRkH1/data/BrK_svSi/TFCC010.CAB/vasprun.xml.relax1",
"program_name":"VASP",
"program_version":"4.6.35 3Apr08 complex parallel LinuxIFC",
"chemical_composition_bulk_reduced":"BrKSi2",
"program_basis_set_type":"plane waves",
"chemical_composition":"BrKSi2",
"basis_set_type":"plane waves",
"atom_species":[
35,
19,
......@@ -176,12 +176,12 @@ class RepoCalcsRes(Resource):
if g.user is None:
abort(401, message='Authentication required for owner value user.')
search = RepoCalc.search().query('match_all')
search = search.filter('term', user_id=g.user.email)
search = search.filter('term', user_id=str(g.user.user_id))
elif owner == 'staging':
if g.user is None:
abort(401, message='Authentication required for owner value user.')
search = RepoCalc.search().query('match_all')
search = search.filter('term', user_id=g.user.email).filter('term', staging=True)
search = search.filter('term', user_id=str(g.user.user_id)).filter('term', staging=True)
else:
abort(400, message='Invalid owner value. Valid values are all|user|staging, default is all')
......
......@@ -233,7 +233,7 @@ class UploadRes(Resource):
except KeyError:
abort(404, message='Upload with id %s does not exist.' % upload_id)
if upload.user_id != g.user.email:
if upload.user_id != str(g.user.user_id):
abort(404, message='Upload with id %s does not exist.' % upload_id)
try:
......@@ -300,7 +300,7 @@ class UploadRes(Resource):
except KeyError:
abort(404, message='Upload with id %s does not exist.' % upload_id)
if upload.user_id != g.user.email:
if upload.user_id != str(g.user.user_id):
abort(404, message='Upload with id %s does not exist.' % upload_id)
json_data = request.get_json()
......@@ -341,7 +341,7 @@ class UploadRes(Resource):
except KeyError:
abort(404, message='Upload with id %s does not exist.' % upload_id)
if upload.user_id != g.user.email:
if upload.user_id != str(g.user.user_id):
abort(404, message='Upload with id %s does not exist.' % upload_id)
try:
......
......@@ -13,13 +13,12 @@
# limitations under the License.
"""
Module with some prototypes/placeholder for future user management in nomad@FAIR.
It is currently based on the NOMAD-coe repository postgres API. This module allows
to authenticate users based on user password or session tokens. It allows to access
the user data like names and user_id.
Interface to the NOMAD-coe repository postgres database. This implementation is based on
SQLAlchemy. There are model classes that represent entries in the *users* and *session*
tables.
This implementation is based on SQLAlchemy. There are model classes that represent
entries in the *users* and *session* tables.
This module allows to authenticate users based on user password or session tokens.
It allows to access the user data like names and user_id.
.. autoclass:: User
:members:
......@@ -30,18 +29,199 @@ entries in the *users* and *session* tables.
:undoc-members:
.. autofunction:: ensure_test_user
This module also provides functionality to add parsed calculation data to the db:
.. autofunction:: add_upload
"""
from passlib.hash import bcrypt
from sqlalchemy import Column, Integer, String
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.postgresql import BYTEA
from nomad import infrastructure
from nomad import utils, infrastructure
from nomad.repo import RepoCalc
Base = declarative_base()
def add_upload(upload, restricted: bool) -> int:
"""
Add the processed upload to the NOMAD-coe repository db. It creates an
uploads-entry, respective calculation and property entries. Everything in one
transaction. Triggers an updates the NOMAD-coe repository elastic search index after
success.
TODO deal with the restricted parameter
"""
repo_db = infrastructure.repository_db
logger = utils.get_logger(
__name__,
upload_id=upload.upload_id,
upload_hash=upload.upload_hash)
result = None
try:
# create upload
coe_upload = Upload(
upload_name=upload.upload_hash,
created=upload.upload_time,
user_id=int(upload.user_id),
is_processed=True)
# add calculations and metadata
has_calcs = False
for repo_calc in RepoCalc.upload_calcs(upload.upload_id):
has_calcs = True
add_calculation(upload, coe_upload, repo_calc)
# commit
if has_calcs:
# empty upload case
repo_db.commit()
result = coe_upload.upload_id
else:
repo_db.rollback()
except Exception as e:
logger.error('Unexpected exception.', exc_info=e)
repo_db.rollback()
raise e
# trigger index update
pass
return result
def add_calculation(upload, coe_upload, calc: RepoCalc) -> None:
repo_db = infrastructure.repository_db
# table based properties
coe_calc = Calc(checksum=calc.calc_hash, upload=coe_upload)
repo_db.add(coe_calc)
program_version = calc.program_version # TODO shorten version names
code_version = repo_db.query(CodeVersion).filter_by(content=program_version).first()
if code_version is None:
code_version = CodeVersion(content=program_version)
repo_db.add(code_version)
metadata = CalcMetaData(
calc=coe_calc,
added=upload.upload_time,
chemical_formula=calc.chemical_composition,
filenames=','.join(calc.aux_files).encode('utf-8'), # TODO fix paths, has to be aligned with API
location=calc.mainfile, # TODO fix paths, has to be aligned with API
version=code_version)
repo_db.add(metadata)
spacegroup = Spacegroup(
calc=coe_calc,
n=int(calc.space_group_number)
)
repo_db.add(spacegroup)
# topic based properties
coe_calc.set_value(topic_code, calc.program_name)
for atom in set(calc.atom_species):
coe_calc.set_value(topic_atoms, str(atom)) # TODO atom label not number
coe_calc.set_value(topic_system_type, calc.system_type)
coe_calc.set_value(topic_xc_treatment, calc.XC_functional_name) # TODO function->treatment
coe_calc.set_value(topic_crystal_system, calc.crystal_system)
coe_calc.set_value(topic_basis_set_type, calc.basis_set_type)
class Calc(Base): # type: ignore
__tablename__ = 'calculations'
calc_id = Column(Integer, primary_key=True, autoincrement=True)
origin_id = Column(Integer, ForeignKey('uploads.upload_id'))
upload = relationship('Upload')
checksum = Column(String)
def set_value(self, topic_cid: int, value: str) -> None:
if value is None:
return
repo_db = infrastructure.repository_db
topic = repo_db.query(Topics).filter_by(topic=value).first()
if not topic:
topic = Topics(cid=topic_cid, topic=value)
repo_db.add(topic)
tag = Tag(calc=self, topic=topic)
repo_db.add(tag)
class CalcMetaData(Base): # type: ignore
__tablename__ = 'metadata'
calc_id = Column(Integer, ForeignKey('calculations.calc_id'), primary_key=True)
calc = relationship('Calc')
added = Column(DateTime)
chemical_formula = Column(String)
filenames = Column(BYTEA)
location = Column(String)
version_id = Column(Integer, ForeignKey('codeversions.version_id'))
version = relationship('CodeVersion')
class CodeVersion(Base): # type: ignore
__tablename__ = 'codeversions'
version_id = Column(Integer, primary_key=True, autoincrement=True)
content = Column(String)
class Spacegroup(Base): # type: ignore
__tablename__ = 'spacegroups'
calc_id = Column(Integer, ForeignKey('calculations.calc_id'), primary_key=True)
calc = relationship('Calc')
n = Column(Integer)
class Tag(Base): # type: ignore
__tablename__ = 'tags'
calc_id = Column(Integer, ForeignKey('calculations.calc_id'), primary_key=True)
calc = relationship('Calc')
tid = Column(Integer, ForeignKey('topics.tid'), primary_key=True)
topic = relationship('Topics')
def __repr__(self):
return '<Tag(calc_id="%d", tid="%d)>' % (self.calc_id, self.tid)
topic_code = 220
topic_atoms = 10
topic_system_type = 50
topic_xc_treatment = 75
topic_crystal_system = 90
topic_basis_set_type = 80
class Topics(Base): # type: ignore
__tablename__ = 'topics'
tid = Column(Integer, primary_key=True, autoincrement=True)
cid = Column(Integer)
topic = Column(String)
class Upload(Base): # type: ignore
__tablename__ = 'uploads'
upload_id = Column(Integer, primary_key=True, autoincrement=True)
upload_name = Column(String)
user_id = Column(Integer, ForeignKey('users.user_id'))
user = relationship('User')
is_processed = Column(Boolean)
created = Column(DateTime)
class Session(Base): # type: ignore
__tablename__ = 'sessions'
......@@ -84,8 +264,8 @@ class User(Base): # type: ignore
assert False, 'Login functions are done by the NOMAD-coe repository GUI'
def get_auth_token(self):
repository_db = infrastructure.repository_db
session = repository_db.query(Session).filter_by(user_id=self.user_id).first()
repo_db = infrastructure.repository_db
session = repo_db.query(Session).filter_by(user_id=self.user_id).first()
if not session:
raise LoginException('No session, user probably not logged in at NOMAD-coe repository GUI')
......@@ -93,8 +273,8 @@ class User(Base): # type: ignore
@staticmethod
def verify_user_password(email, password):
repository_db = infrastructure.repository_db
user = repository_db.query(User).filter_by(email=email).first()
repo_db = infrastructure.repository_db
user = repo_db.query(User).filter_by(email=email).first()
if not user:
return None
......@@ -105,12 +285,12 @@ class User(Base): # type: ignore
@staticmethod
def verify_auth_token(token):
repository_db = infrastructure.repository_db
session = repository_db.query(Session).filter_by(token=token).first()
repo_db = infrastructure.repository_db
session = repo_db.query(Session).filter_by(token=token).first()
if session is None:
return None
user = repository_db.query(User).filter_by(user_id=session.user_id).first()
user = repo_db.query(User).filter_by(user_id=session.user_id).first()
assert user, 'User in sessions must exist.'
return user
......@@ -121,11 +301,11 @@ def ensure_test_user(email):
Returns:
The user as :class:`User` instance.
"""
existing = infrastructure.repository_db.query(User).filter_by(
email=email).first()
repo_db = infrastructure.repository_db
existing = repo_db.query(User).filter_by(email=email).first()
assert existing, 'Test user %s does not exist.' % email
session = infrastructure.repository_db.query(Session).filter_by(
session = repo_db.query(Session).filter_by(
user_id=existing.user_id).first()
assert session, 'Test user %s has no session.' % email
assert session.token == email, 'Test user %s session has unexpected token.' % email
......
......@@ -22,6 +22,8 @@ import shutil
from contextlib import contextmanager
import psycopg2
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from elasticsearch.exceptions import RequestError
from elasticsearch_dsl import connections
from mongoengine import connect
......@@ -37,7 +39,9 @@ mongo_client = None
""" The pymongo mongodb client. """
repository_db = None
""" The repository postgres db sqlalchemy client. """
""" The repository postgres db sqlalchemy session. """
repository_db_conn = None
""" The repository postgres db sqlalchemy connection. """
def setup():
......@@ -106,18 +110,19 @@ def setup_repository_db():
if not exists:
reset_repository_db()
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
global repository_db
global repository_db_conn
url = 'postgresql://%s:%s@%s:%d/%s' % (
config.repository_db.user,
config.repository_db.password,
config.repository_db.host,
config.repository_db.port,
config.repository_db.dbname)
engine = create_engine(url, echo=False, isolation_level='AUTOCOMMIT')
repository_db = sessionmaker(bind=engine)()
engine = create_engine(url, echo=False)
repository_db_conn = engine.connect()
repository_db = Session(bind=repository_db_conn)
logger.info('setup repository db')
......@@ -150,7 +155,14 @@ def repository_db_connection():
config.repository_db.password)
conn = psycopg2.connect(conn_str)
yield conn
try:
yield conn
except Exception as e:
logger.error('Unhandled exception within repository db connection.', exc_info=e)
conn.rollback()
conn.close()
return
conn.commit()
conn.close()
......
......@@ -34,10 +34,9 @@ import time
from structlog import wrap_logger
from contextlib import contextmanager
from nomad import config, utils
from nomad import config, utils, coe_repo
from nomad.files import UploadFile, ArchiveFile, ArchiveLogFile, File
from nomad.repo import RepoCalc
from nomad.user import User
from nomad.processing.base import Proc, Chord, process, task, PENDING, SUCCESS, FAILURE, RUNNING
from nomad.parsing import parsers, parser_dict
from nomad.normalizing import normalizers
......@@ -284,7 +283,7 @@ class Calc(Proc):
additional=additional,
upload_hash=upload_hash,
calc_hash=calc_hash,
upload_id=self.upload_id)
upload_id=self.upload_id).persist()
with utils.timer(
logger, 'archived', step='archive',
......@@ -342,6 +341,8 @@ class Upload(Chord):
upload_url = StringField(default=None)
upload_command = StringField(default=None)
coe_repo_upload_id = IntField(default=None)
_initiated_parsers = IntField(default=-1)
meta: Any = {
......@@ -359,9 +360,9 @@ class Upload(Chord):
return cls.get_by_id(id, 'upload_id')
@classmethod
def user_uploads(cls, user: User) -> List['Upload']:
def user_uploads(cls, user: coe_repo.User) -> List['Upload']:
""" Returns all uploads for the given user. Currently returns all uploads. """
return cls.objects(user_id=user.email, in_staging=True)
return cls.objects(user_id=str(user.user_id), in_staging=True)
def get_logger(self, **kwargs):
logger = super().get_logger()
......@@ -413,13 +414,13 @@ class Upload(Chord):
The upload will be already saved to the database.
Arguments:
user (User): The user that created the upload.
user (coe_repo.User): The user that created the upload.
"""
user: User = kwargs['user']
user: coe_repo.User = kwargs['user']
del(kwargs['user'])
if 'upload_id' not in kwargs:
kwargs.update(upload_id=utils.create_uuid())
kwargs.update(user_id=user.email)
kwargs.update(user_id=str(user.user_id))
self = super().create(**kwargs)
basic_auth_token = base64.b64encode(b'%s:' % user.get_auth_token()).decode('utf-8')
......@@ -443,6 +444,7 @@ class Upload(Chord):
self.get_logger().info('unstage')
self.in_staging = False
RepoCalc.unstage(upload_id=self.upload_id)
# coe_repo.add_upload(self, restricted=False) # TODO allow users to choose restricted
self.save()
@property
......@@ -479,6 +481,12 @@ class Upload(Chord):
@task
def extracting(self):
"""
Task performed before the actual parsing/normalizing. Extracting and bagging
the uploaded files, computing all keys, create an *upload* entry in the NOMAD-coe
repository db, etc.
"""
# extract the uploaded file, this will also create a bagit bag.
logger = self.get_logger()
try:
with utils.timer(
......@@ -489,12 +497,14 @@ class Upload(Chord):
self.fail('process request for non existing upload', level=logging.INFO)
return
# create and save a hash for the upload
try:
self.upload_hash = self.upload_file.upload_hash()
except Exception as e:
self.fail('could not create upload hash', e)
return
# check if the file was already uploaded and processed before
if RepoCalc.upload_exists(self.upload_hash):
self.fail('The same file was already uploaded and processed.', level=logging.INFO)
return
......
......@@ -86,8 +86,7 @@ class RepoCalc(ElasticDocument):
@classmethod
def create_from_backend(
cls, backend: LocalBackend, additional: Dict[str, Any],
upload_id: str, upload_hash: str, calc_hash: str,
**kwargs) -> 'RepoCalc':
upload_id: str, upload_hash: str, calc_hash: str) -> 'RepoCalc':
"""
Create a new calculation instance in elastic search. The data from the given backend
will be used. Additional meta-data can be given as *kwargs*. ``upload_id``,
......@@ -100,12 +99,9 @@ class RepoCalc(ElasticDocument):
upload_hash: The upload hash of the originating upload.
upload_id: The upload id of the originating upload.
calc_hash: The upload unique hash for this calculation.
kwargs: Arguments are passed to elasticsearch index operation.
Raises:
AlreadyExists: If the calculation already exists in elastic search. We use
the elastic document lock here. The elastic document is IDed via the
``archive_id``.
Returns:
The created instance.
"""
assert upload_hash is not None and calc_hash is not None and upload_id is not None
additional.update(dict(upload_hash=upload_hash, calc_hash=calc_hash, upload_id=upload_id))
......@@ -113,33 +109,45 @@ class RepoCalc(ElasticDocument):
# prepare the entry with all necessary properties from the backend
calc = cls(meta=dict(id='%s/%s' % (upload_hash, calc_hash)))
for property in cls._doc_type.mapping:
property = key_mappings.get(property, property)
mapped_property = key_mappings.get(property, property)
if property in additional:
value = additional[property]
if mapped_property in additional:
value = additional[mapped_property]
else:
try:
value = backend.get_value(property, 0)
value = backend.get_value(mapped_property, 0)
if value is None:
raise KeyError
except KeyError:
try:
program_name = backend.get_value('program_name', 0)
except KeyError:
program_name = 'unknown'
logger.warning(
'Missing property value', property=property, upload_id=upload_id,
'Missing property value', property=mapped_property, upload_id=upload_id,
upload_hash=upload_hash, calc_hash=calc_hash, code=program_name)
continue
setattr(calc, property, value)
# persist to elastic search
return calc
def persist(self, **kwargs):
"""
Persist this entry to elastic search. Kwargs are passed to elastic search.
Raises:
AlreadyExists: If the calculation already exists in elastic search. We use
the elastic document lock here. The elastic document is IDed via the
``archive_id``.
"""
try:
# In practive es operation might fail due to timeout under heavy loads/
# bad configuration. Retries with a small delay is a pragmatic solution.
e_after_retries = None
for _ in range(0, 2):
try:
calc.save(op_type='create', **kwargs)
self.save(op_type='create', **kwargs)
e_after_retries = None
break
except ConnectionTimeout as e:
......@@ -154,9 +162,7 @@ class RepoCalc(ElasticDocument):
# if we had and exception and could not fix with retries, throw it
raise e_after_retries # pylint: disable=E0702
except ConflictError:
raise AlreadyExists('Calculation %s does already exist.' % (calc.archive_id))
return calc
raise AlreadyExists('Calculation %s does already exist.' % (self.archive_id))
@staticmethod
def delete_upload(upload_id):
......@@ -208,6 +214,13 @@ class RepoCalc(ElasticDocument):
return len(search) > 0
@staticmethod
def upload_calcs(upload_id):
""" Returns an iterable over all entries for the given upload_id. """
return Search(using=infrastructure.elastic_client, index=config.elastic.calc_index) \
.query('match', upload_id=upload_id) \
.scan()
@property