archive.py 15.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
16
17
The archive API of the nomad@FAIRDI APIs. This API is about serving processed
(parsed and normalized) calculation data in nomad's *meta-info* format.
18
'''
19

20
from typing import Dict, Any
21
from io import BytesIO
22
from flask import request, g
23
from flask_restplus import abort, Resource, fields
24
import json
25
import orjson
26
import urllib.parse
27

28
from nomad.files import UploadFiles, Restricted
29
30
31
from nomad.archive import (
    query_archive, ArchiveQueryError, filter_archive, read_partial_archives_from_mongo,
    compute_required_with_referenced)
32
from nomad import search, config
33
from nomad.app import common
34

35
from .auth import authenticate, create_authorization_predicate
Markus Scheidgen's avatar
Markus Scheidgen committed
36
from .api import api
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
37
38
from .common import calc_route, streamed_zipfile, search_model, add_search_parameters,\
    apply_search_parameters
39

40
41

ns = api.namespace(
42
43
    'archive',
    description='Access archive data and archive processing logs.')
44
45
46
47


@calc_route(ns, '/logs')
class ArchiveCalcLogResource(Resource):
48
    @api.doc('get_archive_logs')
49
    @api.response(404, 'The upload or calculation does not exist')
50
    @api.response(401, 'Not authorized to access the data.')
51
    @api.response(200, 'Archive data send')
52
    @authenticate(signature_token=True)
53
    def get(self, upload_id, calc_id):
54
        '''
55
56
        Get calculation processing log.

57
        Calcs are references via *upload_id*, *calc_id* pairs.
58
        '''
59
        archive_id = '%s/%s' % (upload_id, calc_id)
60

61
        upload_files = UploadFiles.get(
62
            upload_id, is_authorized=create_authorization_predicate(upload_id, calc_id))
63

64
        if upload_files is None:
65
            abort(404, message='Upload %s does not exist.' % upload_id)
66
67

        try:
68
            with upload_files.read_archive(calc_id) as archive:
69
70
                return [entry.to_dict() for entry in archive[calc_id]['processing_logs']]

71
        except Restricted:
72
            abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
73
74
        except KeyError:
            abort(404, message='Calculation %s does not exist.' % archive_id)
75
76
77
78


@calc_route(ns)
class ArchiveCalcResource(Resource):
79
    @api.doc('get_archive_calc')
80
    @api.response(404, 'The upload or calculation does not exist')
81
    @api.response(401, 'Not authorized to access the data.')
82
    @api.response(200, 'Archive data send', headers={'Content-Type': 'application/json'})
83
    @authenticate(signature_token=True)
84
    def get(self, upload_id, calc_id):
85
        '''
86
87
        Get calculation data in archive form.

88
        Calcs are references via *upload_id*, *calc_id* pairs.
89
        '''
90
        archive_id = '%s/%s' % (upload_id, calc_id)
91

92
        upload_files = UploadFiles.get(
93
            upload_id, is_authorized=create_authorization_predicate(upload_id, calc_id))
94

95
        if upload_files is None:
96
            abort(404, message='Archive %s does not exist.' % upload_id)
97
98

        try:
99
            with upload_files.read_archive(calc_id) as archive:
100
                return {
101
102
103
104
                    key: value
                    for key, value in archive[calc_id].to_dict().items()
                    if key != 'processing_logs'}

105
        except Restricted:
106
            abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
107
        except KeyError:
108
            abort(404, message='Calculation %s does not exist.' % archive_id)
109
110


111
112
113
_archive_download_parser = api.parser()
add_search_parameters(_archive_download_parser)
_archive_download_parser.add_argument(
114
115
116
    name='compress', type=bool, help='Use compression on .zip files, default is not.',
    location='args')

117
118
119

@ns.route('/download')
class ArchiveDownloadResource(Resource):
120
121
    manifest_quantities = ['upload_id', 'calc_id', 'external_id', 'raw_id', 'pid', 'calc_hash']

122
    @api.doc('archive_download')
123
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
124
    @api.expect(_archive_download_parser, validate=True)
125
126
127
    @api.response(200, 'File(s) send', headers={'Content-Type': 'application/zip'})
    @authenticate(signature_token=True)
    def get(self):
128
        '''
129
130
131
132
133
134
135
136
137
        Get calculation data in archive form from all query results.

        See ``/repo`` endpoint for documentation on the search
        parameters.

        Zip files are streamed; instead of 401 errors, the zip file will just not contain
        any files that the user is not authorized to access.

        The zip file will contain a ``manifest.json`` with the repository meta data.
