bulk_read.py 3.51 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
'''
In this example, we go through many uploads in parallel to extract information from
certain calculations.

The motivation behind this is that the selective access of sections in an archvie might
be slow. If something is read from almost all calculations, it might be faster to
sequentially read all of the upload's archive file.

This is not an API example, but directly accesses archive files. Specifically, we
try to read all fingerprints for upload/calc/material combinations read from an
input file. The fingerprint data gets writting to an output file.
'''
from typing import Any
from multiprocessing import Pool, Queue, Event
from queue import Empty
import json
import traceback

from nomad import files


def read_archive(entries):
    try:
        upload_id = entries[0]['upload_id']
        upload_files = files.UploadFiles.get(upload_id, lambda *args: True)
        assert upload_files is not None
        for entry in entries:
            calc_id = entry['calc_id']
            material_id = entry['material_id']
            with upload_files.read_archive(calc_id) as archive:
                entry_archive = archive[calc_id].to_dict()
                for run in entry_archive.get('section_run', []):
                    for calc in run.get('section_single_configuration_calculation', []):
                        for dos in calc.get('section_dos', []):
                            fingerprint = dos.get('section_dos_fingerprint')
                            if fingerprint:
                                yield {
                                    'upload_id': upload_id,
                                    'calc_id': calc_id,
                                    'material_id': material_id,
                                    'fingerprint': fingerprint}
    except Exception:
        traceback.print_exc()


nworker = 24
entry_queue: Any = Queue(maxsize=100)
result_queue: Any = Queue(maxsize=100)
producer_end = Event()
worker_sentinel = 'end'


def worker():
    entries = []
    while not (producer_end.is_set() and entry_queue.empty()):
        try:
            entries = entry_queue.get(block=True, timeout=0.1)
        except Empty:
            continue

        for result in read_archive(entries):
            result_queue.put(result)

    result_queue.put(worker_sentinel)
    print('end worker')


def writer():
    ended_worker = 0
    count = 0
    f = open('local/fingerprints.json', 'wt')
    f.write('[')
    while not (ended_worker == nworker and result_queue.empty()):
        try:
            result = result_queue.get(block=True, timeout=0.1)
        except Empty:
            continue

        if result == worker_sentinel:
            ended_worker += 1
            continue

        if count > 0:
            f.write(',\n')
        json.dump(result, f, indent=2)
        count += 1
        if count % 1000 == 0:
            print(count)

    f.write(']')
    f.close()
    print('end writer')


def producer():
    with open('local/materials.json', 'r') as f:
        data = json.load(f)

    upload_id = None
    entries = []
    for entry in data:
        if upload_id is not None and upload_id != entry['upload_id']:
            entry_queue.put(entries, block=True)
            entries = []

        upload_id = entry['upload_id']
        entries.append(entry)

    entry_queue.put(entries, block=True)

    producer_end.set()
    print('end producer')


with Pool(processes=nworker + 2) as pool:
    for _ in range(nworker):
        pool.apply_async(worker)
    pool.apply_async(producer)
    pool.apply_async(writer)

    pool.close()
    pool.join()