archive.py 14.8 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
5
6
7
8
9
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
Markus Scheidgen's avatar
Markus Scheidgen committed
10
#     http://www.apache.org/licenses/LICENSE-2.0
11
12
#
# Unless required by applicable law or agreed to in writing, software
Markus Scheidgen's avatar
Markus Scheidgen committed
13
# distributed under the License is distributed on an "AS IS" BASIS,
14
15
16
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Markus Scheidgen's avatar
Markus Scheidgen committed
17
#
18

19
'''
20
21
The archive API of the nomad@FAIRDI APIs. This API is about serving processed
(parsed and normalized) calculation data in nomad's *meta-info* format.
22
'''
23

24
from typing import Dict, Any
25
from io import BytesIO
26
from flask import request, g
27
from flask_restplus import abort, Resource, fields
28
import orjson
29
import urllib.parse
30

31
from nomad.files import UploadFiles, Restricted
32
33
34
from nomad.archive import (
    query_archive, ArchiveQueryError, filter_archive, read_partial_archives_from_mongo,
    compute_required_with_referenced)
35
from nomad import search, config
36

Markus Scheidgen's avatar
Markus Scheidgen committed
37
from .. import common
38
from .auth import authenticate, create_authorization_predicate
Markus Scheidgen's avatar
Markus Scheidgen committed
39
from .api import api
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
40
41
from .common import calc_route, streamed_zipfile, search_model, add_search_parameters,\
    apply_search_parameters
42

43
44

ns = api.namespace(
45
46
    'archive',
    description='Access archive data and archive processing logs.')
47
48
49
50


@calc_route(ns, '/logs')
class ArchiveCalcLogResource(Resource):
51
    @api.doc('get_archive_logs')
52
    @api.response(404, 'The upload or calculation does not exist')
53
    @api.response(401, 'Not authorized to access the data.')
54
    @api.response(200, 'Archive data send')
55
    @authenticate(signature_token=True)
56
    def get(self, upload_id, calc_id):
57
        '''
58
59
        Get calculation processing log.

60
        Calcs are references via *upload_id*, *calc_id* pairs.
61
        '''
62
        archive_id = '%s/%s' % (upload_id, calc_id)
63

64
        upload_files = UploadFiles.get(
65
            upload_id, is_authorized=create_authorization_predicate(upload_id, calc_id))
66

67
        if upload_files is None:
68
            abort(404, message='Upload %s does not exist.' % upload_id)
69
70

        try:
71
            with upload_files.read_archive(calc_id) as archive:
72
73
                return [entry.to_dict() for entry in archive[calc_id]['processing_logs']]

74
        except Restricted:
75
            abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
76
77
        except KeyError:
            abort(404, message='Calculation %s does not exist.' % archive_id)
78
79
80
81


@calc_route(ns)
class ArchiveCalcResource(Resource):
82
    @api.doc('get_archive_calc')
83
    @api.response(404, 'The upload or calculation does not exist')
84
    @api.response(401, 'Not authorized to access the data.')
85
    @api.response(200, 'Archive data send', headers={'Content-Type': 'application/json'})
86
    @authenticate(signature_token=True)
87
    def get(self, upload_id, calc_id):
88
        '''
89
90
        Get calculation data in archive form.

91
        Calcs are references via *upload_id*, *calc_id* pairs.
92
        '''
93
        archive_id = '%s/%s' % (upload_id, calc_id)
94

95
        upload_files = UploadFiles.get(
96
            upload_id, is_authorized=create_authorization_predicate(upload_id, calc_id))
97

98
        if upload_files is None:
99
            abort(404, message='Archive %s does not exist.' % upload_id)
100
101

        try:
102
            with upload_files.read_archive(calc_id) as archive:
103
                return {
104
105
106
107
                    key: value
                    for key, value in archive[calc_id].to_dict().items()
                    if key != 'processing_logs'}

108
        except Restricted:
109
            abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
110
        except KeyError:
111
            abort(404, message='Calculation %s does not exist.' % archive_id)
112
113


114
115
116
_archive_download_parser = api.parser()
add_search_parameters(_archive_download_parser)
_archive_download_parser.add_argument(
117
118
119
    name='compress', type=bool, help='Use compression on .zip files, default is not.',
    location='args')

120
121
122

@ns.route('/download')
class ArchiveDownloadResource(Resource):
123
124
    manifest_quantities = ['upload_id', 'calc_id', 'external_id', 'raw_id', 'pid', 'calc_hash']

125
    @api.doc('archive_download')
126
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
127
    @api.expect(_archive_download_parser, validate=True)
128
129
130
    @api.response(200, 'File(s) send', headers={'Content-Type': 'application/zip'})
    @authenticate(signature_token=True)
    def get(self):
131
        '''
132
133
134
135
136
137
138
139
140
        Get calculation data in archive form from all query results.

        See ``/repo`` endpoint for documentation on the search
        parameters.

        Zip files are streamed; instead of 401 errors, the zip file will just not contain
        any files that the user is not authorized to access.

        The zip file will contain a ``manifest.json`` with the repository meta data.
141
        '''
142
        try:
143
            args = _archive_download_parser.parse_args()
144
145
146
147
148
            compress = args.get('compress', False)
        except Exception:
            abort(400, message='bad parameter types')

        search_request = search.SearchRequest()
149
        apply_search_parameters(search_request, args)
150
        search_request.include('calc_id', 'upload_id', 'mainfile')
151

152
153
154
155
        calcs = search_request.execute_scan(
            order_by='upload_id',
            size=config.services.download_scan_size,
            scroll=config.services.download_scan_timeout)
156
157

        def generator():
158
            try:
159
160
161
                manifest = {}
                upload_files = None

162
163
164
                for entry in calcs:
                    upload_id = entry['upload_id']
                    calc_id = entry['calc_id']
165
166
167
168
169
170
171
172
173

                    manifest = {
                        calc_id: {
                            key: entry[key]
                            for key in ArchiveDownloadResource.manifest_quantities
                            if entry.get(key) is not None
                        }
                    }

174
175
                    if upload_files is None or upload_files.upload_id != upload_id:
                        if upload_files is not None:
176
                            upload_files.close()
177
178
179
180
181

                        upload_files = UploadFiles.get(
                            upload_id, create_authorization_predicate(upload_id))

                        if upload_files is None:
182
                            common.logger.error('upload files do not exist', upload_id=upload_id)
183
184
                            continue

185
186
                    upload_files._is_authorized = create_authorization_predicate(
                        upload_id=upload_id, calc_id=calc_id)
187
188
189
190
                    with upload_files.read_archive(calc_id) as archive:
                        f = BytesIO(orjson.dumps(
                            archive[calc_id].to_dict(),
                            option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS))
191

192
                        yield (
193
                            '%s.%s' % (calc_id, 'json'), calc_id, manifest,
194
195
                            lambda calc_id: f,
                            lambda calc_id: f.getbuffer().nbytes)
196

197
                if upload_files is not None:
198
                    upload_files.close()
199

200
            except Exception as e:
201
                common.logger.error(
202
203
204
                    'unexpected error while streaming raw data from query',
                    exc_info=e,
                    query=urllib.parse.urlencode(request.args, doseq=True))
205

206
        return streamed_zipfile(
207
            generator(), zipfile_name='nomad_archive.zip', compress=compress, manifest=dict())
208
209


210
_archive_query_model = api.inherit('ArchiveSearch', search_model, {
211
212
    'required': fields.Raw(description='A dictionary that defines what archive data to retrive.'),
    'query_schema': fields.Raw(description='Deprecated, use required instead.'),
213
214
    'raise_errors': fields.Boolean(
        description='Return 404 on missing archives or 500 on other errors instead of skipping the entry.')
215
})
216
217


218
219
@ns.route('/query')
class ArchiveQueryResource(Resource):
220
221
222
223
    @api.doc('post_archive_query')
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
    @api.response(401, 'Not authorized to access the data.')
    @api.response(404, 'The upload or calculation does not exist')
224
    @api.expect(_archive_query_model)
225
    @api.marshal_with(_archive_query_model, skip_none=True, code=200, description='Archive search results sent')
226
    @authenticate()
227
    def post(self):
228
        '''
229
        Post a query schema and return it filled with archive data.
230

231
232
        See ``/repo`` endpoint for documentation on the search
        parameters.
233

234
235
236
237
238
        This endpoint uses pagination (see /repo) or id aggregation to handle large result
        sets over multiple requests.
        Use aggregation.after and aggregation.per_page to request a
        certain page with id aggregation.

239
240
        The actual data are in results and a supplementary python code (curl) to
        execute search is in python (curl).
241
        '''
242
243
        try:
            data_in = request.get_json()
244
            aggregation = data_in.get('aggregation', None)
245
246
247

            pagination = data_in.get('pagination', {})
            page = pagination.get('page', 1)
248
            per_page = pagination.get('per_page', 10)
249
250

            query = data_in.get('query', {})
251

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
252
253
            query_expression = {key: val for key, val in query.items() if '$' in key}

254
255
256
257
258
259
            required: Dict[str, Any] = None
            if 'required' in data_in:
                required = data_in.get('required')
            else:
                required = data_in.get('query_schema', '*')

Markus Scheidgen's avatar
Markus Scheidgen committed
260
            raise_errors = data_in.get('raise_errors', False)
261

262
263
264
        except Exception:
            abort(400, message='bad parameter types')

265
        if not (page >= 1 and per_page > 0):
266
267
268
            abort(400, message='invalid pagination')

        search_request = search.SearchRequest()
269
270
271
272
273
274
        if g.user is not None:
            search_request.owner('all', user_id=g.user.user_id)
        else:
            search_request.owner('all')

        apply_search_parameters(search_request, query)
275
        if not aggregation:
276
277
            search_request.include(
                'calc_id', 'upload_id', 'with_embargo', 'published', 'parser_name')
278

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
279
        if query_expression:
280
281
282
283
            try:
                search_request.query_expression(query_expression)
            except AssertionError as e:
                abort(400, str(e))
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
284

285
        try:
286
287
288
289
            if aggregation:
                results = search_request.execute_aggregated(
                    after=aggregation.get('after'), per_page=aggregation.get('per_page', 1000),
                    includes=['with_embargo', 'published', 'parser_name'])
290
291
292

            else:
                results = search_request.execute_paginated(
293
                    per_page=per_page, page=page, order_by='upload_id')
294
295

        except KeyError as e:
Markus Scheidgen's avatar
Markus Scheidgen committed
296
297
298
299
300
301
302
303
            abort(400, 'The query part contained an unknown quantity or section: %s' % str(e))

        try:
            required_with_references = compute_required_with_referenced(required)
        except KeyError as e:
            abort(
                400,
                message='The required part contained an unknown quantity or section: %s' % str(e))
304
305
306

        data = []
        calcs = results['results']
307
        upload_files = None
308
        current_upload_id = None
309
310
311
312
        archive_is_complete = False
        if required_with_references is not None:
            archive_is_complete = True
            required = required_with_references
313

314
        for entry in calcs:
315
316
            with_embargo = entry['with_embargo']

317
318
            upload_id = entry['upload_id']
            calc_id = entry['calc_id']
319

320
321
322
323
            if upload_files is None or current_upload_id != upload_id:
                if upload_files is not None:
                    upload_files.close()

324
325
                upload_files = UploadFiles.get(
                    upload_id, create_authorization_predicate(upload_id))
326
327
328
329

                if upload_files is None:
                    return []

330
331
332
333
334
335
                if archive_is_complete:
                    upload_calc_ids = [
                        calc['calc_id'] for calc in calcs if calc['upload_id'] == upload_id]
                    upload_partial_archives = read_partial_archives_from_mongo(
                        upload_calc_ids, as_dict=True)

336
                current_upload_id = upload_id
337

338
339
340
341
342
343
344
            # TODO we are either just use the partial from mongo or read the whole required
            # from the mgs-pack archive on top.
            # Ideally, we would only read whats left from the msg-pack archive and merge.
            if archive_is_complete:
                try:
                    partial_archive = upload_partial_archives[calc_id]
                    partial_archive = filter_archive(
345
                        required, partial_archive, transform=lambda e: e)
346
347
348
349
350
351
352
353
354
355
356
357
358
359

                    data.append({
                        'calc_id': calc_id,
                        'parser_name': entry['parser_name'],
                        'archive': partial_archive})
                    continue
                except KeyError:
                    pass
                except ArchiveQueryError as e:
                    abort(400, str(e))
                except Exception as e:
                    common.logger.error(
                        str(e), upload_id=upload_id, calc_id=calc_id, exc_info=e)

360
            if with_embargo:
361
                access = 'restricted'
362
363
                upload_files._is_authorized = create_authorization_predicate(
                    upload_id=upload_id, calc_id=calc_id)
364
            else:
365
366
367
368
369
370
371
                access = 'public'

            try:
                with upload_files.read_archive(calc_id, access) as archive:
                    data.append({
                        'calc_id': calc_id,
                        'parser_name': entry['parser_name'],
372
                        'archive': query_archive(archive, {calc_id: required})[calc_id]
373
                    })
374
375
            except ArchiveQueryError as e:
                abort(400, str(e))
376
            except KeyError:
Markus Scheidgen's avatar
Markus Scheidgen committed
377
378
                if raise_errors:
                    abort(404, 'Archive for entry %s does not exist' % calc_id)
379
380
                # We simply skip this entry
                pass
381
            except Restricted:
382
                # this should not happen
383
384
                common.logger.error(
                    'supposedly unreachable code', upload_id=upload_id, calc_id=calc_id)
385
            except Exception as e:
Markus Scheidgen's avatar
Markus Scheidgen committed
386
                if raise_errors:
387
                    raise e
388
389
                common.logger.error(
                    str(e), upload_id=upload_id, calc_id=calc_id, exc_info=e)
390

391
392
393
        if upload_files is not None:
            upload_files.close()

394
395
396
        # assign archive data to results
        results['results'] = data

397
        return results, 200