138
        '''
139
        try:
140
            args = _archive_download_parser.parse_args()
141
142
143
144
145
            compress = args.get('compress', False)
        except Exception:
            abort(400, message='bad parameter types')

        search_request = search.SearchRequest()
146
        apply_search_parameters(search_request, args)
147
        search_request.include('calc_id', 'upload_id', 'mainfile')
148

149
150
151
152
        calcs = search_request.execute_scan(
            order_by='upload_id',
            size=config.services.download_scan_size,
            scroll=config.services.download_scan_timeout)
153
154

        def generator():
155
            try:
156
157
158
                manifest = {}
                upload_files = None

159
160
161
162
163
                for entry in calcs:
                    upload_id = entry['upload_id']
                    calc_id = entry['calc_id']
                    if upload_files is None or upload_files.upload_id != upload_id:
                        if upload_files is not None:
164
                            upload_files.close()
165
166
167
168
169

                        upload_files = UploadFiles.get(
                            upload_id, create_authorization_predicate(upload_id))

                        if upload_files is None:
170
                            common.logger.error('upload files do not exist', upload_id=upload_id)
171
172
                            continue

173
174
                    upload_files._is_authorized = create_authorization_predicate(
                        upload_id=upload_id, calc_id=calc_id)
175
176
177
178
                    with upload_files.read_archive(calc_id) as archive:
                        f = BytesIO(orjson.dumps(
                            archive[calc_id].to_dict(),
                            option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS))
179

180
181
182
183
                        yield (
                            '%s.%s' % (calc_id, 'json'), calc_id,
                            lambda calc_id: f,
                            lambda calc_id: f.getbuffer().nbytes)
184
185
186
187
188
189

                    manifest[calc_id] = {
                        key: entry[key]
                        for key in ArchiveDownloadResource.manifest_quantities
                        if entry.get(key) is not None
                    }
190

191
                if upload_files is not None:
192
                    upload_files.close()
193
194
195
196
197
198

                try:
                    manifest_contents = json.dumps(manifest).encode('utf-8')
                except Exception as e:
                    manifest_contents = json.dumps(
                        dict(error='Could not create the manifest: %s' % (e))).encode('utf-8')
199
                    common.logger.error(
200
201
202
203
204
205
                        'could not create raw query manifest', exc_info=e)

                yield (
                    'manifest.json', 'manifest',
                    lambda *args: BytesIO(manifest_contents),
                    lambda *args: len(manifest_contents))
Markus Scheidgen's avatar
Markus Scheidgen committed
206

207
            except Exception as e:
208
                common.logger.warning(
209
210
211
                    'unexpected error while streaming raw data from query',
                    exc_info=e,
                    query=urllib.parse.urlencode(request.args, doseq=True))
212

213
214
215
216
        return streamed_zipfile(
            generator(), zipfile_name='nomad_archive.zip', compress=compress)


217
_archive_query_model = api.inherit('ArchiveSearch', search_model, {
218
219
    'required': fields.Raw(description='A dictionary that defines what archive data to retrive.'),
    'query_schema': fields.Raw(description='Deprecated, use required instead.'),
220
221
    'raise_errors': fields.Boolean(
        description='Return 404 on missing archives or 500 on other errors instead of skipping the entry.')
222
})
223
224


225
226
@ns.route('/query')
class ArchiveQueryResource(Resource):
227
228
229
230
    @api.doc('post_archive_query')
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
    @api.response(401, 'Not authorized to access the data.')
    @api.response(404, 'The upload or calculation does not exist')
231
    @api.expect(_archive_query_model)
232
    @api.marshal_with(_archive_query_model, skip_none=True, code=200, description='Archive search results sent')
233
    @authenticate()
234
    def post(self):
235
        '''
236
        Post a query schema and return it filled with archive data.
237

238
239
        See ``/repo`` endpoint for documentation on the search
        parameters.
240

241
242
243
244
245
        This endpoint uses pagination (see /repo) or id aggregation to handle large result
        sets over multiple requests.
        Use aggregation.after and aggregation.per_page to request a
        certain page with id aggregation.

246
247
        The actual data are in results and a supplementary python code (curl) to
        execute search is in python (curl).
