archive.py 16.2 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
16
17
The archive API of the nomad@FAIRDI APIs. This API is about serving processed
(parsed and normalized) calculation data in nomad's *meta-info* format.
18
'''
19

20
from typing import Dict, Any
21
from io import BytesIO
22
import os.path
23
from flask import request, g
24
from flask_restplus import abort, Resource, fields
25
import json
26
import orjson
27
import importlib
28
import urllib.parse
29

30
import metainfo
31

32
from nomad.files import UploadFiles, Restricted
33
from nomad.archive import query_archive, ArchiveQueryError
34
from nomad import search, config
35
from nomad.app import common
36

37
from .auth import authenticate, create_authorization_predicate
Markus Scheidgen's avatar
Markus Scheidgen committed
38
from .api import api
39
40
from .common import calc_route, streamed_zipfile, search_model, add_search_parameters, apply_search_parameters, query_model

41
42

ns = api.namespace(
43
44
    'archive',
    description='Access archive data and archive processing logs.')
45
46
47
48


@calc_route(ns, '/logs')
class ArchiveCalcLogResource(Resource):
49
    @api.doc('get_archive_logs')
50
    @api.response(404, 'The upload or calculation does not exist')
51
    @api.response(401, 'Not authorized to access the data.')
52
    @api.response(200, 'Archive data send')
53
    @authenticate(signature_token=True)
54
    def get(self, upload_id, calc_id):
55
        '''
56
57
        Get calculation processing log.

58
        Calcs are references via *upload_id*, *calc_id* pairs.
59
        '''
60
        archive_id = '%s/%s' % (upload_id, calc_id)
61

62
        upload_files = UploadFiles.get(
63
            upload_id, is_authorized=create_authorization_predicate(upload_id, calc_id))
64

65
        if upload_files is None:
66
            abort(404, message='Upload %s does not exist.' % upload_id)
67
68

        try:
69
            with upload_files.read_archive(calc_id) as archive:
70
71
                return [entry.to_dict() for entry in archive[calc_id]['processing_logs']]

72
        except Restricted:
73
            abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
74
75
        except KeyError:
            abort(404, message='Calculation %s does not exist.' % archive_id)
76
77
78
79


@calc_route(ns)
class ArchiveCalcResource(Resource):
80
    @api.doc('get_archive_calc')
81
    @api.response(404, 'The upload or calculation does not exist')
82
    @api.response(401, 'Not authorized to access the data.')
83
    @api.response(200, 'Archive data send', headers={'Content-Type': 'application/json'})
84
    @authenticate(signature_token=True)
85
    def get(self, upload_id, calc_id):
86
        '''
87
88
        Get calculation data in archive form.

89
        Calcs are references via *upload_id*, *calc_id* pairs.
90
        '''
91
        archive_id = '%s/%s' % (upload_id, calc_id)
92

93
        upload_files = UploadFiles.get(
94
            upload_id, is_authorized=create_authorization_predicate(upload_id, calc_id))
95

96
        if upload_files is None:
97
            abort(404, message='Archive %s does not exist.' % upload_id)
98
99

        try:
100
            with upload_files.read_archive(calc_id) as archive:
101
                return {
102
103
104
105
                    key: value
                    for key, value in archive[calc_id].to_dict().items()
                    if key != 'processing_logs'}

106
        except Restricted:
107
            abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
108
        except KeyError:
109
            abort(404, message='Calculation %s does not exist.' % archive_id)
110
111


112
113
114
_archive_download_parser = api.parser()
add_search_parameters(_archive_download_parser)
_archive_download_parser.add_argument(
115
116
117
    name='compress', type=bool, help='Use compression on .zip files, default is not.',
    location='args')

118
119
120

@ns.route('/download')
class ArchiveDownloadResource(Resource):
121
122
    manifest_quantities = ['upload_id', 'calc_id', 'external_id', 'raw_id', 'pid', 'calc_hash']

123
    @api.doc('archive_download')
124
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
125
    @api.expect(_archive_download_parser, validate=True)
126
127
128
    @api.response(200, 'File(s) send', headers={'Content-Type': 'application/zip'})
    @authenticate(signature_token=True)
    def get(self):
129
        '''
130
131
132
133
134
135
136
137
138
        Get calculation data in archive form from all query results.

        See ``/repo`` endpoint for documentation on the search
        parameters.

        Zip files are streamed; instead of 401 errors, the zip file will just not contain
        any files that the user is not authorized to access.

        The zip file will contain a ``manifest.json`` with the repository meta data.
