Commit 74981f38 authored by tlc@void's avatar tlc@void
Browse files

Add async interface

parent 4f875e54
Pipeline #126349 passed with stages
in 35 minutes and 31 seconds
...@@ -18,12 +18,15 @@ Although the above query object has an empty query. ...@@ -18,12 +18,15 @@ Although the above query object has an empty query.
The query object is constructed only. To access the desired data, users need to perform two operations manually. The query object is constructed only. To access the desired data, users need to perform two operations manually.
### Fetch ### Synchronous Interface
#### Fetch
The fetch process is carried out **synchronously**. Users can call The fetch process is carried out **synchronously**. Users can call
```python ```python
number_of_entries = query.fetch() # number_of_entries = query.fetch(1000) # fetch 1000 entries
number_of_entries = query.fetch() # fetch at most results_max entries
``` ```
to perform the fetch process to fetch up to `results_max` entries. An indicative number `n` can be provided `fetch(n)` to perform the fetch process to fetch up to `results_max` entries. An indicative number `n` can be provided `fetch(n)`
...@@ -40,12 +43,13 @@ print(query.upload_list()) ...@@ -40,12 +43,13 @@ print(query.upload_list())
If applicable, it is possible to fetch a large number of entries first and then perform a second fetch by using some If applicable, it is possible to fetch a large number of entries first and then perform a second fetch by using some
upload ID in the first fetch result as the `after` argument so that some middle segment can be downloaded. upload ID in the first fetch result as the `after` argument so that some middle segment can be downloaded.
### Download #### Download
After fetching the qualified uploads, the desired data can be downloaded **asynchronously**. One can call After fetching the qualified uploads, the desired data can be downloaded **asynchronously**. One can call
```python ```python
results = query.download() # results = query.download(1000) # download 1000 entries
results = query.download() # download all fetched entries if fetched otherwise fetch and download up to `results_max` entries
``` ```
to download up to `results_max` entries. The downloaded results are returned as a list. Alternatively, it is possible to to download up to `results_max` entries. The downloaded results are returned as a list. Alternatively, it is possible to
...@@ -72,6 +76,27 @@ while True: ...@@ -72,6 +76,27 @@ while True:
There is no retry mechanism in the download process. If any uploads fail to be downloaded due to server error, it is There is no retry mechanism in the download process. If any uploads fail to be downloaded due to server error, it is
kept in the list otherwise removed. kept in the list otherwise removed.
### Asynchronous Interface
Some applications, such as Jupyter Notebook, may run a global/top level event loop. To query data in those environments,
one can use the asynchronous interface.
```python
number_of_entries = await query.async_fetch() # indicative number n applies: async_fetch(n)
results = await query.async_download() # indicative number n applies: async_download(n)
```
Alternatively, if one wants to use the asynchronous interface, it is necessary to patch the global event loop to allow
nested loops.
To do so, one can add the following at the beginning of the notebook.
```python
import nest_asyncio
nest_asyncio.apply()
```
## A Complete Rundown ## A Complete Rundown
Here we show a valid query and acquire data from server. Here we show a valid query and acquire data from server.
......
...@@ -79,7 +79,7 @@ class ArchiveQuery: ...@@ -79,7 +79,7 @@ class ArchiveQuery:
required (dict): required properties required (dict): required properties
url (str): server url, if not specified, the default official NOMAD one is used url (str): server url, if not specified, the default official NOMAD one is used
after (str): specify the starting upload id to query, if users have knowledge of uploads, after (str): specify the starting upload id to query, if users have knowledge of uploads,
they may wish to start from a specific upload, default: '' they may wish to start from a specific upload, default: ''
results_max (int): maximum results to query, default: 1000 results_max (int): maximum results to query, default: 1000
page_size (int): size of page in each query, default: 100 page_size (int): size of page in each query, default: 100
username (str): username for authenticated access, default: '' username (str): username for authenticated access, default: ''
...@@ -88,10 +88,11 @@ class ArchiveQuery: ...@@ -88,10 +88,11 @@ class ArchiveQuery:
sleep_time (float): sleep time for retry, default: 1. sleep_time (float): sleep time for retry, default: 1.
''' '''
def __init__(self, owner: str = 'visible', query: dict = None, required: dict = None, def __init__(
url: str = None, after: str = None, results_max: int = 1000, page_size: int = 10, self, owner: str = 'visible', query: dict = None, required: dict = None,
username: str = None, password: str = None, retry: int = 4, sleep_time: float = 4, url: str = None, after: str = None, results_max: int = 1000, page_size: int = 10,
from_api: bool = False): username: str = None, password: str = None, retry: int = 4, sleep_time: float = 4,
from_api: bool = False):
self._owner: str = owner self._owner: str = owner
self._required = required if required else dict(run='*') self._required = required if required else dict(run='*')
self._query_list: List[dict] = [] self._query_list: List[dict] = []
...@@ -342,8 +343,9 @@ class ArchiveQuery: ...@@ -342,8 +343,9 @@ class ArchiveQuery:
semaphore = Semaphore(30) semaphore = Semaphore(30)
async with httpx.AsyncClient(timeout=Timeout(timeout=300)) as session: async with httpx.AsyncClient(timeout=Timeout(timeout=300)) as session:
tasks = [asyncio.create_task(self._acquire(upload, session, semaphore)) for upload in tasks = [asyncio.create_task(
self._uploads[:num_upload]] self._acquire(
upload, session, semaphore)) for upload in self._uploads[:num_upload]]
results = await asyncio.gather(*tasks) results = await asyncio.gather(*tasks)
# flatten 2D list # flatten 2D list
...@@ -380,8 +382,8 @@ class ArchiveQuery: ...@@ -380,8 +382,8 @@ class ArchiveQuery:
self._uploads.remove(upload) self._uploads.remove(upload)
result = [EntryArchive.m_from_dict(result['archive']) for result in result = [EntryArchive.m_from_dict(
response_json['data']] result['archive']) for result in response_json['data']]
if not result: if not result:
print(f'No result returned for id {upload[0]}, is the query proper?') print(f'No result returned for id {upload[0]}, is the query proper?')
...@@ -432,5 +434,36 @@ class ArchiveQuery: ...@@ -432,5 +434,36 @@ class ArchiveQuery:
return asyncio.run(self._download_async(number)) return asyncio.run(self._download_async(number))
async def async_fetch(self, number: int = 0) -> int:
'''
Asynchronous interface for use in a running event loop.
'''
print('Fetching remote uploads...')
return await self._fetch_async(number)
async def async_download(self, number: int = 0) -> List[EntryArchive]:
'''
Asynchronous interface for use in a running event loop.
'''
pending_size: int = sum([count for _, count in self._uploads])
# download all at once
if number == 0:
number = pending_size
if number == 0:
# empty list, fetch as many as possible
number = await self.async_fetch()
elif pending_size < number:
# if not sufficient fetched entries, fetch first
await self.async_fetch(number - pending_size)
print('Downloading required data...')
return await self._download_async(number)
def upload_list(self) -> List[Tuple[str, int]]: def upload_list(self) -> List[Tuple[str, int]]:
return self._uploads return self._uploads
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment