From c8b3592a7b9d9f9eec00b5a082c1560ea2654b40 Mon Sep 17 00:00:00 2001
From: Markus Scheidgen <markus.scheidgen@gmail.com>
Date: Tue, 28 Jul 2020 18:24:01 +0200
Subject: [PATCH] Added fingeprint archive extraction as an example.

---
 examples/archive/bulk_read.py          | 215 -------------------------
 examples/fingerprints/bulk_read.py     | 122 ++++++++++++++
 examples/{ => fingerprints}/elastic.py |  18 ++-
 3 files changed, 133 insertions(+), 222 deletions(-)
 delete mode 100644 examples/archive/bulk_read.py
 create mode 100644 examples/fingerprints/bulk_read.py
 rename examples/{ => fingerprints}/elastic.py (90%)

diff --git a/examples/archive/bulk_read.py b/examples/archive/bulk_read.py
deleted file mode 100644
index 78b4c81627..0000000000
--- a/examples/archive/bulk_read.py
+++ /dev/null
@@ -1,215 +0,0 @@
-'''
-In this example, we select calculations/upload by query and iterate through all
-uploads in parallel, reading the uploads in full to extract some information for each
-calculation.
-
-The motivation behind this is that the selective access of sections in an archvie might
-be slow. If something is read from almost all calculations, it might be faster to
-sequentially read all of the upload's archive file.
-
-This is not an API example, but directly accesses archive files.
-'''
-from typing import Iterable, Tuple, Set, Any
-from concurrent.futures import ThreadPoolExecutor
-from threading import Event, Lock
-from queue import Queue, Empty
-import json
-
-from nomad import search, infrastructure, files, config
-
-infrastructure.setup_files()
-infrastructure.setup_elastic()
-
-
-def query() -> Iterable[Tuple[str, Set[str]]]:
-    after = None
-    while True:
-        res = infrastructure.elastic_client.search(index=config.elastic.index_name, body={
-            "query": {
-                "bool": {
-                    "must": [
-                        {
-                            "match": {
-                                "dft.quantities": "section_dos_fingerprint"
-                            },
-                        },
-                        {
-                            "match": {
-                                "published": True
-                            },
-                        },
-                        {
-                            "match": {
-                                "with_embargo": False
-                            }
-                        }
-                    ]
-                }
-            },
-            "size": 0,
-            "aggs": {
-                "results": {
-                    "composite": {
-                        "sources": [
-                            {
-                                "uploads": {
-                                    "terms": {
-                                        "field": "upload_id"
-                                    },
-                                }
-                            },
-                            {
-                                "materials": {
-                                    "terms": {
-                                        "field": "encyclopedia.material.material_id"
-                                    }
-                                }
-                            }
-                        ],
-                        "size": 100
-                    },
-                    "aggs": {
-                        "calcs": {
-                            "top_hits": {
-                                "sort": {
-                                    "_script": {
-                                        "type": "number",
-                                        "script": {
-                                            "lang": "painless",
-                                            "source": '''
-                                                int result = 0;
-                                                String code = doc['dft.code_name'].value;
-                                                String functional = doc['dft.xc_functional'].value;
-                                                if (functional == 'GGA') result += 100;
-                                                if (code == 'VASP')
-                                                    result += 1;
-                                                else if (code == 'FHI-aims')
-                                                    result += 2;
-                                                return result;
-                                            '''
-                                        },
-                                        "order": "asc"
-                                    },
-                                },
-                                "_source": {
-                                    "includes": ['upload_id', 'calc_id', 'dft.code_name', 'dft.xc_functional']
-                                },
-                                "size": 1
-                            }
-                        }
-                    }
-                }
-            }
-        })
-        print(json.dumps(res, indent=2))
-        raise
-        # searchRequest = search.SearchRequest()
-        # searchRequest.quantity(
-        #     name='encyclopedia.material.material_id',
-        #     examples=2,
-        #     examples_source=['upload_id', 'calc_id', 'dft.code_name', 'dft.xc_functional'],
-        #     order_by='upload_id',
-        #     after=after)
-
-        # result = searchRequest.execute()['quantities']['encylcopedia.material.material_id']
-        # after = result['after']
-        # if len(result) == 0:
-        #     break
-
-        # for material_id, calcs in result.items():
-        # print(json.dumps(result, indent=2))
-        # raise
-        # calc_ids: Set[str] = set()
-        # upload_id = None
-        # for entry in searchRequest.execute_scan(order_by='upload_id'):
-        #     entry_upload_id, entry_calc_id = entry['upload_id'], entry['calc_id']
-        #     if upload_id is not None and upload_id != entry_upload_id:
-        #         yield upload_id, calc_ids
-        #         upload_id = entry_calc_id
-        #         calc_ids = set()
-
-        #     upload_id = entry_upload_id
-        #     calc_ids.add(entry_calc_id)
-
-        # if upload_id is not None:
-        #     yield upload_id, calc_ids
-
-
-for _ in query():
-    pass
-
-def read_archive(upload_id, calc_ids):
-    try:
-        upload_files = files.UploadFiles.get(upload_id, lambda *args: True)
-        for calc_id in calc_ids:
-            with upload_files.read_archive(calc_id) as archive:
-                material_id = archive[calc_id]['section_metadata']['encyclopedia']['material']['material_id']
-                for run in archive[calc_id].get('section_run', []):
-                    for calc in run.get('section_single_configuration_calculation', []):
-                        for dos in calc.get('section_dos', []):
-                            fingerprint = dos.get('section_dos_fingerprint')
-                            if fingerprint:
-                                yield {
-                                    'upload_id': upload_id,
-                                    'calc_id': calc_id,
-                                    'material_id': material_id,
-                                    'fingerprint': fingerprint}
-    except Exception:
-        import traceback
-        traceback.print_exc()
-
-
-nworker = 10
-nended_worker = 1
-upload_queue: Any = Queue(maxsize=100)
-result_queue: Any = Queue(maxsize=100)
-producer_end = Event()
-result_end = Event()
-ended_worker_lock = Lock()
-
-
-def worker():
-    global nended_worker
-    while not (producer_end.is_set() and upload_queue.empty()):
-        try:
-            upload_id, calc_ids = upload_queue.get(block=True, timeout=0.1)
-        except Empty:
-            continue
-
-        for result in read_archive(upload_id, calc_ids):
-            result_queue.put(result)
-
-    with ended_worker_lock:
-        nended_worker += 1
-        if nended_worker == nworker:
-            print('result end')
-            result_end.set()
-
-    print('end worker')
-
-
-def writer():
-    while not (result_end.is_set() and result_queue.empty()):
-        try:
-            result = result_queue.get(block=True, timeout=0.1)
-        except Empty:
-            continue
-
-        print(json.dumps(result, indent=2))
-
-    print('end writer')
-
-
-def producer():
-    for upload_id, calc_ids in query():
-        upload_queue.put((upload_id, calc_ids), block=True)
-
-    producer_end.set()
-    print('end producer')
-
-
-with ThreadPoolExecutor(max_workers=nworker + 2) as executor:
-    for _ in range(nworker):
-        executor.submit(worker)
-    executor.submit(producer)
-    executor.submit(writer)
diff --git a/examples/fingerprints/bulk_read.py b/examples/fingerprints/bulk_read.py
new file mode 100644
index 0000000000..c189791942
--- /dev/null
+++ b/examples/fingerprints/bulk_read.py
@@ -0,0 +1,122 @@
+'''
+In this example, we go through many uploads in parallel to extract information from
+certain calculations.
+
+The motivation behind this is that the selective access of sections in an archvie might
+be slow. If something is read from almost all calculations, it might be faster to
+sequentially read all of the upload's archive file.
+
+This is not an API example, but directly accesses archive files. Specifically, we
+try to read all fingerprints for upload/calc/material combinations read from an
+input file. The fingerprint data gets writting to an output file.
+'''
+from typing import Any
+from multiprocessing import Pool, Queue, Event
+from queue import Empty
+import json
+import traceback
+
+from nomad import files
+
+
+def read_archive(entries):
+    try:
+        upload_id = entries[0]['upload_id']
+        upload_files = files.UploadFiles.get(upload_id, lambda *args: True)
+        assert upload_files is not None
+        for entry in entries:
+            calc_id = entry['calc_id']
+            material_id = entry['material_id']
+            with upload_files.read_archive(calc_id) as archive:
+                entry_archive = archive[calc_id].to_dict()
+                for run in entry_archive.get('section_run', []):
+                    for calc in run.get('section_single_configuration_calculation', []):
+                        for dos in calc.get('section_dos', []):
+                            fingerprint = dos.get('section_dos_fingerprint')
+                            if fingerprint:
+                                yield {
+                                    'upload_id': upload_id,
+                                    'calc_id': calc_id,
+                                    'material_id': material_id,
+                                    'fingerprint': fingerprint}
+    except Exception:
+        traceback.print_exc()
+
+
+nworker = 24
+entry_queue: Any = Queue(maxsize=100)
+result_queue: Any = Queue(maxsize=100)
+producer_end = Event()
+worker_sentinel = 'end'
+
+
+def worker():
+    entries = []
+    while not (producer_end.is_set() and entry_queue.empty()):
+        try:
+            entries = entry_queue.get(block=True, timeout=0.1)
+        except Empty:
+            continue
+
+        for result in read_archive(entries):
+            result_queue.put(result)
+
+    result_queue.put(worker_sentinel)
+    print('end worker')
+
+
+def writer():
+    ended_worker = 0
+    count = 0
+    f = open('local/fingerprints.json', 'wt')
+    f.write('[')
+    while not (ended_worker == nworker and result_queue.empty()):
+        try:
+            result = result_queue.get(block=True, timeout=0.1)
+        except Empty:
+            continue
+
+        if result == worker_sentinel:
+            ended_worker += 1
+            continue
+
+        if count > 0:
+            f.write(',\n')
+        json.dump(result, f, indent=2)
+        count += 1
+        if count % 1000 == 0:
+            print(count)
+
+    f.write(']')
+    f.close()
+    print('end writer')
+
+
+def producer():
+    with open('local/materials.json', 'r') as f:
+        data = json.load(f)
+
+    upload_id = None
+    entries = []
+    for entry in data:
+        if upload_id is not None and upload_id != entry['upload_id']:
+            entry_queue.put(entries, block=True)
+            entries = []
+
+        upload_id = entry['upload_id']
+        entries.append(entry)
+
+    entry_queue.put(entries, block=True)
+
+    producer_end.set()
+    print('end producer')
+
+
+with Pool(processes=nworker + 2) as pool:
+    for _ in range(nworker):
+        pool.apply_async(worker)
+    pool.apply_async(producer)
+    pool.apply_async(writer)
+
+    pool.close()
+    pool.join()
diff --git a/examples/elastic.py b/examples/fingerprints/elastic.py
similarity index 90%
rename from examples/elastic.py
rename to examples/fingerprints/elastic.py
index f20391705f..aa1ae4204c 100644
--- a/examples/elastic.py
+++ b/examples/fingerprints/elastic.py
@@ -1,13 +1,14 @@
 '''
 This examplefies how to send raw queries to elasticsearch.
+
+Specifically this will read all materials with fingerprints from the search engine
+and store them in a local file. We use composite aggregations with after-based
+pagination.
 '''
-from typing import Iterable, Tuple, Set, Any
-from concurrent.futures import ThreadPoolExecutor
-from threading import Event, Lock
-from queue import Queue, Empty
+
 import json
 
-from nomad import search, infrastructure, files, config
+from nomad import infrastructure, config
 
 infrastructure.setup_files()
 infrastructure.setup_elastic()
@@ -15,6 +16,7 @@ infrastructure.setup_elastic()
 
 results = []
 after = None
+count = 0
 while True:
     request = {
         "query": {
@@ -99,8 +101,10 @@ while True:
         upload_id = entry['upload_id']
         calc_id = entry['calc_id']
         results.append(dict(material_id=material_id, upload_id=upload_id, calc_id=calc_id))
+        count += 1
 
-    print('.')
+    print(count)
 
 results.sort(key=lambda item: item['upload_id'])
-print(json.dumps(results, indent=2))
+with open('local/materials.json', 'wt') as f:
+    f.write(json.dumps(results, indent=2))
-- 
GitLab