139
        '''
140
        try:
141
            args = _archive_download_parser.parse_args()
142
143
144
145
146
            compress = args.get('compress', False)
        except Exception:
            abort(400, message='bad parameter types')

        search_request = search.SearchRequest()
147
        apply_search_parameters(search_request, args)
148
        search_request.include('calc_id', 'upload_id', 'mainfile')
149

150
151
152
153
        calcs = search_request.execute_scan(
            order_by='upload_id',
            size=config.services.download_scan_size,
            scroll=config.services.download_scan_timeout)
154
155

        def generator():
156
            try:
157
158
159
                manifest = {}
                upload_files = None

160
161
162
163
164
                for entry in calcs:
                    upload_id = entry['upload_id']
                    calc_id = entry['calc_id']
                    if upload_files is None or upload_files.upload_id != upload_id:
                        if upload_files is not None:
165
                            upload_files.close()
166
167
168
169
170

                        upload_files = UploadFiles.get(
                            upload_id, create_authorization_predicate(upload_id))

                        if upload_files is None:
171
                            common.logger.error('upload files do not exist', upload_id=upload_id)
172
173
                            continue

174
175
176
177
                    with upload_files.read_archive(calc_id) as archive:
                        f = BytesIO(orjson.dumps(
                            archive[calc_id].to_dict(),
                            option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS))
178

179
180
181
182
                        yield (
                            '%s.%s' % (calc_id, 'json'), calc_id,
                            lambda calc_id: f,
                            lambda calc_id: f.getbuffer().nbytes)
183
184
185
186
187
188

                    manifest[calc_id] = {
                        key: entry[key]
                        for key in ArchiveDownloadResource.manifest_quantities
                        if entry.get(key) is not None
                    }
189

190
                if upload_files is not None:
191
                    upload_files.close()
192
193
194
195
196
197

                try:
                    manifest_contents = json.dumps(manifest).encode('utf-8')
                except Exception as e:
                    manifest_contents = json.dumps(
                        dict(error='Could not create the manifest: %s' % (e))).encode('utf-8')
198
                    common.logger.error(
199
200
201
202
203
204
                        'could not create raw query manifest', exc_info=e)

                yield (
                    'manifest.json', 'manifest',
                    lambda *args: BytesIO(manifest_contents),
                    lambda *args: len(manifest_contents))
Markus Scheidgen's avatar
Markus Scheidgen committed
205

206
            except Exception as e:
207
                common.logger.warning(
208
209
210
                    'unexpected error while streaming raw data from query',
                    exc_info=e,
                    query=urllib.parse.urlencode(request.args, doseq=True))
211

212
213
214
215
        return streamed_zipfile(
            generator(), zipfile_name='nomad_archive.zip', compress=compress)


216
_archive_query_model = api.inherit('ArchiveSearch', search_model, {
217
    'query': fields.Nested(query_model, description='The query used to find the requested entries.', skip_none=True),
218
219
220
    'required': fields.Raw(description='A dictionary that defines what archive data to retrive.'),
    'query_schema': fields.Raw(description='Deprecated, use required instead.'),
    'raise_errors': fields.Boolean(description='Return 401 on missing archives or 500 on other errors instead of skipping the entry.')
221
})
222
223


224
225
@ns.route('/query')
class ArchiveQueryResource(Resource):
226
227
228
229
    @api.doc('post_archive_query')
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
    @api.response(401, 'Not authorized to access the data.')
    @api.response(404, 'The upload or calculation does not exist')
230
    @api.expect(_archive_query_model)
231
    @api.marshal_with(_archive_query_model, skip_none=True, code=200, description='Archive search results sent')
232
    @authenticate()
233
    def post(self):
234
        '''
235
        Post a query schema and return it filled with archive data.
236

237
238
        See ``/repo`` endpoint for documentation on the search
        parameters.
239

240
241
        The actual data are in results and a supplementary python code (curl) to
        execute search is in python (curl).