248
        '''
249
250
        try:
            data_in = request.get_json()
251
            aggregation = data_in.get('aggregation', None)
252
253
254

            pagination = data_in.get('pagination', {})
            page = pagination.get('page', 1)
255
            per_page = pagination.get('per_page', 10)
256
257

            query = data_in.get('query', {})
258

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
259
260
            query_expression = {key: val for key, val in query.items() if '$' in key}

261
262
263
264
265
266
            required: Dict[str, Any] = None
            if 'required' in data_in:
                required = data_in.get('required')
            else:
                required = data_in.get('query_schema', '*')

Markus Scheidgen's avatar
Markus Scheidgen committed
267
            raise_errors = data_in.get('raise_errors', False)
268

269
270
271
        except Exception:
            abort(400, message='bad parameter types')

272
        if not (page >= 1 and per_page > 0):
273
274
275
            abort(400, message='invalid pagination')

        search_request = search.SearchRequest()
276
277
278
279
280
281
        if g.user is not None:
            search_request.owner('all', user_id=g.user.user_id)
        else:
            search_request.owner('all')

        apply_search_parameters(search_request, query)
282
        if not aggregation:
283
284
            search_request.include(
                'calc_id', 'upload_id', 'with_embargo', 'published', 'parser_name')
285

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
286
        if query_expression:
287
288
289
290
            try:
                search_request.query_expression(query_expression)
            except AssertionError as e:
                abort(400, str(e))
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
291

292
        try:
293
294
295
296
            if aggregation:
                results = search_request.execute_aggregated(
                    after=aggregation.get('after'), per_page=aggregation.get('per_page', 1000),
                    includes=['with_embargo', 'published', 'parser_name'])
297
298
299

            else:
                results = search_request.execute_paginated(
300
                    per_page=per_page, page=page, order_by='upload_id')
301
302

        except KeyError as e:
Markus Scheidgen's avatar
Markus Scheidgen committed
303
304
305
306
307
308
309
310
            abort(400, 'The query part contained an unknown quantity or section: %s' % str(e))

        try:
            required_with_references = compute_required_with_referenced(required)
        except KeyError as e:
            abort(
                400,
                message='The required part contained an unknown quantity or section: %s' % str(e))
311
312
313

        data = []
        calcs = results['results']
314
        upload_files = None
315
        current_upload_id = None
316
317
318
319
        archive_is_complete = False
        if required_with_references is not None:
            archive_is_complete = True
            required = required_with_references
320

321
        for entry in calcs:
322
323
            with_embargo = entry['with_embargo']

324
325
            upload_id = entry['upload_id']
            calc_id = entry['calc_id']
326

327
328
329
330
            if upload_files is None or current_upload_id != upload_id:
                if upload_files is not None:
                    upload_files.close()

331
332
                upload_files = UploadFiles.get(
                    upload_id, create_authorization_predicate(upload_id))
333
334
335
336

                if upload_files is None:
                    return []

337
338
339
340
341
342
                if archive_is_complete:
                    upload_calc_ids = [
                        calc['calc_id'] for calc in calcs if calc['upload_id'] == upload_id]
                    upload_partial_archives = read_partial_archives_from_mongo(
                        upload_calc_ids, as_dict=True)

343
                current_upload_id = upload_id
344

345
346
347
348
349
350
351
            # TODO we are either just use the partial from mongo or read the whole required
            # from the mgs-pack archive on top.
            # Ideally, we would only read whats left from the msg-pack archive and merge.
            if archive_is_complete:
                try:
                    partial_archive = upload_partial_archives[calc_id]
                    partial_archive = filter_archive(
352
                        required, partial_archive, transform=lambda e: e)
353
354
355
356
357
358
359
360
361
362
363
364
365
366

                    data.append({
                        'calc_id': calc_id,
                        'parser_name': entry['parser_name'],
                        'archive': partial_archive})
                    continue
                except KeyError:
                    pass
                except ArchiveQueryError as e:
                    abort(400, str(e))
                except Exception as e:
                    common.logger.error(
                        str(e), upload_id=upload_id, calc_id=calc_id, exc_info=e)

367
            if with_embargo:
368
                access = 'restricted'
369
370
                upload_files._is_authorized = create_authorization_predicate(
                    upload_id=upload_id, calc_id=calc_id)
371
            else:
372
373
374
375
376
377
378
                access = 'public'

            try:
                with upload_files.read_archive(calc_id, access) as archive:
                    data.append({
                        'calc_id': calc_id,
                        'parser_name': entry['parser_name'],
379
                        'archive': query_archive(archive, {calc_id: required})[calc_id]
380
                    })
381
382
            except ArchiveQueryError as e:
                abort(400, str(e))
383
            except KeyError:
Markus Scheidgen's avatar
Markus Scheidgen committed
384
385
                if raise_errors:
                    abort(404, 'Archive for entry %s does not exist' % calc_id)
386
387
                # We simply skip this entry
                pass
388
            except Restricted:
389
                # this should not happen
390
391
                common.logger.error(
                    'supposedly unreachable code', upload_id=upload_id, calc_id=calc_id)
392
            except Exception as e:
Markus Scheidgen's avatar
Markus Scheidgen committed
393
                if raise_errors:
394
                    raise e
395
396
                common.logger.error(
                    str(e), upload_id=upload_id, calc_id=calc_id, exc_info=e)
397

398
399
400
        if upload_files is not None:
            upload_files.close()

401
402
403
        # assign archive data to results
        results['results'] = data

404
        return results, 200