archive.py 15.3 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
5
6
7
8
9
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
Markus Scheidgen's avatar
Markus Scheidgen committed
10
#     http://www.apache.org/licenses/LICENSE-2.0
11
12
#
# Unless required by applicable law or agreed to in writing, software
Markus Scheidgen's avatar
Markus Scheidgen committed
13
# distributed under the License is distributed on an "AS IS" BASIS,
14
15
16
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Markus Scheidgen's avatar
Markus Scheidgen committed
17
#
18

19
'''
20
21
The archive API of the nomad@FAIRDI APIs. This API is about serving processed
(parsed and normalized) calculation data in nomad's *meta-info* format.
22
'''
23

24
from typing import Dict, Any
25
from io import BytesIO
26
from flask import request, g
27
from flask_restplus import abort, Resource, fields
28
import json
29
import orjson
30
import urllib.parse
31

32
from nomad.files import UploadFiles, Restricted
33
34
35
from nomad.archive import (
    query_archive, ArchiveQueryError, filter_archive, read_partial_archives_from_mongo,
    compute_required_with_referenced)
36
from nomad import search, config
37
from nomad.app import common
38

39
from .auth import authenticate, create_authorization_predicate
Markus Scheidgen's avatar
Markus Scheidgen committed
40
from .api import api
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
41
42
from .common import calc_route, streamed_zipfile, search_model, add_search_parameters,\
    apply_search_parameters
43

44
45

ns = api.namespace(
46
47
    'archive',
    description='Access archive data and archive processing logs.')
48
49
50
51


@calc_route(ns, '/logs')
class ArchiveCalcLogResource(Resource):
52
    @api.doc('get_archive_logs')
53
    @api.response(404, 'The upload or calculation does not exist')
54
    @api.response(401, 'Not authorized to access the data.')
55
    @api.response(200, 'Archive data send')
56
    @authenticate(signature_token=True)
57
    def get(self, upload_id, calc_id):
58
        '''
59
60
        Get calculation processing log.

61
        Calcs are references via *upload_id*, *calc_id* pairs.
62
        '''
63
        archive_id = '%s/%s' % (upload_id, calc_id)
64

65
        upload_files = UploadFiles.get(
66
            upload_id, is_authorized=create_authorization_predicate(upload_id, calc_id))
67

68
        if upload_files is None:
69
            abort(404, message='Upload %s does not exist.' % upload_id)
70
71

        try:
72
            with upload_files.read_archive(calc_id) as archive:
73
74
                return [entry.to_dict() for entry in archive[calc_id]['processing_logs']]

75
        except Restricted:
76
            abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
77
78
        except KeyError:
            abort(404, message='Calculation %s does not exist.' % archive_id)
79
80
81
82


@calc_route(ns)
class ArchiveCalcResource(Resource):
83
    @api.doc('get_archive_calc')
84
    @api.response(404, 'The upload or calculation does not exist')
85
    @api.response(401, 'Not authorized to access the data.')
86
    @api.response(200, 'Archive data send', headers={'Content-Type': 'application/json'})
87
    @authenticate(signature_token=True)
88
    def get(self, upload_id, calc_id):
89
        '''
90
91
        Get calculation data in archive form.

92
        Calcs are references via *upload_id*, *calc_id* pairs.
93
        '''
94
        archive_id = '%s/%s' % (upload_id, calc_id)
95

96
        upload_files = UploadFiles.get(
97
            upload_id, is_authorized=create_authorization_predicate(upload_id, calc_id))
98

99
        if upload_files is None:
100
            abort(404, message='Archive %s does not exist.' % upload_id)
101
102

        try:
103
            with upload_files.read_archive(calc_id) as archive:
104
                return {
105
106
107
108
                    key: value
                    for key, value in archive[calc_id].to_dict().items()
                    if key != 'processing_logs'}

109
        except Restricted:
110
            abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
111
        except KeyError:
112
            abort(404, message='Calculation %s does not exist.' % archive_id)
113
114


115
116
117
_archive_download_parser = api.parser()
add_search_parameters(_archive_download_parser)
_archive_download_parser.add_argument(
118
119
120
    name='compress', type=bool, help='Use compression on .zip files, default is not.',
    location='args')

121
122
123

@ns.route('/download')
class ArchiveDownloadResource(Resource):
124
125
    manifest_quantities = ['upload_id', 'calc_id', 'external_id', 'raw_id', 'pid', 'calc_hash']

126
    @api.doc('archive_download')
127
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
128
    @api.expect(_archive_download_parser, validate=True)
129
130
131
    @api.response(200, 'File(s) send', headers={'Content-Type': 'application/zip'})
    @authenticate(signature_token=True)
    def get(self):
132
        '''
133
134
135
136
137
138
139
140
141
        Get calculation data in archive form from all query results.

        See ``/repo`` endpoint for documentation on the search
        parameters.

        Zip files are streamed; instead of 401 errors, the zip file will just not contain
        any files that the user is not authorized to access.

        The zip file will contain a ``manifest.json`` with the repository meta data.
142
        '''
143
        try:
144
            args = _archive_download_parser.parse_args()
145
146
147
148
149
            compress = args.get('compress', False)
        except Exception:
            abort(400, message='bad parameter types')

        search_request = search.SearchRequest()
150
        apply_search_parameters(search_request, args)
151
        search_request.include('calc_id', 'upload_id', 'mainfile')
152

153
154
155
156
        calcs = search_request.execute_scan(
            order_by='upload_id',
            size=config.services.download_scan_size,
            scroll=config.services.download_scan_timeout)
157
158

        def generator():
159
            try:
160
161
162
                manifest = {}
                upload_files = None

163
164
165
166
167
                for entry in calcs:
                    upload_id = entry['upload_id']
                    calc_id = entry['calc_id']
                    if upload_files is None or upload_files.upload_id != upload_id:
                        if upload_files is not None:
168
                            upload_files.close()
169
170
171
172
173

                        upload_files = UploadFiles.get(
                            upload_id, create_authorization_predicate(upload_id))

                        if upload_files is None:
174
                            common.logger.error('upload files do not exist', upload_id=upload_id)
175
176
                            continue

177
178
                    upload_files._is_authorized = create_authorization_predicate(
                        upload_id=upload_id, calc_id=calc_id)
179
180
181
182
                    with upload_files.read_archive(calc_id) as archive:
                        f = BytesIO(orjson.dumps(
                            archive[calc_id].to_dict(),
                            option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS))
183

184
185
186
187
                        yield (
                            '%s.%s' % (calc_id, 'json'), calc_id,
                            lambda calc_id: f,
                            lambda calc_id: f.getbuffer().nbytes)
188
189
190
191
192
193

                    manifest[calc_id] = {
                        key: entry[key]
                        for key in ArchiveDownloadResource.manifest_quantities
                        if entry.get(key) is not None
                    }
194

195
                if upload_files is not None:
196
                    upload_files.close()
197
198
199
200
201
202

                try:
                    manifest_contents = json.dumps(manifest).encode('utf-8')
                except Exception as e:
                    manifest_contents = json.dumps(
                        dict(error='Could not create the manifest: %s' % (e))).encode('utf-8')
203
                    common.logger.error(
204
205
206
207
208
209
                        'could not create raw query manifest', exc_info=e)

                yield (
                    'manifest.json', 'manifest',
                    lambda *args: BytesIO(manifest_contents),
                    lambda *args: len(manifest_contents))
Markus Scheidgen's avatar
Markus Scheidgen committed
210

211
            except Exception as e:
212
                common.logger.warning(
213
214
215
                    'unexpected error while streaming raw data from query',
                    exc_info=e,
                    query=urllib.parse.urlencode(request.args, doseq=True))
216

217
218
219
220
        return streamed_zipfile(
            generator(), zipfile_name='nomad_archive.zip', compress=compress)


221
_archive_query_model = api.inherit('ArchiveSearch', search_model, {
222
223
    'required': fields.Raw(description='A dictionary that defines what archive data to retrive.'),
    'query_schema': fields.Raw(description='Deprecated, use required instead.'),
224
225
    'raise_errors': fields.Boolean(
        description='Return 404 on missing archives or 500 on other errors instead of skipping the entry.')
226
})
227
228


229
230
@ns.route('/query')
class ArchiveQueryResource(Resource):
231
232
233
234
    @api.doc('post_archive_query')
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
    @api.response(401, 'Not authorized to access the data.')
    @api.response(404, 'The upload or calculation does not exist')
235
    @api.expect(_archive_query_model)
236
    @api.marshal_with(_archive_query_model, skip_none=True, code=200, description='Archive search results sent')
237
    @authenticate()
238
    def post(self):
239
        '''
240
        Post a query schema and return it filled with archive data.
241

242
243
        See ``/repo`` endpoint for documentation on the search
        parameters.
244

245
246
247
248
249
        This endpoint uses pagination (see /repo) or id aggregation to handle large result
        sets over multiple requests.
        Use aggregation.after and aggregation.per_page to request a
        certain page with id aggregation.

250
251
        The actual data are in results and a supplementary python code (curl) to
        execute search is in python (curl).
252
        '''
253
254
        try:
            data_in = request.get_json()
255
            aggregation = data_in.get('aggregation', None)
256
257
258

            pagination = data_in.get('pagination', {})
            page = pagination.get('page', 1)
259
            per_page = pagination.get('per_page', 10)
260
261

            query = data_in.get('query', {})
262

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
263
264
            query_expression = {key: val for key, val in query.items() if '$' in key}

265
266
267
268
269
270
            required: Dict[str, Any] = None
            if 'required' in data_in:
                required = data_in.get('required')
            else:
                required = data_in.get('query_schema', '*')

