Commit c8b3592a authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added fingeprint archive extraction as an example.

parent af03e528
Pipeline #79615 passed with stages
in 22 minutes and 40 seconds
'''
In this example, we select calculations/upload by query and iterate through all
uploads in parallel, reading the uploads in full to extract some information for each
calculation.
The motivation behind this is that the selective access of sections in an archvie might
be slow. If something is read from almost all calculations, it might be faster to
sequentially read all of the upload's archive file.
This is not an API example, but directly accesses archive files.
'''
from typing import Iterable, Tuple, Set, Any
from concurrent.futures import ThreadPoolExecutor
from threading import Event, Lock
from queue import Queue, Empty
import json
from nomad import search, infrastructure, files, config
infrastructure.setup_files()
infrastructure.setup_elastic()
def query() -> Iterable[Tuple[str, Set[str]]]:
after = None
while True:
res = infrastructure.elastic_client.search(index=config.elastic.index_name, body={
"query": {
"bool": {
"must": [
{
"match": {
"dft.quantities": "section_dos_fingerprint"
},
},
{
"match": {
"published": True
},
},
{
"match": {
"with_embargo": False
}
}
]
}
},
"size": 0,
"aggs": {
"results": {
"composite": {
"sources": [
{
"uploads": {
"terms": {
"field": "upload_id"
},
}
},
{
"materials": {
"terms": {
"field": "encyclopedia.material.material_id"
}
}
}
],
"size": 100
},
"aggs": {
"calcs": {
"top_hits": {
"sort": {
"_script": {
"type": "number",
"script": {
"lang": "painless",
"source": '''
int result = 0;
String code = doc['dft.code_name'].value;
String functional = doc['dft.xc_functional'].value;
if (functional == 'GGA') result += 100;
if (code == 'VASP')
result += 1;
else if (code == 'FHI-aims')
result += 2;
return result;
'''
},
"order": "asc"
},
},
"_source": {
"includes": ['upload_id', 'calc_id', 'dft.code_name', 'dft.xc_functional']
},
"size": 1
}
}
}
}
}
})
print(json.dumps(res, indent=2))
raise
# searchRequest = search.SearchRequest()
# searchRequest.quantity(
# name='encyclopedia.material.material_id',
# examples=2,
# examples_source=['upload_id', 'calc_id', 'dft.code_name', 'dft.xc_functional'],
# order_by='upload_id',
# after=after)
# result = searchRequest.execute()['quantities']['encylcopedia.material.material_id']
# after = result['after']
# if len(result) == 0:
# break
# for material_id, calcs in result.items():
# print(json.dumps(result, indent=2))
# raise
# calc_ids: Set[str] = set()
# upload_id = None
# for entry in searchRequest.execute_scan(order_by='upload_id'):
# entry_upload_id, entry_calc_id = entry['upload_id'], entry['calc_id']
# if upload_id is not None and upload_id != entry_upload_id:
# yield upload_id, calc_ids
# upload_id = entry_calc_id
# calc_ids = set()
# upload_id = entry_upload_id
# calc_ids.add(entry_calc_id)
# if upload_id is not None:
# yield upload_id, calc_ids
for _ in query():
pass
def read_archive(upload_id, calc_ids):
try:
upload_files = files.UploadFiles.get(upload_id, lambda *args: True)
for calc_id in calc_ids:
with upload_files.read_archive(calc_id) as archive:
material_id = archive[calc_id]['section_metadata']['encyclopedia']['material']['material_id']
for run in archive[calc_id].get('section_run', []):
for calc in run.get('section_single_configuration_calculation', []):
for dos in calc.get('section_dos', []):
fingerprint = dos.get('section_dos_fingerprint')
if fingerprint:
yield {
'upload_id': upload_id,
'calc_id': calc_id,
'material_id': material_id,
'fingerprint': fingerprint}
except Exception:
import traceback
traceback.print_exc()
nworker = 10
nended_worker = 1
upload_queue: Any = Queue(maxsize=100)
result_queue: Any = Queue(maxsize=100)
producer_end = Event()
result_end = Event()
ended_worker_lock = Lock()
def worker():
global nended_worker
while not (producer_end.is_set() and upload_queue.empty()):
try:
upload_id, calc_ids = upload_queue.get(block=True, timeout=0.1)
except Empty:
continue
for result in read_archive(upload_id, calc_ids):
result_queue.put(result)
with ended_worker_lock:
nended_worker += 1
if nended_worker == nworker:
print('result end')
result_end.set()
print('end worker')
def writer():
while not (result_end.is_set() and result_queue.empty()):
try:
result = result_queue.get(block=True, timeout=0.1)
except Empty:
continue
print(json.dumps(result, indent=2))
print('end writer')
def producer():
for upload_id, calc_ids in query():
upload_queue.put((upload_id, calc_ids), block=True)
producer_end.set()
print('end producer')
with ThreadPoolExecutor(max_workers=nworker + 2) as executor:
for _ in range(nworker):
executor.submit(worker)
executor.submit(producer)
executor.submit(writer)
'''
In this example, we go through many uploads in parallel to extract information from
certain calculations.
The motivation behind this is that the selective access of sections in an archvie might
be slow. If something is read from almost all calculations, it might be faster to
sequentially read all of the upload's archive file.
This is not an API example, but directly accesses archive files. Specifically, we
try to read all fingerprints for upload/calc/material combinations read from an
input file. The fingerprint data gets writting to an output file.
'''
from typing import Any
from multiprocessing import Pool, Queue, Event
from queue import Empty
import json
import traceback
from nomad import files
def read_archive(entries):
try:
upload_id = entries[0]['upload_id']
upload_files = files.UploadFiles.get(upload_id, lambda *args: True)
assert upload_files is not None
for entry in entries:
calc_id = entry['calc_id']
material_id = entry['material_id']
with upload_files.read_archive(calc_id) as archive:
entry_archive = archive[calc_id].to_dict()
for run in entry_archive.get('section_run', []):
for calc in run.get('section_single_configuration_calculation', []):
for dos in calc.get('section_dos', []):
fingerprint = dos.get('section_dos_fingerprint')
if fingerprint:
yield {
'upload_id': upload_id,
'calc_id': calc_id,
'material_id': material_id,
'fingerprint': fingerprint}
except Exception:
traceback.print_exc()
nworker = 24
entry_queue: Any = Queue(maxsize=100)
result_queue: Any = Queue(maxsize=100)
producer_end = Event()
worker_sentinel = 'end'
def worker():
entries = []
while not (producer_end.is_set() and entry_queue.empty()):
try:
entries = entry_queue.get(block=True, timeout=0.1)
except Empty:
continue
for result in read_archive(entries):
result_queue.put(result)
result_queue.put(worker_sentinel)
print('end worker')
def writer():
ended_worker = 0
count = 0
f = open('local/fingerprints.json', 'wt')
f.write('[')
while not (ended_worker == nworker and result_queue.empty()):
try:
result = result_queue.get(block=True, timeout=0.1)
except Empty:
continue
if result == worker_sentinel:
ended_worker += 1
continue
if count > 0:
f.write(',\n')
json.dump(result, f, indent=2)
count += 1
if count % 1000 == 0:
print(count)
f.write(']')
f.close()
print('end writer')
def producer():
with open('local/materials.json', 'r') as f:
data = json.load(f)
upload_id = None
entries = []
for entry in data:
if upload_id is not None and upload_id != entry['upload_id']:
entry_queue.put(entries, block=True)
entries = []
upload_id = entry['upload_id']
entries.append(entry)
entry_queue.put(entries, block=True)
producer_end.set()
print('end producer')
with Pool(processes=nworker + 2) as pool:
for _ in range(nworker):
pool.apply_async(worker)
pool.apply_async(producer)
pool.apply_async(writer)
pool.close()
pool.join()
''' '''
This examplefies how to send raw queries to elasticsearch. This examplefies how to send raw queries to elasticsearch.
Specifically this will read all materials with fingerprints from the search engine
and store them in a local file. We use composite aggregations with after-based
pagination.
''' '''
from typing import Iterable, Tuple, Set, Any
from concurrent.futures import ThreadPoolExecutor
from threading import Event, Lock
from queue import Queue, Empty
import json import json
from nomad import search, infrastructure, files, config from nomad import infrastructure, config
infrastructure.setup_files() infrastructure.setup_files()
infrastructure.setup_elastic() infrastructure.setup_elastic()
...@@ -15,6 +16,7 @@ infrastructure.setup_elastic() ...@@ -15,6 +16,7 @@ infrastructure.setup_elastic()
results = [] results = []
after = None after = None
count = 0
while True: while True:
request = { request = {
"query": { "query": {
...@@ -99,8 +101,10 @@ while True: ...@@ -99,8 +101,10 @@ while True:
upload_id = entry['upload_id'] upload_id = entry['upload_id']
calc_id = entry['calc_id'] calc_id = entry['calc_id']
results.append(dict(material_id=material_id, upload_id=upload_id, calc_id=calc_id)) results.append(dict(material_id=material_id, upload_id=upload_id, calc_id=calc_id))
count += 1
print('.') print(count)
results.sort(key=lambda item: item['upload_id']) results.sort(key=lambda item: item['upload_id'])
print(json.dumps(results, indent=2)) with open('local/materials.json', 'wt') as f:
f.write(json.dumps(results, indent=2))
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment