archive.py 14.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
16
17
18
19
"""
The archive API of the nomad@FAIRDI APIs. This API is about serving processed
(parsed and normalized) calculation data in nomad's *meta-info* format.
"""

20
from typing import Dict, Any
21
from io import BytesIO
22
import os.path
23
24
from flask import send_file, request, g
from flask_restplus import abort, Resource, fields
25
import json
26
import importlib
27
import urllib.parse
28

29
30
import nomad_meta_info

31
from nomad.files import UploadFiles, Restricted
32
33
from nomad import search, config
from nomad.app import common
34
from nomad.archive import query_archive
35

36
from .auth import authenticate, create_authorization_predicate
Markus Scheidgen's avatar
Markus Scheidgen committed
37
from .api import api
38
39
from .common import calc_route, streamed_zipfile, search_model,\
    add_search_parameters, apply_search_parameters, query_model
40
41

ns = api.namespace(
42
43
    'archive',
    description='Access archive data and archive processing logs.')
44
45
46
47


@calc_route(ns, '/logs')
class ArchiveCalcLogResource(Resource):
48
    @api.doc('get_archive_logs')
49
    @api.response(404, 'The upload or calculation does not exist')
50
    @api.response(401, 'Not authorized to access the data.')
51
    @api.response(200, 'Archive data send', headers={'Content-Type': 'application/plain'})
52
    @authenticate(signature_token=True)
53
    def get(self, upload_id, calc_id):
54
55
56
        """
        Get calculation processing log.

57
        Calcs are references via *upload_id*, *calc_id* pairs.
58
        """
59
        archive_id = '%s/%s' % (upload_id, calc_id)
60

61
        upload_files = UploadFiles.get(
62
            upload_id, is_authorized=create_authorization_predicate(upload_id, calc_id))
63

64
        if upload_files is None:
65
            abort(404, message='Upload %s does not exist.' % upload_id)
66
67
68

        try:
            return send_file(
69
                upload_files.archive_log_file(calc_id, 'rb'),
70
71
                mimetype='text/plain',
                as_attachment=True,
72
                cache_timeout=0,
73
74
                attachment_filename='%s.log' % archive_id)
        except Restricted:
75
            abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
76
77
        except KeyError:
            abort(404, message='Calculation %s does not exist.' % archive_id)
78
79
80
81


@calc_route(ns)
class ArchiveCalcResource(Resource):
82
    @api.doc('get_archive_calc')
83
    @api.response(404, 'The upload or calculation does not exist')
84
    @api.response(401, 'Not authorized to access the data.')
85
    @api.response(200, 'Archive data send')
86
    @authenticate(signature_token=True)
87
    def get(self, upload_id, calc_id):
88
89
90
        """
        Get calculation data in archive form.

91
        Calcs are references via *upload_id*, *calc_id* pairs.
92
        """
93
        archive_id = '%s/%s' % (upload_id, calc_id)
94

95
        upload_file = UploadFiles.get(
96
            upload_id, is_authorized=create_authorization_predicate(upload_id, calc_id))
97

98
        if upload_file is None:
99
            abort(404, message='Archive %s does not exist.' % upload_id)
100
101
102

        try:
            return send_file(
103
                upload_file.archive_file(calc_id, 'rb'),
104
105
                mimetype='application/json',
                as_attachment=True,
106
                cache_timeout=0,
107
108
                attachment_filename='%s.json' % archive_id)
        except Restricted:
109
            abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
110
        except KeyError:
111
            abort(404, message='Calculation %s does not exist.' % archive_id)
112
113


114
115
116
_archive_download_parser = api.parser()
add_search_parameters(_archive_download_parser)
_archive_download_parser.add_argument(
117
118
119
    name='compress', type=bool, help='Use compression on .zip files, default is not.',
    location='args')

120
121
122

@ns.route('/download')
class ArchiveDownloadResource(Resource):
123
124
    manifest_quantities = ['upload_id', 'calc_id', 'external_id', 'raw_id', 'pid', 'calc_hash']

125
    @api.doc('archive_download')
126
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
127
    @api.expect(_archive_download_parser, validate=True)
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
    @api.response(200, 'File(s) send', headers={'Content-Type': 'application/zip'})
    @authenticate(signature_token=True)
    def get(self):
        """
        Get calculation data in archive form from all query results.

        See ``/repo`` endpoint for documentation on the search
        parameters.

        Zip files are streamed; instead of 401 errors, the zip file will just not contain
        any files that the user is not authorized to access.

        The zip file will contain a ``manifest.json`` with the repository meta data.
        """
        try:
143
            args = _archive_download_parser.parse_args()
144
145
146
147
148
            compress = args.get('compress', False)
        except Exception:
            abort(400, message='bad parameter types')

        search_request = search.SearchRequest()
149
        apply_search_parameters(search_request, args)
150
        search_request.include('calc_id', 'upload_id', 'mainfile')
151

152
153
154
155
        calcs = search_request.execute_scan(
            order_by='upload_id',
            size=config.services.download_scan_size,
            scroll=config.services.download_scan_timeout)
156
157

        def generator():
158
            try:
159
160
161
                manifest = {}
                upload_files = None

162
163
164
165
166
167
168
169
170
171
172
                for entry in calcs:
                    upload_id = entry['upload_id']
                    calc_id = entry['calc_id']
                    if upload_files is None or upload_files.upload_id != upload_id:
                        if upload_files is not None:
                            upload_files.close_zipfile_cache()

                        upload_files = UploadFiles.get(
                            upload_id, create_authorization_predicate(upload_id))

                        if upload_files is None:
173
                            common.logger.error('upload files do not exist', upload_id=upload_id)
174
175
176
177
178
179
180
181
182
183
184
185
186
187
                            continue

                        upload_files.open_zipfile_cache()

                    yield (
                        '%s.%s' % (calc_id, upload_files._archive_ext), calc_id,
                        lambda calc_id: upload_files.archive_file(calc_id, 'rb'),
                        lambda calc_id: upload_files.archive_file_size(calc_id))

                    manifest[calc_id] = {
                        key: entry[key]
                        for key in ArchiveDownloadResource.manifest_quantities
                        if entry.get(key) is not None
                    }
188

189
190
191
192
193
194
195
196
                if upload_files is not None:
                    upload_files.close_zipfile_cache()

                try:
                    manifest_contents = json.dumps(manifest).encode('utf-8')
                except Exception as e:
                    manifest_contents = json.dumps(
                        dict(error='Could not create the manifest: %s' % (e))).encode('utf-8')
197
                    common.logger.error(
198
199
200
201
202
203
                        'could not create raw query manifest', exc_info=e)

                yield (
                    'manifest.json', 'manifest',
                    lambda *args: BytesIO(manifest_contents),
                    lambda *args: len(manifest_contents))
Markus Scheidgen's avatar
Markus Scheidgen committed
204

205
            except Exception as e:
206
                common.logger.warning(
207
208
209
                    'unexpected error while streaming raw data from query',
                    exc_info=e,
                    query=urllib.parse.urlencode(request.args, doseq=True))
210

211
212
213
214
        return streamed_zipfile(
            generator(), zipfile_name='nomad_archive.zip', compress=compress)


215
216
217
218
_archive_query_model = api.inherit('ArchiveCalculations', search_model, {
    'query': fields.Nested(query_model, description='The query used to find the requested entries.'),
    'query_schema': fields.Raw(description='The query schema that defines what archive data to retrive.')
})
219
220


221
222
@ns.route('/query')
class ArchiveQueryResource(Resource):
223
224
225
226
227
    @api.doc('post_archive_query')
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
    @api.response(401, 'Not authorized to access the data.')
    @api.response(404, 'The upload or calculation does not exist')
    @api.response(200, 'Archive data send')
228
    @api.expect(_archive_query_model)
229
    @api.marshal_with(_archive_query_model, skip_none=True, code=200, description='Search results sent')
230
    @authenticate()
231
232
    def post(self):
        """
233
        Post a query schema and return it filled with archive data.
234

235
236
        See ``/repo`` endpoint for documentation on the search
        parameters.
237

238
239
240
        The actual data are in results and a supplementary python code (curl) to
        execute search is in python (curl).
        """
241
242
243
        try:
            data_in = request.get_json()
            scroll = data_in.get('scroll', None)
244
245
246
247
248
249
250
251
252
253
254
            if scroll:
                scroll_id = scroll.get('scroll_id')
                scroll = True

            pagination = data_in.get('pagination', {})
            page = pagination.get('page', 1)
            per_page = pagination.get('per_page', 10 if not scroll else 1000)

            query = data_in.get('query', {})
            query_schema = data_in.get('query_schema', '*')

255
256
257
        except Exception:
            abort(400, message='bad parameter types')

258
        if not (page >= 1 and per_page > 0):
259
260
261
            abort(400, message='invalid pagination')

        search_request = search.SearchRequest()
262
263
264
265
266
267
268
        if g.user is not None:
            search_request.owner('all', user_id=g.user.user_id)
        else:
            search_request.owner('all')

        apply_search_parameters(search_request, query)
        search_request.include('calc_id', 'upload_id', 'with_embargo')
269
270
271

        try:
            if scroll:
272
273
                results = search_request.execute_scrolled(
                    scroll_id=scroll_id, size=per_page, order_by='upload_id')
274
275
276
277
                results['scroll']['scroll'] = True

            else:
                results = search_request.execute_paginated(
278
                    per_page=per_page, page=page, order_by='upload_id')
279
280
281
282
283
284
285
286
287
288

        except search.ScrollIdNotFound:
            abort(400, 'The given scroll_id does not exist.')
        except KeyError as e:
            import traceback
            traceback.print_exc()
            abort(400, str(e))

        data = []
        calcs = results['results']
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
        archive_files = None
        cur_upload_id = None
        for entry in calcs:
            upload_id = entry['upload_id']
            calc_id = entry['calc_id']
            if archive_files is None or cur_upload_id != upload_id:
                upload_files = UploadFiles.get(upload_id, create_authorization_predicate(upload_id))

                if upload_files is None:
                    return []

                archive_files = upload_files.archive_file_msgs()
                cur_upload_id = upload_id

            if entry['with_embargo']:
                archive_file = archive_files[1]
            else:
                archive_file = archive_files[0]
307

308
309
            if archive_file is None:
                continue
310

311
            data.append(query_archive(archive_file, {calc_id: query_schema}))
312
313
314
315

        # assign archive data to results
        results['results'] = data

316
        return results, 200
317
318


319
320
@ns.route('/metainfo/<string:metainfo_package_name>')
@api.doc(params=dict(metainfo_package_name='The name of the metainfo package.'))
321
322
323
324
class MetainfoResource(Resource):
    @api.doc('get_metainfo')
    @api.response(404, 'The metainfo does not exist')
    @api.response(200, 'Metainfo data send')
325
    def get(self, metainfo_package_name):
326
327
328
329
        """
        Get a metainfo definition file.
        """
        try:
330
            return load_metainfo(metainfo_package_name), 200
331
        except FileNotFoundError:
332
            parser_prefix = metainfo_package_name[:-len('.nomadmetainfo.json')]
333

334
            try:
335
336
337
                return load_metainfo(dict(
                    parser='%sparser' % parser_prefix,
                    path='%s.nomadmetainfo.json' % parser_prefix)), 200
338
339
            except FileNotFoundError:
                abort(404, message='The metainfo %s does not exist.' % metainfo_package_name)
340
341
342
343
344


metainfo_main_path = os.path.dirname(os.path.abspath(nomad_meta_info.__file__))


345
346
347
def load_metainfo(
        package_name_or_dependency: str, dependency_source: str = None,
        loaded_packages: Dict[str, Any] = None) -> Dict[str, Any]:
348
349
350
351
352
    """
    Loads the given metainfo package and all its dependencies. Returns a dict with
    all loaded package_names and respective packages.

    Arguments:
353
354
        package_name_or_dependency: The name of the package, or a nomadmetainfo dependency object.
        dependency_source: The path of the metainfo that uses this function to load a relative dependency.
355
356
357
358
359
360
        loaded_packages: Give a dict and the function will added freshly loaded packages
            to it and return it.
    """
    if loaded_packages is None:
        loaded_packages = {}

361
362
    if isinstance(package_name_or_dependency, str):
        package_name = package_name_or_dependency
363
        metainfo_path = os.path.join(metainfo_main_path, package_name)
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
    else:
        dependency = package_name_or_dependency
        if 'relativePath' in dependency:
            if dependency_source is None:
                raise Exception(
                    'Can only load relative dependency from within another metainfo package')

            metainfo_path = os.path.join(
                os.path.dirname(dependency_source), dependency['relativePath'])

        elif 'metainfoPath' in dependency:
            metainfo_path = os.path.join(metainfo_main_path, dependency['metainfoPath'])

        elif 'parser' in dependency:
            parser = dependency['parser']
            path = dependency['path']
            try:
                parser_module = importlib.import_module(parser).__file__
            except Exception:
                raise Exception('Parser not installed %s for metainfo path %s' % (parser, metainfo_path))

            parser_directory = os.path.dirname(parser_module)
            metainfo_path = os.path.join(parser_directory, path)

        else:
            raise Exception('Invalid dependency type in metainfo package %s' % metainfo_path)

        package_name = os.path.basename(metainfo_path)
392
393
394
395
396
397
398
399
400
401
402
403

    package_name = os.path.basename(package_name)

    if package_name in loaded_packages:
        return loaded_packages

    with open(metainfo_path, 'rt') as f:
        metainfo_json = json.load(f)

    loaded_packages[package_name] = metainfo_json

    for dependency in metainfo_json.get('dependencies', []):
404
        load_metainfo(dependency, dependency_source=metainfo_path, loaded_packages=loaded_packages)
405
406

    return loaded_packages