Commit ec38997d authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Temporalerly reverted back to client without multiprocessing.

parent 49b5fa3e
Pipeline #80673 passed with stages
in 31 minutes and 46 seconds
......@@ -143,17 +143,14 @@ your queries. To authenticate simply provide your NOMAD username and password to
from typing import Dict, Union, Any, List
import collections.abc
import requests
from urllib.parse import urlparse, urlencode
from urllib.parse import urlparse
from bravado import requests_client as bravado_requests_client
import time
from keycloak import KeycloakOpenID
from io import StringIO
import numpy as np
import multiprocessing
from nomad import config
from nomad import metainfo as mi
from nomad.units import ureg
from nomad.datamodel import EntryArchive
# TODO this import is necessary to load all metainfo defintions that the parsers are using
......@@ -202,11 +199,11 @@ class ApiStatistics(mi.MSection):
description='Number of entries loaded in the last api call')
last_response_data_size = mi.Quantity(
type=int, unit=ureg.bytes, default=0,
type=int, unit=mi.units.bytes, default=0,
description='Bytes loaded in the last api call')
loaded_data_size = mi.Quantity(
type=int, unit=ureg.bytes, default=0,
type=int, unit=mi.units.bytes, default=0,
description='Bytes loaded from this query')
loaded_nentries = mi.Quantity(
......@@ -225,18 +222,6 @@ class ApiStatistics(mi.MSection):
return out.getvalue()
class QProcess(multiprocessing.Process):
def __init__(self, queue, **kwargs):
multiprocessing.Process.__init__(self, **kwargs)
self.queue = queue
def run(self):
result = self._target(*self._args)
self.queue.put(result)
time.sleep(0.2)
exit(0)
class ArchiveQuery(collections.abc.Sequence):
'''
Object of this class represent a query on the NOMAD Archive. It is solely configured
......@@ -267,11 +252,11 @@ class ArchiveQuery(collections.abc.Sequence):
self,
query: dict = None, required: dict = None,
url: str = None, username: str = None, password: str = None,
per_page: int = 10, max: int = None, parallel: int = None,
per_page: int = 10, max: int = None,
raise_errors: bool = False,
authentication: Union[Dict[str, str], KeycloakAuthenticator] = None):
self._afters = None
self._after = None
self.page = 1
self.per_page = per_page
self.max = max
......@@ -307,13 +292,11 @@ class ArchiveQuery(collections.abc.Sequence):
self.username = username
self.url = config.client.url if url is None else url
self._authentication = authentication
self.parallel = parallel
self._total = -1
self._capped_total = -1
self._results: List[dict] = []
self._statistics = ApiStatistics()
self._upload_ids = self.get_upload_ids()
@property
def authentication(self):
......@@ -337,73 +320,18 @@ class ArchiveQuery(collections.abc.Sequence):
else:
return self._authentication
def get_upload_ids(self):
query = self.query['query']
url = '%s/repo/quantity/upload_id?%s' % (self.url, urlencode(query, doseq=True))
response = requests.get(url, headers=self.authentication)
data = response.json
if not isinstance(data, dict):
data = data()
quantity = data['quantity']
after = quantity['after']
values = quantity['values']
while True:
response = requests.get('%s&after=%s' % (url, after), headers=self.authentication)
data = response.json
if not isinstance(data, dict):
data = data()
quantity = data['quantity']
if quantity.get('after') is None:
break
values.update(quantity['values'])
if self.parallel is None:
ids_per_proc = 100
upload_ids = []
count = 0
ids = []
keys = list(values.keys())
for key in keys:
count += values[key]['total']
ids.append(key)
if count >= ids_per_proc or key == keys[-1]:
upload_ids.append(ids)
count = 0
ids = []
elif isinstance(self.parallel, int):
upload_ids = np.array_split(list(values.keys()), self.parallel)
self._afters = [None] * len(upload_ids)
return upload_ids
def _api_proc(self, proc_id):
def call_api(self):
'''
Calls the API to retrieve the next set of results. Is automatically called, if
not yet downloaded entries are accessed.
'''
url = '%s/%s/%s' % (self.url, 'archive', 'query')
upload_id = self._upload_ids[proc_id]
if upload_id is None:
return
after = self._afters[proc_id]
if after is None and self._results:
return
query = dict(self.query)
query['query']['upload_id'] = list(upload_id)
aggregation = self.query.setdefault('aggregation', {'per_page': self.per_page})
if self._after is not None:
aggregation['after'] = self._after
aggregation = query.setdefault('aggregation', {'per_page': self.per_page})
if after is not None:
aggregation['after'] = after
response = requests.post(url, headers=self.authentication, json=query)
response = requests.post(url, headers=self.authentication, json=self.query)
if response.status_code != 200:
if response.status_code == 400:
message = response.json().get('message')
......@@ -413,73 +341,41 @@ class ArchiveQuery(collections.abc.Sequence):
raise QueryError('The query is invalid for unknown reasons.')
try:
raise response.raise_for_status()
except Exception:
pass
raise response.raise_for_status()
data = response.json
if not isinstance(data, dict):
data = data()
if not data:
return
result = data.get('results', [])
try:
data_size = len(response.content)
nresult = len(result)
except Exception:
data_size = 0
nresult = 0
after = data['aggregation'].get('after', None)
total = data['aggregation'].get('total', 0)
return (proc_id, [result, data_size, nresult, after, total])
def call_api(self):
'''
Calls the API to retrieve the next set of results. Is automatically called, if
not yet downloaded entries are accessed.
'''
procs = []
queue = multiprocessing.Manager().Queue()
for idx in range(len(self._upload_ids)):
proc = QProcess(target=self._api_proc, args=(idx,), queue=queue)
proc.start()
procs.append(proc)
for p in procs:
p.join()
data_size = 0
nresults = 0
self._total = 0
while not queue.empty():
pid, output = queue.get()
self._results += [EntryArchive.m_from_dict(calc['archive']) for calc in output[0]]
data_size += output[1]
nresults += output[2]
self._afters[pid] = output[3]
self._total += output[4]
aggregation = data['aggregation']
self._after = aggregation.get('after')
self._total = aggregation['total']
if self.max is not None:
self._capped_total = min(self.max, self._total)
else:
self._capped_total = self._total
self._statistics.last_response_data_size = data_size
self._statistics.loaded_data_size += data_size
self._statistics.nentries = self._total
self._statistics.last_response_nentries = nresults
self._statistics.loaded_nentries = len(self._results)
self._statistics.napi_calls += len(procs)
results = data.get('results', [])
for result in results:
archive = EntryArchive.m_from_dict(result['archive'])
self._results.append(archive)
try:
data_size = len(response.content)
self._statistics.last_response_data_size = data_size
self._statistics.loaded_data_size += data_size
self._statistics.nentries = self._total
self._statistics.last_response_nentries = len(results)
self._statistics.loaded_nentries = len(self._results)
self._statistics.napi_calls += 1
except Exception:
# fails in test due to mocked requests library
pass
if self._afters.count(None) == len(self._upload_ids):
if self._after is None:
# there are no more search results, we need to avoid further calls
self._capped_total = len(self._results)
self._total = len(self._results)
......@@ -535,6 +431,7 @@ class ArchiveQuery(collections.abc.Sequence):
remove all results.
'''
for i, _ in enumerate(self._results[:index]):
print(i)
self._results[i] = None
......
from typing import List
import pytest
import os
from nomad.client import query_archive
from nomad.metainfo import MSection, SubSection
from nomad.datamodel import EntryArchive
from nomad.datamodel.metainfo.public import section_run
from nomad import config
from tests.app.test_app import BlueprintClient
from tests.processing import test_data as test_processing_data
from tests.test_files import example_file
# TODO with the existing published_wo_user_metadata fixture there is only one entry
......@@ -24,30 +20,6 @@ def api(client, monkeypatch):
return api
@pytest.mark.timeout(config.tests.default_timeout)
@pytest.fixture(scope='function')
def example_multiple_upload(test_user, proc_infra, mongo, elastic):
def _example(n_uploads):
upload_ids = []
uploads = []
for i in range(n_uploads):
upload_id = '%s_%d' % (os.path.basename(example_file).replace('.zip', ''), i)
processed = test_processing_data.run_processing(
(upload_id, example_file), test_user)
# processed.publish_upload()
# try:
# processed.block_until_complete(interval=.01)
# except Exception:
# pass
upload_ids.append(upload_id)
uploads.append(processed)
return upload_ids
return _example
def assert_results(
results: List[MSection],
sub_section_defs: List[SubSection] = None,
......@@ -67,20 +39,6 @@ def assert_results(
current = sub_sections[0]
@pytest.mark.parametrize('n_uploads, parallel', [
(2, 1), (8, 4), (4, 4), (8, 1), (2, None), (8, None)
])
def test_query_parallel(api, example_multiple_upload, test_user_auth, n_uploads, parallel):
upload_ids = example_multiple_upload(n_uploads)
upload_ids.sort()
results = query_archive(authentication=test_user_auth, parallel=parallel)
upload_ids_query = []
for result in results:
upload_ids_query.append(result.section_metadata.upload_id)
upload_ids_query.sort()
assert upload_ids == upload_ids_query
def test_query(api, published_wo_user_metadata):
assert_results(query_archive())
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment