repo.py 33.5 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
16
17
The repository API of the nomad@FAIRDI APIs. Currently allows to resolve repository
meta-data.
18
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
19

20
from typing import List, Dict, Any
Markus Scheidgen's avatar
Markus Scheidgen committed
21
from flask_restplus import Resource, abort, fields
22
from flask import request, g
23
from elasticsearch_dsl import Q
24
from elasticsearch.exceptions import NotFoundError
25
import elasticsearch.helpers
Markus Scheidgen's avatar
Markus Scheidgen committed
26
from datetime import datetime
Markus Scheidgen's avatar
Markus Scheidgen committed
27

28
from nomad import search, utils, datamodel, processing as proc, infrastructure, files
29
from nomad.metainfo import search_extension
30
from nomad.datamodel import Dataset, User, EditableUserMetadata
31
from nomad.app import common
32
from nomad.app.common import RFC3339DateTime, DotKeyNested, rfc3339DateTime
Markus Scheidgen's avatar
Markus Scheidgen committed
33

Markus Scheidgen's avatar
Markus Scheidgen committed
34
from .api import api
35
from .auth import authenticate
36
37
from .common import search_model, calc_route, add_pagination_parameters,\
    add_scroll_parameters, add_search_parameters, apply_search_parameters,\
38
    query_api_python, query_api_curl, query_api_clientlib, _search_quantities
Markus Scheidgen's avatar
Markus Scheidgen committed
39

40
ns = api.namespace('repo', description='Access repository metadata.')
Markus Scheidgen's avatar
Markus Scheidgen committed
41
42
43
44
45


@calc_route(ns)
class RepoCalcResource(Resource):
    @api.response(404, 'The upload or calculation does not exist')
46
    @api.response(401, 'Not authorized to access the calculation')
47
    @api.response(200, 'Metadata send', fields.Raw)
48
    @api.doc('get_repo_calc')
49
    @authenticate()
50
    def get(self, upload_id, calc_id):
51
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
52
53
        Get calculation metadata in repository form.

54
        Repository metadata only entails the quantities shown in the repository.
55
        Calcs are references via *upload_id*, *calc_id* pairs.
56
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
57
        try:
58
            calc = search.entry_document.get(calc_id)
59
60
61
62
63
64
65
        except NotFoundError:
            abort(404, message='There is no calculation %s/%s' % (upload_id, calc_id))

        if calc.with_embargo or not calc.published:
            if g.user is None:
                abort(401, message='Not logged in to access %s/%s.' % (upload_id, calc_id))

66
            if not (any(g.user.user_id == user.user_id for user in calc.owners) or g.user.is_admin):
67
68
                abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))

69
        result = calc.to_dict()
70
71
72
73
74
        result['code'] = {
            'python': query_api_python('archive', upload_id, calc_id),
            'curl': query_api_curl('archive', upload_id, calc_id),
            'clientlib': query_api_clientlib(upload_id=[upload_id], calc_id=[calc_id])
        }
Markus Scheidgen's avatar
Markus Scheidgen committed
75

76
        return result, 200
Markus Scheidgen's avatar
Markus Scheidgen committed
77

78

79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
def resolve_interval(from_time, until_time):
    if from_time is None:
        from_time = datetime.fromtimestamp(0)
    if until_time is None:
        until_time = datetime.utcnow()
    dt = rfc3339DateTime.parse(until_time) - rfc3339DateTime.parse(from_time)

    if dt.days >= 1826:
        return '1y'
    elif dt.days >= 731:
        return '1q'
    elif dt.days >= 121:
        return '1M'
    elif dt.days >= 28:
        return '1w'
    elif dt.days >= 4:
        return '1d'
    elif dt.total_seconds() >= 14400:
        return '1h'
    else:
        return '1m'


102
103
104
105
106
_search_request_parser = api.parser()
add_pagination_parameters(_search_request_parser)
add_scroll_parameters(_search_request_parser)
add_search_parameters(_search_request_parser)
_search_request_parser.add_argument(
107
    'date_histogram', type=bool, help='Add an additional aggregation over the upload time')
