Commit bae1e3d4 authored by Alvin Noe Ladines's avatar Alvin Noe Ladines
Browse files

Defined model for archive query parameters

parent 54f086da
Pipeline #68124 passed with stages
in 14 minutes and 43 seconds
......@@ -221,162 +221,27 @@ _archive_query_parser.add_argument(
'qschema', type=str, help='Serialized archive dict with null values as placeholder for data.')
_archive_query_model_fields = {
'results': fields.List(fields.Raw, description=(
'A list of search results. Each result is a dict with quantities names as key and '
'values as values')),
'python': fields.String(description=(
'python': fields.String(allow_null=True, skip_none=True, description=(
'A string of python code snippet which can be executed to reproduce the api result.')),
'curl': fields.String(description=(
'curl': fields.String(allow_null=True, skip_none=True, description=(
'A string of curl command which can be executed to reproduce the api result.')),
}
_archive_query_model = api.inherit('ArchiveCalculations', search_model, _archive_query_model_fields)
def execute_search():
try:
args = {
key: value for key, value in _archive_query_parser.parse_args().items()
if value is not None}
scroll = args.get('scroll', False)
scroll_id = args.get('scroll_id', None)
page = args.get('page', 1)
per_page = args.get('per_page', 10 if not scroll else 1000)
order = args.get('order', -1)
order_by = 'upload_id'
db = args.get('db')
qschema = args.get('qschema', None)
except Exception:
abort(400, message='bad parameter types')
try:
assert page >= 1
assert per_page > 0
except AssertionError:
abort(400, message='invalid pagination')
if order not in [-1, 1]:
abort(400, message='invalid pagination')
search_request = search.SearchRequest()
apply_search_parameters(search_request, args)
try:
if scroll:
results = search_request.execute_scrolled(scroll_id=scroll_id, size=per_page)
else:
results = search_request.execute_paginated(
per_page=per_page, page=page, order=order, order_by=order_by)
except search.ScrollIdNotFound:
abort(400, 'The given scroll_id does not exist.')
except KeyError as e:
import traceback
traceback.print_exc()
abort(400, str(e))
if qschema is None:
qschema = request.get_json()
qschema = qschema if qschema is None else qschema['results']
else:
qschema = json.loads(qschema)
data = []
calcs = results['results']
try:
upload_files = None
for entry in calcs:
upload_id = entry['upload_id']
calc_id = entry['calc_id']
if upload_files is None or upload_files.upload_id != upload_id:
if upload_files is not None:
upload_files.close_zipfile_cache()
upload_files = UploadFiles.get(
upload_id, create_authorization_predicate(upload_id))
if upload_files is None:
raise KeyError
upload_files.open_zipfile_cache()
if db == 'msg':
fos = upload_files.archive_file_msg(calc_id)
msgdbs = [ArchiveFileDB(fo) for fo in fos if fo is not None]
if db == 'zip':
fo = upload_files.archive_file(calc_id, 'rb')
data.append({calc_id: json.loads(fo.read())})
elif db == 'msg':
for msgdb in msgdbs:
data.append(msgdb.query({calc_id: qschema}))
if upload_files is not None:
upload_files.close_zipfile_cache()
except Restricted:
abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
except KeyError:
abort(404, message='Calculation %s/%s does not exist.' % (upload_id, calc_id))
results['results'] = data
# build python code and curl snippet
results['python'] = query_api_python('archive', 'query', query_string=request.args)
results['curl'] = query_api_curl('archive', 'query', query_string=request.args)
return results
_archive_query_model = api.clone('ArchiveCalculations', search_model, _archive_query_model_fields)
# scroll model should be capitalized to prevent ambiguity with scroll flag
_archive_query_model['Scroll'] = _archive_query_model.pop('scroll')
_archive_query_model['Pagination'] = _archive_query_model.pop('pagination')
@ns.route('/query')
class ArchiveQueryResource(Resource):
@api.doc('get_archive_query')
@api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
@api.response(401, 'Not authorized to access the data.')
@api.response(404, 'The upload or calculation does not exist')
@api.response(200, 'Archive data send', headers={'Content-Type': 'application/octet-stream'})
@api.expect(_archive_query_parser, validate=True)
@api.marshal_with(_archive_query_model, skip_none=True, code=200, description='File sent')
@authenticate()
def get(self):
"""
Get msgpack database resulting from query.
See ``/repo`` endpoint for documentation on the search
parameters.
The actual data are in results and a supplementary python code (curl) to
execute search is in python (curl).
"""
results = execute_search()
# create msgpack database from results and return database file
data = results['results']
filename = 'nomad_archive.msg'
msgdb = ArchiveFileDB(BytesIO(), max_lfragment=2)
for calc in data:
msgdb.add_data(calc)
msgdb.create_db()
data_stream = msgdb.fileobj
data_stream.seek(0)
return send_file(
data_stream,
mimetype='application/octet_stream',
as_attachment=True,
cache_timeout=0,
attachment_filename=filename)
@api.doc('post_archive_query')
@api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
@api.response(401, 'Not authorized to access the data.')
@api.response(404, 'The upload or calculation does not exist')
@api.response(200, 'Archive data send')
@api.expect(_archive_query_parser, validate=True)
@api.expect(_archive_query_model)
@api.marshal_with(_archive_query_model, skip_none=True, code=200, description='Search results sent')
@authenticate(signature_token=True)
@authenticate()
def post(self):
"""
Post an query schema and return it filled with archive data in json format from
......@@ -388,7 +253,112 @@ class ArchiveQueryResource(Resource):
The actual data are in results and a supplementary python code (curl) to
execute search is in python (curl).
"""
results = execute_search()
try:
data_in = request.get_json()
scroll = data_in.get('scroll', None)
scroll_id = data_in.get('scroll_id', None)
Scroll = data_in.get('Scroll', None)
if Scroll:
scroll = Scroll.get('scroll', scroll)
scroll_id = Scroll.get('scroll_id', scroll_id)
pagination = data_in.get('Pagination', None)
page = data_in.get('page', 1)
per_page = data_in.get('per_page', 10 if not scroll else 1000)
order = data_in.get('order', -1)
order_by = data_in.get('order_by', 'upload_id')
if pagination:
page = pagination.get('page', page)
per_page = pagination.get('per_page', per_page)
order = pagination.get('order', order)
order_by = pagination.get('order_by', order_by)
db = data_in.get('db')
qschema = data_in.get('results', None)
if qschema is not None:
qschema = qschema[-1]
except Exception:
abort(400, message='bad parameter types')
try:
assert page >= 1
assert per_page > 0
except AssertionError:
abort(400, message='invalid pagination')
if order not in [-1, 1]:
abort(400, message='invalid pagination')
search_request = search.SearchRequest()
apply_search_parameters(search_request, data_in)
try:
if scroll:
results = search_request.execute_scrolled(scroll_id=scroll_id, size=per_page)
results['scroll']['scroll'] = True
else:
results = search_request.execute_paginated(
per_page=per_page, page=page, order=order, order_by=order_by)
except search.ScrollIdNotFound:
abort(400, 'The given scroll_id does not exist.')
except KeyError as e:
import traceback
traceback.print_exc()
abort(400, str(e))
data = []
calcs = results['results']
try:
upload_files = None
for entry in calcs:
upload_id = entry['upload_id']
calc_id = entry['calc_id']
if upload_files is None or upload_files.upload_id != upload_id:
if upload_files is not None:
upload_files.close_zipfile_cache()
upload_files = UploadFiles.get(
upload_id, create_authorization_predicate(upload_id))
if upload_files is None:
raise KeyError
upload_files.open_zipfile_cache()
if db == 'msg':
fos = upload_files.archive_file_msg(calc_id)
msgdbs = [ArchiveFileDB(fo) for fo in fos if fo is not None]
if db == 'zip':
fo = upload_files.archive_file(calc_id, 'rb')
data.append({calc_id: json.loads(fo.read())})
elif db == 'msg':
for msgdb in msgdbs:
data.append(msgdb.query({calc_id: qschema}))
if upload_files is not None:
upload_files.close_zipfile_cache()
except Restricted:
abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
except KeyError:
abort(404, message='Calculation %s/%s does not exist.' % (upload_id, calc_id))
# assign archive data to results
results['results'] = data
# build python code and curl snippet
if 'python' in data_in:
results['python'] = query_api_python('archive', 'query', query_string=request.args)
if 'curl' in data_in:
results['curl'] = query_api_curl('archive', 'query', query_string=request.args)
# for compatibility with archive model
# TODO should be changed in search
results['Scroll'] = results.pop('scroll', None)
results['Pagination'] = results.pop('pagination', None)
return results, 200
......
......@@ -52,20 +52,31 @@ metadata_model = api.model('MetaData', {
pagination_model = api.model('Pagination', {
'total': fields.Integer(description='Number of total elements.'),
'page': fields.Integer(description='Number of the current page, starting with 0.'),
'per_page': fields.Integer(description='Number of elements per page.')
'per_page': fields.Integer(description='Number of elements per page.'),
'order_by': fields.String(description='Sorting criterion.'),
'order': fields.Integer(description='Sorting order -1 for descending, 1 for asceding.')
})
""" Model used in responses with pagination. """
search_model = api.model('Search', {
'pagination': fields.Nested(pagination_model, skip_none=True),
'scroll': fields.Nested(allow_null=True, skip_none=True, model=api.model('Scroll', {
'total': fields.Integer(description='The total amount of hits for the search.'),
'scroll_id': fields.String(allow_null=True, description='The scroll_id that can be used to retrieve the next page.'),
'size': fields.Integer(help='The size of the returned scroll page.')})),
scroll_model = api.model('Scroll', {
'scroll': fields.Boolean(default=False, description='Flag if scrolling is enables.'),
'total': fields.Integer(default=0, description='The total amount of hits for the search.'),
'scroll_id': fields.String(default=None, allow_null=True, description='The scroll_id that can be used to retrieve the next page.'),
'size': fields.Integer(default=0, help='The size of the returned scroll page.')})
search_model_fields = {
'pagination': fields.Nested(pagination_model, allow_null=True, skip_none=True),
'scroll': fields.Nested(scroll_model, allow_null=True, skip_none=True),
'results': fields.List(fields.Raw, description=(
'A list of search results. Each result is a dict with quantitie names as key and '
'values as values')),
})
'owner': fields.String(description='The group the calculations belong to.', allow_null=True, skip_none=True),
'from_time': fields.Raw(description='The minimum entry time.', allow_null=True, skip_none=True),
'until_time': fields.Raw(description='The maximum entry time.', allow_null=True, skip_none=True),
}
for quantity in search.quantities.values():
search_model_fields[quantity.name] = fields.Raw(description=quantity.description, allow_null=True, skip_none=True)
search_model = api.model('Search', search_model_fields)
def add_pagination_parameters(request_parser):
......
......@@ -38,9 +38,12 @@ class ArchiveMetainfo:
Converts archive data in json format to the new nomad metainfo model
Arguments:
archive_data: the archive data in json format or msgdb filename
archive_schema: dict with the desired quantities as keys and None as placeholder
for the values which are queried from the data
"""
def __init__(self, archive_data=None):
def __init__(self, archive_data, archive_schema=None):
self._archive_data = archive_data
self._archive_schema = archive_schema
self.metainfo = None
self._metacls = None
self._calcs = {}
......@@ -70,7 +73,48 @@ class ArchiveMetainfo:
def _init_calcs(self):
for i in range(len(self.calc_ids)):
calc_id = self.calc_ids[i]
self._calcs[calc_id] = self.base_metainfo
if self._archive_schema is None:
self._calcs[calc_id] = self.base_metainfo
else:
data = self._archive_db.query({calc_id: self._archive_schema})[calc_id]
self._calcs[calc_id] = self.base_metacls.m_from_dict(data)
self._calcs[calc_id].archive_db = self._archive_db
def __getitem__(self, key):
if isinstance(key, str):
calc = self._calcs.get(key, None)
if calc:
calc.calc_id = key
return calc
elif isinstance(key, int):
calc_id = self._calc_ids[key]
calc = self._calcs[calc_id]
calc.calc_id = calc_id
return calc
else:
calc_ids = self._calc_ids[key]
calcs = []
for calc_id in calc_ids:
calc = self._calcs[calc_id]
calc.calc_id = calc_id
calcs.append(calc)
return calcs
def __len__(self):
return len(self._calcs)
def __iter__(self):
self._n = -1
return self
def __next__(self):
self._n += 1
if self._n >= len(self):
raise StopIteration
calc = list(self._calcs.values())[self._n]
calc.calc_id = list(self._calcs.keys())[self._n]
calc.archive_db = self._archive_db
return calc
@staticmethod
def get_path_from_section(content):
......@@ -134,6 +178,13 @@ class ArchiveMetainfo:
data = None
return data
@property
def base_data(self):
if self._base_data is None:
calc_id = self.calc_ids[0]
self._base_data = self._archive_db.query({calc_id: self._archive_schema})[calc_id]
return self._base_data
@property
def base_metacls(self):
"""
......@@ -141,7 +192,7 @@ class ArchiveMetainfo:
"""
if self._base_metacls is None:
name = self._prefix
self._base_metacls = self._build_meta_cls(self._base_data, name)
self._base_metacls = self._build_meta_cls(self.base_data, name)
return self._base_metacls
@property
......@@ -150,11 +201,9 @@ class ArchiveMetainfo:
The base metainfo to enable auto completion for each calc
"""
if self._base_metainfo is None:
calc_id = self.calc_ids[0]
self._base_data = self._archive_db.query({calc_id: None})[calc_id]
metacls = self.base_metacls
self._base_data = self._nullify_data(self._base_data)
self._base_metainfo = metacls.m_from_dict(self._base_data)
base_data = self._nullify_data(self.base_data)
self._base_metainfo = metacls.m_from_dict(base_data)
return self._base_metainfo
def get_dtype(self, data):
......
......@@ -31,48 +31,86 @@ from nomad.archive_library.metainfo import ArchiveMetainfo
class ArchiveQuery:
def __init__(self, q_params, q_schema=None):
self._q_params = q_params
self._q_schema = q_schema
def __init__(self, *args, **kwargs):
self._archive_path = 'archive'
self._query_path = 'query'
self._q_params['scroll'] = q_params.get('scroll', True)
self._q_params['per_page'] = q_params.get('per_page', 10)
self._archive_data = []
self._scroll_id = None
self._data = []
self._page = None
self._query_params = {}
if args:
self._query_params = args[0]
if kwargs:
self._query_params.update(kwargs)
self._archive_schema = self._query_params.pop('archive_data', None)
self._authentication = self._query_params.pop('authentication', None)
self._max_n_pages = self._query_params.pop('max_n_pages', 3)
def _get_value(self, name, in_dict):
if not isinstance(in_dict, dict):
return
for key, val in in_dict.items():
if key == name:
res = val
else:
res = self._get_value(name, val)
return res
def _set_value(self, name, value, in_dict):
if not isinstance(in_dict, dict):
return
for key, val in in_dict.items():
if key == name:
in_dict[name] = value
return
else:
self._set_value(name, value, val)
in_dict[name] = value
def _api_query(self):
url = query_api_url(self._archive_path, self._query_path)
data = self._query_params
if not isinstance(self._archive_schema, list):
data['results'] = [self._archive_schema]
if self._page is not None:
# increment the page number
self._set_value('page', self._page + 1, data)
if self._scroll_id is not None:
self._q_params['scroll_id'] = self._scroll_id
url = query_api_url(
self._archive_path, self._query_path, query_string=self._q_params)
data = {'results': self._q_schema}
self._set_value('scroll_id', self._scroll_id, data)
response = requests.post(
url, content_type='application/json', data=json.dumps(data))
url, headers=self._authentication, content_type='application/json', data=json.dumps(data))
if response.status_code != 200:
raise Exception('Query returned %s' % response.status_code)
data = response.json
if not isinstance(data, dict):
data = data()
results = data.get('results', None)
scroll = data.get('scroll', None)
if scroll:
self._scroll_id = data.get('scroll_id', None)
self._scroll_id = scroll.get('scroll_id', None)
pagination = data.get('pagination', None)
if pagination:
self._page = pagination.get('page', None)
return results
def _get_data(self):
def _get_archive_data(self):
results = self._api_query()
n_page = 0
while results:
self._data += results
self._archive_data += results
results = self._api_query()
if self._scroll_id is None:
n_page += 1
if n_page >= self._max_n_pages:
break
def query(self):
self._get_data()
if self._data:
metainfo = ArchiveMetainfo(archive_data=self._data)
self._get_archive_data()
if self._archive_data:
metainfo = ArchiveMetainfo(archive_data=self._archive_data, archive_schema=self._archive_schema)
return metainfo
......@@ -668,38 +668,17 @@ class TestArchive(UploadFilesBasedTests):
assert_zip_file(rv, files=1)
@pytest.mark.parametrize('db', ['zip', 'msg'])
def test_post_archive_query(self, api, published_wo_user_metadata, other_test_user_auth, db):
def test_post_archive_query(self, api, published_wo_user_metadata, db):
schema = {"section_run": {"section_single_configuration_calculation": {"energy_total": None}}}
order = -1
data = {'results': schema}
uri = '/archive/query?owner=all&order=%d&db=%s' % (order, db)
rv = api.post(uri, headers=other_test_user_auth, content_type='application/json', data=json.dumps(data))
data = {'results': [schema], 'per_page': 5}
uri = '/archive/query'
rv = api.post(uri, content_type='application/json', data=json.dumps(data))
assert rv.status_code == 200
data = rv.get_json()
assert data
results = data.get('results', None)
assert results is not None
@pytest.mark.parametrize('db', ['zip', 'msg'])
def test_get_archive_query(self, api, published_wo_user_metadata, other_test_user_auth, db):
schema = {"section_run": {"section_single_configuration_calculation": {"energy_total": None}}}
q_params = {'owner': 'all', 'order': -1, 'db': db, 'qschema': json.dumps(schema)}
uri = '/archive/query?%s' % urlencode(q_params)
rv = api.get(uri, headers=other_test_user_auth)
assert rv.status_code == 200
data = rv.data
assert data
def test_get_code_from_query(self, api, processeds, test_user_auth):
query_params = {'atoms': 'Si', 'res_type': 'json', 'order': 1, 'per_page': 5}
url = '/archive/query?%s' % urlencode(query_params)
rv = api.post(url, headers=test_user_auth)
assert rv.status_code == 200
data = json.loads(rv.data)
assert isinstance(data, dict)
assert data['results'] is not None
assert data['python'] is not None
class TestRepo():
@pytest.fixture(scope='class')
......
......@@ -119,9 +119,17 @@ class TestArchiveQuery:
@pytest.mark.parametrize('db', ['zip', 'msg'])
def test_query_from_json(self, api, published_wo_user_metadata, other_test_user_auth, db, monkeypatch):
monkeypatch.setattr('nomad.archive_library.query.requests', api)
q_params = {'order': 1, 'per_page': 5, 'scroll': False, 'db': db}
q_params = {'pagination': {'order': 1, 'per_page': 5}, 'db': db}
q_schema = {'section_entry_info': None}
q = ArchiveQuery(q_params, q_schema)
q = ArchiveQuery(q_params, archive_data=q_schema, authentication=other_test_user_auth)
metainfo = q.query()
for c in metainfo.calcs:
assert c.section_entry_info({'calc_id': None}) is not None
for calc in metainfo:
assert calc.section_entry_info.calc_id is not None