test_optimade.py 6.88 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List
16
import json
17
18
import numpy as np
import pytest
19
20

from nomad.processing import Upload
21
from nomad import search, processing as proc
22
23
24
from nomad.parsing import LocalBackend
from nomad.datamodel import CalcWithMetadata

25
from nomad.app.optimade import parse_filter, url
26

27
from tests.app.test_app import BlueprintClient
28
29
from tests.test_normalizing import run_normalize
from tests.conftest import clear_elastic
30
31


32
33
34
@pytest.fixture(scope='session')
def api(nomad_app):
    return BlueprintClient(nomad_app, '/optimade')
35
36


37
38
39
40
41
42
def test_get_entry(published: Upload):
    calc_id = list(published.calcs)[0].calc_id

    with published.upload_files.archive_file(calc_id) as f:
        data = json.load(f)

43
    assert 'OptimadeEntry' in data
44
    search_result = search.SearchRequest().search_parameter('calc_id', calc_id).execute_paginated()['results'][0]
45
    assert 'optimade' in search_result
46
47


48
49
50
51
def create_test_structure(
        meta_info, id: int, h: int, o: int, extra: List[str], periodicity: int,
        without_optimade: bool = False):

52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
    atom_labels = ['H' for i in range(0, h)] + ['O' for i in range(0, o)] + extra
    test_vector = np.array([0, 0, 0])

    backend = LocalBackend(meta_info, False, True)  # type: ignore
    backend.openSection('section_run')
    backend.addValue('program_name', 'test_code')
    backend.openSection('section_system')

    backend.addArrayValues('atom_labels', np.array(atom_labels))
    backend.addArrayValues(
        'atom_positions', np.array([test_vector for i in range(0, len(atom_labels))]))
    backend.addArrayValues(
        'lattice_vectors', np.array([test_vector, test_vector, test_vector]))
    backend.addArrayValues(
        'configuration_periodic_dimensions',
        np.array([True for _ in range(0, periodicity)] + [False for _ in range(periodicity, 3)]))

    backend.closeSection('section_system', 0)
    backend.closeSection('section_run', 0)

    backend = run_normalize(backend)
73
    calc_id = 'test_calc_id_%d' % id
74
    calc = CalcWithMetadata(
75
        upload_id='test_uload_id', calc_id=calc_id, mainfile='test_mainfile',
76
        published=True, with_embargo=False)
77
    calc.apply_domain_metadata(backend)
78
79
80
81

    if without_optimade:
        calc.optimade = None

82
    proc.Calc.from_calc_with_metadata(calc).save()
83
84
    search.Entry.from_calc_with_metadata(calc).save()

85
86
    assert proc.Calc.objects(calc_id__in=[calc_id]).count() == 1

87

88
89
90
91
92
93
94
95
96
97
98
99
def test_no_optimade(meta_info, elastic, api):
    create_test_structure(meta_info, 1, 2, 1, [], 0)
    create_test_structure(meta_info, 2, 2, 1, [], 0, without_optimade=True)
    search.refresh()

    rv = api.get('/calculations')
    assert rv.status_code == 200
    data = json.loads(rv.data)

    assert data['meta']['data_returned'] == 1


100
@pytest.fixture(scope='module')
101
def example_structures(meta_info, elastic_infra, mongo_infra):
102
    clear_elastic(elastic_infra)
103
104
    mongo_infra.drop_database('test_db')

105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
    create_test_structure(meta_info, 1, 2, 1, [], 0)
    create_test_structure(meta_info, 2, 2, 1, ['C'], 0)
    create_test_structure(meta_info, 3, 2, 1, [], 1)
    create_test_structure(meta_info, 4, 1, 1, [], 0)
    search.refresh()

    yield
    clear_elastic(elastic_infra)


