archive.py 15.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
16
17
The archive API of the nomad@FAIRDI APIs. This API is about serving processed
(parsed and normalized) calculation data in nomad's *meta-info* format.
18
'''
19

20
from typing import Dict, Any
21
from io import BytesIO
22
import os.path
23
from flask import request, g
24
from flask_restplus import abort, Resource, fields
25
import json
26
import orjson
27
import importlib
28
import urllib.parse
29

30
import metainfo
31

32
from nomad.files import UploadFiles, Restricted
33
34
from nomad.archive import query_archive
from nomad import search, config
35
from nomad.app import common
36

37
from .auth import authenticate, create_authorization_predicate
Markus Scheidgen's avatar
Markus Scheidgen committed
38
from .api import api
39
40
from .common import calc_route, streamed_zipfile, search_model, add_search_parameters, apply_search_parameters, query_model

41
42

ns = api.namespace(
43
44
    'archive',
    description='Access archive data and archive processing logs.')
45
46
47
48


@calc_route(ns, '/logs')
class ArchiveCalcLogResource(Resource):
49
    @api.doc('get_archive_logs')
50
    @api.response(404, 'The upload or calculation does not exist')
51
    @api.response(401, 'Not authorized to access the data.')
52
    @api.response(200, 'Archive data send')
53
    @authenticate(signature_token=True)
54
    def get(self, upload_id, calc_id):
55
        '''
56
57
        Get calculation processing log.

58
        Calcs are references via *upload_id*, *calc_id* pairs.
59
        '''
60
        archive_id = '%s/%s' % (upload_id, calc_id)
61

62
        upload_files = UploadFiles.get(
63
            upload_id, is_authorized=create_authorization_predicate(upload_id, calc_id))
64

65
        if upload_files is None:
66
            abort(404, message='Upload %s does not exist.' % upload_id)
67
68

        try:
69
            with upload_files.read_archive(calc_id) as archive:
70
71
                return [entry.to_dict() for entry in archive[calc_id]['processing_logs']]

72
        except Restricted:
73
            abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
74
75
        except KeyError:
            abort(404, message='Calculation %s does not exist.' % archive_id)
76
77
78
79


@calc_route(ns)
class ArchiveCalcResource(Resource):
80
    @api.doc('get_archive_calc')
81
    @api.response(404, 'The upload or calculation does not exist')
82
    @api.response(401, 'Not authorized to access the data.')
83
    @api.response(200, 'Archive data send', headers={'Content-Type': 'application/json'})
84
    @authenticate(signature_token=True)
85
    def get(self, upload_id, calc_id):
86
        '''
87
88
        Get calculation data in archive form.

89
        Calcs are references via *upload_id*, *calc_id* pairs.
90
        '''
91
        archive_id = '%s/%s' % (upload_id, calc_id)
92

93
        upload_files = UploadFiles.get(
94
            upload_id, is_authorized=create_authorization_predicate(upload_id, calc_id))
95

96
        if upload_files is None:
97
            abort(404, message='Archive %s does not exist.' % upload_id)
98
99

        try:
100
            with upload_files.read_archive(calc_id) as archive:
101
                return {
102
103
104
105
                    key: value
                    for key, value in archive[calc_id].to_dict().items()
                    if key != 'processing_logs'}

106
        except Restricted:
107
            abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
108
        except KeyError:
109
            abort(404, message='Calculation %s does not exist.' % archive_id)
110
111


112
113
114
_archive_download_parser = api.parser()
add_search_parameters(_archive_download_parser)
_archive_download_parser.add_argument(
115
116
117
    name='compress', type=bool, help='Use compression on .zip files, default is not.',
    location='args')

118
119
120

@ns.route('/download')
class ArchiveDownloadResource(Resource):
121
122
    manifest_quantities = ['upload_id', 'calc_id', 'external_id', 'raw_id', 'pid', 'calc_hash']

123
    @api.doc('archive_download')
124
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
125
    @api.expect(_archive_download_parser, validate=True)
126
127
128
    @api.response(200, 'File(s) send', headers={'Content-Type': 'application/zip'})
    @authenticate(signature_token=True)
    def get(self):
129
        '''
130
131
132
133
134
135
136
137
138
        Get calculation data in archive form from all query results.

        See ``/repo`` endpoint for documentation on the search
        parameters.

        Zip files are streamed; instead of 401 errors, the zip file will just not contain
        any files that the user is not authorized to access.

        The zip file will contain a ``manifest.json`` with the repository meta data.
139
        '''
140
        try:
141
            args = _archive_download_parser.parse_args()
142
143
144
145
146
            compress = args.get('compress', False)
        except Exception:
            abort(400, message='bad parameter types')

        search_request = search.SearchRequest()
147
        apply_search_parameters(search_request, args)
148
        search_request.include('calc_id', 'upload_id', 'mainfile')
149

150
151
152
153
        calcs = search_request.execute_scan(
            order_by='upload_id',
            size=config.services.download_scan_size,
            scroll=config.services.download_scan_timeout)
154
155

        def generator():
156
            try:
157
158
159
                manifest = {}
                upload_files = None

160
161
162
163
164
                for entry in calcs:
                    upload_id = entry['upload_id']
                    calc_id = entry['calc_id']
                    if upload_files is None or upload_files.upload_id != upload_id:
                        if upload_files is not None:
165
                            upload_files.close()
166
167
168
169
170

                        upload_files = UploadFiles.get(
                            upload_id, create_authorization_predicate(upload_id))

                        if upload_files is None:
171
                            common.logger.error('upload files do not exist', upload_id=upload_id)
172
173
                            continue

174
175
176
177
                    with upload_files.read_archive(calc_id) as archive:
                        f = BytesIO(orjson.dumps(
                            archive[calc_id].to_dict(),
                            option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS))
178

179
180
181
182
                        yield (
                            '%s.%s' % (calc_id, 'json'), calc_id,
                            lambda calc_id: f,
                            lambda calc_id: f.getbuffer().nbytes)
183
184
185
186
187
188

                    manifest[calc_id] = {
                        key: entry[key]
                        for key in ArchiveDownloadResource.manifest_quantities
                        if entry.get(key) is not None
                    }
189

190
                if upload_files is not None:
191
                    upload_files.close()
192
193
194
195
196
197

                try:
                    manifest_contents = json.dumps(manifest).encode('utf-8')
                except Exception as e:
                    manifest_contents = json.dumps(
                        dict(error='Could not create the manifest: %s' % (e))).encode('utf-8')
198
                    common.logger.error(
199
200
201
202
203
204
                        'could not create raw query manifest', exc_info=e)

                yield (
                    'manifest.json', 'manifest',
                    lambda *args: BytesIO(manifest_contents),
                    lambda *args: len(manifest_contents))
Markus Scheidgen's avatar
Markus Scheidgen committed
205

206
            except Exception as e:
207
                common.logger.warning(
208
209
210
                    'unexpected error while streaming raw data from query',
                    exc_info=e,
                    query=urllib.parse.urlencode(request.args, doseq=True))
211

212
213
214
215
        return streamed_zipfile(
            generator(), zipfile_name='nomad_archive.zip', compress=compress)


216
_archive_query_model = api.inherit('ArchiveSearch', search_model, {
217
    'query': fields.Nested(query_model, description='The query used to find the requested entries.', skip_none=True),
218
219
    'query_schema': fields.Raw(description='The query schema that defines what archive data to retrive.')
})
220
221


222
223
@ns.route('/query')
class ArchiveQueryResource(Resource):
224
225
226
227
    @api.doc('post_archive_query')
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
    @api.response(401, 'Not authorized to access the data.')
    @api.response(404, 'The upload or calculation does not exist')
228
    @api.expect(_archive_query_model)
229
    @api.marshal_with(_archive_query_model, skip_none=True, code=200, description='Archive search results sent')
230
    @authenticate()
231
    def post(self):
232
        '''
233
        Post a query schema and return it filled with archive data.
234

235
236
        See ``/repo`` endpoint for documentation on the search
        parameters.
237

238
239
        The actual data are in results and a supplementary python code (curl) to
        execute search is in python (curl).
240
        '''
241
242
243
        try:
            data_in = request.get_json()
            scroll = data_in.get('scroll', None)
244
245
246
247
248
249
250
251
252
253
254
            if scroll:
                scroll_id = scroll.get('scroll_id')
                scroll = True

            pagination = data_in.get('pagination', {})
            page = pagination.get('page', 1)
            per_page = pagination.get('per_page', 10 if not scroll else 1000)

            query = data_in.get('query', {})
            query_schema = data_in.get('query_schema', '*')

255
256
257
        except Exception:
            abort(400, message='bad parameter types')

258
        if not (page >= 1 and per_page > 0):
259
260
261
            abort(400, message='invalid pagination')

        search_request = search.SearchRequest()
262
263
264
265
266
267
        if g.user is not None:
            search_request.owner('all', user_id=g.user.user_id)
        else:
            search_request.owner('all')

        apply_search_parameters(search_request, query)
268
        search_request.include('calc_id', 'upload_id', 'with_embargo', 'parser_name')
269
270
271

        try:
            if scroll:
272
273
                results = search_request.execute_scrolled(
                    scroll_id=scroll_id, size=per_page, order_by='upload_id')
274
275
276
277
                results['scroll']['scroll'] = True

            else:
                results = search_request.execute_paginated(
278
                    per_page=per_page, page=page, order_by='upload_id')
279
280
281
282
283
284
285
286
287
288

        except search.ScrollIdNotFound:
            abort(400, 'The given scroll_id does not exist.')
        except KeyError as e:
            import traceback
            traceback.print_exc()
            abort(400, str(e))

        data = []
        calcs = results['results']
289
        upload_files = None
290
        current_upload_id = None
291
292
293
        for entry in calcs:
            upload_id = entry['upload_id']
            calc_id = entry['calc_id']
294
295
296
297
            if upload_files is None or current_upload_id != upload_id:
                if upload_files is not None:
                    upload_files.close()

298
299
300
301
302
                upload_files = UploadFiles.get(upload_id, create_authorization_predicate(upload_id))

                if upload_files is None:
                    return []

303
                current_upload_id = upload_id
304
305

            if entry['with_embargo']:
306
                access = 'restricted'
307
            else:
308
309
310
311
312
313
314
315
316
317
318
319
320
321
                access = 'public'

            try:
                with upload_files.read_archive(calc_id, access) as archive:
                    data.append({
                        'calc_id': calc_id,
                        'parser_name': entry['parser_name'],
                        'archive': query_archive(
                            archive, {calc_id: query_schema})[calc_id]
                    })

            except Restricted:
                # optimize and not access restricted for same upload again
                pass
322

323
324
325
        if upload_files is not None:
            upload_files.close()

326
327
328
        # assign archive data to results
        results['results'] = data

329
        return results, 200
330
331


332
333
@ns.route('/metainfo/<string:metainfo_package_name>')
@api.doc(params=dict(metainfo_package_name='The name of the metainfo package.'))
334
335
336
337
class MetainfoResource(Resource):
    @api.doc('get_metainfo')
    @api.response(404, 'The metainfo does not exist')
    @api.response(200, 'Metainfo data send')
338
    def get(self, metainfo_package_name):
339
        '''
340
        Get a metainfo definition file.
341
        '''
342
        try:
343
            return load_metainfo(metainfo_package_name), 200
344
        except FileNotFoundError:
345
            parser_prefix = metainfo_package_name[:-len('.nomadmetainfo.json')]
346

347
            try:
348
349
350
                return load_metainfo(dict(
                    parser='%sparser' % parser_prefix,
                    path='%s.nomadmetainfo.json' % parser_prefix)), 200
351
352
            except FileNotFoundError:
                abort(404, message='The metainfo %s does not exist.' % metainfo_package_name)
353
354


355
metainfo_main_path = os.path.dirname(os.path.abspath(metainfo.__file__))
356
357


358
359
360
def load_metainfo(
        package_name_or_dependency: str, dependency_source: str = None,
        loaded_packages: Dict[str, Any] = None) -> Dict[str, Any]:
361
    '''
362
363
364
365
    Loads the given metainfo package and all its dependencies. Returns a dict with
    all loaded package_names and respective packages.

    Arguments:
366
367
        package_name_or_dependency: The name of the package, or a nomadmetainfo dependency object.
        dependency_source: The path of the metainfo that uses this function to load a relative dependency.
368
369
        loaded_packages: Give a dict and the function will added freshly loaded packages
            to it and return it.
370
    '''
371
372
373
    if loaded_packages is None:
        loaded_packages = {}

374
375
    if isinstance(package_name_or_dependency, str):
        package_name = package_name_or_dependency
376
        metainfo_path = os.path.join(metainfo_main_path, package_name)
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
    else:
        dependency = package_name_or_dependency
        if 'relativePath' in dependency:
            if dependency_source is None:
                raise Exception(
                    'Can only load relative dependency from within another metainfo package')

            metainfo_path = os.path.join(
                os.path.dirname(dependency_source), dependency['relativePath'])

        elif 'metainfoPath' in dependency:
            metainfo_path = os.path.join(metainfo_main_path, dependency['metainfoPath'])

        elif 'parser' in dependency:
            parser = dependency['parser']
            path = dependency['path']
            try:
                parser_module = importlib.import_module(parser).__file__
            except Exception:
                raise Exception('Parser not installed %s for metainfo path %s' % (parser, metainfo_path))

            parser_directory = os.path.dirname(parser_module)
            metainfo_path = os.path.join(parser_directory, path)

        else:
            raise Exception('Invalid dependency type in metainfo package %s' % metainfo_path)

        package_name = os.path.basename(metainfo_path)
405
406
407
408
409
410
411
412
413
414
415
416

    package_name = os.path.basename(package_name)

    if package_name in loaded_packages:
        return loaded_packages

    with open(metainfo_path, 'rt') as f:
        metainfo_json = json.load(f)

    loaded_packages[package_name] = metainfo_json

    for dependency in metainfo_json.get('dependencies', []):
417
        load_metainfo(dependency, dependency_source=metainfo_path, loaded_packages=loaded_packages)
418
419

    return loaded_packages