archive.py 15.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
16
17
The archive API of the nomad@FAIRDI APIs. This API is about serving processed
(parsed and normalized) calculation data in nomad's *meta-info* format.
18
'''
19

20
from typing import Dict, Any
21
from io import BytesIO
22
import os.path
23
from flask import request, g
24
from flask_restplus import abort, Resource, fields
25
import json
26
import orjson
27
import importlib
28
import urllib.parse
29

30
import metainfo
31

32
from nomad.files import UploadFiles, Restricted
33
34
from nomad.archive import query_archive
from nomad import search, config
35
from nomad.app import common
36

37
from .auth import authenticate, create_authorization_predicate
Markus Scheidgen's avatar
Markus Scheidgen committed
38
from .api import api
39
40
from .common import calc_route, streamed_zipfile, search_model, add_search_parameters, apply_search_parameters, query_model

41
42

ns = api.namespace(
43
44
    'archive',
    description='Access archive data and archive processing logs.')
45
46
47
48


@calc_route(ns, '/logs')
class ArchiveCalcLogResource(Resource):
49
    @api.doc('get_archive_logs')
50
    @api.response(404, 'The upload or calculation does not exist')
51
    @api.response(401, 'Not authorized to access the data.')
52
    @api.response(200, 'Archive data send')
53
    @authenticate(signature_token=True)
54
    def get(self, upload_id, calc_id):
55
        '''
56
57
        Get calculation processing log.

58
        Calcs are references via *upload_id*, *calc_id* pairs.
59
        '''
60
        archive_id = '%s/%s' % (upload_id, calc_id)
61

62
        upload_files = UploadFiles.get(
63
            upload_id, is_authorized=create_authorization_predicate(upload_id, calc_id))
64

65
        if upload_files is None:
66
            abort(404, message='Upload %s does not exist.' % upload_id)
67
68

        try:
69
            with upload_files.read_archive(calc_id) as archive:
70
71
                return [entry.to_dict() for entry in archive[calc_id]['processing_logs']]

72
        except Restricted:
73
            abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
74
75
        except KeyError:
            abort(404, message='Calculation %s does not exist.' % archive_id)
76
77
78
79


@calc_route(ns)
class ArchiveCalcResource(Resource):
80
    @api.doc('get_archive_calc')
81
    @api.response(404, 'The upload or calculation does not exist')
82
    @api.response(401, 'Not authorized to access the data.')
83
    @api.response(200, 'Archive data send', headers={'Content-Type': 'application/json'})
84
    @authenticate(signature_token=True)
85
    def get(self, upload_id, calc_id):
86
        '''
87
88
        Get calculation data in archive form.

89
        Calcs are references via *upload_id*, *calc_id* pairs.
90
        '''
91
        archive_id = '%s/%s' % (upload_id, calc_id)
92

93
        upload_files = UploadFiles.get(
94
            upload_id, is_authorized=create_authorization_predicate(upload_id, calc_id))
95

96
        if upload_files is None:
97
            abort(404, message='Archive %s does not exist.' % upload_id)
98
99

        try:
100
            with upload_files.read_archive(calc_id) as archive:
101
                return {
102
103
104
105
                    key: value
                    for key, value in archive[calc_id].to_dict().items()
                    if key != 'processing_logs'}

106
        except Restricted:
107
            abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
108
        except KeyError:
109
            abort(404, message='Calculation %s does not exist.' % archive_id)
110
111


112
113
114
_archive_download_parser = api.parser()
add_search_parameters(_archive_download_parser)
_archive_download_parser.add_argument(
115
116
117
    name='compress', type=bool, help='Use compression on .zip files, default is not.',
    location='args')

118
119
120

@ns.route('/download')
class ArchiveDownloadResource(Resource):
121
122
    manifest_quantities = ['upload_id', 'calc_id', 'external_id', 'raw_id', 'pid', 'calc_hash']

123
    @api.doc('archive_download')
124
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
125
    @api.expect(_archive_download_parser, validate=True)
126
127
128
    @api.response(200, 'File(s) send', headers={'Content-Type': 'application/zip'})
    @authenticate(signature_token=True)
    def get(self):
129
        '''
130
131
132
133
134
135
136
137
138
        Get calculation data in archive form from all query results.

        See ``/repo`` endpoint for documentation on the search
        parameters.

        Zip files are streamed; instead of 401 errors, the zip file will just not contain
        any files that the user is not authorized to access.

        The zip file will contain a ``manifest.json`` with the repository meta data.
139
        '''
140
        try:
141
            args = _archive_download_parser.parse_args()
142
143
144
145
146
            compress = args.get('compress', False)
        except Exception:
            abort(400, message='bad parameter types')

        search_request = search.SearchRequest()
147
        apply_search_parameters(search_request, args)
148
        search_request.include('calc_id', 'upload_id', 'mainfile')
149

150
151
152
153
        calcs = search_request.execute_scan(
            order_by='upload_id',
            size=config.services.download_scan_size,
            scroll=config.services.download_scan_timeout)
154
155

        def generator():
156
            try:
157
158
159
                manifest = {}
                upload_files = None

160
161
162
163
164
                for entry in calcs:
                    upload_id = entry['upload_id']
                    calc_id = entry['calc_id']
                    if upload_files is None or upload_files.upload_id != upload_id:
                        if upload_files is not None:
165
                            upload_files.close()
166
167
168
169
170

                        upload_files = UploadFiles.get(
                            upload_id, create_authorization_predicate(upload_id))

                        if upload_files is None:
171
                            common.logger.error('upload files do not exist', upload_id=upload_id)
172
173
                            continue

174
175
176
177
                    with upload_files.read_archive(calc_id) as archive:
                        f = BytesIO(orjson.dumps(
                            archive[calc_id].to_dict(),
                            option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS))
178

179
180
181
182
                        yield (
                            '%s.%s' % (calc_id, 'json'), calc_id,
                            lambda calc_id: f,
                            lambda calc_id: f.getbuffer().nbytes)
183
184
185
186
187
188

                    manifest[calc_id] = {
                        key: entry[key]
                        for key in ArchiveDownloadResource.manifest_quantities
                        if entry.get(key) is not None
                    }
189

190
                if upload_files is not None:
191
                    upload_files.close()
192
193
194
195
196
197

                try:
                    manifest_contents = json.dumps(manifest).encode('utf-8')
                except Exception as e:
                    manifest_contents = json.dumps(
                        dict(error='Could not create the manifest: %s' % (e))).encode('utf-8')
198
                    common.logger.error(
199
200
201
202
203
204
                        'could not create raw query manifest', exc_info=e)

                yield (
                    'manifest.json', 'manifest',
                    lambda *args: BytesIO(manifest_contents),
                    lambda *args: len(manifest_contents))
Markus Scheidgen's avatar
Markus Scheidgen committed
205

206
            except Exception as e:
207
                common.logger.warning(
208
209
210
                    'unexpected error while streaming raw data from query',
                    exc_info=e,
                    query=urllib.parse.urlencode(request.args, doseq=True))
211

212
213
214
215
        return streamed_zipfile(
            generator(), zipfile_name='nomad_archive.zip', compress=compress)


216
_archive_query_model = api.inherit('ArchiveSearch', search_model, {
217
218
219
    'query': fields.Nested(query_model, description='The query used to find the requested entries.'),
    'query_schema': fields.Raw(description='The query schema that defines what archive data to retrive.')
})
220
221


222
223
@ns.route('/query')
class ArchiveQueryResource(Resource):
224
225
226
227
228
    @api.doc('post_archive_query')
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
    @api.response(401, 'Not authorized to access the data.')
    @api.response(404, 'The upload or calculation does not exist')
    @api.response(200, 'Archive data send')
229
    @api.expect(_archive_query_model)
230
    @api.marshal_with(_archive_query_model, skip_none=True, code=200, description='Search results sent')
231
    @authenticate()
232
    def post(self):
233
        '''
234
        Post a query schema and return it filled with archive data.
235

236
237
        See ``/repo`` endpoint for documentation on the search
        parameters.
238

239
240
        The actual data are in results and a supplementary python code (curl) to
        execute search is in python (curl).
241
        '''
242
243
244
        try:
            data_in = request.get_json()
            scroll = data_in.get('scroll', None)
245
246
247
248
249
250
251
252
253
254
255
            if scroll:
                scroll_id = scroll.get('scroll_id')
                scroll = True

            pagination = data_in.get('pagination', {})
            page = pagination.get('page', 1)
            per_page = pagination.get('per_page', 10 if not scroll else 1000)

            query = data_in.get('query', {})
            query_schema = data_in.get('query_schema', '*')

256
257
258
        except Exception:
            abort(400, message='bad parameter types')

259
        if not (page >= 1 and per_page > 0):
260
261
262
            abort(400, message='invalid pagination')

        search_request = search.SearchRequest()
263
264
265
266
267
268
        if g.user is not None:
            search_request.owner('all', user_id=g.user.user_id)
        else:
            search_request.owner('all')

        apply_search_parameters(search_request, query)
269
        search_request.include('calc_id', 'upload_id', 'with_embargo', 'parser_name')
270
271
272

        try:
            if scroll:
273
274
                results = search_request.execute_scrolled(
                    scroll_id=scroll_id, size=per_page, order_by='upload_id')
275
276
277
278
                results['scroll']['scroll'] = True

            else:
                results = search_request.execute_paginated(
279
                    per_page=per_page, page=page, order_by='upload_id')
280
281
282
283
284
285
286
287
288
289

        except search.ScrollIdNotFound:
            abort(400, 'The given scroll_id does not exist.')
        except KeyError as e:
            import traceback
            traceback.print_exc()
            abort(400, str(e))

        data = []
        calcs = results['results']
290
        upload_files = None
291
        current_upload_id = None
292
293
294
        for entry in calcs:
            upload_id = entry['upload_id']
            calc_id = entry['calc_id']
295
296
297
298
            if upload_files is None or current_upload_id != upload_id:
                if upload_files is not None:
                    upload_files.close()

299
300
301
302
303
                upload_files = UploadFiles.get(upload_id, create_authorization_predicate(upload_id))

                if upload_files is None:
                    return []

304
                current_upload_id = upload_id
305
306

            if entry['with_embargo']:
307
                access = 'restricted'
308
            else:
309
310
311
312
313
314
315
316
317
318
319
320
321
322
                access = 'public'

            try:
                with upload_files.read_archive(calc_id, access) as archive:
                    data.append({
                        'calc_id': calc_id,
                        'parser_name': entry['parser_name'],
                        'archive': query_archive(
                            archive, {calc_id: query_schema})[calc_id]
                    })

            except Restricted:
                # optimize and not access restricted for same upload again
                pass
323
324
325
326

        # assign archive data to results
        results['results'] = data

327
        return results, 200
328
329


330
331
@ns.route('/metainfo/<string:metainfo_package_name>')
@api.doc(params=dict(metainfo_package_name='The name of the metainfo package.'))
332
333
334
335
class MetainfoResource(Resource):
    @api.doc('get_metainfo')
    @api.response(404, 'The metainfo does not exist')
    @api.response(200, 'Metainfo data send')
336
    def get(self, metainfo_package_name):
337
        '''
338
        Get a metainfo definition file.
339
        '''
340
        try:
341
            return load_metainfo(metainfo_package_name), 200
342
        except FileNotFoundError:
343
            parser_prefix = metainfo_package_name[:-len('.nomadmetainfo.json')]
344

345
            try:
346
347
348
                return load_metainfo(dict(
                    parser='%sparser' % parser_prefix,
                    path='%s.nomadmetainfo.json' % parser_prefix)), 200
349
350
            except FileNotFoundError:
                abort(404, message='The metainfo %s does not exist.' % metainfo_package_name)
351
352


353
metainfo_main_path = os.path.dirname(os.path.abspath(metainfo.__file__))
354
355


356
357
358
def load_metainfo(
        package_name_or_dependency: str, dependency_source: str = None,
        loaded_packages: Dict[str, Any] = None) -> Dict[str, Any]:
359
    '''
360
361
362
363
    Loads the given metainfo package and all its dependencies. Returns a dict with
    all loaded package_names and respective packages.

    Arguments:
364
365
        package_name_or_dependency: The name of the package, or a nomadmetainfo dependency object.
        dependency_source: The path of the metainfo that uses this function to load a relative dependency.
366
367
        loaded_packages: Give a dict and the function will added freshly loaded packages
            to it and return it.
368
    '''
369
370
371
    if loaded_packages is None:
        loaded_packages = {}

372
373
    if isinstance(package_name_or_dependency, str):
        package_name = package_name_or_dependency
374
        metainfo_path = os.path.join(metainfo_main_path, package_name)
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
    else:
        dependency = package_name_or_dependency
        if 'relativePath' in dependency:
            if dependency_source is None:
                raise Exception(
                    'Can only load relative dependency from within another metainfo package')

            metainfo_path = os.path.join(
                os.path.dirname(dependency_source), dependency['relativePath'])

        elif 'metainfoPath' in dependency:
            metainfo_path = os.path.join(metainfo_main_path, dependency['metainfoPath'])

        elif 'parser' in dependency:
            parser = dependency['parser']
            path = dependency['path']
            try:
                parser_module = importlib.import_module(parser).__file__
            except Exception:
                raise Exception('Parser not installed %s for metainfo path %s' % (parser, metainfo_path))

            parser_directory = os.path.dirname(parser_module)
            metainfo_path = os.path.join(parser_directory, path)

        else:
            raise Exception('Invalid dependency type in metainfo package %s' % metainfo_path)

        package_name = os.path.basename(metainfo_path)
403
404
405
406
407
408
409
410
411
412
413
414

    package_name = os.path.basename(package_name)

    if package_name in loaded_packages:
        return loaded_packages

    with open(metainfo_path, 'rt') as f:
        metainfo_json = json.load(f)

    loaded_packages[package_name] = metainfo_json

    for dependency in metainfo_json.get('dependencies', []):
415
        load_metainfo(dependency, dependency_source=metainfo_path, loaded_packages=loaded_packages)
416
417

    return loaded_packages