Commit 41e4044c authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Refactored test fixtures. Added search tests.

parent 76d899d2
Pipeline #43627 failed with stages
in 1 minute and 22 seconds
......@@ -44,7 +44,7 @@
"cwd": "${workspaceFolder}",
"program": "${workspaceFolder}/.pyenv/bin/pytest",
"args": [
"-sv", "tests/test_api.py::TestUploads::test_put[None-multipart-tests/data/proc/examples_template.zip]"
"-sv", "tests/test_api.py::TestUploads::test_post[tests/data/proc/empty.zip]"
]
},
{
......
......@@ -57,3 +57,7 @@ nomad.utils
nomad.migration
---------------
.. automodule:: nomad.migration
tests
-----
.. automodule:: tests
......@@ -17,6 +17,7 @@ import json
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship, aliased
from sqlalchemy.sql.expression import literal
from datetime import datetime
from nomad import infrastructure, utils
from nomad.datamodel import CalcWithMetadata
......@@ -149,9 +150,16 @@ class Calc(Base):
code_version_obj = CodeVersion(content=source_code_version)
repo_db.add(code_version_obj)
if calc.upload_time is not None:
added_time = calc.upload_time
elif self.upload is not None and self.upload.upload_time is not None:
added_time = self.upload.upload_time
else:
added_time = datetime.now()
metadata = CalcMetaData(
calc=self,
added=calc.upload_time if calc.upload_time is not None else self.upload.upload_time,
added=added_time,
chemical_formula=calc.formula,
filenames=('[%s]' % ','.join(['"%s"' % filename for filename in calc.files])).encode('utf-8'),
location=calc.mainfile,
......@@ -183,41 +191,45 @@ class Calc(Base):
self._set_value(base.topic_basis_set_type, calc.basis_set)
# user relations
if calc.uploader is not None:
uploader = repo_db.query(User).get(calc.uploader.id)
else:
uploader = self.upload.user
def add_users_to_relation(source_users, relation):
for source_user in source_users:
coe_user = repo_db.query(User).get(source_user.id)
source_user.update(coe_user.to_popo())
relation.append(coe_user)
self.owners.append(uploader)
for coauthor in calc.coauthors:
self.coauthors.append(repo_db.query(User).get(coauthor.id))
if calc.uploader is not None:
add_users_to_relation([calc.uploader], self.owners)
elif self.upload is not None and self.upload.user is not None:
self.owners.append(self.upload.user)
calc.uploader = self.upload.user.to_popo()
for shared_with in calc.shared_with:
self.shared_with.append(repo_db.query(User).get(shared_with.id))
add_users_to_relation(calc.coauthors, self.coauthors)
add_users_to_relation(calc.shared_with, self.shared_with)
# datasets
for dataset in calc.datasets:
dataset_id = dataset.id
coe_dataset = repo_db.query(Calc).get(dataset_id)
if coe_dataset is None:
coe_dataset = Calc(coe_calc_id=dataset_id)
repo_db.add(coe_dataset)
coe_dataset_calc: Calc = repo_db.query(Calc).get(dataset_id)
if coe_dataset_calc is None:
coe_dataset_calc = Calc(coe_calc_id=dataset_id)
repo_db.add(coe_dataset_calc)
metadata = CalcMetaData(
calc=coe_dataset,
calc=coe_dataset_calc,
added=self.upload.upload_time,
chemical_formula=dataset.name)
repo_db.add(metadata)
if dataset.doi is not None:
self._add_citation(coe_dataset, dataset.doi['value'], 'INTERNAL')
self._add_citation(coe_dataset_calc, dataset.doi['value'], 'INTERNAL')
# cause a flush to avoid future inconsistencies
coe_dataset = repo_db.query(Calc).get(dataset_id)
coe_dataset_calc = repo_db.query(Calc).get(dataset_id)
dataset = CalcSet(parent_calc_id=dataset_id, children_calc_id=self.coe_calc_id)
repo_db.add(dataset)
coe_dataset_rel = CalcSet(parent_calc_id=dataset_id, children_calc_id=self.coe_calc_id)
repo_db.add(coe_dataset_rel)
dataset.update(DataSet(coe_dataset_calc).to_popo())
# references
for reference in calc.references:
......@@ -284,9 +296,7 @@ class Calc(Base):
result.pid = self.pid
result.uploader = self.uploader.to_popo()
result.upload_time = self.calc_metadata.added
result.datasets = list(
utils.POPO(id=ds.id, doi=ds.doi.to_popo(), name=ds.name)
for ds in datasets)
result.datasets = list(ds.to_popo() for ds in datasets)
result.with_embargo = self.with_embargo
result.comment = self.comment
result.references = list(
......@@ -320,3 +330,6 @@ class DataSet:
@property
def name(self):
return self._dataset_calc.calc_metadata.chemical_formula
def to_popo(self):
return utils.POPO(id=self.id, doi=self.doi.to_popo(), name=self.name)
......@@ -150,7 +150,7 @@ class User(Base): # type: ignore
def to_popo(self) -> utils.POPO:
return utils.POPO(
user_id=self.user_id,
id=self.user_id,
first_name=self.first_name,
last_name=self.last_name,
email=self.email,
......
......@@ -122,3 +122,23 @@ class CalcWithMetadata():
key: value for key, value in self.__dict__.items()
if value is not None
}
def apply_user_metadata(self, metadata: dict):
"""
Applies a user provided metadata dict to this calc.
"""
self.pid = metadata.get('_pid')
self.comment = metadata.get('comment')
self.upload_time = metadata.get('_upload_time')
uploader_id = metadata.get('_uploader')
if uploader_id is not None:
self.uploader = utils.POPO(id=uploader_id)
self.references = [utils.POPO(value=ref) for ref in metadata.get('references', [])]
self.with_embargo = metadata.get('with_embargo', False)
self.coauthors = [
utils.POPO(id=user) for user in metadata.get('coauthors', [])]
self.shared_with = [
utils.POPO(id=user) for user in metadata.get('shared_with', [])]
self.datasets = [
utils.POPO(id=ds['id'], doi=utils.POPO(value=ds.get('_doi')), name=ds.get('_name'))
for ds in metadata.get('datasets', [])]
......@@ -90,6 +90,8 @@ def setup_elastic():
try:
from nomad.search import Entry
Entry.init(index=config.elastic.index_name)
Entry._index._name = config.elastic.index_name
logger.info('initialized elastic index', index_name=config.elastic.index_name)
except RequestError as e:
if e.status_code == 400 and 'resource_already_exists_exception' in e.error:
......@@ -99,6 +101,8 @@ def setup_elastic():
else:
logger.info('init elastic index')
return elastic_client
def setup_repository_db(**kwargs):
""" Creates a connection and stores it in the module variables. """
......
......@@ -423,7 +423,7 @@ class NomadCOEMigration:
""" Transforms to a dict that fullfils the API's uploade metadata model. """
return dict(
_upload_time=source.upload_time,
_uploader=source.uploader['user_id'],
_uploader=source.uploader['id'],
_pid=source.pid,
references=[ref['value'] for ref in source.references],
datasets=[dict(
......@@ -433,8 +433,8 @@ class NomadCOEMigration:
mainfile=source.mainfile,
with_embargo=source.with_embargo,
comment=source.comment,
coauthors=list(user['user_id'] for user in source.coauthors),
shared_with=list(user['user_id'] for user in source.shared_with)
coauthors=list(user['id'] for user in source.coauthors),
shared_with=list(user['id'] for user in source.shared_with)
)
def index(self, *args, **kwargs):
......
......@@ -30,7 +30,7 @@ import logging
from structlog import wrap_logger
from contextlib import contextmanager
from nomad import utils, coe_repo, config, infrastructure
from nomad import utils, coe_repo, config, infrastructure, search
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles
from nomad.processing.base import Proc, Chord, process, task, PENDING, SUCCESS, FAILURE
from nomad.parsing import parsers, parser_dict
......@@ -231,14 +231,19 @@ class Calc(Proc):
def archiving(self):
logger = self.get_logger()
calc_with_metadata = self._parser_backend.to_calc_with_metadata()
# persist the repository metadata
with utils.timer(logger, 'indexed', step='index'):
self.upload_files.metadata.insert(
self._parser_backend.to_calc_with_metadata().to_dict())
with utils.timer(logger, 'saved repo metadata', step='persist'):
self.upload_files.metadata.insert(calc_with_metadata.to_dict())
# index in search
with utils.timer(logger, 'indexed', step='persist'):
search.Entry.from_calc_with_metadata(calc_with_metadata, published=False).persist()
# persist the archive
with utils.timer(
logger, 'archived', step='archive',
logger, 'archived', step='persist',
input_size=self.mainfile_file.size) as log_data:
with self.upload_files.archive_file(self.calc_id, 'wt') as out:
self._parser_backend.write_json(out, pretty=True)
......@@ -248,7 +253,7 @@ class Calc(Proc):
# close loghandler
if self._calc_proc_logwriter is not None:
with utils.timer(
logger, 'archived log', step='archive_log',
logger, 'archived log', step='persist',
input_size=self.mainfile_file.size) as log_data:
self._calc_proc_logwriter_ctx.__exit__(None, None, None) # pylint: disable=E1101
self._calc_proc_logwriter = None
......@@ -350,6 +355,11 @@ class Upload(Chord):
logger = self.get_logger()
with utils.lnr(logger, 'staged upload delete failed'):
with utils.timer(
logger, 'upload deleted from index', step='delete',
upload_size=self.upload_files.size):
search.Entry.delete_upload(self.upload_id)
with utils.timer(
logger, 'staged upload deleted', step='delete',
upload_size=self.upload_files.size):
......@@ -369,16 +379,23 @@ class Upload(Chord):
logger = self.get_logger()
with utils.lnr(logger, 'publish failed'):
upload_with_metadata = self.to_upload_with_metadata()
with utils.timer(
logger, 'upload added to repository', step='publish',
upload_size=self.upload_files.size):
coe_repo.Upload.add(self.to_upload_with_metadata())
coe_repo.Upload.add(upload_with_metadata)
with utils.timer(
logger, 'staged upload files packed', step='publish',
upload_size=self.upload_files.size):
self.upload_files.pack()
with utils.timer(
logger, 'index updated', step='publish',
upload_size=self.upload_files.size):
search.Entry.publish_upload(upload_with_metadata)
with utils.timer(
logger, 'staged upload deleted', step='publish',
upload_size=self.upload_files.size):
......@@ -523,25 +540,8 @@ class Upload(Chord):
def apply_metadata(calc):
metadata = calc_metadata.get(calc.mainfile, self.metadata)
if metadata is None:
return calc
calc.pid = metadata.get('_pid')
calc.comment = metadata.get('comment')
calc.upload_time = metadata.get('_upload_time')
uploader_id = metadata.get('_uploader')
if uploader_id is not None:
calc.uploader = utils.POPO(id=uploader_id)
calc.references = [utils.POPO(value=ref) for ref in metadata.get('references', [])]
calc.with_embargo = metadata.get('with_embargo', False)
calc.coauthors = [
utils.POPO(id=user) for user in metadata.get('coauthors', [])]
calc.shared_with = [
utils.POPO(id=user) for user in metadata.get('shared_with', [])]
calc.datasets = [
utils.POPO(id=ds['id'], doi=utils.POPO(value=ds.get('_doi')), name=ds.get('_name'))
for ds in metadata.get('datasets', [])]
if metadata is not None:
calc.apply_user_metadata(metadata)
return calc
result = UploadWithMetadata(
......
......@@ -20,20 +20,27 @@ from elasticsearch.exceptions import ConflictError, ConnectionTimeout
from datetime import datetime
import time
from elasticsearch_dsl import Document, InnerDoc, Keyword, Text, Date, \
Nested
Nested, Boolean, Search
from nomad import config, datamodel, infrastructure, datamodel
from nomad import config, datamodel, infrastructure, datamodel, coe_repo
class AlreadyExists(Exception): pass
class User(InnerDoc):
def __init__(self, user):
super().__init__(
id=user.user_id,
name='%s %s' % (user.first_name, user.last_name),
name_keyword='%s %s' % (user.first_name, user.last_name))
@classmethod
def from_user_popo(cls, user):
self = cls(id=user.id)
if 'first_name' not in user:
user = coe_repo.User.from_user_id(user.id).to_popo()
self.name = '%s %s' % (user['first_name'], user['last_name'])
self.name_keyword = '%s %s' % (user['first_name'], user['last_name'])
return self
id = Keyword()
name = Text()
......@@ -41,8 +48,10 @@ class User(InnerDoc):
class Dataset(InnerDoc):
def __init__(self, dataset):
super().__init__(
@classmethod
def from_dataset_popo(cls, dataset):
return cls(
id=dataset.id,
doi=dataset.doi.value if dataset.doi is not None else None,
name=dataset.name)
......@@ -57,15 +66,17 @@ class Entry(Document):
name = config.elastic.index_name
upload_id = Keyword()
upload_time = Date(format='epoch_millis')
upload_time = Date()
calc_id = Keyword()
calc_hash = Keyword()
pid = Keyword()
mainfile = Keyword()
files = Keyword()
files = Keyword(multi=True)
uploader = Nested(User)
with_embargo = Keyword()
with_embargo = Boolean()
published = Boolean()
coauthors = Nested(User)
shared_with = Nested(User)
comment = Text()
......@@ -73,7 +84,7 @@ class Entry(Document):
datasets = Nested(Dataset)
formula = Keyword()
atoms = Keyword()
atoms = Keyword(multi=True)
basis_set = Keyword()
xc_functional = Keyword()
system = Keyword()
......@@ -83,7 +94,7 @@ class Entry(Document):
code_version = Keyword()
@classmethod
def from_calc_with_metadata(cls, source: datamodel.CalcWithMetadata) -> 'Entry':
def from_calc_with_metadata(cls, source: datamodel.CalcWithMetadata, published: bool = False) -> 'Entry':
return Entry(
meta=dict(id=source.calc_id),
upload_id=source.upload_id,
......@@ -93,14 +104,15 @@ class Entry(Document):
pid=str(source.pid),
mainfile=source.mainfile,
files=source.files,
uploader=User(source.uploader) if source.uploader is not None else None,
uploader=User.from_user_popo(source.uploader) if source.uploader is not None else None,
with_embargo=source.with_embargo,
coauthors=[User(user) for user in source.coauthors],
shared_with=[User(user) for user in source.shared_with],
published=published,
coauthors=[User.from_user_popo(user) for user in source.coauthors],
shared_with=[User.from_user_popo(user) for user in source.shared_with],
comment=source.comment,
references=[ref.value for ref in source.references],
datasets=[Dataset(ds) for ds in source.datasets],
datasets=[Dataset.from_dataset_popo(ds) for ds in source.datasets],
formula=source.formula,
atoms=list(set(source.atoms)),
......@@ -112,6 +124,11 @@ class Entry(Document):
code_name=source.code_name,
code_version=source.code_version)
@classmethod
def add_upload(cls, source: datamodel.UploadWithMetadata):
for calc in source.calcs:
cls.from_calc_with_metadata(calc).save(op_type='create')
def persist(self, **kwargs):
"""
Persist this entry to elastic search. Kwargs are passed to elastic search.
......@@ -160,6 +177,16 @@ class Entry(Document):
}
conn.update_by_query(index, doc_type=[doc_type], body=body)
@classmethod
def publish_upload(cls, upload: datamodel.UploadWithMetadata):
cls.update_by_query(upload.upload_id, 'ctx._source["published"] = true')
# TODO run update on all calcs with their new metadata
@classmethod
def delete_upload(cls, upload_id):
index = cls._default_index()
Search(index=index).query('match', upload_id=upload_id).delete()
@staticmethod
def es_search(body):
""" Perform an elasticsearch and not elasticsearch_dsl search on the Calc index. """
......@@ -175,6 +202,3 @@ class Entry(Document):
data['upload_time'] = data['upload_time'].isoformat()
return {key: value for key, value in data.items() if value is not None}
# Entry.register_mapping(datamodel.CalcWithMetadata, Entry.from_calc_with_metadata)
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
The nomad@FAIRDI tests are based on the pytest library. Pytest uses *fixtures* to
modularize setup and teardown of mocks, infrastructure, and other context objects.
The following depicts the used hierarchy of fixtures:
.. image:: test_fixtures.png
Otherwise the test submodules follow the names of the nomad code modules.
"""
from nomad import config
# For convinience we test the api without path prefix.
# This should be setup with a fixture with in conftest.py, but it will be too late.
# After importing the api module, the config values have already been used and
# changing them afterwards does not change anything anymore.
services_config = config.services._asdict()
services_config.update(api_base_path='')
config.services = config.NomadServicesConfig(**services_config)
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Tuple
import pytest
import logging
from sqlalchemy.orm import Session
......@@ -10,9 +25,20 @@ from threading import Lock, Thread
import asyncore
import time
import pytest
import elasticsearch.exceptions
import shutil
import os.path
import datetime
import base64
from bravado.client import SwaggerClient
from nomad import config, infrastructure
from nomad import config, infrastructure, files, parsing, processing, coe_repo, api
from tests import test_parsing, test_normalizing
from tests.processing import test_data as test_processing
from tests.test_files import example_file, empty_file
from tests.bravado_flask import FlaskTestHttpClient
example_files = [empty_file, example_file]
@pytest.fixture(scope="session")
......@@ -23,12 +49,6 @@ def monkeysession(request):
mpatch.undo()
@pytest.fixture(scope='session', autouse=True)
def nomad_files(monkeysession):
monkeysession.setattr('nomad.config.fs', config.FSConfig(
tmp='.volumes/test_fs/tmp', objects='.volumes/test_fs/objects'))
@pytest.fixture(scope='session', autouse=True)
def nomad_logging():
config.logstash = config.logstash._replace(enabled=False)
......@@ -36,6 +56,36 @@ def nomad_logging():
infrastructure.setup_logging()
@pytest.fixture(scope='session', autouse=True)
def raw_files_infra(monkeysession):
monkeysession.setattr('nomad.config.fs', config.FSConfig(
tmp='.volumes/test_fs/tmp', objects='.volumes/test_fs/objects'))
@pytest.fixture(scope='function')
def raw_files(raw_files_infra):
""" Provides cleaned out files directory structure per function. Clears files after test. """
try:
yield
finally:
try:
shutil.rmtree(config.fs.objects)
except FileNotFoundError:
pass
try:
shutil.rmtree(config.fs.tmp)
except FileNotFoundError:
pass
@pytest.fixture(scope='function')
def client(monkeysession):
api.app.config['TESTING'] = True
client = api.app.test_client()
yield client
@pytest.fixture(scope='session')
def celery_includes():
return ['nomad.processing.base']
......@@ -49,57 +99,14 @@ def celery_config():
@pytest.fixture(scope='session')
def purged_app(celery_session_app):
"""
Purges all pending tasks of the celery app before test. This is necessary to
remove tasks from the queue that might be 'left over' from prior tests.
"""
celery_session_app.control.purge()
yield celery_session_app
@pytest.fixture()
def patched_celery(monkeypatch):
# There is a bug in celery, which prevents to use the celery_worker for multiple
# tests: https://github.com/celery/celery/issues/4088
# The bug has a fix from Aug 2018, but it is not yet released (TODO).
# We monkeypatch a similar solution here.
def add_reader(self, fds, callback, *args):
from kombu.utils.eventio import ERR, READ, poll
if self.poller is None:
self.poller = poll()
return self.add(fds, callback, READ | ERR, args)
monkeypatch.setattr('kombu.asynchronous.hub.Hub.add_reader', add_reader)
yield
@pytest.fixture(scope='session')