archive.py 13.2 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
16
17
The archive API of the nomad@FAIRDI APIs. This API is about serving processed
(parsed and normalized) calculation data in nomad's *meta-info* format.
18
'''
19

20
from typing import Dict, Any
21
from io import BytesIO
22
from flask import request, g
23
from flask_restplus import abort, Resource, fields
24
import json
25
import orjson
26
import urllib.parse
27

28
from nomad.files import UploadFiles, Restricted
29
from nomad.archive import query_archive, ArchiveQueryError
30
from nomad import search, config
31
from nomad.app import common
32

33
from .auth import authenticate, create_authorization_predicate
Markus Scheidgen's avatar
Markus Scheidgen committed
34
from .api import api
35
36
from .common import calc_route, streamed_zipfile, search_model, add_search_parameters, apply_search_parameters, query_model

37
38

ns = api.namespace(
39
40
    'archive',
    description='Access archive data and archive processing logs.')
41
42
43
44


@calc_route(ns, '/logs')
class ArchiveCalcLogResource(Resource):
45
    @api.doc('get_archive_logs')
46
    @api.response(404, 'The upload or calculation does not exist')
47
    @api.response(401, 'Not authorized to access the data.')
48
    @api.response(200, 'Archive data send')
49
    @authenticate(signature_token=True)
50
    def get(self, upload_id, calc_id):
51
        '''
52
53
        Get calculation processing log.

54
        Calcs are references via *upload_id*, *calc_id* pairs.
55
        '''
56
        archive_id = '%s/%s' % (upload_id, calc_id)
57

58
        upload_files = UploadFiles.get(
59
            upload_id, is_authorized=create_authorization_predicate(upload_id, calc_id))
60

61
        if upload_files is None:
62
            abort(404, message='Upload %s does not exist.' % upload_id)
63
64

        try:
65
            with upload_files.read_archive(calc_id) as archive:
66
67
                return [entry.to_dict() for entry in archive[calc_id]['processing_logs']]

68
        except Restricted:
69
            abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
70
71
        except KeyError:
            abort(404, message='Calculation %s does not exist.' % archive_id)
72
73
74
75


@calc_route(ns)
class ArchiveCalcResource(Resource):
76
    @api.doc('get_archive_calc')
77
    @api.response(404, 'The upload or calculation does not exist')
78
    @api.response(401, 'Not authorized to access the data.')
79
    @api.response(200, 'Archive data send', headers={'Content-Type': 'application/json'})
80
    @authenticate(signature_token=True)
81
    def get(self, upload_id, calc_id):
82
        '''
83
84
        Get calculation data in archive form.

85
        Calcs are references via *upload_id*, *calc_id* pairs.
86
        '''
87
        archive_id = '%s/%s' % (upload_id, calc_id)
88

89
        upload_files = UploadFiles.get(
90
            upload_id, is_authorized=create_authorization_predicate(upload_id, calc_id))
91

92
        if upload_files is None:
93
            abort(404, message='Archive %s does not exist.' % upload_id)
94
95

        try:
96
            with upload_files.read_archive(calc_id) as archive:
97
                return {
98
99
100
101
                    key: value
                    for key, value in archive[calc_id].to_dict().items()
                    if key != 'processing_logs'}

102
        except Restricted:
103
            abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
104
        except KeyError:
105
            abort(404, message='Calculation %s does not exist.' % archive_id)
106
107


108
109
110
_archive_download_parser = api.parser()
add_search_parameters(_archive_download_parser)
_archive_download_parser.add_argument(
111
112
113
    name='compress', type=bool, help='Use compression on .zip files, default is not.',
    location='args')

114
115
116

@ns.route('/download')
class ArchiveDownloadResource(Resource):
117
118
    manifest_quantities = ['upload_id', 'calc_id', 'external_id', 'raw_id', 'pid', 'calc_hash']

119
    @api.doc('archive_download')
120
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
121
    @api.expect(_archive_download_parser, validate=True)
122
123
124
    @api.response(200, 'File(s) send', headers={'Content-Type': 'application/zip'})
    @authenticate(signature_token=True)
    def get(self):
125
        '''