108
109
_search_request_parser.add_argument(
    'interval', type=str, help='Interval to use for upload time aggregation.')
110
_search_request_parser.add_argument(
111
    'metrics', type=str, action='append', help=(
112
        'Metrics to aggregate over all quantities and their values as comma separated list. '
113
        'Possible values are %s.' % ', '.join(search_extension.metrics.keys())))
114
_search_request_parser.add_argument(
Markus Scheidgen's avatar
Markus Scheidgen committed
115
    'statistics', type=bool, help=('Return statistics.'))
116
117
_search_request_parser.add_argument(
    'statistics_order', type=str, help='Statistics order (can be _key or _count)')
118
119
_search_request_parser.add_argument(
    'exclude', type=str, action='split', help='Excludes the given keys in the returned data.')
120
for group_name in search_extension.groups:
121
    _search_request_parser.add_argument(
122
        group_name, type=bool, help=('Return %s group data.' % group_name))
123
    _search_request_parser.add_argument(
Markus Scheidgen's avatar
Markus Scheidgen committed
124
125
126
        '%s_after' % group_name, type=str,
        help='The last %s id of the last scroll window for the %s group' % (group_name, group_name))

127
128
129
130
131
_repo_calcs_model_fields = {
    'statistics': fields.Raw(description=(
        'A dict with all statistics. Each statistic is dictionary with a metrics dict as '
        'value and quantity value as key. The possible metrics are code runs(calcs), %s. '
        'There is a pseudo quantity "total" with a single value "all" that contains the '
132
        ' metrics over all results. ' % ', '.join(search_extension.metrics.keys())))}
133

134
for group_name in search_extension.groups:
135
    _repo_calcs_model_fields[group_name] = (DotKeyNested if '.' in group_name else fields.Nested)(api.model('RepoGroup', {
136
        'after': fields.String(description='The after value that can be used to retrieve the next %s.' % group_name),
137
        'values': fields.Raw(description='A dict with %s as key. The values are dicts with "total" and "examples" keys.' % group_name)
138
    }), skip_none=True)
139

140
for qualified_name, quantity in search_extension.search_quantities.items():
141
    _repo_calcs_model_fields[qualified_name] = fields.Raw(
142
143
        description=quantity.description, allow_null=True, skip_none=True)

144
_repo_calcs_model = api.inherit('RepoCalculations', search_model, _repo_calcs_model_fields)
145
146


Markus Scheidgen's avatar
Markus Scheidgen committed
147
148
@ns.route('/')
class RepoCalcsResource(Resource):
149
    @api.doc('search')
150
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
151
152
    @api.expect(_search_request_parser, validate=True)
    @api.marshal_with(_repo_calcs_model, skip_none=True, code=200, description='Search results send')
153
    @authenticate()
Markus Scheidgen's avatar
Markus Scheidgen committed
154
    def get(self):
155
        '''
156
        Search for calculations in the repository form, paginated.
157
158

        The ``owner`` parameter determines the overall entries to search through.
159
160
161
162
        Possible values are: ``all`` (show all entries visible to the current user), ``public``
        (show all publically visible entries), ``user`` (show all user entries, requires login),
        ``staging`` (show all user entries in staging area, requires login).

163
164
165
166
167
        You can use the various quantities to search/filter for. For some of the
        indexed quantities this endpoint returns aggregation information. This means
        you will be given a list of all possible values and the number of entries
        that have the certain value. You can also use these aggregations on an empty
        search to determine the possible values.
168
169
170

        The pagination parameters allows determine which page to return via the
        ``page`` and ``per_page`` parameters. Pagination however, is limited to the first
Markus Scheidgen's avatar
Markus Scheidgen committed
171
172
173
174
175
176
177
178
        100k (depending on ES configuration) hits.

        An alternative to pagination is to use ``scroll`` and ``scroll_id``. With ``scroll``
        you will get a ``scroll_id`` on the first request. Each call with ``scroll`` and
        the respective ``scroll_id`` will return the next ``per_page`` (here the default is 1000)
        results. Scroll however, ignores ordering and does not return aggregations.
        The scroll view used in the background will stay alive for 1 minute between requests.
        If the given ``scroll_id`` is not available anymore, a HTTP 400 is raised.
179
180
181
182

        The search will return aggregations on a predefined set of quantities. Aggregations
        will tell you what quantity values exist and how many entries match those values.

183
184
        Ordering is determined by ``order_by`` and ``order`` parameters. Default is
        ``upload_time`` in decending order.
185
        '''
186
187

        try:
188
            parsed_args = _search_request_parser.parse_args()
189
            args = {
190
                key: value for key, value in parsed_args.items()
191
192
193
194
195
196
197
                if value is not None}

            scroll = args.get('scroll', False)
            scroll_id = args.get('scroll_id', None)
            page = args.get('page', 1)
            per_page = args.get('per_page', 10 if not scroll else 1000)
            order = args.get('order', -1)
198
            order_by = args.get('order_by', 'upload_time')
199
200

            date_histogram = args.get('date_histogram', False)
201
            interval = args.get('interval', 'auto')
202
            metrics: List[str] = request.args.getlist('metrics')
Markus Scheidgen's avatar
Markus Scheidgen committed
203

Markus Scheidgen's avatar
Markus Scheidgen committed
204
            with_statistics = args.get('statistics', False) or \
205
                any(args.get(group_name, False) for group_name in search_extension.groups)
206
            statistics_order = args.get('statistics_order', '_key')
207
208
209
210
        except Exception as e:
            abort(400, message='bad parameters: %s' % str(e))

        search_request = search.SearchRequest()
211
        apply_search_parameters(search_request, args)
212
        if date_histogram:
213
214
215
216
217
218
219
            if interval == 'auto':
                try:
                    from_time = args.get('from_time', None)
                    until_time = args.get('until_time', None)
                    interval = resolve_interval(from_time, until_time)
                except Exception:
                    abort(400, message='encountered error resolving time interval')
220
            search_request.date_histogram(interval=interval)
221

222
        try:
223
            assert page >= 1
224
            assert per_page >= 0
225
226
227
        except AssertionError:
            abort(400, message='invalid pagination')

228
229
230
        if order not in [-1, 1]:
            abort(400, message='invalid pagination')

231
        for metric in metrics:
232
            if metric not in search_extension.metrics:
233
234
                abort(400, message='there is no metric %s' % metric)

Markus Scheidgen's avatar
Markus Scheidgen committed
235
        if with_statistics:
236
            search_request.default_statistics(metrics_to_use=metrics, statistics_order=statistics_order)
237

Markus Scheidgen's avatar
Markus Scheidgen committed
238
            additional_metrics = [
239
                group_quantity.metric_name
240
                for group_name, group_quantity in search_extension.groups.items()
241
                if args.get(group_name, False)]
242
243
244

            total_metrics = metrics + additional_metrics

Markus Scheidgen's avatar
Markus Scheidgen committed
245
246
            search_request.totals(metrics_to_use=total_metrics)
            search_request.statistic('authors', 1000)
247
248
        elif len(metrics) > 0:
            search_request.totals(metrics_to_use=metrics)
249

250
        if 'exclude' in parsed_args:
251
252
253
            excludes = parsed_args['exclude']
            if excludes is not None:
                search_request.exclude(*excludes)
254

255
        try:
256
            if scroll:
257
                results = search_request.execute_scrolled(scroll_id=scroll_id, size=per_page)
258

259
            else:
260
                for group_name, group_quantity in search_extension.groups.items():
261
                    if args.get(group_name, False):
262
                        kwargs: Dict[str, Any] = {}
263
                        if group_name == 'group_uploads':
264
                            kwargs.update(order_by='upload_time', order='desc')
Markus Scheidgen's avatar
Markus Scheidgen committed
265
                        search_request.quantity(
266
                            group_quantity.qualified_name, size=per_page, examples=1,
267
268
                            after=request.args.get('%s_after' % group_name, None),
                            **kwargs)
269

270
271
                results = search_request.execute_paginated(
                    per_page=per_page, page=page, order=order, order_by=order_by)
272
273

                # TODO just a work around to make things prettier
Markus Scheidgen's avatar
Markus Scheidgen committed
274
275
276
277
278
                if with_statistics:
                    statistics = results['statistics']
                    if 'code_name' in statistics and 'currupted mainfile' in statistics['code_name']:
                        del(statistics['code_name']['currupted mainfile'])

279
280
281
                if 'quantities' in results:
                    quantities = results.pop('quantities')

282
                for group_name, group_quantity in search_extension.groups.items():
283
                    if args.get(group_name, False):
284
                        results[group_name] = quantities[group_quantity.qualified_name]
285

286
            # build python code/curl snippet
287
288
289
            code_args = dict(request.args)
            if 'statistics' in code_args:
                del(code_args['statistics'])
290
291
292
293
294
            results['code'] = {
                'curl': query_api_curl('archive', 'query', query_string=code_args),
                'python': query_api_python('archive', 'query', query_string=code_args),
                'clientlib': query_api_clientlib(**code_args)
            }
295

296
            return results, 200
Markus Scheidgen's avatar
Markus Scheidgen committed
297
298
        except search.ScrollIdNotFound:
            abort(400, 'The given scroll_id does not exist.')
299
        except KeyError as e:
300
301
            import traceback
            traceback.print_exc()
302
            abort(400, str(e))
303

304

305
_query_model_parameters = {
306
307
308
309
310
    'owner': fields.String(description='Specify which calcs to return: ``all``, ``public``, ``user``, ``staging``, default is ``all``'),
    'from_time': RFC3339DateTime(description='A yyyy-MM-ddTHH:mm:ss (RFC3339) minimum entry time (e.g. upload time)'),
    'until_time': RFC3339DateTime(description='A yyyy-MM-ddTHH:mm:ss (RFC3339) maximum entry time (e.g. upload time)')
}

311
312
for qualified_name, quantity in search.search_quantities.items():
    if quantity.many_and:
313
314
315
316
        def field(**kwargs):
            return fields.List(fields.String(**kwargs))
    else:
        field = fields.String
317
    _query_model_parameters[qualified_name] = field(description=quantity.description)
318

319
_repo_query_model = api.model('RepoQuery', _query_model_parameters, skip_none=True)
320
321
322
323


def repo_edit_action_field(quantity):
    if quantity.is_scalar:
324
        return fields.Nested(_repo_edit_action_model, description=quantity.description, skip_none=True)
325
326
    else:
        return fields.List(
327
            fields.Nested(_repo_edit_action_model, skip_none=True), description=quantity.description)
328
329


330
_repo_edit_action_model = api.model('RepoEditAction', {
331
    'value': fields.String(description='The value/values that is set as a string.'),
332
333
334
335
    'success': fields.Boolean(description='If this can/could be done. Only in API response.'),
    'message': fields.String(descriptin='A message that details the action result. Only in API response.')
})

336
_repo_edit_model = api.model('RepoEdit', {
337
    'verify': fields.Boolean(description='If true, no action is performed.'),
338
    'query': fields.Nested(_repo_query_model, skip_none=True, description='New metadata will be applied to query results.'),
339
340
341
    'actions': fields.Nested(
        api.model('RepoEditActions', {
            quantity.name: repo_edit_action_field(quantity)
342
            for quantity in EditableUserMetadata.m_def.definitions
343
        }), skip_none=True,
344
345
346
        description='Each action specifies a single value (even for multi valued quantities).'),
    'success': fields.Boolean(description='If the overall edit can/could be done. Only in API response.'),
    'message': fields.String(description='A message that details the overall edit result. Only in API response.')
347
348
})

349
350
351
_editable_quantities = {
    quantity.name: quantity for quantity in EditableUserMetadata.m_def.definitions}

352

353
def edit(parsed_query: Dict[str, Any], mongo_update: Dict[str, Any] = None, re_index=True) -> List[str]:
354
    # get all calculations that have to change
355
    with utils.timer(common.logger, 'edit query executed'):
356
        search_request = search.SearchRequest().include('calc_id', 'upload_id')
357
        apply_search_parameters(search_request, parsed_query)
358
359
        upload_ids = set()
        calc_ids = []
360

361
362
363
        for hit in search_request.execute_scan():
            calc_ids.append(hit['calc_id'])
            upload_ids.add(hit['upload_id'])
364
365

    # perform the update on the mongo db
366
    with utils.timer(common.logger, 'edit mongo update executed', size=len(calc_ids)):
367
368
369
        if mongo_update is not None:
            n_updated = proc.Calc.objects(calc_id__in=calc_ids).update(multi=True, **mongo_update)
            if n_updated != len(calc_ids):
370
                common.logger.error('edit repo did not update all entries', payload=mongo_update)
371
372

    # re-index the affected entries in elastic search
373
    with utils.timer(common.logger, 'edit elastic update executed', size=len(calc_ids)):
374
375
        if re_index:
            def elastic_updates():
376
377
                upload_files_cache: Dict[str, files.UploadFiles] = dict()

378
                for calc in proc.Calc.objects(calc_id__in=calc_ids):
379
380
381
382
383
384
385
                    upload_id = calc.upload_id
                    upload_files = upload_files_cache.get(upload_id)
                    if upload_files is None:
                        upload_files = files.UploadFiles.get(upload_id, is_authorized=lambda: True)
                        upload_files_cache[upload_id] = upload_files

                    entry_metadata = calc.entry_metadata(upload_files)
386
                    entry = entry_metadata.a_elastic.create_index_entry().to_dict(include_meta=True)
387
                    entry['_op_type'] = 'index'
388

389
390
                    yield entry

391
392
393
                for upload_files in upload_files_cache.values():
                    upload_files.close()

394
395
396
397
            _, failed = elasticsearch.helpers.bulk(
                infrastructure.elastic_client, elastic_updates(), stats_only=True)
            search.refresh()
            if failed > 0:
398
                common.logger.error(
399
400
                    'edit repo with failed elastic updates',
                    payload=mongo_update, nfailed=len(failed))
401

402
403
    return list(upload_ids)

404

405
def get_uploader_ids(query):
406
    ''' Get all the uploader from the query, to check coauthers and shared_with for uploaders. '''
407
    search_request = search.SearchRequest()
408
    apply_search_parameters(search_request, query)
409
410
411
412
    search_request.quantity(name='uploader_id')
    return search_request.execute()['quantities']['uploader_id']['values']


413
414
@ns.route('/edit')
class EditRepoCalcsResource(Resource):
415
416
    @api.doc('edit_repo')
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
417
418
    @api.expect(_repo_edit_model)
    @api.marshal_with(_repo_edit_model, skip_none=True, code=200, description='Edit verified/performed')
419
    @authenticate()
420
    def post(self):
421
        ''' Edit repository metadata. '''
422
423

        # basic body parsing and some semantic checks
424
425
426
427
428
429
430
431
432
433
        json_data = request.get_json()
        if json_data is None:
            json_data = {}
        query = json_data.get('query', {})

        owner = query.get('owner', 'user')
        if owner not in ['user', 'staging']:
            abort(400, 'Not a valid owner for edit %s. Edit can only be performed in user or staging' % owner)
        query['owner'] = owner

434
435
436
437
        if 'actions' not in json_data:
            abort(400, 'Missing key actions in edit data')
        actions = json_data['actions']
        verify = json_data.get('verify', False)
438

439
440
        # preparing the query of entries that are edited
        parsed_query = {}
441
442
        for quantity_name, value in query.items():
            if quantity_name in _search_quantities:
443
444
445
446
                quantity = search.search_quantities[quantity_name]
                if quantity.many:
                    if not isinstance(value, list):
                        value = value.split(',')
447
448
                parsed_query[quantity_name] = value
        parsed_query['owner'] = owner
449
        parsed_query['domain'] = query.get('domain')
450

451
        # checking the edit actions and preparing a mongo update on the fly
452
        json_data['success'] = True
453
        mongo_update = {}
454
        uploader_ids = None
455
        lift_embargo = False
456
457
        removed_datasets = None

458
        with utils.timer(common.logger, 'edit verified'):
459
            for action_quantity_name, quantity_actions in actions.items():
460
                quantity = _editable_quantities.get(action_quantity_name)
461
462
463
                if quantity is None:
                    abort(400, 'Unknown quantity %s' % action_quantity_name)

464
                quantity_flask = quantity.m_get_annotations('flask', {})
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
                if quantity_flask.get('admin_only', False):
                    if not g.user.is_admin():
                        abort(404, 'Only the admin user can set %s' % quantity.name)

                if isinstance(quantity_actions, list) == quantity.is_scalar:
                    abort(400, 'Wrong shape for quantity %s' % action_quantity_name)

                if not isinstance(quantity_actions, list):
                    quantity_actions = [quantity_actions]

                flask_verify = quantity_flask.get('verify', None)
                mongo_key = 'metadata__%s' % quantity.name
                has_error = False
                for action in quantity_actions:
                    action['success'] = True
                    action['message'] = None
                    action_value = action.get('value')
                    action_value = action_value if action_value is None else action_value.strip()

                    if action_value is None:
                        mongo_value = None
486

487
                    elif action_value == '':
488
                        mongo_value = None
489
490
491
492
493

                    elif flask_verify == datamodel.User:
                        try:
                            mongo_value = User.get(user_id=action_value).user_id
                        except KeyError:
494
495
                            action['success'] = False
                            has_error = True
496
                            action['message'] = 'User does not exist'
497
                            continue
498

499
500
501
502
503
504
505
506
507
508
                        if uploader_ids is None:
                            uploader_ids = get_uploader_ids(parsed_query)
                        if action_value in uploader_ids:
                            action['success'] = False
                            has_error = True
                            action['message'] = 'This user is already an uploader of one entry in the query'
                            continue

                    elif flask_verify == datamodel.Dataset:
                        try:
509
                            mongo_value = Dataset.m_def.a_mongo.get(
510
511
512
513
514
515
516
                                user_id=g.user.user_id, name=action_value).dataset_id
                        except KeyError:
                            action['message'] = 'Dataset does not exist and will be created'
                            mongo_value = None
                            if not verify:
                                dataset = Dataset(
                                    dataset_id=utils.create_uuid(), user_id=g.user.user_id,
Markus Scheidgen's avatar
Markus Scheidgen committed
517
                                    name=action_value, created=datetime.utcnow())
518
                                dataset.a_mongo.create()
519
520
521
522
523
524
525
526
527
                                mongo_value = dataset.dataset_id

                    elif action_quantity_name == 'with_embargo':
                        # ignore the actual value ... just lift the embargo
                        mongo_value = False
                        lift_embargo = True

                        # check if necessary
                        search_request = search.SearchRequest()
528
                        apply_search_parameters(search_request, parsed_query)
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
                        search_request.q = search_request.q & Q('term', with_embargo=True)
                        if search_request.execute()['total'] == 0:
                            action['success'] = False
                            has_error = True
                            action['message'] = 'There is no embargo to lift'
                            continue
                    else:
                        mongo_value = action_value

                    if len(quantity.shape) == 0:
                        mongo_update[mongo_key] = mongo_value
                    else:
                        mongo_values = mongo_update.setdefault(mongo_key, [])
                        if mongo_value is not None:
                            if mongo_value in mongo_values:
                                action['success'] = False
                                has_error = True
                                action['message'] = 'Duplicate values are not allowed'
                                continue
                            mongo_values.append(mongo_value)

                if len(quantity_actions) == 0 and len(quantity.shape) > 0:
                    mongo_update[mongo_key] = []

                if action_quantity_name == 'datasets':
                    # check if datasets edit is allowed and if datasets have to be removed
                    search_request = search.SearchRequest()
556
                    apply_search_parameters(search_request, parsed_query)
557
558
559
560
561
562
563
564
565
                    search_request.quantity(name='dataset_id')
                    old_datasets = list(
                        search_request.execute()['quantities']['dataset_id']['values'].keys())

                    removed_datasets = []
                    for dataset_id in old_datasets:
                        if dataset_id not in mongo_update.get(mongo_key, []):
                            removed_datasets.append(dataset_id)

566
                    doi_ds = Dataset.m_def.a_mongo.objects(
567
568
569
570
571
572
                        dataset_id__in=removed_datasets, doi__ne=None).first()
                    if doi_ds is not None:
                        json_data['success'] = False
                        json_data['message'] = json_data.get('message', '') + \
                            'Edit would remove entries from a dataset with DOI (%s) ' % doi_ds.name
                        has_error = True
573

574
575
576
577
578
579
580
581
        # stop here, if client just wants to verify its actions
        if verify:
            return json_data, 200

        # stop if the action were not ok
        if has_error:
            return json_data, 400

582
        # perform the change
Markus Scheidgen's avatar
Markus Scheidgen committed
583
        mongo_update['metadata__last_edit'] = datetime.utcnow()
584
        upload_ids = edit(parsed_query, mongo_update, True)
585
586
587
588
589
590

        # lift embargo
        if lift_embargo:
            for upload_id in upload_ids:
                upload = proc.Upload.get(upload_id)
                upload.re_pack()
591

592
        # remove potentially empty old datasets
593
        if removed_datasets is not None:
594
595
            for dataset in removed_datasets:
                if proc.Calc.objects(metadata__dataset_id=dataset).first() is None:
596
                    Dataset.m_def.a_mongo.objects(dataset_id=dataset).delete()
597

598
        return json_data, 200
599

600

601
602
603
604
605
606
607
608
_repo_quantity_search_request_parser = api.parser()
add_search_parameters(_repo_quantity_search_request_parser)
_repo_quantity_search_request_parser.add_argument(
    'after', type=str, help='The after value to use for "scrolling".')
_repo_quantity_search_request_parser.add_argument(
    'size', type=int, help='The max size of the returned values.')

_repo_quantity_model = api.model('RepoQuantity', {
609
610
611
612
    'after': fields.String(description='The after value that can be used to retrieve the next set of values.'),
    'values': fields.Raw(description='A dict with values as key. Values are dicts with "total" and "examples" keys.')
})

613
614
_repo_quantity_values_model = api.model('RepoQuantityValues', {
    'quantity': fields.Nested(_repo_quantity_model, allow_null=True)
615
616
})

617

618
@ns.route('/quantity/<string:quantity>')
619
620
621
class RepoQuantityResource(Resource):
    @api.doc('quantity_search')
    @api.response(400, 'Invalid requests, e.g. wrong owner type, bad quantity, bad search parameters')
622
623
    @api.expect(_repo_quantity_search_request_parser, validate=True)
    @api.marshal_with(_repo_quantity_values_model, skip_none=True, code=200, description='Search results send')
624
    @authenticate()
625
    def get(self, quantity: str):
626
        '''
627
628
629
630
631
632
633
634
635
636
637
638
        Retrieve quantity values from entries matching the search.

        You can use the various quantities to search/filter for. For some of the
        indexed quantities this endpoint returns aggregation information. This means
        you will be given a list of all possible values and the number of entries
        that have the certain value. You can also use these aggregations on an empty
        search to determine the possible values.

        There is no ordering and no pagination. Instead there is an 'after' key based
        scrolling. The result will contain an 'after' value, that can be specified
        for the next request. You can use the 'size' and 'after' parameters accordingly.

639
640
641
        The result will contain a 'quantity' key with quantity values and the "after"
        value. There will be upto 'size' many values. For the rest of the values use the
        "after" parameter in another request.
642
        '''
643

644
        search_request = search.SearchRequest()
645
646
        args = {
            key: value
647
            for key, value in _repo_quantity_search_request_parser.parse_args().items()
648
            if value is not None}
649

650
        apply_search_parameters(search_request, args)
651
652
        after = args.get('after', None)
        size = args.get('size', 100)
653
654
655
656
657
658
659

        try:
            assert size >= 0
        except AssertionError:
            abort(400, message='invalid size')

        try:
660
            search_request.quantity(quantity, size=size, after=after)
661
662
663
            results = search_request.execute()
            quantities = results.pop('quantities')
            results['quantity'] = quantities[quantity]
664
665
666
667
668
669

            return results, 200
        except KeyError as e:
            import traceback
            traceback.print_exc()
            abort(400, 'Given quantity does not exist: %s' % str(e))
670
671


672
673
674
_repo_quantities_search_request_parser = api.parser()
add_search_parameters(_repo_quantities_search_request_parser)
_repo_quantities_search_request_parser.add_argument(
675
676
    'quantities', type=str, action='append',
    help='The quantities to retrieve values from')
677
_repo_quantities_search_request_parser.add_argument(
678
679
    'size', type=int, help='The max size of the returned values.')

680
681
682
683
684
_repo_quantities_model = api.model('RepoQuantitiesResponse', {
    'quantities': fields.Nested(api.model('RepoQuantities', {
        quantity: fields.List(fields.Nested(_repo_quantity_model))
        for quantity in search_extension.search_quantities
    }))
685
686
})

687
688
689
690
691

@ns.route('/quantities')
class RepoQuantitiesResource(Resource):
    @api.doc('quantities_search')
    @api.response(400, 'Invalid requests, e.g. wrong owner type, bad quantity, bad search parameters')
692
    @api.expect(_repo_quantities_search_request_parser, validate=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
693
    @api.marshal_with(_repo_quantities_model, skip_none=True, code=200, description='Search results send')
694
695
    @authenticate()
    def get(self):
696
        '''
697
698
699
700
701
702
703
704
705
706
707
708
709
        Retrieve quantity values for multiple quantities at once.

        You can use the various quantities to search/filter for. For some of the
        indexed quantities this endpoint returns aggregation information. This means
        you will be given a list of all possible values and the number of entries
        that have the certain value. You can also use these aggregations on an empty
        search to determine the possible values.

        There is no ordering and no pagination and not after key based scrolling. Instead
        there is an 'after' key based scrolling.

        The result will contain a 'quantities' key with a dict of quantity names and the
        retrieved values as values.
710
        '''
711
712
713
714

        search_request = search.SearchRequest()
        args = {
            key: value
715
            for key, value in _repo_quantities_search_request_parser.parse_args().items()
716
717
            if value is not None}

718
        apply_search_parameters(search_request, args)
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
        quantities = args.get('quantities', [])
        size = args.get('size', 5)

        try:
            assert size >= 0
        except AssertionError:
            abort(400, message='invalid size')

        for quantity in quantities:
            try:
                search_request.quantity(quantity, size=size)
            except KeyError as e:
                import traceback
                traceback.print_exc()
                abort(400, 'Given quantity does not exist: %s' % str(e))

        return search_request.execute(), 200


Markus Scheidgen's avatar
Markus Scheidgen committed
738
_repo_calc_id_model = api.model('RepoCalculationId', {
739
740
741
742
    'upload_id': fields.String(), 'calc_id': fields.String()
})


743
@ns.route('/pid/<path:pid>')
744
745
746
class RepoPidResource(Resource):
    @api.doc('resolve_pid')
    @api.response(404, 'Entry with PID does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
747
    @api.marshal_with(_repo_calc_id_model, skip_none=True, code=200, description='Entry resolved')
Markus Scheidgen's avatar
Markus Scheidgen committed
748
    @authenticate()
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
    def get(self, pid: str):
        if '/' in pid:
            prefix, pid = pid.split('/')
            if prefix != '21.11132':
                abort(400, 'Wrong PID format')
            try:
                pid_int = utils.decode_handle_id(pid)
            except ValueError:
                abort(400, 'Wrong PID format')
        else:
            try:
                pid_int = int(pid)
            except ValueError:
                abort(400, 'Wrong PID format')

764
        search_request = search.SearchRequest().include('upload_id', 'calc_id')
765
766
767

        if g.user is not None:
            search_request.owner('all', user_id=g.user.user_id)
768
        else:
769
770
            search_request.owner('all')

771
        search_request.search_parameter('pid', pid_int)
772
773
774
775
776

        results = list(search_request.execute_scan())
        total = len(results)

        if total == 0:
777
            abort(404, 'Entry with PID %s does not exist' % pid)
778
779

        if total > 1:
780
            common.logger.error('Two entries for the same pid', pid=pid_int)
781
782
783
784
785

        result = results[0]
        return dict(
            upload_id=result['upload_id'],
            calc_id=result['calc_id'])