repo.py 22.2 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The repository API of the nomad@FAIRDI APIs. Currently allows to resolve repository
meta-data.
"""

20
from typing import List, Dict, Any
Markus Scheidgen's avatar
Markus Scheidgen committed
21
from flask_restplus import Resource, abort, fields
22
from flask import request, g
23
from elasticsearch.exceptions import NotFoundError
24
import elasticsearch.helpers
Markus Scheidgen's avatar
Markus Scheidgen committed
25

26
27
from nomad import search, utils, datamodel, processing as proc, infrastructure
from nomad.app.utils import rfc3339DateTime, RFC3339DateTime, with_logger
28
from nomad.app.optimade import filterparser
Markus Scheidgen's avatar
Markus Scheidgen committed
29

Markus Scheidgen's avatar
Markus Scheidgen committed
30
from .api import api
31
from .auth import authenticate
32
from .common import pagination_model, pagination_request_parser, calc_route, metadata_model
Markus Scheidgen's avatar
Markus Scheidgen committed
33

34
ns = api.namespace('repo', description='Access repository metadata.')
Markus Scheidgen's avatar
Markus Scheidgen committed
35
36
37
38
39


@calc_route(ns)
class RepoCalcResource(Resource):
    @api.response(404, 'The upload or calculation does not exist')
40
    @api.response(401, 'Not authorized to access the calculation')
41
    @api.response(200, 'Metadata send', fields.Raw)
42
    @api.doc('get_repo_calc')
43
    @authenticate()
44
    def get(self, upload_id, calc_id):
Markus Scheidgen's avatar
Markus Scheidgen committed
45
46
47
        """
        Get calculation metadata in repository form.

48
        Repository metadata only entails the quantities shown in the repository.
49
        Calcs are references via *upload_id*, *calc_id* pairs.