126
127
128
129
130
131
132
133
134
        Get calculation data in archive form from all query results.

        See ``/repo`` endpoint for documentation on the search
        parameters.

        Zip files are streamed; instead of 401 errors, the zip file will just not contain
        any files that the user is not authorized to access.

        The zip file will contain a ``manifest.json`` with the repository meta data.
135
        '''
136
        try:
137
            args = _archive_download_parser.parse_args()
138
139
140
141
142
            compress = args.get('compress', False)
        except Exception:
            abort(400, message='bad parameter types')

        search_request = search.SearchRequest()
143
        apply_search_parameters(search_request, args)
144
        search_request.include('calc_id', 'upload_id', 'mainfile')
145

146
147
148
149
        calcs = search_request.execute_scan(
            order_by='upload_id',
            size=config.services.download_scan_size,
            scroll=config.services.download_scan_timeout)
150
151

        def generator():
152
            try:
153
154
155
                manifest = {}
                upload_files = None

156
157
158
159
160
                for entry in calcs:
                    upload_id = entry['upload_id']
                    calc_id = entry['calc_id']
                    if upload_files is None or upload_files.upload_id != upload_id:
                        if upload_files is not None:
161
                            upload_files.close()
162
163
164
165
166

                        upload_files = UploadFiles.get(
                            upload_id, create_authorization_predicate(upload_id))

                        if upload_files is None:
167
                            common.logger.error('upload files do not exist', upload_id=upload_id)
168
169
                            continue

170
171
                    upload_files._is_authorized = create_authorization_predicate(
                        upload_id=upload_id, calc_id=calc_id)
172
173
174
175
                    with upload_files.read_archive(calc_id) as archive:
                        f = BytesIO(orjson.dumps(
                            archive[calc_id].to_dict(),
                            option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS))
176

177
178
179
180
                        yield (
                            '%s.%s' % (calc_id, 'json'), calc_id,
                            lambda calc_id: f,
                            lambda calc_id: f.getbuffer().nbytes)
181
182
183
184
185
186

                    manifest[calc_id] = {
                        key: entry[key]
                        for key in ArchiveDownloadResource.manifest_quantities
                        if entry.get(key) is not None
                    }
187

188
                if upload_files is not None:
189
                    upload_files.close()
190
191
192
193
194
195

                try:
                    manifest_contents = json.dumps(manifest).encode('utf-8')
                except Exception as e:
                    manifest_contents = json.dumps(
                        dict(error='Could not create the manifest: %s' % (e))).encode('utf-8')
196
                    common.logger.error(
197
198
199
200
201
202
                        'could not create raw query manifest', exc_info=e)

                yield (
                    'manifest.json', 'manifest',
                    lambda *args: BytesIO(manifest_contents),
                    lambda *args: len(manifest_contents))
Markus Scheidgen's avatar
Markus Scheidgen committed
203

204
            except Exception as e:
205
                common.logger.warning(
206
207
208
                    'unexpected error while streaming raw data from query',
                    exc_info=e,
                    query=urllib.parse.urlencode(request.args, doseq=True))
209

210
211
212
213
        return streamed_zipfile(
            generator(), zipfile_name='nomad_archive.zip', compress=compress)


214
_archive_query_model = api.inherit('ArchiveSearch', search_model, {
215
    'query': fields.Nested(query_model, description='The query used to find the requested entries.', skip_none=True),
216
217
    'required': fields.Raw(description='A dictionary that defines what archive data to retrive.'),
    'query_schema': fields.Raw(description='Deprecated, use required instead.'),
Markus Scheidgen's avatar
Markus Scheidgen committed
218
    'raise_errors': fields.Boolean(description='Return 404 on missing archives or 500 on other errors instead of skipping the entry.')
219
})
220
221


222
223
@ns.route('/query')
class ArchiveQueryResource(Resource):
224
225
226
227
    @api.doc('post_archive_query')
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
    @api.response(401, 'Not authorized to access the data.')
    @api.response(404, 'The upload or calculation does not exist')
228
    @api.expect(_archive_query_model)
229
    @api.marshal_with(_archive_query_model, skip_none=True, code=200, description='Archive search results sent')
230
    @authenticate()
231
    def post(self):
232
        '''
