Commit ad80c1c2 authored by Theodore Chang's avatar Theodore Chang
Browse files

Rewrite ArchiveQuery with asyncio

parent 916890ec
# Archive Query
The asynchronous archive query class `ArchiveQuery` provides a "downloader", which adopts a different mechanism compared to the
synchronous version. The configuration is handled by the construction of the target object.
## Argument List
The following arguments are acceptable.
- `owner` : `str` The scope of data to access. Default: `'visible'`
- `query` : `dict` The API query. There are no validations of any means carried out by the class, users shall make sure
the provided query is valid. Otherwise, server would return error message.
- `required` : `dict` The required quantities.
- `url` : `str` The database url. It can be the one of your local database. The official NOMAD database is used be
default if no valid one defined. Default: `http://nomad-lab.eu/prod/v1/api`
- `after` : `str` It can be understood that the data is stored in a sequential list. Each upload has a unique ID,
if `after` is not provided, the query always starts from the first upload. One can choose to query the uploads in the
middle of storage by assigning a proper value of `after`.
- `results_max` : `int` Determine how many entries to download. Note each upload may have multiple entries.
- `page_size` : `int` Page size.
- `username` : `str` Username for authentication.
- `password` : `str` Password for authentication.
- `retry` : `int` In the case of server errors, the fetch process is automatically retried every `sleep_time` seconds.
This argument limits the maximum times of retry.
- `sleep_time` : `float` The interval of fetch retry.
## Basic Usage
To define a query, one can, for example, write
```python
from nomad.client.archive import ArchiveQuery
query = ArchiveQuery(query={}, required={}, page_size=10, results_max=10000)
```
Although the above query object has an empty query.
The query object is constructed only. To access the desired data, users need to perform two operations manually.
### Fetch
The fetch process is carried out **synchronously**. Users can call
```python
number_of_entries = query.fetch()
```
to perform the fetch process to fetch up to `results_max` entries. An indicative number $$n$$ can be provided `fetch(n)`
. Given that each upload may contain various numbers of entries, the fetch process guarantees at least $$n$$ entries
will be fetched. The exact number is determined by `page_size`, indicating how many uploads in each page. However, this
would be limited to the `results_max`. The exact qualified number of entries will be returned. Meanwhile, the qualified
upload list would be populated with their IDs. To check all qualified upload IDs, one can call `upload_list()` method to
return the full list.
```python
print(query.upload_list())
```
If applicable, it is possible to fetch a large number of entries first and then perform a second fetch by using some
upload ID in the first fetch result as the `after` argument so that some middle segment can be downloaded.
### Download
After fetching the qualified uploads, the desired data can be downloaded **asynchronously**. One can call
```python
results = query.download()
```
to download up to `results_max` entries. The downloaded results are returned as a list. Alternatively, it is possible to
just download a portion of previously fetched entries at a single time. For example,
```python
# previously fetched for example 1000 entries
# but only download the first 100 (approx.) entries
results = query.download(100)
```
The same `download(n)` method can be called repeatedly. If there are no sufficient entries, new entries will be
automatically fetched. If there are no more entries, the returned result list is empty. For example,
```python
total_results = []
while True:
result = query.download(100)
if len(result) == 0:
break
total_results.extend(result)
```
There is no retry mechanism in the download process. If any uploads fail to be downloaded due to server error, it is
kept in the list otherwise removed.
## A Complete Rundown
Here we show a valid query and acquire data from server.
We first define the desired query and construct the object. We limit the maximum number of entries to be 10000 and 10
uploads per page.
```python
from nomad.client.archive import ArchiveQuery
required = {
'workflow': {
'calculation_result_ref': {
'energy': '*',
'system_ref': {
'chemical_composition_reduced': '*'
}
}
}
}
query = {
'results.method.simulation.program_name': 'VASP',
'results.material.elements': ['Ti']
}
query = ArchiveQuery(query=query, required=required, page_size=10, results_max=10000)
```
Let's fetch some entries.
```python
query.fetch(1000)
print(query.upload_list())
```
If we print the upload list, it would be
```text
[('-19NlAwxTCCXb6YT9Plifw', 526), ('-2ewONNGTZ68zuTQ6zrRZw', 4), ('-3LrFBvFQtCtmEp3Hy15EA', 12), ('-3ofEqLvSZiqo59vtf-TAQ', 4), ('-4W-jogwReafpdva4ELdrw', 32), ('-BLVfvlJRWawtHyuUWvP_g', 68), ('-Dm30DqRQX6pZUbJYwHUmw', 320), ('-Jfjp-lZSjqyaph2chqZfw', 6), ('-K2QS7s4QiqRg6nMPqzaTw', 82), ('-Li36ZXhQPucJvkd8yzYoA', 10)]
```
So upload `-19NlAwxTCCXb6YT9Plifw` has 526 qualified entries, upload `-2ewONNGTZ68zuTQ6zrRZw` has 4 qualified entries,
and so on. The summation of the above entries gives 1064 entries in total as shown in terminal message.
Now data can be downloaded.
```python
result = query.download(100)
print(f'Downloaded {len(result)} entries.') # Downloaded 526 entries.
```
Since the first upload has 526 entries, they will be downloaded in this call. The list would have the first upload
removed as it has been downloaded.
```text
[('-2ewONNGTZ68zuTQ6zrRZw', 4), ('-3LrFBvFQtCtmEp3Hy15EA', 12), ('-3ofEqLvSZiqo59vtf-TAQ', 4), ('-4W-jogwReafpdva4ELdrw', 32), ('-BLVfvlJRWawtHyuUWvP_g', 68), ('-Dm30DqRQX6pZUbJYwHUmw', 320), ('-Jfjp-lZSjqyaph2chqZfw', 6), ('-K2QS7s4QiqRg6nMPqzaTw', 82), ('-Li36ZXhQPucJvkd8yzYoA', 10)]
```
It is possible to download more data.
```python
result = query.download(300)
print(f'Downloaded {len(result)} entries.') # Downloaded 440 entries.
```
The first six uploads will be downloaded to meet the required (at least) 300 entries, which total to 440 entries. What's
left in the list would be
```text
[('-Jfjp-lZSjqyaph2chqZfw', 6), ('-K2QS7s4QiqRg6nMPqzaTw', 82), ('-Li36ZXhQPucJvkd8yzYoA', 10)]
```
We perform one more download call to illustrate that fetch process will be automatically performed.
```python
result = query.download(100)
print(f'Downloaded {len(result)} entries.') # Downloaded 102 entries.
```
In the above, we request additional 100 entries, however, the list contains only $$6+82+10=98$$ entries, fetch process
will be called to fetch new entries from server. You will see the following message in terminal.
```text
Fetching remote uploads...
787 entries are qualified and added to the download list.
Downloading required data...
Downloaded 102 entries.
[('-NiRWNGjS--JtFoEnYrCfg', 8), ('-OcPUKZtS6u3lXlkWBM4qg', 129), ('-PA35e2ZRsq4AdDfBU4M_g', 14), ('-TG77dGiSTyrDAFNqTKa6Q', 366), ('-VzlPYtnS4q1tSl3NOmlCw', 178), ('-XeqzVqwSMCJFwhvDqWs8A', 14), ('-Y7gwnleQI6Q61jp024fXQ', 16), ('-Zf1RO1MQXegYVTbFybtQQ', 8), ('-Zm4S9VGRdOX-kbF1J5lOA', 50)]
```
\ No newline at end of file
......@@ -49,118 +49,7 @@ The `ArchiveQuery` allows you to search for entries and access their parsed *arc
at the same time. Furthermore, all data is accessible through a convenient Python interface
based on the [NOMAD metainfo](archive.md) rather than plain JSON.
Here is an example:
```py
query = ArchiveQuery(
query={
'results.method.simulation.program_name': 'VASP',
'results.material.elements': ['Ti', 'O'],
'results.method.simulation.geometry_optimization': {
'convergence_tolerance_energy_difference:lt': 1e-22
}
},
required={
'workflow': {
'calculation_result_ref': {
'energy': '*',
'system_ref': {
'chemical_composition_reduced': '*'
}
}
}
},
parallel=10,
max=100)
```
This instantiates an `ArchiveQuery`. You can print some details about the query:
```py
print(query)
```
This gives you a general overview of the query. For example which search was performed on
the NOMAD API, how many entries were found or what has already been downloaded, etc.
```py
Query: {
"and": [
{
"results.method.simulation.program_name": "VASP",
"results.material.elements": [
"Ti",
"O"
],
"results.method.simulation.geometry_optimization": {
"convergence_tolerance_energy_difference:lt": 1e-22
}
},
{
"quantities": [
"run.system.chemical_composition_reduced",
"run.calculation.system_ref",
"run.calculation.energy",
"workflow",
"workflow.calculation_result_ref"
]
}
]
}
Total number of entries that fulfil the query: 252
Number queried entries: 252
Number of entries loaded in the last api call: 70
Bytes loaded in the last api call: 53388
Bytes loaded from this query: 53388
Number of downloaded entries: 70
Number of made api calls: 1
```
This `ArchiveQuery` does not download all archive data immediately. More and more data will be
downloaded as you iterate through the query:
```py
for result in query:
calc = result.workflow[0].calculation_result_ref
formula = calc.system_ref.chemical_composition_reduced
total_energy = calc.energy.total.value.to(units.eV)
print(f'{formula}: {total_energy}')
```
The resulting output can look like this:
```
O10K2Ti3La2: -136.76387842 electron_volt
Li2O10Ti3La2: -139.15455203 electron_volt
O8Ti4: -107.30373862 electron_volt
O8Ca2Ti4: -116.52240913000001 electron_volt
...
```
Let's discuss the used `ArchiveQuery` parameters:
- `query`, this is an arbitrary API query as discussed in the under [Queries in the API section](api.md#queries).
- `required`, this optional parameter allows you to specify which parts of an archive you require. This is also
described under [Access archives in API section](api.md#access-archives).
- `per_page`, this optional parameter allows you to specify how many results should be downloaded at once. For mass download of many results, we recommend ~100. If you are only interested in the first results a lower number may increase performance.
- `max`, with this optional parameter, we limit the maximum number of entries that are downloaded to avoid accidentally iterating through a result set of unknown and potentially large size.
- `owner` and `auth`, allow you to access private data or to specify you only want to
query your own data. See also [owner](api.md#owner) and [auth](api.md#authentication) in the API section. Her is an example with authentication:
```py
from nomad.client import ArchiveQuery, Auth
query = ArchiveQuery(
owner='user',
required={
'run': {
'system[-1]': '*'
}
},
authentication=Auth(user='yourusername', password='yourpassword'))
```
The archive query object can be treated as Python list-like. You use indices and ranges to select results. Each result is a Python object. The attributes of these objects are
determined by NOMAD's schema, [the metainfo](archive.md).
This energy value is a number with an attached unit (Joule), which can be converted to something else (e.g. eV). {{ metainfo_data() }}
The create query object keeps all results in memory. Keep this in mind, when you are accessing a large amount of query results.
Please check [this page](async_archive.md) for details.
## Use NOMAD parser locally
......
......@@ -16,7 +16,7 @@
# limitations under the License.
#
from .archive import ArchiveQuery, query_archive
from .archive import ArchiveQuery
from .api import Auth
from .upload import upload_file
from .processing import LocalEntryProcessing, parse, normalize, normalize_all
This diff is collapsed.
......@@ -22,7 +22,7 @@ python-keycloak==0.26.1
elasticsearch-dsl==6.4.0
pydantic==1.8.2
jmespath==0.10.0
httpx==0.22.0
# [parsing]
netCDF4==1.5.4
......
......@@ -15,32 +15,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import List, Tuple
from httpx import AsyncClient
import pytest
from nomad.client import query_archive, Auth
from nomad.metainfo import MSection, SubSection
from nomad.app.main import app
from nomad.client.archive import ArchiveQuery
from nomad.datamodel import EntryArchive, User
from nomad.datamodel.metainfo.simulation.run import Run
from tests.app.conftest import other_test_user_auth, test_user_auth # pylint: disable=unused-import
from nomad.metainfo import MSection, SubSection
from tests.conftest import test_users
from tests.processing import test_data as test_processing
# TODO most nomad.client functionality is only tested indirectly via its use in nomad.cli
def test_requests_auth(api_v1):
rv = api_v1.get('users/me', auth=Auth(from_api=True))
assert rv.status_code == 200
# TODO: more tests
# TODO with the existing published_wo_user_metadata fixture there is only one entry
# that does not allow to properly test pagination and scrolling
def assert_results(
results: List[MSection],
sub_section_defs: List[SubSection] = None,
total=1):
def assert_results(results: List[MSection], sub_section_defs: List[SubSection] = None, total=1):
assert len(results) == total
for result in results:
assert result.m_def == EntryArchive.m_def
......@@ -56,31 +48,6 @@ def assert_results(
current = sub_sections[0]
def test_query(api_v1, published_wo_user_metadata):
assert_results(query_archive())
def test_query_query(api_v1, published_wo_user_metadata):
assert_results(query_archive(query=dict(upload_id=[published_wo_user_metadata.upload_id])))
@pytest.mark.parametrize('q_schema,sub_sections', [
({'run': '*'}, [EntryArchive.run]),
({'run': {'system': '*'}}, [EntryArchive.run, Run.system]),
({'run[0]': {'system': '*'}}, [EntryArchive.run, Run.system])
])
def test_query_required(api_v1, published_wo_user_metadata, q_schema, sub_sections):
assert_results(query_archive(required=q_schema), sub_section_defs=sub_sections)
def test_query_authentication(api_v1, published, other_test_user, test_user):
# The published test uploads uploader in entry and upload's user id do not match
# due to testing the uploader change via publish metadata.
assert_results(query_archive(authentication=Auth(other_test_user.username, 'password', from_api=True)), total=0)
assert_results(query_archive(authentication=Auth(test_user.username, 'password', from_api=True)), total=1)
@pytest.fixture(scope='function')
def many_uploads(non_empty_uploaded: Tuple[str, str], test_user: User, proc_infra):
_, upload_file = non_empty_uploaded
......@@ -93,36 +60,101 @@ def many_uploads(non_empty_uploaded: Tuple[str, str], test_user: User, proc_infr
pass
@pytest.fixture(scope='function', autouse=True)
def patch_multiprocessing_and_api(monkeypatch):
class TestPool:
''' A fake multiprocessing pool, because multiprocessing does not work well in pytest. '''
def __init__(self, n):
pass
@pytest.fixture(scope='session')
def async_api_v1(monkeysession):
'''
This fixture provides an HTTP client with AsyncClient that accesses
the fast api. The patch will redirect all requests to the fast api under test.
'''
test_client = AsyncClient(app=app)
def map(self, f, args):
return [f(arg) for arg in args]
monkeysession.setattr(
'nomad.client.archive.ArchiveQuery._fetch_url',
'http://testserver/api/v1/entries/query')
monkeysession.setattr(
'nomad.client.archive.ArchiveQuery._download_url',
'http://testserver/api/v1/entries/archive/query')
monkeysession.setattr(
'nomad.client.archive.ArchiveQuery._auth_url',
'http://testserver/api/v1/auth/token')
def __enter__(self, *args, **kwargs):
return self
monkeysession.setattr('httpx.AsyncClient.get', getattr(test_client, 'get'))
monkeysession.setattr('httpx.AsyncClient.put', getattr(test_client, 'put'))
monkeysession.setattr('httpx.AsyncClient.post', getattr(test_client, 'post'))
monkeysession.setattr('httpx.AsyncClient.delete', getattr(test_client, 'delete'))
def mocked_auth(self) -> dict:
for user in test_users.values():
if user['username'] == self._username or user['email'] == self._username:
return dict(Authorization=f'Bearer {user["user_id"]}')
return {}
monkeysession.setattr('nomad.client.archive.ArchiveQuery._auth', mocked_auth)
return test_client
def test_async_query():
required = {
'workflow': {
'calculation_result_ref': {
'energy': '*',
'system_ref': {
'chemical_composition_reduced': '*'
}
}
}
}
query = {
'results.method.simulation.program_name': 'VASP',
'results.material.elements': ['Ti', 'O']
}
async_query = ArchiveQuery(query=query, required=required, page_size=100, results_max=10000)
num_entry = async_query.fetch(1000)
num_entry -= len(async_query.download(100))
num_entry -= len(async_query.download(100))
assert num_entry == sum([count for _, count in async_query.upload_list()])
def test_async_query_basic(async_api_v1, published_wo_user_metadata):
async_query = ArchiveQuery()
assert_results(async_query.download())
async_query = ArchiveQuery(query=dict(upload_id=[published_wo_user_metadata.upload_id]))
assert_results(async_query.download())
@pytest.mark.parametrize(
'q_required,sub_sections',
[({'run': '*'}, [EntryArchive.run]), ({'run': {'system': '*'}}, [EntryArchive.run, Run.system]),
({'run[0]': {'system': '*'}}, [EntryArchive.run, Run.system])])
def test_async_query_required(async_api_v1, published_wo_user_metadata, q_required, sub_sections):
async_query = ArchiveQuery(required=q_required)
assert_results(async_query.download(), sub_section_defs=sub_sections)
def test_async_query_auth(async_api_v1, published, other_test_user, test_user):
async_query = ArchiveQuery(username=other_test_user.username, password='password')
assert_results(async_query.download(), total=0)
async_query = ArchiveQuery(username=test_user.username, password='password')
assert_results(async_query.download(), total=1)
def __exit__(self, *args, **kwargs):
pass
monkeypatch.setattr('multiprocessing.Pool', TestPool)
def test_async_query_parallel(async_api_v1, many_uploads, monkeypatch):
async_query = ArchiveQuery(required=dict(run='*'))
assert_results(async_query.download(), total=4)
def test_parallel_query(api_v1, many_uploads, monkeypatch):
result = query_archive(required=dict(run='*'), parallel=2)
assert_results(result, total=4)
assert result._statistics.nentries == 4
assert result._statistics.loaded_nentries == 4
assert result._statistics.last_response_nentries == 4
assert result._statistics.napi_calls == 1
async_query = ArchiveQuery(required=dict(run='*'), page_size=1)
result = query_archive(required=dict(run='*'), parallel=2, per_page=1)
assert_results(result, total=4)
assert result._statistics.nentries == 4
assert result._statistics.loaded_nentries == 4
assert result._statistics.last_response_nentries == 2
assert result._statistics.napi_calls == 2
assert_results(async_query.download(), total=4)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment