archive.py 15.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
16
17
The archive API of the nomad@FAIRDI APIs. This API is about serving processed
(parsed and normalized) calculation data in nomad's *meta-info* format.
18
'''
19

20
from typing import Dict, Any
21
from io import BytesIO
22
import os.path
23
from flask import request, g
24
from flask_restplus import abort, Resource, fields
25
import json
26
import orjson
27
import importlib
28
import urllib.parse
29

30
import metainfo
31

32
from nomad.files import UploadFiles, Restricted
33
34
from nomad.archive import query_archive
from nomad import search, config
35
from nomad.app import common
36

37
from .auth import authenticate, create_authorization_predicate
Markus Scheidgen's avatar
Markus Scheidgen committed
38
from .api import api
39
40
from .common import calc_route, streamed_zipfile, search_model, add_search_parameters, apply_search_parameters, query_model

41
42

ns = api.namespace(
43
44
    'archive',
    description='Access archive data and archive processing logs.')
45
46
47
48


@calc_route(ns, '/logs')
class ArchiveCalcLogResource(Resource):
49
    @api.doc('get_archive_logs')
50
    @api.response(404, 'The upload or calculation does not exist')
51
    @api.response(401, 'Not authorized to access the data.')
52
    @api.response(200, 'Archive data send', headers={'Content-Type': 'application/plain'})
53
    @authenticate(signature_token=True)
54
    def get(self, upload_id, calc_id):
55
        '''
56
57
        Get calculation processing log.

58
        Calcs are references via *upload_id*, *calc_id* pairs.
59
        '''
60
        archive_id = '%s/%s' % (upload_id, calc_id)
61

62
        upload_files = UploadFiles.get(
63
            upload_id, is_authorized=create_authorization_predicate(upload_id, calc_id))
64

65
        if upload_files is None:
66
            abort(404, message='Upload %s does not exist.' % upload_id)
67
68

        try:
69
70
71
            with upload_files.read_archive(calc_id) as archive:
                data = archive[calc_id]['processing_logs']
                return '\n'.join([json.dumps(entry.to_dict()) for entry in data])
72
        except Restricted:
73
            abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
74
75
        except KeyError:
            abort(404, message='Calculation %s does not exist.' % archive_id)
76
77
78
79


@calc_route(ns)
class ArchiveCalcResource(Resource):
80
    @api.doc('get_archive_calc')
81
    @api.response(404, 'The upload or calculation does not exist')
82
    @api.response(401, 'Not authorized to access the data.')
83
    @api.response(200, 'Archive data send', headers={'Content-Type': 'application/json'})
84
    @authenticate(signature_token=True)
85
    def get(self, upload_id, calc_id):
86
        '''
87
88
        Get calculation data in archive form.

89
        Calcs are references via *upload_id*, *calc_id* pairs.
90
        '''
91
        archive_id = '%s/%s' % (upload_id, calc_id)
92

93
        upload_files = UploadFiles.get(
94
            upload_id, is_authorized=create_authorization_predicate(upload_id, calc_id))
95

96
        if upload_files is None:
97
            abort(404, message='Archive %s does not exist.' % upload_id)
98
99

        try:
100
101
            with upload_files.read_archive(calc_id) as archive:
                return archive[calc_id].to_dict()
102
        except Restricted:
103
            abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
104
        except KeyError:
105
            abort(404, message='Calculation %s does not exist.' % archive_id)
106
107


108
109
110
_archive_download_parser = api.parser()
add_search_parameters(_archive_download_parser)
_archive_download_parser.add_argument(
111
112
113
    name='compress', type=bool, help='Use compression on .zip files, default is not.',
    location='args')

114
115
116

@ns.route('/download')
class ArchiveDownloadResource(Resource):
117
118
    manifest_quantities = ['upload_id', 'calc_id', 'external_id', 'raw_id', 'pid', 'calc_hash']

119
    @api.doc('archive_download')
120
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
121
    @api.expect(_archive_download_parser, validate=True)
122
123
124
    @api.response(200, 'File(s) send', headers={'Content-Type': 'application/zip'})
    @authenticate(signature_token=True)
    def get(self):
125
        '''
126
127
128
129
130
131
132
133
134
        Get calculation data in archive form from all query results.

        See ``/repo`` endpoint for documentation on the search
        parameters.

        Zip files are streamed; instead of 401 errors, the zip file will just not contain
        any files that the user is not authorized to access.

        The zip file will contain a ``manifest.json`` with the repository meta data.
135
        '''
136
        try:
137
            args = _archive_download_parser.parse_args()
138
139
140
141
142
            compress = args.get('compress', False)
        except Exception:
            abort(400, message='bad parameter types')

        search_request = search.SearchRequest()
143
        apply_search_parameters(search_request, args)
144
        search_request.include('calc_id', 'upload_id', 'mainfile')
145

146
147
148
149
        calcs = search_request.execute_scan(
            order_by='upload_id',
            size=config.services.download_scan_size,
            scroll=config.services.download_scan_timeout)
150
151

        def generator():
152
            try:
153
154
155
                manifest = {}
                upload_files = None

156
157
158
159
160
                for entry in calcs:
                    upload_id = entry['upload_id']
                    calc_id = entry['calc_id']
                    if upload_files is None or upload_files.upload_id != upload_id:
                        if upload_files is not None:
161
                            upload_files.close()
162
163
164
165
166

                        upload_files = UploadFiles.get(
                            upload_id, create_authorization_predicate(upload_id))

                        if upload_files is None:
167
                            common.logger.error('upload files do not exist', upload_id=upload_id)
168
169
                            continue

170
171
172
173
                    with upload_files.read_archive(calc_id) as archive:
                        f = BytesIO(orjson.dumps(
                            archive[calc_id].to_dict(),
                            option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS))
174

175
176
177
178
                        yield (
                            '%s.%s' % (calc_id, 'json'), calc_id,
                            lambda calc_id: f,
                            lambda calc_id: f.getbuffer().nbytes)
179
180
181
182
183
184

                    manifest[calc_id] = {
                        key: entry[key]
                        for key in ArchiveDownloadResource.manifest_quantities
                        if entry.get(key) is not None
                    }
185

186
                if upload_files is not None:
187
                    upload_files.close()
188
189
190
191
192
193

                try:
                    manifest_contents = json.dumps(manifest).encode('utf-8')
                except Exception as e:
                    manifest_contents = json.dumps(
                        dict(error='Could not create the manifest: %s' % (e))).encode('utf-8')
194
                    common.logger.error(
195
196
197
198
199
200
                        'could not create raw query manifest', exc_info=e)

                yield (
                    'manifest.json', 'manifest',
                    lambda *args: BytesIO(manifest_contents),
                    lambda *args: len(manifest_contents))
Markus Scheidgen's avatar
Markus Scheidgen committed
201

202
            except Exception as e:
203
                common.logger.warning(
204
205
206
                    'unexpected error while streaming raw data from query',
                    exc_info=e,
                    query=urllib.parse.urlencode(request.args, doseq=True))
207

208
209
210
211
        return streamed_zipfile(
            generator(), zipfile_name='nomad_archive.zip', compress=compress)


212
_archive_query_model = api.inherit('ArchiveSearch', search_model, {
213
214
215
    'query': fields.Nested(query_model, description='The query used to find the requested entries.'),
    'query_schema': fields.Raw(description='The query schema that defines what archive data to retrive.')
})
216
217


218
219
@ns.route('/query')
class ArchiveQueryResource(Resource):
220
221
222
223
224
    @api.doc('post_archive_query')
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
    @api.response(401, 'Not authorized to access the data.')
    @api.response(404, 'The upload or calculation does not exist')
    @api.response(200, 'Archive data send')
225
    @api.expect(_archive_query_model)
226
    @api.marshal_with(_archive_query_model, skip_none=True, code=200, description='Search results sent')
227
    @authenticate()
228
    def post(self):
229
        '''
230
        Post a query schema and return it filled with archive data.
231

232
233
        See ``/repo`` endpoint for documentation on the search
        parameters.
234

235
236
        The actual data are in results and a supplementary python code (curl) to
        execute search is in python (curl).
237
        '''
238
239
240
        try:
            data_in = request.get_json()
            scroll = data_in.get('scroll', None)
241
242
243
244
245
246
247
248
249
250
251
            if scroll:
                scroll_id = scroll.get('scroll_id')
                scroll = True

            pagination = data_in.get('pagination', {})
            page = pagination.get('page', 1)
            per_page = pagination.get('per_page', 10 if not scroll else 1000)

            query = data_in.get('query', {})
            query_schema = data_in.get('query_schema', '*')

252
253
254
        except Exception:
            abort(400, message='bad parameter types')

255
        if not (page >= 1 and per_page > 0):
256
257
258
            abort(400, message='invalid pagination')

        search_request = search.SearchRequest()
259
260
261
262
263
264
        if g.user is not None:
            search_request.owner('all', user_id=g.user.user_id)
        else:
            search_request.owner('all')

        apply_search_parameters(search_request, query)
265
        search_request.include('calc_id', 'upload_id', 'with_embargo', 'parser_name')
266
267
268

        try:
            if scroll:
269
270
                results = search_request.execute_scrolled(
                    scroll_id=scroll_id, size=per_page, order_by='upload_id')
271
272
273
274
                results['scroll']['scroll'] = True

            else:
                results = search_request.execute_paginated(
275
                    per_page=per_page, page=page, order_by='upload_id')
276
277
278
279
280
281
282
283
284
285

        except search.ScrollIdNotFound:
            abort(400, 'The given scroll_id does not exist.')
        except KeyError as e:
            import traceback
            traceback.print_exc()
            abort(400, str(e))

        data = []
        calcs = results['results']
286
        upload_files = None
287
        current_upload_id = None
288
289
290
        for entry in calcs:
            upload_id = entry['upload_id']
            calc_id = entry['calc_id']
291
292
293
294
            if upload_files is None or current_upload_id != upload_id:
                if upload_files is not None:
                    upload_files.close()

295
296
297
298
299
                upload_files = UploadFiles.get(upload_id, create_authorization_predicate(upload_id))

                if upload_files is None:
                    return []

300
                current_upload_id = upload_id
301
302

            if entry['with_embargo']:
303
                access = 'restricted'
304
            else:
305
306
307
308
309
310
311
312
313
314
315
316
317
318
                access = 'public'

            try:
                with upload_files.read_archive(calc_id, access) as archive:
                    data.append({
                        'calc_id': calc_id,
                        'parser_name': entry['parser_name'],
                        'archive': query_archive(
                            archive, {calc_id: query_schema})[calc_id]
                    })

            except Restricted:
                # optimize and not access restricted for same upload again
                pass
319
320
321
322

        # assign archive data to results
        results['results'] = data

323
        return results, 200
324
325


326
327
@ns.route('/metainfo/<string:metainfo_package_name>')
@api.doc(params=dict(metainfo_package_name='The name of the metainfo package.'))
328
329
330
331
class MetainfoResource(Resource):
    @api.doc('get_metainfo')
    @api.response(404, 'The metainfo does not exist')
    @api.response(200, 'Metainfo data send')
332
    def get(self, metainfo_package_name):
333
        '''
334
        Get a metainfo definition file.
335
        '''
336
        try:
337
            return load_metainfo(metainfo_package_name), 200
338
        except FileNotFoundError:
339
            parser_prefix = metainfo_package_name[:-len('.nomadmetainfo.json')]
340

341
            try:
342
343
344
                return load_metainfo(dict(
                    parser='%sparser' % parser_prefix,
                    path='%s.nomadmetainfo.json' % parser_prefix)), 200
345
346
            except FileNotFoundError:
                abort(404, message='The metainfo %s does not exist.' % metainfo_package_name)
347
348


349
metainfo_main_path = os.path.dirname(os.path.abspath(metainfo.__file__))
350
351


352
353
354
def load_metainfo(
        package_name_or_dependency: str, dependency_source: str = None,
        loaded_packages: Dict[str, Any] = None) -> Dict[str, Any]:
355
    '''
356
357
358
359
    Loads the given metainfo package and all its dependencies. Returns a dict with
    all loaded package_names and respective packages.

    Arguments:
360
361
        package_name_or_dependency: The name of the package, or a nomadmetainfo dependency object.
        dependency_source: The path of the metainfo that uses this function to load a relative dependency.
362
363
        loaded_packages: Give a dict and the function will added freshly loaded packages
            to it and return it.
364
    '''
365
366
367
    if loaded_packages is None:
        loaded_packages = {}

368
369
    if isinstance(package_name_or_dependency, str):
        package_name = package_name_or_dependency
370
        metainfo_path = os.path.join(metainfo_main_path, package_name)
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
    else:
        dependency = package_name_or_dependency
        if 'relativePath' in dependency:
            if dependency_source is None:
                raise Exception(
                    'Can only load relative dependency from within another metainfo package')

            metainfo_path = os.path.join(
                os.path.dirname(dependency_source), dependency['relativePath'])

        elif 'metainfoPath' in dependency:
            metainfo_path = os.path.join(metainfo_main_path, dependency['metainfoPath'])

        elif 'parser' in dependency:
            parser = dependency['parser']
            path = dependency['path']
            try:
                parser_module = importlib.import_module(parser).__file__
            except Exception:
                raise Exception('Parser not installed %s for metainfo path %s' % (parser, metainfo_path))

            parser_directory = os.path.dirname(parser_module)
            metainfo_path = os.path.join(parser_directory, path)

        else:
            raise Exception('Invalid dependency type in metainfo package %s' % metainfo_path)

        package_name = os.path.basename(metainfo_path)
399
400
401
402
403
404
405
406
407
408
409
410

    package_name = os.path.basename(package_name)

    if package_name in loaded_packages:
        return loaded_packages

    with open(metainfo_path, 'rt') as f:
        metainfo_json = json.load(f)

    loaded_packages[package_name] = metainfo_json

    for dependency in metainfo_json.get('dependencies', []):
411
        load_metainfo(dependency, dependency_source=metainfo_path, loaded_packages=loaded_packages)
412
413

    return loaded_packages