Markus Scheidgen's avatar
Markus Scheidgen committed
271
            raise_errors = data_in.get('raise_errors', False)
272

273
274
275
        except Exception:
            abort(400, message='bad parameter types')

276
        if not (page >= 1 and per_page > 0):
277
278
279
            abort(400, message='invalid pagination')

        search_request = search.SearchRequest()
280
281
282
283
284
285
        if g.user is not None:
            search_request.owner('all', user_id=g.user.user_id)
        else:
            search_request.owner('all')

        apply_search_parameters(search_request, query)
286
        if not aggregation:
287
288
            search_request.include(
                'calc_id', 'upload_id', 'with_embargo', 'published', 'parser_name')
289

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
290
        if query_expression:
291
292
293
294
            try:
                search_request.query_expression(query_expression)
            except AssertionError as e:
                abort(400, str(e))
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
295

296
        try:
297
298
299
300
            if aggregation:
                results = search_request.execute_aggregated(
                    after=aggregation.get('after'), per_page=aggregation.get('per_page', 1000),
                    includes=['with_embargo', 'published', 'parser_name'])
301
302
303

            else:
                results = search_request.execute_paginated(
304
                    per_page=per_page, page=page, order_by='upload_id')
305
306

        except KeyError as e:
Markus Scheidgen's avatar
Markus Scheidgen committed
307
308
309
310
311
312
313
314
            abort(400, 'The query part contained an unknown quantity or section: %s' % str(e))

        try:
            required_with_references = compute_required_with_referenced(required)
        except KeyError as e:
            abort(
                400,
                message='The required part contained an unknown quantity or section: %s' % str(e))
315
316
317

        data = []
        calcs = results['results']
318
        upload_files = None
319
        current_upload_id = None
320
321
322
323
        archive_is_complete = False
        if required_with_references is not None:
            archive_is_complete = True
            required = required_with_references
324

325
        for entry in calcs:
326
327
            with_embargo = entry['with_embargo']

328
329
            upload_id = entry['upload_id']
            calc_id = entry['calc_id']
330

331
332
333
334
            if upload_files is None or current_upload_id != upload_id:
                if upload_files is not None:
                    upload_files.close()

335
336
                upload_files = UploadFiles.get(
                    upload_id, create_authorization_predicate(upload_id))
337
338
339
340

                if upload_files is None:
                    return []

341
342
343
344
345
346
                if archive_is_complete:
                    upload_calc_ids = [
                        calc['calc_id'] for calc in calcs if calc['upload_id'] == upload_id]
                    upload_partial_archives = read_partial_archives_from_mongo(
                        upload_calc_ids, as_dict=True)

347
                current_upload_id = upload_id
348

349
350
351
352
353
354
355
            # TODO we are either just use the partial from mongo or read the whole required
            # from the mgs-pack archive on top.
            # Ideally, we would only read whats left from the msg-pack archive and merge.
            if archive_is_complete:
                try:
                    partial_archive = upload_partial_archives[calc_id]
                    partial_archive = filter_archive(
356
                        required, partial_archive, transform=lambda e: e)
357
358
359
360
361
362
363
364
365
366
367
368
369
370

                    data.append({
                        'calc_id': calc_id,
                        'parser_name': entry['parser_name'],
                        'archive': partial_archive})
                    continue
                except KeyError:
                    pass
                except ArchiveQueryError as e:
                    abort(400, str(e))
                except Exception as e:
                    common.logger.error(
                        str(e), upload_id=upload_id, calc_id=calc_id, exc_info=e)

371
            if with_embargo:
372
                access = 'restricted'
373
374
                upload_files._is_authorized = create_authorization_predicate(
                    upload_id=upload_id, calc_id=calc_id)
375
            else:
376
377
378
379
380
381
382
                access = 'public'

            try:
                with upload_files.read_archive(calc_id, access) as archive:
                    data.append({
                        'calc_id': calc_id,
                        'parser_name': entry['parser_name'],
383
                        'archive': query_archive(archive, {calc_id: required})[calc_id]
384
                    })
385
386
            except ArchiveQueryError as e:
                abort(400, str(e))
387
            except KeyError:
Markus Scheidgen's avatar
Markus Scheidgen committed
388
389
                if raise_errors:
                    abort(404, 'Archive for entry %s does not exist' % calc_id)
390
391
                # We simply skip this entry
                pass
392
            except Restricted:
393
                # this should not happen
394
395
                common.logger.error(
                    'supposedly unreachable code', upload_id=upload_id, calc_id=calc_id)
396
            except Exception as e:
Markus Scheidgen's avatar
Markus Scheidgen committed
397
                if raise_errors:
398
                    raise e
399
400
                common.logger.error(
                    str(e), upload_id=upload_id, calc_id=calc_id, exc_info=e)
401

402
403
404
        if upload_files is not None:
            upload_files.close()

405
406
407
        # assign archive data to results
        results['results'] = data

408
        return results, 200