archive.py 13.2 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
16
17
The archive API of the nomad@FAIRDI APIs. This API is about serving processed
(parsed and normalized) calculation data in nomad's *meta-info* format.
18
'''
19

20
from typing import Dict, Any
21
from io import BytesIO
22
from flask import request, g
23
from flask_restplus import abort, Resource, fields
24
import json
25
import orjson
26
import urllib.parse
27

28
from nomad.files import UploadFiles, Restricted
29
from nomad.archive import query_archive, ArchiveQueryError
30
from nomad import search, config
31
from nomad.app import common
32

33
from .auth import authenticate, create_authorization_predicate
Markus Scheidgen's avatar
Markus Scheidgen committed
34
from .api import api
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
35
36
from .common import calc_route, streamed_zipfile, search_model, add_search_parameters,\
    apply_search_parameters
37

38
39

ns = api.namespace(
40
41
    'archive',
    description='Access archive data and archive processing logs.')
42
43
44
45


@calc_route(ns, '/logs')
class ArchiveCalcLogResource(Resource):
46
    @api.doc('get_archive_logs')
47
    @api.response(404, 'The upload or calculation does not exist')
48
    @api.response(401, 'Not authorized to access the data.')
49
    @api.response(200, 'Archive data send')
50
    @authenticate(signature_token=True)
51
    def get(self, upload_id, calc_id):
52
        '''
53
54
        Get calculation processing log.

55
        Calcs are references via *upload_id*, *calc_id* pairs.
56
        '''
57
        archive_id = '%s/%s' % (upload_id, calc_id)
58

59
        upload_files = UploadFiles.get(
60
            upload_id, is_authorized=create_authorization_predicate(upload_id, calc_id))
61

62
        if upload_files is None:
63
            abort(404, message='Upload %s does not exist.' % upload_id)
64
65

        try:
66
            with upload_files.read_archive(calc_id) as archive:
67
68
                return [entry.to_dict() for entry in archive[calc_id]['processing_logs']]

69
        except Restricted:
70
            abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
71
72
        except KeyError:
            abort(404, message='Calculation %s does not exist.' % archive_id)
73
74
75
76


@calc_route(ns)
class ArchiveCalcResource(Resource):
77
    @api.doc('get_archive_calc')
78
    @api.response(404, 'The upload or calculation does not exist')
79
    @api.response(401, 'Not authorized to access the data.')
80
    @api.response(200, 'Archive data send', headers={'Content-Type': 'application/json'})
81
    @authenticate(signature_token=True)
82
    def get(self, upload_id, calc_id):
83
        '''
84
85
        Get calculation data in archive form.

86
        Calcs are references via *upload_id*, *calc_id* pairs.
87
        '''
88
        archive_id = '%s/%s' % (upload_id, calc_id)
89

90
        upload_files = UploadFiles.get(
91
            upload_id, is_authorized=create_authorization_predicate(upload_id, calc_id))
92

93
        if upload_files is None:
94
            abort(404, message='Archive %s does not exist.' % upload_id)
95
96

        try:
97
            with upload_files.read_archive(calc_id) as archive:
98
                return {
99
100
101
102
                    key: value
                    for key, value in archive[calc_id].to_dict().items()
                    if key != 'processing_logs'}

103
        except Restricted:
104
            abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
105
        except KeyError:
106
            abort(404, message='Calculation %s does not exist.' % archive_id)
107
108


109
110
111
_archive_download_parser = api.parser()
add_search_parameters(_archive_download_parser)
_archive_download_parser.add_argument(
112
113
114
    name='compress', type=bool, help='Use compression on .zip files, default is not.',
    location='args')

115
116
117

@ns.route('/download')
class ArchiveDownloadResource(Resource):
118
119
    manifest_quantities = ['upload_id', 'calc_id', 'external_id', 'raw_id', 'pid', 'calc_hash']

120
    @api.doc('archive_download')
121
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
122
    @api.expect(_archive_download_parser, validate=True)
123
124
125
    @api.response(200, 'File(s) send', headers={'Content-Type': 'application/zip'})
    @authenticate(signature_token=True)
    def get(self):
126
        '''
127
128
129
130
131
132
133
134
135
        Get calculation data in archive form from all query results.

        See ``/repo`` endpoint for documentation on the search
        parameters.

        Zip files are streamed; instead of 401 errors, the zip file will just not contain
        any files that the user is not authorized to access.

        The zip file will contain a ``manifest.json`` with the repository meta data.
136
        '''
137
        try:
138
            args = _archive_download_parser.parse_args()
139
140
141
142
143
            compress = args.get('compress', False)
        except Exception:
            abort(400, message='bad parameter types')

        search_request = search.SearchRequest()
144
        apply_search_parameters(search_request, args)
145
        search_request.include('calc_id', 'upload_id', 'mainfile')
146

147
148
149
150
        calcs = search_request.execute_scan(
            order_by='upload_id',
            size=config.services.download_scan_size,
            scroll=config.services.download_scan_timeout)
151
152

        def generator():
153
            try:
154
155
156
                manifest = {}
                upload_files = None

157
158
159
160
161
                for entry in calcs:
                    upload_id = entry['upload_id']
                    calc_id = entry['calc_id']
                    if upload_files is None or upload_files.upload_id != upload_id:
                        if upload_files is not None:
162
                            upload_files.close()
163
164
165
166
167

                        upload_files = UploadFiles.get(
                            upload_id, create_authorization_predicate(upload_id))

                        if upload_files is None:
168
                            common.logger.error('upload files do not exist', upload_id=upload_id)
169
170
                            continue

171
172
                    upload_files._is_authorized = create_authorization_predicate(
                        upload_id=upload_id, calc_id=calc_id)
173
174
175
176
                    with upload_files.read_archive(calc_id) as archive:
                        f = BytesIO(orjson.dumps(
                            archive[calc_id].to_dict(),
                            option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS))
177

178
179
180
181
                        yield (
                            '%s.%s' % (calc_id, 'json'), calc_id,
                            lambda calc_id: f,
                            lambda calc_id: f.getbuffer().nbytes)
182
183
184
185
186
187

                    manifest[calc_id] = {
                        key: entry[key]
                        for key in ArchiveDownloadResource.manifest_quantities
                        if entry.get(key) is not None
                    }
188

189
                if upload_files is not None:
190
                    upload_files.close()
191
192
193
194
195
196

                try:
                    manifest_contents = json.dumps(manifest).encode('utf-8')
                except Exception as e:
                    manifest_contents = json.dumps(
                        dict(error='Could not create the manifest: %s' % (e))).encode('utf-8')
197
                    common.logger.error(
198
199
200
201
202
203
                        'could not create raw query manifest', exc_info=e)

                yield (
                    'manifest.json', 'manifest',
                    lambda *args: BytesIO(manifest_contents),
                    lambda *args: len(manifest_contents))
Markus Scheidgen's avatar
Markus Scheidgen committed
204

205
            except Exception as e:
206
                common.logger.warning(
207
208
209
                    'unexpected error while streaming raw data from query',
                    exc_info=e,
                    query=urllib.parse.urlencode(request.args, doseq=True))
210

211
212
213
214
        return streamed_zipfile(
            generator(), zipfile_name='nomad_archive.zip', compress=compress)


215
_archive_query_model = api.inherit('ArchiveSearch', search_model, {
216
217
    'required': fields.Raw(description='A dictionary that defines what archive data to retrive.'),
    'query_schema': fields.Raw(description='Deprecated, use required instead.'),
Markus Scheidgen's avatar
Markus Scheidgen committed
218
    'raise_errors': fields.Boolean(description='Return 404 on missing archives or 500 on other errors instead of skipping the entry.')
219
})
220
221


222
223
@ns.route('/query')
class ArchiveQueryResource(Resource):
224
225
226
227
    @api.doc('post_archive_query')
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
    @api.response(401, 'Not authorized to access the data.')
    @api.response(404, 'The upload or calculation does not exist')
228
    @api.expect(_archive_query_model)
229
    @api.marshal_with(_archive_query_model, skip_none=True, code=200, description='Archive search results sent')
230
    @authenticate()
231
    def post(self):
232
        '''
233
        Post a query schema and return it filled with archive data.
234

235
236
        See ``/repo`` endpoint for documentation on the search
        parameters.
237

238
239
240
241
242
        This endpoint uses pagination (see /repo) or id aggregation to handle large result
        sets over multiple requests.
        Use aggregation.after and aggregation.per_page to request a
        certain page with id aggregation.

243
244
        The actual data are in results and a supplementary python code (curl) to
        execute search is in python (curl).
245
        '''
246
247
        try:
            data_in = request.get_json()
248
            aggregation = data_in.get('aggregation', None)
249
250
251

            pagination = data_in.get('pagination', {})
            page = pagination.get('page', 1)
252
            per_page = pagination.get('per_page', 10)
253
254

            query = data_in.get('query', {})
255

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
256
257
            query_expression = {key: val for key, val in query.items() if '$' in key}

258
259
260
261
262
263
            required: Dict[str, Any] = None
            if 'required' in data_in:
                required = data_in.get('required')
            else:
                required = data_in.get('query_schema', '*')

Markus Scheidgen's avatar
Markus Scheidgen committed
264
            raise_errors = data_in.get('raise_errors', False)
265

266
267
268
        except Exception:
            abort(400, message='bad parameter types')

269
        if not (page >= 1 and per_page > 0):
270
271
272
            abort(400, message='invalid pagination')

        search_request = search.SearchRequest()
273
274
275
276
277
278
        if g.user is not None:
            search_request.owner('all', user_id=g.user.user_id)
        else:
            search_request.owner('all')

        apply_search_parameters(search_request, query)
279
280
        if not aggregation:
            search_request.include('calc_id', 'upload_id', 'with_embargo', 'published', 'parser_name')
281

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
282
283
284
        if query_expression:
            search_request.query_expression(query_expression)

285
        try:
286
287
288
289
            if aggregation:
                results = search_request.execute_aggregated(
                    after=aggregation.get('after'), per_page=aggregation.get('per_page', 1000),
                    includes=['with_embargo', 'published', 'parser_name'])
290
291
292

            else:
                results = search_request.execute_paginated(
293
                    per_page=per_page, page=page, order_by='upload_id')
294
295
296
297
298
299

        except KeyError as e:
            abort(400, str(e))

        data = []
        calcs = results['results']
300
        upload_files = None
301
        current_upload_id = None
302
        for entry in calcs:
303
304
            with_embargo = entry['with_embargo']

305
306
            upload_id = entry['upload_id']
            calc_id = entry['calc_id']
307

308
309
310
311
            if upload_files is None or current_upload_id != upload_id:
                if upload_files is not None:
                    upload_files.close()

312
313
314
315
316
                upload_files = UploadFiles.get(upload_id, create_authorization_predicate(upload_id))

                if upload_files is None:
                    return []

317
                current_upload_id = upload_id
318

319
            if with_embargo:
320
                access = 'restricted'
321
322
                upload_files._is_authorized = create_authorization_predicate(
                    upload_id=upload_id, calc_id=calc_id)
323
            else:
324
325
326
327
328
329
330
331
                access = 'public'

            try:
                with upload_files.read_archive(calc_id, access) as archive:
                    data.append({
                        'calc_id': calc_id,
                        'parser_name': entry['parser_name'],
                        'archive': query_archive(
332
                            archive, {calc_id: required})[calc_id]
333
                    })
334
335
            except ArchiveQueryError as e:
                abort(400, str(e))
336
            except KeyError:
Markus Scheidgen's avatar
Markus Scheidgen committed
337
338
                if raise_errors:
                    abort(404, 'Archive for entry %s does not exist' % calc_id)
339
340
                # We simply skip this entry
                pass
341
            except Restricted:
342
343
                # this should not happen
                common.logger.error('supposedly unreachable code', upload_id=upload_id, calc_id=calc_id)
344
            except Exception as e:
Markus Scheidgen's avatar
Markus Scheidgen committed
345
                if raise_errors:
346
                    raise e
347
                common.logger.error(str(e), upload_id=upload_id, calc_id=calc_id, exc_info=e)
348

349
350
351
        if upload_files is not None:
            upload_files.close()

352
353
354
        # assign archive data to results
        results['results'] = data

355
        return results, 200