233
        Post a query schema and return it filled with archive data.
234

235
236
        See ``/repo`` endpoint for documentation on the search
        parameters.
237

238
239
240
241
242
        This endpoint uses pagination (see /repo) or id aggregation to handle large result
        sets over multiple requests.
        Use aggregation.after and aggregation.per_page to request a
        certain page with id aggregation.

243
244
        The actual data are in results and a supplementary python code (curl) to
        execute search is in python (curl).
245
        '''
246
247
        try:
            data_in = request.get_json()
248
            aggregation = data_in.get('aggregation', None)
249
250
251

            pagination = data_in.get('pagination', {})
            page = pagination.get('page', 1)
252
            per_page = pagination.get('per_page', 10)
253
254

            query = data_in.get('query', {})
255
256
257
258
259
260
261

            required: Dict[str, Any] = None
            if 'required' in data_in:
                required = data_in.get('required')
            else:
                required = data_in.get('query_schema', '*')

Markus Scheidgen's avatar
Markus Scheidgen committed
262
            raise_errors = data_in.get('raise_errors', False)
263

264
265
266
        except Exception:
            abort(400, message='bad parameter types')

267
        if not (page >= 1 and per_page > 0):
268
269
270
            abort(400, message='invalid pagination')

        search_request = search.SearchRequest()
271
272
273
274
275
276
        if g.user is not None:
            search_request.owner('all', user_id=g.user.user_id)
        else:
            search_request.owner('all')

        apply_search_parameters(search_request, query)
277
278
        if not aggregation:
            search_request.include('calc_id', 'upload_id', 'with_embargo', 'published', 'parser_name')
279
280

        try:
281
282
283
284
            if aggregation:
                results = search_request.execute_aggregated(
                    after=aggregation.get('after'), per_page=aggregation.get('per_page', 1000),
                    includes=['with_embargo', 'published', 'parser_name'])
285
286
287

            else:
                results = search_request.execute_paginated(
288
                    per_page=per_page, page=page, order_by='upload_id')
289
290
291
292
293
294

        except KeyError as e:
            abort(400, str(e))

        data = []
        calcs = results['results']
295
        upload_files = None
296
        current_upload_id = None
297
        for entry in calcs:
298
299
            with_embargo = entry['with_embargo']

300
301
            upload_id = entry['upload_id']
            calc_id = entry['calc_id']
302

303
304
305
306
            if upload_files is None or current_upload_id != upload_id:
                if upload_files is not None:
                    upload_files.close()

307
308
309
310
311
                upload_files = UploadFiles.get(upload_id, create_authorization_predicate(upload_id))

                if upload_files is None:
                    return []

312
                current_upload_id = upload_id
313

314
            if with_embargo:
315
                access = 'restricted'
316
317
                upload_files._is_authorized = create_authorization_predicate(
                    upload_id=upload_id, calc_id=calc_id)
318
            else:
319
320
321
322
323
324
325
326
                access = 'public'

            try:
                with upload_files.read_archive(calc_id, access) as archive:
                    data.append({
                        'calc_id': calc_id,
                        'parser_name': entry['parser_name'],
                        'archive': query_archive(
327
                            archive, {calc_id: required})[calc_id]
328
                    })
329
330
            except ArchiveQueryError as e:
                abort(400, str(e))
331
            except KeyError:
Markus Scheidgen's avatar
Markus Scheidgen committed
332
333
                if raise_errors:
                    abort(404, 'Archive for entry %s does not exist' % calc_id)
334
335
                # We simply skip this entry
                pass
336
            except Restricted:
337
338
                # this should not happen
                common.logger.error('supposedly unreachable code', upload_id=upload_id, calc_id=calc_id)
339
            except Exception as e:
Markus Scheidgen's avatar
Markus Scheidgen committed
340
                if raise_errors:
341
                    raise e
342
                common.logger.error(str(e), upload_id=upload_id, calc_id=calc_id, exc_info=e)
343

344
345
346
        if upload_files is not None:
            upload_files.close()

347
348
349
        # assign archive data to results
        results['results'] = data

350
        return results, 200