@pytest.mark.parametrize('query, results', [
    ('nelements > 1', 4),
    ('nelements >= 2', 4),
    ('nelements > 2', 1),
    ('nelements < 4', 4),
    ('nelements < 3', 3),
    ('nelements <= 3', 4),
    ('nelements != 2', 1),
    ('1 < nelements', 4),
    ('elements HAS "H"', 4),
    ('elements HAS ALL "H", "O"', 4),
    ('elements HAS ALL "H", "C"', 1),
    ('elements HAS ANY "H", "C"', 4),
    ('elements HAS ANY "C"', 1),
    ('elements HAS ONLY "C"', 0),
    ('elements HAS ONLY "H", "O"', 3),
    ('elements:elements_ratios HAS "H":>0.66', 2),
    ('elements:elements_ratios HAS ALL "O":>0.33', 3),
    ('elements:elements_ratios HAS ALL "O":>0.33,"O":<0.34', 2),
    ('elements IS KNOWN', 4),
    ('elements IS UNKNOWN', 0),
    ('chemical_formula_reduced = "H2O"', 2),
    ('chemical_formula_reduced CONTAINS "H2"', 3),
    ('chemical_formula_reduced CONTAINS "H"', 4),
    ('chemical_formula_reduced CONTAINS "C"', 1),
    ('chemical_formula_reduced STARTS "H2"', 3),
    ('chemical_formula_reduced STARTS WITH "H2"', 3),
    ('chemical_formula_reduced ENDS WITH "C"', 1),
    ('chemical_formula_reduced ENDS "C"', 1),
    ('LENGTH elements = 2', 3),
    ('LENGTH elements = 3', 1),
    ('LENGTH dimension_types = 0', 3),
    ('LENGTH dimension_types = 1', 1),
    ('nelements = 2 AND LENGTH dimension_types = 1', 1),
    ('nelements = 3 AND LENGTH dimension_types = 1', 0),
    ('nelements = 3 OR LENGTH dimension_types = 1', 2),
    ('nelements > 1 OR LENGTH dimension_types = 1 AND nelements = 2', 4),
    ('(nelements > 1 OR LENGTH dimension_types = 1) AND nelements = 2', 3),
    ('NOT LENGTH dimension_types = 1', 3)
])
def test_optimade_parser(example_structures, query, results):
    query = parse_filter(query)
    result = search.SearchRequest(query=query).execute_paginated()
    assert result['pagination']['total'] == results
159
160
161
162
163
164
165
166
167
168
169


def test_url():
    assert url('endpoint', param='value').endswith('/optimade/endpoint?param=value')


def test_list_endpoint(api, example_structures):
    rv = api.get('/calculations')
    assert rv.status_code == 200
    data = json.loads(rv.data)
    # TODO replace with real assertions
170
    # print(json.dumps(data, indent=2))
171
172
173
174
175
176
177


def test_list_endpoint_request_fields(api, example_structures):
    rv = api.get('/calculations?request_fields=nelements,elements')
    assert rv.status_code == 200
    data = json.loads(rv.data)
    # TODO replace with real assertions
178
    # print(json.dumps(data, indent=2))
179
180
181
182
183
184
185


def test_single_endpoint(api, example_structures):
    rv = api.get('/calculations/%s' % 'test_calc_id_1')
    assert rv.status_code == 200
    data = json.loads(rv.data)
    # TODO replace with real assertions
186
    # print(json.dumps(data, indent=2))
187
188
189
190
191
192
193


def test_base_info_endpoint(api):
    rv = api.get('/info')
    assert rv.status_code == 200
    data = json.loads(rv.data)
    # TODO replace with real assertions
194
    # print(json.dumps(data, indent=2))
195
196
197
198
199
200
201


def test_calculation_info_endpoint(api):
    rv = api.get('/info/calculation')
    assert rv.status_code == 200
    data = json.loads(rv.data)
    # TODO replace with real assertions
202
    # print(json.dumps(data, indent=2))
203
204
205
206
207


# TODO test single with request_fields
# TODO test errors
# TODO test response format (deny everything but json)