242
        '''
243
244
245
        try:
            data_in = request.get_json()
            scroll = data_in.get('scroll', None)
246
247
248
249
250
251
252
253
254
            if scroll:
                scroll_id = scroll.get('scroll_id')
                scroll = True

            pagination = data_in.get('pagination', {})
            page = pagination.get('page', 1)
            per_page = pagination.get('per_page', 10 if not scroll else 1000)

            query = data_in.get('query', {})
255
256
257
258
259
260
261
262

            required: Dict[str, Any] = None
            if 'required' in data_in:
                required = data_in.get('required')
            else:
                required = data_in.get('query_schema', '*')

            raise_error = data_in.get('raise_error', True)
263

264
265
266
        except Exception:
            abort(400, message='bad parameter types')

267
        if not (page >= 1 and per_page > 0):
268
269
270
            abort(400, message='invalid pagination')

        search_request = search.SearchRequest()
271
272
273
274
275
276
        if g.user is not None:
            search_request.owner('all', user_id=g.user.user_id)
        else:
            search_request.owner('all')

        apply_search_parameters(search_request, query)
277
        search_request.include('calc_id', 'upload_id', 'with_embargo', 'published', 'parser_name')
278
279
280

        try:
            if scroll:
281
282
                results = search_request.execute_scrolled(
                    scroll_id=scroll_id, size=per_page, order_by='upload_id')
283
284
285
286
                results['scroll']['scroll'] = True

            else:
                results = search_request.execute_paginated(
287
                    per_page=per_page, page=page, order_by='upload_id')
288
289
290
291
292
293
294
295

        except search.ScrollIdNotFound:
            abort(400, 'The given scroll_id does not exist.')
        except KeyError as e:
            abort(400, str(e))

        data = []
        calcs = results['results']
296
        upload_files = None
297
        current_upload_id = None
298
        for entry in calcs:
299
300
            with_embargo = entry['with_embargo']

301
302
            upload_id = entry['upload_id']
            calc_id = entry['calc_id']
303

304
305
306
307
            if upload_files is None or current_upload_id != upload_id:
                if upload_files is not None:
                    upload_files.close()

308
309
310
311
312
                upload_files = UploadFiles.get(upload_id, create_authorization_predicate(upload_id))

                if upload_files is None:
                    return []

313
                current_upload_id = upload_id
314

315
            if with_embargo:
316
                access = 'restricted'
317
            else:
318
319
320
321
322
323
324
325
                access = 'public'

            try:
                with upload_files.read_archive(calc_id, access) as archive:
                    data.append({
                        'calc_id': calc_id,
                        'parser_name': entry['parser_name'],
                        'archive': query_archive(
326
                            archive, {calc_id: required})[calc_id]
327
                    })
328
329
            except ArchiveQueryError as e:
                abort(400, str(e))
330
331
332
333
334
            except KeyError:
                if raise_error:
                    abort(401, 'Archive for entry %s does not exist' % calc_id)
                # We simply skip this entry
                pass
335
            except Restricted:
336
337
                # TODO in reality this should not happen
                pass
338
339
340
341
            except Exception as e:
                if raise_error:
                    raise e
                common.logger(str(e), exc_info=e)
342

343
344
345
        if upload_files is not None:
            upload_files.close()

346
347
348
        # assign archive data to results
        results['results'] = data

349
        return results, 200
350
351


352
353
@ns.route('/metainfo/<string:metainfo_package_name>')
@api.doc(params=dict(metainfo_package_name='The name of the metainfo package.'))
354
355
356
357
class MetainfoResource(Resource):
    @api.doc('get_metainfo')
    @api.response(404, 'The metainfo does not exist')
    @api.response(200, 'Metainfo data send')
358
    def get(self, metainfo_package_name):
359
        '''
360
        Get a metainfo definition file.
361
        '''
362
        try:
363
            return load_metainfo(metainfo_package_name), 200
364
        except FileNotFoundError:
365
            parser_prefix = metainfo_package_name[:-len('.nomadmetainfo.json')]
366

367
            try:
368
369
370
                return load_metainfo(dict(
                    parser='%sparser' % parser_prefix,
                    path='%s.nomadmetainfo.json' % parser_prefix)), 200
371
372
            except FileNotFoundError:
                abort(404, message='The metainfo %s does not exist.' % metainfo_package_name)
373
374


375
metainfo_main_path = os.path.dirname(os.path.abspath(metainfo.__file__))
376
377


378
379
380
def load_metainfo(
        package_name_or_dependency: str, dependency_source: str = None,
        loaded_packages: Dict[str, Any] = None) -> Dict[str, Any]:
381
    '''
382
383
384
385
    Loads the given metainfo package and all its dependencies. Returns a dict with
    all loaded package_names and respective packages.

    Arguments:
386
387
        package_name_or_dependency: The name of the package, or a nomadmetainfo dependency object.
        dependency_source: The path of the metainfo that uses this function to load a relative dependency.
388
389
        loaded_packages: Give a dict and the function will added freshly loaded packages
            to it and return it.
390
    '''
391
392
393
    if loaded_packages is None:
        loaded_packages = {}

394
395
    if isinstance(package_name_or_dependency, str):
        package_name = package_name_or_dependency
396
        metainfo_path = os.path.join(metainfo_main_path, package_name)
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
    else:
        dependency = package_name_or_dependency
        if 'relativePath' in dependency:
            if dependency_source is None:
                raise Exception(
                    'Can only load relative dependency from within another metainfo package')

            metainfo_path = os.path.join(
                os.path.dirname(dependency_source), dependency['relativePath'])

        elif 'metainfoPath' in dependency:
            metainfo_path = os.path.join(metainfo_main_path, dependency['metainfoPath'])

        elif 'parser' in dependency:
            parser = dependency['parser']
            path = dependency['path']
            try:
                parser_module = importlib.import_module(parser).__file__
            except Exception:
                raise Exception('Parser not installed %s for metainfo path %s' % (parser, metainfo_path))

            parser_directory = os.path.dirname(parser_module)
            metainfo_path = os.path.join(parser_directory, path)

        else:
            raise Exception('Invalid dependency type in metainfo package %s' % metainfo_path)

        package_name = os.path.basename(metainfo_path)
425
426
427
428
429
430
431
432
433
434
435
436

    package_name = os.path.basename(package_name)

    if package_name in loaded_packages:
        return loaded_packages

    with open(metainfo_path, 'rt') as f:
        metainfo_json = json.load(f)

    loaded_packages[package_name] = metainfo_json

    for dependency in metainfo_json.get('dependencies', []):
437
        load_metainfo(dependency, dependency_source=metainfo_path, loaded_packages=loaded_packages)
438
439

    return loaded_packages