repo.py 32.1 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
16
17
The repository API of the nomad@FAIRDI APIs. Currently allows to resolve repository
meta-data.
18
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
19

20
from typing import List, Dict, Any
Markus Scheidgen's avatar
Markus Scheidgen committed
21
from flask_restplus import Resource, abort, fields
22
from flask import request, g
23
from elasticsearch_dsl import Q
24
from elasticsearch.exceptions import NotFoundError
25
import elasticsearch.helpers
Markus Scheidgen's avatar
Markus Scheidgen committed
26
from datetime import datetime
Markus Scheidgen's avatar
Markus Scheidgen committed
27

28
from nomad import search, utils, datamodel, processing as proc, infrastructure, files
29
from nomad.metainfo import search_extension
30
from nomad.datamodel import Dataset, User, EditableUserMetadata
31
from nomad.app import common
32
from nomad.app.common import RFC3339DateTime, DotKeyNested
Markus Scheidgen's avatar
Markus Scheidgen committed
33

Markus Scheidgen's avatar
Markus Scheidgen committed
34
from .api import api
35
from .auth import authenticate
36
37
from .common import search_model, calc_route, add_pagination_parameters,\
    add_scroll_parameters, add_search_parameters, apply_search_parameters,\
38
    query_api_python, query_api_curl, _search_quantities
Markus Scheidgen's avatar
Markus Scheidgen committed
39

40
ns = api.namespace('repo', description='Access repository metadata.')
Markus Scheidgen's avatar
Markus Scheidgen committed
41
42
43
44
45


@calc_route(ns)
class RepoCalcResource(Resource):
    @api.response(404, 'The upload or calculation does not exist')
46
    @api.response(401, 'Not authorized to access the calculation')
47
    @api.response(200, 'Metadata send', fields.Raw)
48
    @api.doc('get_repo_calc')
49
    @authenticate()
50
    def get(self, upload_id, calc_id):
51
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
52
53
        Get calculation metadata in repository form.

54
        Repository metadata only entails the quantities shown in the repository.
55
        Calcs are references via *upload_id*, *calc_id* pairs.
56
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
57
        try:
58
            calc = search.entry_document.get(calc_id)
59
60
61
62
63
64
65
        except NotFoundError:
            abort(404, message='There is no calculation %s/%s' % (upload_id, calc_id))

        if calc.with_embargo or not calc.published:
            if g.user is None:
                abort(401, message='Not logged in to access %s/%s.' % (upload_id, calc_id))

66
            if not (any(g.user.user_id == user.user_id for user in calc.owners) or g.user.is_admin):
67
68
                abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))

69
        result = calc.to_dict()
70
71
        result['python'] = query_api_python('archive', upload_id, calc_id)
        result['curl'] = query_api_curl('archive', upload_id, calc_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
72

73
        return result, 200
Markus Scheidgen's avatar
Markus Scheidgen committed
74

75

76
77
78
79
80
_search_request_parser = api.parser()
add_pagination_parameters(_search_request_parser)
add_scroll_parameters(_search_request_parser)
add_search_parameters(_search_request_parser)
_search_request_parser.add_argument(
81
    'date_histogram', type=bool, help='Add an additional aggregation over the upload time')
82
83
_search_request_parser.add_argument(
    'interval', type=str, help='Interval to use for upload time aggregation.')
84
_search_request_parser.add_argument(
85
    'metrics', type=str, action='append', help=(
86
        'Metrics to aggregate over all quantities and their values as comma separated list. '
87
        'Possible values are %s.' % ', '.join(search_extension.metrics.keys())))
88
_search_request_parser.add_argument(
Markus Scheidgen's avatar
Markus Scheidgen committed
89
    'statistics', type=bool, help=('Return statistics.'))
90
91
_search_request_parser.add_argument(
    'exclude', type=str, action='split', help='Excludes the given keys in the returned data.')
92
for group_name in search_extension.groups:
93
    _search_request_parser.add_argument(
94
        group_name, type=bool, help=('Return %s group data.' % group_name))
95
    _search_request_parser.add_argument(
Markus Scheidgen's avatar
Markus Scheidgen committed
96
97
98
        '%s_after' % group_name, type=str,
        help='The last %s id of the last scroll window for the %s group' % (group_name, group_name))

99
100
101
102
103
_repo_calcs_model_fields = {
    'statistics': fields.Raw(description=(
        'A dict with all statistics. Each statistic is dictionary with a metrics dict as '
        'value and quantity value as key. The possible metrics are code runs(calcs), %s. '
        'There is a pseudo quantity "total" with a single value "all" that contains the '
104
        ' metrics over all results. ' % ', '.join(search_extension.metrics.keys())))}
105

106
for group_name in search_extension.groups:
107
    _repo_calcs_model_fields[group_name] = (DotKeyNested if '.' in group_name else fields.Nested)(api.model('RepoGroup', {
108
        'after': fields.String(description='The after value that can be used to retrieve the next %s.' % group_name),
109
        'values': fields.Raw(description='A dict with %s as key. The values are dicts with "total" and "examples" keys.' % group_name)
110
    }), skip_none=True)
111

112
for qualified_name, quantity in search_extension.search_quantities.items():
113
    _repo_calcs_model_fields[qualified_name] = fields.Raw(
114
115
        description=quantity.description, allow_null=True, skip_none=True)

116
_repo_calcs_model = api.inherit('RepoCalculations', search_model, _repo_calcs_model_fields)
117
118


Markus Scheidgen's avatar
Markus Scheidgen committed
119
120
@ns.route('/')
class RepoCalcsResource(Resource):
121
    @api.doc('search')
122
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
123
124
    @api.expect(_search_request_parser, validate=True)
    @api.marshal_with(_repo_calcs_model, skip_none=True, code=200, description='Search results send')
125
    @authenticate()
Markus Scheidgen's avatar
Markus Scheidgen committed
126
    def get(self):
127
        '''
128
        Search for calculations in the repository form, paginated.
129
130

        The ``owner`` parameter determines the overall entries to search through.
131
132
133
134
        Possible values are: ``all`` (show all entries visible to the current user), ``public``
        (show all publically visible entries), ``user`` (show all user entries, requires login),
        ``staging`` (show all user entries in staging area, requires login).

135
136
137
138
139
        You can use the various quantities to search/filter for. For some of the
        indexed quantities this endpoint returns aggregation information. This means
        you will be given a list of all possible values and the number of entries
        that have the certain value. You can also use these aggregations on an empty
        search to determine the possible values.
140
141
142

        The pagination parameters allows determine which page to return via the
        ``page`` and ``per_page`` parameters. Pagination however, is limited to the first
Markus Scheidgen's avatar
Markus Scheidgen committed
143
144
145
146
147
148
149
150
        100k (depending on ES configuration) hits.

        An alternative to pagination is to use ``scroll`` and ``scroll_id``. With ``scroll``
        you will get a ``scroll_id`` on the first request. Each call with ``scroll`` and
        the respective ``scroll_id`` will return the next ``per_page`` (here the default is 1000)
        results. Scroll however, ignores ordering and does not return aggregations.
        The scroll view used in the background will stay alive for 1 minute between requests.
        If the given ``scroll_id`` is not available anymore, a HTTP 400 is raised.
151
152
153
154

        The search will return aggregations on a predefined set of quantities. Aggregations
        will tell you what quantity values exist and how many entries match those values.

155
156
        Ordering is determined by ``order_by`` and ``order`` parameters. Default is
        ``upload_time`` in decending order.
157
        '''
158
159

        try:
160
            parsed_args = _search_request_parser.parse_args()
161
            args = {
162
                key: value for key, value in parsed_args.items()
163
164
165
166
167
168
169
                if value is not None}

            scroll = args.get('scroll', False)
            scroll_id = args.get('scroll_id', None)
            page = args.get('page', 1)
            per_page = args.get('per_page', 10 if not scroll else 1000)
            order = args.get('order', -1)
170
            order_by = args.get('order_by', 'upload_time')
171
172

            date_histogram = args.get('date_histogram', False)
173
            interval = args.get('interval', '1M')
174
            metrics: List[str] = request.args.getlist('metrics')
Markus Scheidgen's avatar
Markus Scheidgen committed
175

Markus Scheidgen's avatar
Markus Scheidgen committed
176
            with_statistics = args.get('statistics', False) or \
177
                any(args.get(group_name, False) for group_name in search_extension.groups)
178
179
180
181
        except Exception as e:
            abort(400, message='bad parameters: %s' % str(e))

        search_request = search.SearchRequest()
182
        apply_search_parameters(search_request, args)
183
        if date_histogram:
184
            search_request.date_histogram(interval=interval)
185

186
        try:
187
            assert page >= 1
188
            assert per_page >= 0
189
190
191
        except AssertionError:
            abort(400, message='invalid pagination')

192
193
194
        if order not in [-1, 1]:
            abort(400, message='invalid pagination')

195
        for metric in metrics:
196
            if metric not in search_extension.metrics:
197
198
                abort(400, message='there is no metric %s' % metric)

Markus Scheidgen's avatar
Markus Scheidgen committed
199
200
        if with_statistics:
            search_request.default_statistics(metrics_to_use=metrics)
201

Markus Scheidgen's avatar
Markus Scheidgen committed
202
            additional_metrics = [
203
                group_quantity.metric_name
204
                for group_name, group_quantity in search_extension.groups.items()
205
                if args.get(group_name, False)]
206
207
208

            total_metrics = metrics + additional_metrics

Markus Scheidgen's avatar
Markus Scheidgen committed
209
210
            search_request.totals(metrics_to_use=total_metrics)
            search_request.statistic('authors', 1000)
211
212
        elif len(metrics) > 0:
            search_request.totals(metrics_to_use=metrics)
213

214
        if 'exclude' in parsed_args:
215
216
217
            excludes = parsed_args['exclude']
            if excludes is not None:
                search_request.exclude(*excludes)
218

219
        try:
220
            if scroll:
221
                results = search_request.execute_scrolled(scroll_id=scroll_id, size=per_page)
222

223
            else:
224
                for group_name, group_quantity in search_extension.groups.items():
225
                    if args.get(group_name, False):
226
                        kwargs: Dict[str, Any] = {}
227
                        if group_name == 'group_uploads':
228
                            kwargs.update(order_by='upload_time', order='desc')
Markus Scheidgen's avatar
Markus Scheidgen committed
229
                        search_request.quantity(
230
                            group_quantity.qualified_name, size=per_page, examples=1,
231
232
                            after=request.args.get('%s_after' % group_name, None),
                            **kwargs)
233

234
235
                results = search_request.execute_paginated(
                    per_page=per_page, page=page, order=order, order_by=order_by)
236
237

                # TODO just a work around to make things prettier
Markus Scheidgen's avatar
Markus Scheidgen committed
238
239
240
241
242
                if with_statistics:
                    statistics = results['statistics']
                    if 'code_name' in statistics and 'currupted mainfile' in statistics['code_name']:
                        del(statistics['code_name']['currupted mainfile'])

243
244
245
                if 'quantities' in results:
                    quantities = results.pop('quantities')

246
                for group_name, group_quantity in search_extension.groups.items():
247
                    if args.get(group_name, False):
248
                        results[group_name] = quantities[group_quantity.qualified_name]
249

250
            # build python code/curl snippet
251
252
253
254
255
            code_args = dict(request.args)
            if 'statistics' in code_args:
                del(code_args['statistics'])
            results['curl'] = query_api_curl('archive', 'query', query_string=code_args)
            results['python'] = query_api_python('archive', 'query', query_string=code_args)
256

257
            return results, 200
Markus Scheidgen's avatar
Markus Scheidgen committed
258
259
        except search.ScrollIdNotFound:
            abort(400, 'The given scroll_id does not exist.')
260
        except KeyError as e:
261
262
            import traceback
            traceback.print_exc()
263
            abort(400, str(e))
264

265

266
_query_model_parameters = {
267
268
269
270
271
    'owner': fields.String(description='Specify which calcs to return: ``all``, ``public``, ``user``, ``staging``, default is ``all``'),
    'from_time': RFC3339DateTime(description='A yyyy-MM-ddTHH:mm:ss (RFC3339) minimum entry time (e.g. upload time)'),
    'until_time': RFC3339DateTime(description='A yyyy-MM-ddTHH:mm:ss (RFC3339) maximum entry time (e.g. upload time)')
}

272
273
for qualified_name, quantity in search.search_quantities.items():
    if quantity.many_and:
274
275
276
277
        def field(**kwargs):
            return fields.List(fields.String(**kwargs))
    else:
        field = fields.String
278
    _query_model_parameters[qualified_name] = field(description=quantity.description)
279

280
_repo_query_model = api.model('RepoQuery', _query_model_parameters, skip_none=True)
281
282
283
284


def repo_edit_action_field(quantity):
    if quantity.is_scalar:
285
        return fields.Nested(_repo_edit_action_model, description=quantity.description, skip_none=True)
286
287
    else:
        return fields.List(
288
            fields.Nested(_repo_edit_action_model, skip_none=True), description=quantity.description)
289
290


291
_repo_edit_action_model = api.model('RepoEditAction', {
292
    'value': fields.String(description='The value/values that is set as a string.'),
293
294
295
296
    'success': fields.Boolean(description='If this can/could be done. Only in API response.'),
    'message': fields.String(descriptin='A message that details the action result. Only in API response.')
})

297
_repo_edit_model = api.model('RepoEdit', {
298
    'verify': fields.Boolean(description='If true, no action is performed.'),
299
    'query': fields.Nested(_repo_query_model, skip_none=True, description='New metadata will be applied to query results.'),
300
301
302
    'actions': fields.Nested(
        api.model('RepoEditActions', {
            quantity.name: repo_edit_action_field(quantity)
303
            for quantity in EditableUserMetadata.m_def.definitions
304
        }), skip_none=True,
305
306
307
        description='Each action specifies a single value (even for multi valued quantities).'),
    'success': fields.Boolean(description='If the overall edit can/could be done. Only in API response.'),
    'message': fields.String(description='A message that details the overall edit result. Only in API response.')
308
309
})

310
311
312
_editable_quantities = {
    quantity.name: quantity for quantity in EditableUserMetadata.m_def.definitions}

313

314
def edit(parsed_query: Dict[str, Any], mongo_update: Dict[str, Any] = None, re_index=True) -> List[str]:
315
    # get all calculations that have to change
316
    with utils.timer(common.logger, 'edit query executed'):
317
        search_request = search.SearchRequest().include('calc_id', 'upload_id')
318
        apply_search_parameters(search_request, parsed_query)
319
320
        upload_ids = set()
        calc_ids = []
321

322
323
324
        for hit in search_request.execute_scan():
            calc_ids.append(hit['calc_id'])
            upload_ids.add(hit['upload_id'])
325
326

    # perform the update on the mongo db
327
    with utils.timer(common.logger, 'edit mongo update executed', size=len(calc_ids)):
328
329
330
        if mongo_update is not None:
            n_updated = proc.Calc.objects(calc_id__in=calc_ids).update(multi=True, **mongo_update)
            if n_updated != len(calc_ids):
331
                common.logger.error('edit repo did not update all entries', payload=mongo_update)
332
333

    # re-index the affected entries in elastic search
334
    with utils.timer(common.logger, 'edit elastic update executed', size=len(calc_ids)):
335
336
        if re_index:
            def elastic_updates():
337
338
                upload_files_cache: Dict[str, files.UploadFiles] = dict()

339
                for calc in proc.Calc.objects(calc_id__in=calc_ids):
340
341
342
343
344
345
346
                    upload_id = calc.upload_id
                    upload_files = upload_files_cache.get(upload_id)
                    if upload_files is None:
                        upload_files = files.UploadFiles.get(upload_id, is_authorized=lambda: True)
                        upload_files_cache[upload_id] = upload_files

                    entry_metadata = calc.entry_metadata(upload_files)
347
                    entry = entry_metadata.a_elastic.create_index_entry().to_dict(include_meta=True)
348
                    entry['_op_type'] = 'index'
349

350
351
                    yield entry

352
353
354
                for upload_files in upload_files_cache.values():
                    upload_files.close()

355
356
357
358
            _, failed = elasticsearch.helpers.bulk(
                infrastructure.elastic_client, elastic_updates(), stats_only=True)
            search.refresh()
            if failed > 0:
359
                common.logger.error(
360
361
                    'edit repo with failed elastic updates',
                    payload=mongo_update, nfailed=len(failed))
362

363
364
    return list(upload_ids)

365

366
def get_uploader_ids(query):
367
    ''' Get all the uploader from the query, to check coauthers and shared_with for uploaders. '''
368
    search_request = search.SearchRequest()
369
    apply_search_parameters(search_request, query)
370
371
372
373
    search_request.quantity(name='uploader_id')
    return search_request.execute()['quantities']['uploader_id']['values']


374
375
@ns.route('/edit')
class EditRepoCalcsResource(Resource):
376
377
    @api.doc('edit_repo')
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
378
379
    @api.expect(_repo_edit_model)
    @api.marshal_with(_repo_edit_model, skip_none=True, code=200, description='Edit verified/performed')
380
    @authenticate()
381
    def post(self):
382
        ''' Edit repository metadata. '''
383
384

        # basic body parsing and some semantic checks
385
386
387
388
389
390
391
392
393
394
        json_data = request.get_json()
        if json_data is None:
            json_data = {}
        query = json_data.get('query', {})

        owner = query.get('owner', 'user')
        if owner not in ['user', 'staging']:
            abort(400, 'Not a valid owner for edit %s. Edit can only be performed in user or staging' % owner)
        query['owner'] = owner

395
396
397
398
        if 'actions' not in json_data:
            abort(400, 'Missing key actions in edit data')
        actions = json_data['actions']
        verify = json_data.get('verify', False)
399

400
401
        # preparing the query of entries that are edited
        parsed_query = {}
402
403
        for quantity_name, value in query.items():
            if quantity_name in _search_quantities:
404
405
406
407
                quantity = search.search_quantities[quantity_name]
                if quantity.many:
                    if not isinstance(value, list):
                        value = value.split(',')
408
409
                parsed_query[quantity_name] = value
        parsed_query['owner'] = owner
410
        parsed_query['domain'] = query.get('domain')
411

412
        # checking the edit actions and preparing a mongo update on the fly
413
        json_data['success'] = True
414
        mongo_update = {}
415
        uploader_ids = None
416
        lift_embargo = False
417
418
        removed_datasets = None

419
        with utils.timer(common.logger, 'edit verified'):
420
            for action_quantity_name, quantity_actions in actions.items():
421
                quantity = _editable_quantities.get(action_quantity_name)
422
423
424
                if quantity is None:
                    abort(400, 'Unknown quantity %s' % action_quantity_name)

425
                quantity_flask = quantity.m_get_annotations('flask', {})
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
                if quantity_flask.get('admin_only', False):
                    if not g.user.is_admin():
                        abort(404, 'Only the admin user can set %s' % quantity.name)

                if isinstance(quantity_actions, list) == quantity.is_scalar:
                    abort(400, 'Wrong shape for quantity %s' % action_quantity_name)

                if not isinstance(quantity_actions, list):
                    quantity_actions = [quantity_actions]

                flask_verify = quantity_flask.get('verify', None)
                mongo_key = 'metadata__%s' % quantity.name
                has_error = False
                for action in quantity_actions:
                    action['success'] = True
                    action['message'] = None
                    action_value = action.get('value')
                    action_value = action_value if action_value is None else action_value.strip()

                    if action_value is None:
                        mongo_value = None
447

448
                    elif action_value == '':
449
                        mongo_value = None
450
451
452
453
454

                    elif flask_verify == datamodel.User:
                        try:
                            mongo_value = User.get(user_id=action_value).user_id
                        except KeyError:
455
456
                            action['success'] = False
                            has_error = True
457
                            action['message'] = 'User does not exist'
458
                            continue
459

460
461
462
463
464
465
466
467
468
469
                        if uploader_ids is None:
                            uploader_ids = get_uploader_ids(parsed_query)
                        if action_value in uploader_ids:
                            action['success'] = False
                            has_error = True
                            action['message'] = 'This user is already an uploader of one entry in the query'
                            continue

                    elif flask_verify == datamodel.Dataset:
                        try:
470
                            mongo_value = Dataset.m_def.a_mongo.get(
471
472
473
474
475
476
477
                                user_id=g.user.user_id, name=action_value).dataset_id
                        except KeyError:
                            action['message'] = 'Dataset does not exist and will be created'
                            mongo_value = None
                            if not verify:
                                dataset = Dataset(
                                    dataset_id=utils.create_uuid(), user_id=g.user.user_id,
Markus Scheidgen's avatar
Markus Scheidgen committed
478
                                    name=action_value, created=datetime.utcnow())
479
                                dataset.a_mongo.create()
480
481
482
483
484
485
486
487
488
                                mongo_value = dataset.dataset_id

                    elif action_quantity_name == 'with_embargo':
                        # ignore the actual value ... just lift the embargo
                        mongo_value = False
                        lift_embargo = True

                        # check if necessary
                        search_request = search.SearchRequest()
489
                        apply_search_parameters(search_request, parsed_query)
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
                        search_request.q = search_request.q & Q('term', with_embargo=True)
                        if search_request.execute()['total'] == 0:
                            action['success'] = False
                            has_error = True
                            action['message'] = 'There is no embargo to lift'
                            continue
                    else:
                        mongo_value = action_value

                    if len(quantity.shape) == 0:
                        mongo_update[mongo_key] = mongo_value
                    else:
                        mongo_values = mongo_update.setdefault(mongo_key, [])
                        if mongo_value is not None:
                            if mongo_value in mongo_values:
                                action['success'] = False
                                has_error = True
                                action['message'] = 'Duplicate values are not allowed'
                                continue
                            mongo_values.append(mongo_value)

                if len(quantity_actions) == 0 and len(quantity.shape) > 0:
                    mongo_update[mongo_key] = []

                if action_quantity_name == 'datasets':
                    # check if datasets edit is allowed and if datasets have to be removed
                    search_request = search.SearchRequest()
517
                    apply_search_parameters(search_request, parsed_query)
518
519
520
521
522
523
524
525
526
                    search_request.quantity(name='dataset_id')
                    old_datasets = list(
                        search_request.execute()['quantities']['dataset_id']['values'].keys())

                    removed_datasets = []
                    for dataset_id in old_datasets:
                        if dataset_id not in mongo_update.get(mongo_key, []):
                            removed_datasets.append(dataset_id)

527
                    doi_ds = Dataset.m_def.a_mongo.objects(
528
529
530
531
532
533
                        dataset_id__in=removed_datasets, doi__ne=None).first()
                    if doi_ds is not None:
                        json_data['success'] = False
                        json_data['message'] = json_data.get('message', '') + \
                            'Edit would remove entries from a dataset with DOI (%s) ' % doi_ds.name
                        has_error = True
534

535
536
537
538
539
540
541
542
        # stop here, if client just wants to verify its actions
        if verify:
            return json_data, 200

        # stop if the action were not ok
        if has_error:
            return json_data, 400

543
        # perform the change
Markus Scheidgen's avatar
Markus Scheidgen committed
544
        mongo_update['metadata__last_edit'] = datetime.utcnow()
545
        upload_ids = edit(parsed_query, mongo_update, True)
546
547
548
549
550
551

        # lift embargo
        if lift_embargo:
            for upload_id in upload_ids:
                upload = proc.Upload.get(upload_id)
                upload.re_pack()
552

553
        # remove potentially empty old datasets
554
        if removed_datasets is not None:
555
556
            for dataset in removed_datasets:
                if proc.Calc.objects(metadata__dataset_id=dataset).first() is None:
557
                    Dataset.m_def.a_mongo.objects(dataset_id=dataset).delete()
558

559
        return json_data, 200
560

561

562
563
564
565
566
567
568
569
_repo_quantity_search_request_parser = api.parser()
add_search_parameters(_repo_quantity_search_request_parser)
_repo_quantity_search_request_parser.add_argument(
    'after', type=str, help='The after value to use for "scrolling".')
_repo_quantity_search_request_parser.add_argument(
    'size', type=int, help='The max size of the returned values.')

_repo_quantity_model = api.model('RepoQuantity', {
570
571
572
573
    'after': fields.String(description='The after value that can be used to retrieve the next set of values.'),
    'values': fields.Raw(description='A dict with values as key. Values are dicts with "total" and "examples" keys.')
})

574
575
_repo_quantity_values_model = api.model('RepoQuantityValues', {
    'quantity': fields.Nested(_repo_quantity_model, allow_null=True)
576
577
})

578

579
@ns.route('/quantity/<string:quantity>')
580
581
582
class RepoQuantityResource(Resource):
    @api.doc('quantity_search')
    @api.response(400, 'Invalid requests, e.g. wrong owner type, bad quantity, bad search parameters')
583
584
    @api.expect(_repo_quantity_search_request_parser, validate=True)
    @api.marshal_with(_repo_quantity_values_model, skip_none=True, code=200, description='Search results send')
585
    @authenticate()
586
    def get(self, quantity: str):
587
        '''
588
589
590
591
592
593
594
595
596
597
598
599
        Retrieve quantity values from entries matching the search.

        You can use the various quantities to search/filter for. For some of the
        indexed quantities this endpoint returns aggregation information. This means
        you will be given a list of all possible values and the number of entries
        that have the certain value. You can also use these aggregations on an empty
        search to determine the possible values.

        There is no ordering and no pagination. Instead there is an 'after' key based
        scrolling. The result will contain an 'after' value, that can be specified
        for the next request. You can use the 'size' and 'after' parameters accordingly.

600
601
602
        The result will contain a 'quantity' key with quantity values and the "after"
        value. There will be upto 'size' many values. For the rest of the values use the
        "after" parameter in another request.
603
        '''
604

605
        search_request = search.SearchRequest()
606
607
        args = {
            key: value
608
            for key, value in _repo_quantity_search_request_parser.parse_args().items()
609
            if value is not None}
610

611
        apply_search_parameters(search_request, args)
612
613
        after = args.get('after', None)
        size = args.get('size', 100)
614
615
616
617
618
619
620

        try:
            assert size >= 0
        except AssertionError:
            abort(400, message='invalid size')

        try:
621
            search_request.quantity(quantity, size=size, after=after)
622
623
624
            results = search_request.execute()
            quantities = results.pop('quantities')
            results['quantity'] = quantities[quantity]
625
626
627
628
629
630

            return results, 200
        except KeyError as e:
            import traceback
            traceback.print_exc()
            abort(400, 'Given quantity does not exist: %s' % str(e))
631
632


633
634
635
_repo_quantities_search_request_parser = api.parser()
add_search_parameters(_repo_quantities_search_request_parser)
_repo_quantities_search_request_parser.add_argument(
636
637
    'quantities', type=str, action='append',
    help='The quantities to retrieve values from')
638
_repo_quantities_search_request_parser.add_argument(
639
640
    'size', type=int, help='The max size of the returned values.')

641
642
643
644
645
_repo_quantities_model = api.model('RepoQuantitiesResponse', {
    'quantities': fields.Nested(api.model('RepoQuantities', {
        quantity: fields.List(fields.Nested(_repo_quantity_model))
        for quantity in search_extension.search_quantities
    }))
646
647
})

648
649
650
651
652

@ns.route('/quantities')
class RepoQuantitiesResource(Resource):
    @api.doc('quantities_search')
    @api.response(400, 'Invalid requests, e.g. wrong owner type, bad quantity, bad search parameters')
653
    @api.expect(_repo_quantities_search_request_parser, validate=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
654
    @api.marshal_with(_repo_quantities_model, skip_none=True, code=200, description='Search results send')
655
656
    @authenticate()
    def get(self):
657
        '''
658
659
660
661
662
663
664
665
666
667
668
669
670
        Retrieve quantity values for multiple quantities at once.

        You can use the various quantities to search/filter for. For some of the
        indexed quantities this endpoint returns aggregation information. This means
        you will be given a list of all possible values and the number of entries
        that have the certain value. You can also use these aggregations on an empty
        search to determine the possible values.

        There is no ordering and no pagination and not after key based scrolling. Instead
        there is an 'after' key based scrolling.

        The result will contain a 'quantities' key with a dict of quantity names and the
        retrieved values as values.
671
        '''
672
673
674
675

        search_request = search.SearchRequest()
        args = {
            key: value
676
            for key, value in _repo_quantities_search_request_parser.parse_args().items()
677
678
            if value is not None}

679
        apply_search_parameters(search_request, args)
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
        quantities = args.get('quantities', [])
        size = args.get('size', 5)

        try:
            assert size >= 0
        except AssertionError:
            abort(400, message='invalid size')

        for quantity in quantities:
            try:
                search_request.quantity(quantity, size=size)
            except KeyError as e:
                import traceback
                traceback.print_exc()
                abort(400, 'Given quantity does not exist: %s' % str(e))

        return search_request.execute(), 200


Markus Scheidgen's avatar
Markus Scheidgen committed
699
_repo_calc_id_model = api.model('RepoCalculationId', {
700
701
702
703
    'upload_id': fields.String(), 'calc_id': fields.String()
})


704
@ns.route('/pid/<path:pid>')
705
706
707
class RepoPidResource(Resource):
    @api.doc('resolve_pid')
    @api.response(404, 'Entry with PID does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
708
    @api.marshal_with(_repo_calc_id_model, skip_none=True, code=200, description='Entry resolved')
Markus Scheidgen's avatar
Markus Scheidgen committed
709
    @authenticate()
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
    def get(self, pid: str):
        if '/' in pid:
            prefix, pid = pid.split('/')
            if prefix != '21.11132':
                abort(400, 'Wrong PID format')
            try:
                pid_int = utils.decode_handle_id(pid)
            except ValueError:
                abort(400, 'Wrong PID format')
        else:
            try:
                pid_int = int(pid)
            except ValueError:
                abort(400, 'Wrong PID format')

725
        search_request = search.SearchRequest().include('upload_id', 'calc_id')
726
727
728

        if g.user is not None:
            search_request.owner('all', user_id=g.user.user_id)
729
        else:
730
731
            search_request.owner('all')

732
        search_request.search_parameter('pid', pid_int)
733
734
735
736
737

        results = list(search_request.execute_scan())
        total = len(results)

        if total == 0:
738
            abort(404, 'Entry with PID %s does not exist' % pid)
739
740

        if total > 1:
741
            common.logger.error('Two entries for the same pid', pid=pid_int)
742
743
744
745
746

        result = results[0]
        return dict(
            upload_id=result['upload_id'],
            calc_id=result['calc_id'])