Markus Scheidgen's avatar
Markus Scheidgen committed
50
51
        """
        try:
52
53
54
55
56
57
58
59
            calc = search.Entry.get(calc_id)
        except NotFoundError:
            abort(404, message='There is no calculation %s/%s' % (upload_id, calc_id))

        if calc.with_embargo or not calc.published:
            if g.user is None:
                abort(401, message='Not logged in to access %s/%s.' % (upload_id, calc_id))

60
            if not (any(g.user.user_id == user.user_id for user in calc.owners) or g.user.is_admin):
61
62
63
                abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))

        return calc.to_dict(), 200
Markus Scheidgen's avatar
Markus Scheidgen committed
64
65
66


repo_calcs_model = api.model('RepoCalculations', {
Markus Scheidgen's avatar
Markus Scheidgen committed
67
    'pagination': fields.Nested(pagination_model, skip_none=True),
68
69
70
71
    'scroll': fields.Nested(allow_null=True, skip_none=True, model=api.model('Scroll', {
        'total': fields.Integer(description='The total amount of hits for the search.'),
        'scroll_id': fields.String(allow_null=True, description='The scroll_id that can be used to retrieve the next page.'),
        'size': fields.Integer(help='The size of the returned scroll page.')})),
72
73
74
    'results': fields.List(fields.Raw, description=(
        'A list of search results. Each result is a dict with quantitie names as key and '
        'values as values')),
75
76
77
78
    'statistics': fields.Raw(description=(
        'A dict with all statistics. Each statistic is dictionary with a metrics dict as '
        'value and quantity value as key. The possible metrics are code runs(calcs), %s. '
        'There is a pseudo quantity "total" with a single value "all" that contains the '
79
        ' metrics over all results. ' % ', '.join(datamodel.Domain.instance.metrics_names))),
80
    'datasets': fields.Nested(api.model('RepoDatasets', {
81
82
        'after': fields.String(description='The after value that can be used to retrieve the next datasets.'),
        'values': fields.Raw(description='A dict with names as key. The values are dicts with "total" and "examples" keys.')
Markus Scheidgen's avatar
Markus Scheidgen committed
83
    }), skip_none=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
84
85
})

86

87
88
repo_calc_id_model = api.model('RepoCalculationId', {
    'upload_id': fields.String(), 'calc_id': fields.String()
Markus Scheidgen's avatar
Markus Scheidgen committed
89
90
})

91
92
93
94
95
96
97
98
99
100
101
102

def add_common_parameters(request_parser):
    request_parser.add_argument(
        'owner', type=str,
        help='Specify which calcs to return: ``all``, ``public``, ``user``, ``staging``, default is ``all``')
    request_parser.add_argument(
        'from_time', type=lambda x: rfc3339DateTime.parse(x),
        help='A yyyy-MM-ddTHH:mm:ss (RFC3339) minimum entry time (e.g. upload time)')
    request_parser.add_argument(
        'until_time', type=lambda x: rfc3339DateTime.parse(x),
        help='A yyyy-MM-ddTHH:mm:ss (RFC3339) maximum entry time (e.g. upload time)')

103
    for quantity in search.quantities.values():
104
        request_parser.add_argument(
105
            quantity.name, help=quantity.description,
106
            action=quantity.argparse_action if quantity.multi else None)
107
108


Markus Scheidgen's avatar
Markus Scheidgen committed
109
repo_request_parser = pagination_request_parser.copy()
110
add_common_parameters(repo_request_parser)
111
112
113
114
repo_request_parser.add_argument(
    'scroll', type=bool, help='Enable scrolling')
repo_request_parser.add_argument(
    'scroll_id', type=str, help='The id of the current scrolling window to use.')
115
116
repo_request_parser.add_argument(
    'date_histogram', type=bool, help='Add an additional aggregation over the upload time')
117
118
repo_request_parser.add_argument(
    'datasets_after', type=str, help='The last dataset id of the last scroll window for the dataset quantitiy')
Markus Scheidgen's avatar
Markus Scheidgen committed
119
repo_request_parser.add_argument(
120
    'metrics', type=str, action='append', help=(
121
        'Metrics to aggregate over all quantities and their values as comma separated list. '
122
        'Possible values are %s.' % ', '.join(datamodel.Domain.instance.metrics_names)))
Markus Scheidgen's avatar
Markus Scheidgen committed
123
124
125
126
repo_request_parser.add_argument(
    'datasets', type=bool, help=('Return dataset information.'))
repo_request_parser.add_argument(
    'statistics', type=bool, help=('Return statistics.'))
Markus Scheidgen's avatar
Markus Scheidgen committed
127

128

129
130
131
search_request_parser = api.parser()
add_common_parameters(search_request_parser)

132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
query_model_parameters = {
    'owner': fields.String(description='Specify which calcs to return: ``all``, ``public``, ``user``, ``staging``, default is ``all``'),
    'from_time': RFC3339DateTime(description='A yyyy-MM-ddTHH:mm:ss (RFC3339) minimum entry time (e.g. upload time)'),
    'until_time': RFC3339DateTime(description='A yyyy-MM-ddTHH:mm:ss (RFC3339) maximum entry time (e.g. upload time)')
}

for quantity in search.quantities.values():
    if quantity.multi:
        def field(**kwargs):
            return fields.List(fields.String(**kwargs))
    else:
        field = fields.String
    query_model_parameters[quantity.name] = field(description=quantity.description)


repo_query_model = api.model('RepoQuery', query_model_parameters, skip_none=True)
repo_edit_model = api.model('RepoEdit', {
    'query': fields.Nested(repo_query_model, skip_none=True, description='New metadata will be applied to query results.'),
    'metadata': fields.Nested(metadata_model, skip_none=True, description='New metadata that should be used on all query results.')
})

153

154
def add_query(search_request: search.SearchRequest, args: Dict[str, Any]):
155
    """
156
    Help that adds query relevant request args to the given SearchRequest.
157
    """
158
    args = {key: value for key, value in args.items() if value is not None}
159

160
161
162
    # owner
    try:
        search_request.owner(
163
            args.get('owner', 'all'),
164
165
166
167
168
169
170
            g.user.user_id if g.user is not None else None)
    except ValueError as e:
        abort(401, getattr(e, 'message', 'Invalid owner parameter'))
    except Exception as e:
        abort(400, getattr(e, 'message', 'Invalid owner parameter'))

    # time range
171
172
    from_time_str = args.get('from_time', None)
    until_time_str = args.get('until_time', None)
173
174

    try:
175
176
177
        from_time = rfc3339DateTime.parse(from_time_str) if from_time_str is not None else None
        until_time = rfc3339DateTime.parse(until_time_str) if until_time_str is not None else None
        search_request.time_range(start=from_time, end=until_time)
178
179
180
    except Exception:
        abort(400, message='bad datetime format')

181
182
    # optimade
    try:
183
        optimade = args.get('optimade', None)
184
185
186
187
188
189
        if optimade is not None:
            q = filterparser.parse_filter(optimade)
            search_request.query(q)
    except filterparser.FilterException:
        abort(400, message='could not parse optimade query')

190
191
    # search parameter
    search_request.search_parameters(**{
192
        key: value for key, value in args.items()
193
        if key not in ['optimade'] and key in search.quantities})
194
195


Markus Scheidgen's avatar
Markus Scheidgen committed
196
197
@ns.route('/')
class RepoCalcsResource(Resource):
198
    @api.doc('search')
199
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
Markus Scheidgen's avatar
Markus Scheidgen committed
200
    @api.expect(repo_request_parser, validate=True)
201
    @api.marshal_with(repo_calcs_model, skip_none=True, code=200, description='Search results send')
202
    @authenticate()
Markus Scheidgen's avatar
Markus Scheidgen committed
203
204
    def get(self):
        """
205
        Search for calculations in the repository form, paginated.
206
207

        The ``owner`` parameter determines the overall entries to search through.
208
209
210
211
        Possible values are: ``all`` (show all entries visible to the current user), ``public``
        (show all publically visible entries), ``user`` (show all user entries, requires login),
        ``staging`` (show all user entries in staging area, requires login).

212
213
214
215
216
        You can use the various quantities to search/filter for. For some of the
        indexed quantities this endpoint returns aggregation information. This means
        you will be given a list of all possible values and the number of entries
        that have the certain value. You can also use these aggregations on an empty
        search to determine the possible values.
217
218
219

        The pagination parameters allows determine which page to return via the
        ``page`` and ``per_page`` parameters. Pagination however, is limited to the first
Markus Scheidgen's avatar
Markus Scheidgen committed
220
221
222
223
224
225
226
227
        100k (depending on ES configuration) hits.

        An alternative to pagination is to use ``scroll`` and ``scroll_id``. With ``scroll``
        you will get a ``scroll_id`` on the first request. Each call with ``scroll`` and
        the respective ``scroll_id`` will return the next ``per_page`` (here the default is 1000)
        results. Scroll however, ignores ordering and does not return aggregations.
        The scroll view used in the background will stay alive for 1 minute between requests.
        If the given ``scroll_id`` is not available anymore, a HTTP 400 is raised.
228
229
230
231
232

        The search will return aggregations on a predefined set of quantities. Aggregations
        will tell you what quantity values exist and how many entries match those values.

        Ordering is determined by ``order_by`` and ``order`` parameters.
233
        """
234
235

        try:
236
237
238
239
240
241
242
243
244
245
246
247
            args = {
                key: value for key, value in repo_request_parser.parse_args().items()
                if value is not None}

            scroll = args.get('scroll', False)
            scroll_id = args.get('scroll_id', None)
            page = args.get('page', 1)
            per_page = args.get('per_page', 10 if not scroll else 1000)
            order = args.get('order', -1)
            order_by = args.get('order_by', 'formula')

            date_histogram = args.get('date_histogram', False)
248
            metrics: List[str] = request.args.getlist('metrics')
Markus Scheidgen's avatar
Markus Scheidgen committed
249

250
251
252
253
254
255
256
257
258
            with_datasets = args.get('datasets', False)
            with_statistics = args.get('statistics', False)
        except Exception as e:
            abort(400, message='bad parameters: %s' % str(e))

        search_request = search.SearchRequest()
        add_query(search_request, args)
        if date_histogram:
            search_request.date_histogram()
259

260
        try:
261
            assert page >= 1
262
            assert per_page >= 0
263
264
265
        except AssertionError:
            abort(400, message='invalid pagination')

266
267
268
        if order not in [-1, 1]:
            abort(400, message='invalid pagination')

269
270
        for metric in metrics:
            if metric not in search.metrics_names:
271
272
                abort(400, message='there is no metric %s' % metric)

Markus Scheidgen's avatar
Markus Scheidgen committed
273
274
275
276
277
278
279
280
        if with_statistics:
            search_request.default_statistics(metrics_to_use=metrics)
            if 'datasets' not in metrics:
                total_metrics = metrics + ['datasets']
            else:
                total_metrics = metrics
            search_request.totals(metrics_to_use=total_metrics)
            search_request.statistic('authors', 1000)
281

282
        try:
283
            if scroll:
284
                results = search_request.execute_scrolled(scroll_id=scroll_id, size=per_page)
285

286
            else:
Markus Scheidgen's avatar
Markus Scheidgen committed
287
288
289
290
291
                if with_datasets:
                    search_request.quantity(
                        'dataset_id', size=per_page, examples=1,
                        after=request.args.get('datasets_after', None))

292
293
                results = search_request.execute_paginated(
                    per_page=per_page, page=page, order=order, order_by=order_by)
294
295

                # TODO just a work around to make things prettier
Markus Scheidgen's avatar
Markus Scheidgen committed
296
297
298
299
300
301
302
303
                if with_statistics:
                    statistics = results['statistics']
                    if 'code_name' in statistics and 'currupted mainfile' in statistics['code_name']:
                        del(statistics['code_name']['currupted mainfile'])

                if with_datasets:
                    datasets = results.pop('quantities')['dataset_id']
                    results['datasets'] = datasets
304
305

            return results, 200
Markus Scheidgen's avatar
Markus Scheidgen committed
306
307
        except search.ScrollIdNotFound:
            abort(400, 'The given scroll_id does not exist.')
308
        except KeyError as e:
309
310
            import traceback
            traceback.print_exc()
311
            abort(400, str(e))
312

313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
    @api.doc('edit_repo')
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
    @api.expect(repo_edit_model)
    @api.response(code=200, description='Search results send')
    @authenticate()
    @with_logger
    def post(self, logger):
        """ Edit repository metadata. """
        json_data = request.get_json()
        if json_data is None:
            json_data = {}
        query = json_data.get('query', {})

        owner = query.get('owner', 'user')
        if owner not in ['user', 'staging']:
            abort(400, 'Not a valid owner for edit %s. Edit can only be performed in user or staging' % owner)
        query['owner'] = owner

        search_request = search.SearchRequest()
        add_query(search_request, query)

        if 'metadata' not in json_data:
            abort(400, 'Missing key metadata in edit repo payload')
        metadata = json_data['metadata']
        if metadata.get('with_embargo', False):
            abort(400, 'Cannot raise an embargo, you can only lift the embargo')

340
341
342
343
        if '_uploader' in metadata or '_upload_time' in metadata:
            if not g.user.is_admin():
                abort(400, 'Only the admin user can set uploader or upload_time.')

344
        mongo_update = {}
345
346
347
        for key in [
                'with_embargo', 'shared_with', 'coauthors', 'references', 'comment',
                'datasets', '_uploader', '_upload_time']:
348
            if key in metadata:
349
                mongo_update['metadata__%s' % key.lstrip('_')] = metadata[key]
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374

        calc_ids = list(hit['calc_id'] for hit in search_request.execute_scan())

        n_updated = proc.Calc.objects(calc_id__in=calc_ids).update(multi=True, **mongo_update)
        if n_updated != len(calc_ids):
            logger.error('edit repo did not update all entries', payload=json_data)

        def elastic_updates():
            for calc in proc.Calc.objects(calc_id__in=calc_ids):
                entry = search.Entry.from_calc_with_metadata(
                    datamodel.CalcWithMetadata(**calc['metadata']))
                entry = entry.to_dict(include_meta=True)
                entry['_op_type'] = 'index'
                yield entry

        _, failed = elasticsearch.helpers.bulk(
            infrastructure.elastic_client, elastic_updates(), stats_only=True)
        search.refresh()
        if failed > 0:
            logger.error(
                'edit repo with failed elastic updates',
                payload=json_data, nfailed=len(failed))

        return 'metadata updated', 200

375

376
377
378
379
380
repo_quantity_model = api.model('RepoQuantity', {
    'after': fields.String(description='The after value that can be used to retrieve the next set of values.'),
    'values': fields.Raw(description='A dict with values as key. Values are dicts with "total" and "examples" keys.')
})

381
repo_quantity_values_model = api.model('RepoQuantityValues', {
382
383
384
385
386
    'quantity': fields.Nested(repo_quantity_model, allow_null=True)
})

repo_quantities_model = api.model('RepoQuantities', {
    'quantities': fields.List(fields.Nested(repo_quantity_model))
387
388
389
390
391
392
})

repo_quantity_search_request_parser = api.parser()
add_common_parameters(repo_quantity_search_request_parser)
repo_quantity_search_request_parser.add_argument(
    'after', type=str, help='The after value to use for "scrolling".')
393
repo_quantity_search_request_parser.add_argument(
394
395
396
    'size', type=int, help='The max size of the returned values.')


397
@ns.route('/quantity/<string:quantity>')
398
399
400
401
402
class RepoQuantityResource(Resource):
    @api.doc('quantity_search')
    @api.response(400, 'Invalid requests, e.g. wrong owner type, bad quantity, bad search parameters')
    @api.expect(repo_quantity_search_request_parser, validate=True)
    @api.marshal_with(repo_quantity_values_model, skip_none=True, code=200, description='Search results send')
403
    @authenticate()
404
405
406
407
408
409
410
411
412
413
414
415
416
417
    def get(self, quantity: str):
        """
        Retrieve quantity values from entries matching the search.

        You can use the various quantities to search/filter for. For some of the
        indexed quantities this endpoint returns aggregation information. This means
        you will be given a list of all possible values and the number of entries
        that have the certain value. You can also use these aggregations on an empty
        search to determine the possible values.

        There is no ordering and no pagination. Instead there is an 'after' key based
        scrolling. The result will contain an 'after' value, that can be specified
        for the next request. You can use the 'size' and 'after' parameters accordingly.

418
419
420
        The result will contain a 'quantity' key with quantity values and the "after"
        value. There will be upto 'size' many values. For the rest of the values use the
        "after" parameter in another request.
421
422
        """

423
        search_request = search.SearchRequest()
424
425
426
427
        args = {
            key: value
            for key, value in repo_quantity_search_request_parser.parse_args().items()
            if value is not None}
428

429
430
431
        add_query(search_request, args)
        after = args.get('after', None)
        size = args.get('size', 100)
432
433
434
435
436
437

        try:
            assert size >= 0
        except AssertionError:
            abort(400, message='invalid size')

438
        search_request.quantity(quantity, size=size, after=after)
439
440

        try:
441
442
443
            results = search_request.execute()
            quantities = results.pop('quantities')
            results['quantity'] = quantities[quantity]
444
445
446
447
448
449

            return results, 200
        except KeyError as e:
            import traceback
            traceback.print_exc()
            abort(400, 'Given quantity does not exist: %s' % str(e))
450
451


452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
repo_quantities_search_request_parser = api.parser()
add_common_parameters(repo_quantities_search_request_parser)
repo_quantities_search_request_parser.add_argument(
    'quantities', type=str, action='append',
    help='The quantities to retrieve values from')
repo_quantities_search_request_parser.add_argument(
    'size', type=int, help='The max size of the returned values.')


@ns.route('/quantities')
class RepoQuantitiesResource(Resource):
    @api.doc('quantities_search')
    @api.response(400, 'Invalid requests, e.g. wrong owner type, bad quantity, bad search parameters')
    @api.expect(repo_quantities_search_request_parser, validate=True)
    @api.marshal_with(repo_quantities_model, skip_none=True, code=200, description='Search results send')
    @authenticate()
    def get(self):
        """
        Retrieve quantity values for multiple quantities at once.

        You can use the various quantities to search/filter for. For some of the
        indexed quantities this endpoint returns aggregation information. This means
        you will be given a list of all possible values and the number of entries
        that have the certain value. You can also use these aggregations on an empty
        search to determine the possible values.

        There is no ordering and no pagination and not after key based scrolling. Instead
        there is an 'after' key based scrolling.

        The result will contain a 'quantities' key with a dict of quantity names and the
        retrieved values as values.
        """

        search_request = search.SearchRequest()
        args = {
            key: value
            for key, value in repo_quantities_search_request_parser.parse_args().items()
            if value is not None}

        add_query(search_request, args)
        quantities = args.get('quantities', [])
        size = args.get('size', 5)

        print('A ', quantities)
        try:
            assert size >= 0
        except AssertionError:
            abort(400, message='invalid size')

        for quantity in quantities:
            try:
                search_request.quantity(quantity, size=size)
            except KeyError as e:
                import traceback
                traceback.print_exc()
                abort(400, 'Given quantity does not exist: %s' % str(e))

        return search_request.execute(), 200


512
513
514
515
516
@ns.route('/pid/<int:pid>')
class RepoPidResource(Resource):
    @api.doc('resolve_pid')
    @api.response(404, 'Entry with PID does not exist')
    @api.marshal_with(repo_calc_id_model, skip_none=True, code=200, description='Entry resolved')
Markus Scheidgen's avatar
Markus Scheidgen committed
517
    @authenticate()
518
    def get(self, pid: int):
519
520
521
522
        search_request = search.SearchRequest()

        if g.user is not None:
            search_request.owner('all', user_id=g.user.user_id)
523
        else:
524
525
526
527
528
529
530
531
532
533
534
            search_request.owner('all')

        search_request.search_parameter('pid', pid)

        results = list(search_request.execute_scan())
        total = len(results)

        if total == 0:
            abort(404, 'Entry with PID %d does not exist' % pid)

        if total > 1:
535
            utils.get_logger(__name__).error('Two entries for the same pid', pid=pid)
536
537
538
539
540

        result = results[0]
        return dict(
            upload_id=result['upload_id'],
            calc_id=result['calc_id'])