repo.py 32.8 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The repository API of the nomad@FAIRDI APIs. Currently allows to resolve repository
meta-data.
"""

20
from typing import List, Dict, Any
Markus Scheidgen's avatar
Markus Scheidgen committed
21
from flask_restplus import Resource, abort, fields
22
from flask import request, g
23
from elasticsearch_dsl import Q
24
from elasticsearch.exceptions import NotFoundError
25
import elasticsearch.helpers
Markus Scheidgen's avatar
Markus Scheidgen committed
26
from datetime import datetime
27
import os.path
Markus Scheidgen's avatar
Markus Scheidgen committed
28

29
30
from nomad import search, utils, datamodel, processing as proc, infrastructure
from nomad.app.utils import rfc3339DateTime, RFC3339DateTime, with_logger
31
from nomad.app.optimade import filterparser
32
from nomad.datamodel import UserMetadata, Dataset, User
Markus Scheidgen's avatar
Markus Scheidgen committed
33

Markus Scheidgen's avatar
Markus Scheidgen committed
34
from .api import api
35
from .auth import authenticate
36
from .common import pagination_model, pagination_request_parser, calc_route, build_snippet
Markus Scheidgen's avatar
Markus Scheidgen committed
37

38
ns = api.namespace('repo', description='Access repository metadata.')
Markus Scheidgen's avatar
Markus Scheidgen committed
39
40
41
42
43


@calc_route(ns)
class RepoCalcResource(Resource):
    @api.response(404, 'The upload or calculation does not exist')
44
    @api.response(401, 'Not authorized to access the calculation')
45
    @api.response(200, 'Metadata send', fields.Raw)
46
    @api.doc('get_repo_calc')
47
    @authenticate()
48
    def get(self, upload_id, calc_id):
Markus Scheidgen's avatar
Markus Scheidgen committed
49
50
51
        """
        Get calculation metadata in repository form.

52
        Repository metadata only entails the quantities shown in the repository.
53
        Calcs are references via *upload_id*, *calc_id* pairs.
Markus Scheidgen's avatar
Markus Scheidgen committed
54
55
        """
        try:
56
57
58
59
60
61
62
63
            calc = search.Entry.get(calc_id)
        except NotFoundError:
            abort(404, message='There is no calculation %s/%s' % (upload_id, calc_id))

        if calc.with_embargo or not calc.published:
            if g.user is None:
                abort(401, message='Not logged in to access %s/%s.' % (upload_id, calc_id))

64
            if not (any(g.user.user_id == user.user_id for user in calc.owners) or g.user.is_admin):
65
66
67
                abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))

        return calc.to_dict(), 200
Markus Scheidgen's avatar
Markus Scheidgen committed
68
69


Markus Scheidgen's avatar
Markus Scheidgen committed
70
repo_calcs_model_fields = {
Markus Scheidgen's avatar
Markus Scheidgen committed
71
    'pagination': fields.Nested(pagination_model, skip_none=True),
72
73
74
75
    'scroll': fields.Nested(allow_null=True, skip_none=True, model=api.model('Scroll', {
        'total': fields.Integer(description='The total amount of hits for the search.'),
        'scroll_id': fields.String(allow_null=True, description='The scroll_id that can be used to retrieve the next page.'),
        'size': fields.Integer(help='The size of the returned scroll page.')})),
76
77
78
    'results': fields.List(fields.Raw, description=(
        'A list of search results. Each result is a dict with quantitie names as key and '
        'values as values')),
79
80
81
82
    'statistics': fields.Raw(description=(
        'A dict with all statistics. Each statistic is dictionary with a metrics dict as '
        'value and quantity value as key. The possible metrics are code runs(calcs), %s. '
        'There is a pseudo quantity "total" with a single value "all" that contains the '
83
        ' metrics over all results. ' % ', '.join(datamodel.Domain.instance.metrics_names))),
84
85
    'code_snippet': fields.String(description=(
        'A string of python code snippet which can be executed to reproduce the api result.')),
Markus Scheidgen's avatar
Markus Scheidgen committed
86
87
88
89
90
}
for group_name, (group_quantity, _) in search.groups.items():
    repo_calcs_model_fields[group_name] = fields.Nested(api.model('RepoDatasets', {
        'after': fields.String(description='The after value that can be used to retrieve the next %s.' % group_name),
        'values': fields.Raw(description='A dict with %s as key. The values are dicts with "total" and "examples" keys.' % group_quantity)
Markus Scheidgen's avatar
Markus Scheidgen committed
91
    }), skip_none=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
92
repo_calcs_model = api.model('RepoCalculations', repo_calcs_model_fields)
Markus Scheidgen's avatar
Markus Scheidgen committed
93

94

95
96
repo_calc_id_model = api.model('RepoCalculationId', {
    'upload_id': fields.String(), 'calc_id': fields.String()
Markus Scheidgen's avatar
Markus Scheidgen committed
97
98
})

99
100
101
102
103
104
105
106
107
108
109
110

def add_common_parameters(request_parser):
    request_parser.add_argument(
        'owner', type=str,
        help='Specify which calcs to return: ``all``, ``public``, ``user``, ``staging``, default is ``all``')
    request_parser.add_argument(
        'from_time', type=lambda x: rfc3339DateTime.parse(x),
        help='A yyyy-MM-ddTHH:mm:ss (RFC3339) minimum entry time (e.g. upload time)')
    request_parser.add_argument(
        'until_time', type=lambda x: rfc3339DateTime.parse(x),
        help='A yyyy-MM-ddTHH:mm:ss (RFC3339) maximum entry time (e.g. upload time)')

111
    for quantity in search.quantities.values():
112
        request_parser.add_argument(
113
            quantity.name, help=quantity.description,
114
            action=quantity.argparse_action if quantity.multi else None)
115
116


Markus Scheidgen's avatar
Markus Scheidgen committed
117
repo_request_parser = pagination_request_parser.copy()
118
add_common_parameters(repo_request_parser)
119
120
121
122
repo_request_parser.add_argument(
    'scroll', type=bool, help='Enable scrolling')
repo_request_parser.add_argument(
    'scroll_id', type=str, help='The id of the current scrolling window to use.')
123
124
repo_request_parser.add_argument(
    'date_histogram', type=bool, help='Add an additional aggregation over the upload time')
Markus Scheidgen's avatar
Markus Scheidgen committed
125
repo_request_parser.add_argument(
126
    'metrics', type=str, action='append', help=(
127
        'Metrics to aggregate over all quantities and their values as comma separated list. '
128
        'Possible values are %s.' % ', '.join(datamodel.Domain.instance.metrics_names)))
Markus Scheidgen's avatar
Markus Scheidgen committed
129
130
repo_request_parser.add_argument(
    'statistics', type=bool, help=('Return statistics.'))
Markus Scheidgen's avatar
Markus Scheidgen committed
131

Markus Scheidgen's avatar
Markus Scheidgen committed
132
133
134
135
136
137
138
for group_name in search.groups:
    repo_request_parser.add_argument(
        group_name, type=bool, help=('Return %s group data.' % group_name))
    repo_request_parser.add_argument(
        '%s_after' % group_name, type=str,
        help='The last %s id of the last scroll window for the %s group' % (group_name, group_name))

139

140
141
142
143
search_request_parser = api.parser()
add_common_parameters(search_request_parser)


144
def add_query(search_request: search.SearchRequest, args: Dict[str, Any]):
145
    """
146
    Help that adds query relevant request args to the given SearchRequest.
147
    """
148
    args = {key: value for key, value in args.items() if value is not None}
149

150
    # owner
151
    owner = args.get('owner', 'all')
152
153
    try:
        search_request.owner(
154
            owner,
155
156
            g.user.user_id if g.user is not None else None)
    except ValueError as e:
157
        abort(401, getattr(e, 'message', 'Invalid owner parameter: %s' % owner))
158
159
160
161
    except Exception as e:
        abort(400, getattr(e, 'message', 'Invalid owner parameter'))

    # time range
162
163
    from_time_str = args.get('from_time', None)
    until_time_str = args.get('until_time', None)
164
165

    try:
166
167
168
        from_time = rfc3339DateTime.parse(from_time_str) if from_time_str is not None else None
        until_time = rfc3339DateTime.parse(until_time_str) if until_time_str is not None else None
        search_request.time_range(start=from_time, end=until_time)
169
170
171
    except Exception:
        abort(400, message='bad datetime format')

172
173
    # optimade
    try:
174
        optimade = args.get('optimade', None)
175
176
177
178
179
180
        if optimade is not None:
            q = filterparser.parse_filter(optimade)
            search_request.query(q)
    except filterparser.FilterException:
        abort(400, message='could not parse optimade query')

181
182
    # search parameter
    search_request.search_parameters(**{
183
        key: value for key, value in args.items()
184
        if key not in ['optimade'] and key in search.quantities})
185
186


Markus Scheidgen's avatar
Markus Scheidgen committed
187
188
@ns.route('/')
class RepoCalcsResource(Resource):
189
    @api.doc('search')
190
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
Markus Scheidgen's avatar
Markus Scheidgen committed
191
    @api.expect(repo_request_parser, validate=True)
192
    @api.marshal_with(repo_calcs_model, skip_none=True, code=200, description='Search results send')
193
    @authenticate()
Markus Scheidgen's avatar
Markus Scheidgen committed
194
195
    def get(self):
        """
196
        Search for calculations in the repository form, paginated.
197
198

        The ``owner`` parameter determines the overall entries to search through.
199
200
201
202
        Possible values are: ``all`` (show all entries visible to the current user), ``public``
        (show all publically visible entries), ``user`` (show all user entries, requires login),
        ``staging`` (show all user entries in staging area, requires login).

203
204
205
206
207
        You can use the various quantities to search/filter for. For some of the
        indexed quantities this endpoint returns aggregation information. This means
        you will be given a list of all possible values and the number of entries
        that have the certain value. You can also use these aggregations on an empty
        search to determine the possible values.
208
209
210

        The pagination parameters allows determine which page to return via the
        ``page`` and ``per_page`` parameters. Pagination however, is limited to the first
Markus Scheidgen's avatar
Markus Scheidgen committed
211
212
213
214
215
216
217
218
        100k (depending on ES configuration) hits.

        An alternative to pagination is to use ``scroll`` and ``scroll_id``. With ``scroll``
        you will get a ``scroll_id`` on the first request. Each call with ``scroll`` and
        the respective ``scroll_id`` will return the next ``per_page`` (here the default is 1000)
        results. Scroll however, ignores ordering and does not return aggregations.
        The scroll view used in the background will stay alive for 1 minute between requests.
        If the given ``scroll_id`` is not available anymore, a HTTP 400 is raised.
219
220
221
222
223

        The search will return aggregations on a predefined set of quantities. Aggregations
        will tell you what quantity values exist and how many entries match those values.

        Ordering is determined by ``order_by`` and ``order`` parameters.
224
        """
225
226

        try:
227
228
229
230
231
232
233
234
235
236
237
238
            args = {
                key: value for key, value in repo_request_parser.parse_args().items()
                if value is not None}

            scroll = args.get('scroll', False)
            scroll_id = args.get('scroll_id', None)
            page = args.get('page', 1)
            per_page = args.get('per_page', 10 if not scroll else 1000)
            order = args.get('order', -1)
            order_by = args.get('order_by', 'formula')

            date_histogram = args.get('date_histogram', False)
239
            metrics: List[str] = request.args.getlist('metrics')
Markus Scheidgen's avatar
Markus Scheidgen committed
240

Markus Scheidgen's avatar
Markus Scheidgen committed
241
242
            with_statistics = args.get('statistics', False) or \
                any(args.get(group_name, False) for group_name in search.groups)
243
244
245
246
247
248
249
        except Exception as e:
            abort(400, message='bad parameters: %s' % str(e))

        search_request = search.SearchRequest()
        add_query(search_request, args)
        if date_histogram:
            search_request.date_histogram()
250

251
        try:
252
            assert page >= 1
253
            assert per_page >= 0
254
255
256
        except AssertionError:
            abort(400, message='invalid pagination')

257
258
259
        if order not in [-1, 1]:
            abort(400, message='invalid pagination')

260
261
        for metric in metrics:
            if metric not in search.metrics_names:
262
263
                abort(400, message='there is no metric %s' % metric)

Markus Scheidgen's avatar
Markus Scheidgen committed
264
265
        if with_statistics:
            search_request.default_statistics(metrics_to_use=metrics)
266

Markus Scheidgen's avatar
Markus Scheidgen committed
267
268
269
270
            additional_metrics = [
                metric
                for group_name, (_, metric) in search.groups.items()
                if args.get(group_name, False)]
271
272
273

            total_metrics = metrics + additional_metrics

Markus Scheidgen's avatar
Markus Scheidgen committed
274
275
            search_request.totals(metrics_to_use=total_metrics)
            search_request.statistic('authors', 1000)
276
277
        elif len(metrics) > 0:
            search_request.totals(metrics_to_use=metrics)
278

279
        try:
280
            if scroll:
281
                results = search_request.execute_scrolled(scroll_id=scroll_id, size=per_page)
282

283
            else:
Markus Scheidgen's avatar
Markus Scheidgen committed
284
285
286
287
288
                for group_name, (group_quantity, _) in search.groups.items():
                    if args.get(group_name, False):
                        search_request.quantity(
                            group_quantity, size=per_page, examples=1,
                            after=request.args.get('%s_after' % group_name, None))
289

290
291
                results = search_request.execute_paginated(
                    per_page=per_page, page=page, order=order, order_by=order_by)
292
293

                # TODO just a work around to make things prettier
Markus Scheidgen's avatar
Markus Scheidgen committed
294
295
296
297
298
                if with_statistics:
                    statistics = results['statistics']
                    if 'code_name' in statistics and 'currupted mainfile' in statistics['code_name']:
                        del(statistics['code_name']['currupted mainfile'])

299
300
301
                if 'quantities' in results:
                    quantities = results.pop('quantities')

Markus Scheidgen's avatar
Markus Scheidgen committed
302
303
304
                for group_name, (group_quantity, _) in search.groups.items():
                    if args.get(group_name, False):
                        results[group_name] = quantities[group_quantity]
305

306
307
308
309
            # build python code snippet
            snippet = build_snippet(args, os.path.join(api.base_url, ns.name, ''))
            results['code_snippet'] = snippet

310
            return results, 200
Markus Scheidgen's avatar
Markus Scheidgen committed
311
312
        except search.ScrollIdNotFound:
            abort(400, 'The given scroll_id does not exist.')
313
        except KeyError as e:
314
315
            import traceback
            traceback.print_exc()
316
            abort(400, str(e))
317

318
319
320
321
322
323
324
325

query_model_parameters = {
    'owner': fields.String(description='Specify which calcs to return: ``all``, ``public``, ``user``, ``staging``, default is ``all``'),
    'from_time': RFC3339DateTime(description='A yyyy-MM-ddTHH:mm:ss (RFC3339) minimum entry time (e.g. upload time)'),
    'until_time': RFC3339DateTime(description='A yyyy-MM-ddTHH:mm:ss (RFC3339) maximum entry time (e.g. upload time)')
}

for quantity in search.quantities.values():
326
    if quantity.multi and quantity.argparse_action is None:
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
        def field(**kwargs):
            return fields.List(fields.String(**kwargs))
    else:
        field = fields.String
    query_model_parameters[quantity.name] = field(description=quantity.description)

repo_query_model = api.model('RepoQuery', query_model_parameters, skip_none=True)


def repo_edit_action_field(quantity):
    if quantity.is_scalar:
        return fields.Nested(repo_edit_action_model, description=quantity.description, skip_none=True)
    else:
        return fields.List(
            fields.Nested(repo_edit_action_model, skip_none=True), description=quantity.description)


repo_edit_action_model = api.model('RepoEditAction', {
345
    'value': fields.String(description='The value/values that is set as a string.'),
346
347
348
349
350
351
352
353
354
355
356
357
    'success': fields.Boolean(description='If this can/could be done. Only in API response.'),
    'message': fields.String(descriptin='A message that details the action result. Only in API response.')
})

repo_edit_model = api.model('RepoEdit', {
    'verify': fields.Boolean(description='If true, no action is performed.'),
    'query': fields.Nested(repo_query_model, skip_none=True, description='New metadata will be applied to query results.'),
    'actions': fields.Nested(
        api.model('RepoEditActions', {
            quantity.name: repo_edit_action_field(quantity)
            for quantity in UserMetadata.m_def.all_quantities.values()
        }), skip_none=True,
358
359
360
        description='Each action specifies a single value (even for multi valued quantities).'),
    'success': fields.Boolean(description='If the overall edit can/could be done. Only in API response.'),
    'message': fields.String(description='A message that details the overall edit result. Only in API response.')
361
362
363
})


364
def edit(parsed_query: Dict[str, Any], logger, mongo_update: Dict[str, Any] = None, re_index=True) -> List[str]:
365
    # get all calculations that have to change
366
367
368
369
370
371
372
373
    with utils.timer(logger, 'edit query executed'):
        search_request = search.SearchRequest()
        add_query(search_request, parsed_query)
        upload_ids = set()
        calc_ids = []
        for hit in search_request.execute_scan():
            calc_ids.append(hit['calc_id'])
            upload_ids.add(hit['upload_id'])
374
375

    # perform the update on the mongo db
376
377
378
379
380
    with utils.timer(logger, 'edit mongo update executed', size=len(calc_ids)):
        if mongo_update is not None:
            n_updated = proc.Calc.objects(calc_id__in=calc_ids).update(multi=True, **mongo_update)
            if n_updated != len(calc_ids):
                logger.error('edit repo did not update all entries', payload=mongo_update)
381
382

    # re-index the affected entries in elastic search
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
    with utils.timer(logger, 'edit elastic update executed', size=len(calc_ids)):
        if re_index:
            def elastic_updates():
                for calc in proc.Calc.objects(calc_id__in=calc_ids):
                    entry = search.Entry.from_calc_with_metadata(
                        datamodel.CalcWithMetadata(**calc['metadata']))
                    entry = entry.to_dict(include_meta=True)
                    entry['_op_type'] = 'index'
                    yield entry

            _, failed = elasticsearch.helpers.bulk(
                infrastructure.elastic_client, elastic_updates(), stats_only=True)
            search.refresh()
            if failed > 0:
                logger.error(
                    'edit repo with failed elastic updates',
                    payload=mongo_update, nfailed=len(failed))
400

401
402
    return list(upload_ids)

403

404
405
406
407
408
409
410
411
def get_uploader_ids(query):
    """ Get all the uploader from the query, to check coauthers and shared_with for uploaders. """
    search_request = search.SearchRequest()
    add_query(search_request, query)
    search_request.quantity(name='uploader_id')
    return search_request.execute()['quantities']['uploader_id']['values']


412
413
@ns.route('/edit')
class EditRepoCalcsResource(Resource):
414
415
416
    @api.doc('edit_repo')
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
    @api.expect(repo_edit_model)
417
    @api.marshal_with(repo_edit_model, skip_none=True, code=200, description='Edit verified/performed')
418
419
420
421
    @authenticate()
    @with_logger
    def post(self, logger):
        """ Edit repository metadata. """
422
423

        # basic body parsing and some semantic checks
424
425
426
427
428
429
430
431
432
433
        json_data = request.get_json()
        if json_data is None:
            json_data = {}
        query = json_data.get('query', {})

        owner = query.get('owner', 'user')
        if owner not in ['user', 'staging']:
            abort(400, 'Not a valid owner for edit %s. Edit can only be performed in user or staging' % owner)
        query['owner'] = owner

434
435
436
437
        if 'actions' not in json_data:
            abort(400, 'Missing key actions in edit data')
        actions = json_data['actions']
        verify = json_data.get('verify', False)
438

439
440
441
442
443
444
445
446
447
448
        # preparing the query of entries that are edited
        parsed_query = {}
        for quantity_name, quantity in search.quantities.items():
            if quantity_name in query:
                value = query[quantity_name]
                if quantity.multi and quantity.argparse_action == 'split' and not isinstance(value, list):
                    value = value.split(',')
                parsed_query[quantity_name] = value
        parsed_query['owner'] = owner

449
        # checking the edit actions and preparing a mongo update on the fly
450
        json_data['success'] = True
451
        mongo_update = {}
452
        uploader_ids = None
453
        lift_embargo = False
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
        removed_datasets = None

        with utils.timer(logger, 'edit verified'):
            for action_quantity_name, quantity_actions in actions.items():
                quantity = UserMetadata.m_def.all_quantities.get(action_quantity_name)
                if quantity is None:
                    abort(400, 'Unknown quantity %s' % action_quantity_name)

                quantity_flask = quantity.m_x('flask', {})
                if quantity_flask.get('admin_only', False):
                    if not g.user.is_admin():
                        abort(404, 'Only the admin user can set %s' % quantity.name)

                if isinstance(quantity_actions, list) == quantity.is_scalar:
                    abort(400, 'Wrong shape for quantity %s' % action_quantity_name)

                if not isinstance(quantity_actions, list):
                    quantity_actions = [quantity_actions]

                flask_verify = quantity_flask.get('verify', None)
                mongo_key = 'metadata__%s' % quantity.name
                has_error = False
                for action in quantity_actions:
                    action['success'] = True
                    action['message'] = None
                    action_value = action.get('value')
                    action_value = action_value if action_value is None else action_value.strip()

                    if action_value is None:
                        mongo_value = None
484

485
                    elif action_value == '':
486
                        mongo_value = None
487
488
489
490
491

                    elif flask_verify == datamodel.User:
                        try:
                            mongo_value = User.get(user_id=action_value).user_id
                        except KeyError:
492
493
                            action['success'] = False
                            has_error = True
494
                            action['message'] = 'User does not exist'
495
                            continue
496

497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
                        if uploader_ids is None:
                            uploader_ids = get_uploader_ids(parsed_query)
                        if action_value in uploader_ids:
                            action['success'] = False
                            has_error = True
                            action['message'] = 'This user is already an uploader of one entry in the query'
                            continue

                    elif flask_verify == datamodel.Dataset:
                        try:
                            mongo_value = Dataset.m_def.m_x('me').get(
                                user_id=g.user.user_id, name=action_value).dataset_id
                        except KeyError:
                            action['message'] = 'Dataset does not exist and will be created'
                            mongo_value = None
                            if not verify:
                                dataset = Dataset(
                                    dataset_id=utils.create_uuid(), user_id=g.user.user_id,
Markus Scheidgen's avatar
Markus Scheidgen committed
515
                                    name=action_value, created=datetime.utcnow())
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
                                dataset.m_x('me').create()
                                mongo_value = dataset.dataset_id

                    elif action_quantity_name == 'with_embargo':
                        # ignore the actual value ... just lift the embargo
                        mongo_value = False
                        lift_embargo = True

                        # check if necessary
                        search_request = search.SearchRequest()
                        add_query(search_request, parsed_query)
                        search_request.q = search_request.q & Q('term', with_embargo=True)
                        if search_request.execute()['total'] == 0:
                            action['success'] = False
                            has_error = True
                            action['message'] = 'There is no embargo to lift'
                            continue
                    else:
                        mongo_value = action_value

                    if len(quantity.shape) == 0:
                        mongo_update[mongo_key] = mongo_value
                    else:
                        mongo_values = mongo_update.setdefault(mongo_key, [])
                        if mongo_value is not None:
                            if mongo_value in mongo_values:
                                action['success'] = False
                                has_error = True
                                action['message'] = 'Duplicate values are not allowed'
                                continue
                            mongo_values.append(mongo_value)

                if len(quantity_actions) == 0 and len(quantity.shape) > 0:
                    mongo_update[mongo_key] = []

                if action_quantity_name == 'datasets':
                    # check if datasets edit is allowed and if datasets have to be removed
                    search_request = search.SearchRequest()
                    add_query(search_request, parsed_query)
                    search_request.quantity(name='dataset_id')
                    old_datasets = list(
                        search_request.execute()['quantities']['dataset_id']['values'].keys())

                    removed_datasets = []
                    for dataset_id in old_datasets:
                        if dataset_id not in mongo_update.get(mongo_key, []):
                            removed_datasets.append(dataset_id)

                    doi_ds = Dataset.m_def.m_x('me').objects(
                        dataset_id__in=removed_datasets, doi__ne=None).first()
                    if doi_ds is not None:
                        json_data['success'] = False
                        json_data['message'] = json_data.get('message', '') + \
                            'Edit would remove entries from a dataset with DOI (%s) ' % doi_ds.name
                        has_error = True
571

572
573
574
575
576
577
578
579
        # stop here, if client just wants to verify its actions
        if verify:
            return json_data, 200

        # stop if the action were not ok
        if has_error:
            return json_data, 400

580
        # perform the change
Markus Scheidgen's avatar
Markus Scheidgen committed
581
        mongo_update['metadata__last_edit'] = datetime.utcnow()
582
583
584
585
586
587
588
        upload_ids = edit(parsed_query, logger, mongo_update, True)

        # lift embargo
        if lift_embargo:
            for upload_id in upload_ids:
                upload = proc.Upload.get(upload_id)
                upload.re_pack()
589

590
        # remove potentially empty old datasets
591
        if removed_datasets is not None:
592
593
594
            for dataset in removed_datasets:
                if proc.Calc.objects(metadata__dataset_id=dataset).first() is None:
                    Dataset.m_def.m_x('me').objects(dataset_id=dataset).delete()
595

596
        return json_data, 200
597

598

599
600
601
602
603
repo_quantity_model = api.model('RepoQuantity', {
    'after': fields.String(description='The after value that can be used to retrieve the next set of values.'),
    'values': fields.Raw(description='A dict with values as key. Values are dicts with "total" and "examples" keys.')
})

604
repo_quantity_values_model = api.model('RepoQuantityValues', {
605
606
607
608
609
    'quantity': fields.Nested(repo_quantity_model, allow_null=True)
})

repo_quantities_model = api.model('RepoQuantities', {
    'quantities': fields.List(fields.Nested(repo_quantity_model))
610
611
612
613
614
615
})

repo_quantity_search_request_parser = api.parser()
add_common_parameters(repo_quantity_search_request_parser)
repo_quantity_search_request_parser.add_argument(
    'after', type=str, help='The after value to use for "scrolling".')
616
repo_quantity_search_request_parser.add_argument(
617
618
619
    'size', type=int, help='The max size of the returned values.')


620
@ns.route('/quantity/<string:quantity>')
621
622
623
624
625
class RepoQuantityResource(Resource):
    @api.doc('quantity_search')
    @api.response(400, 'Invalid requests, e.g. wrong owner type, bad quantity, bad search parameters')
    @api.expect(repo_quantity_search_request_parser, validate=True)
    @api.marshal_with(repo_quantity_values_model, skip_none=True, code=200, description='Search results send')
626
    @authenticate()
627
628
629
630
631
632
633
634
635
636
637
638
639
640
    def get(self, quantity: str):
        """
        Retrieve quantity values from entries matching the search.

        You can use the various quantities to search/filter for. For some of the
        indexed quantities this endpoint returns aggregation information. This means
        you will be given a list of all possible values and the number of entries
        that have the certain value. You can also use these aggregations on an empty
        search to determine the possible values.

        There is no ordering and no pagination. Instead there is an 'after' key based
        scrolling. The result will contain an 'after' value, that can be specified
        for the next request. You can use the 'size' and 'after' parameters accordingly.

641
642
643
        The result will contain a 'quantity' key with quantity values and the "after"
        value. There will be upto 'size' many values. For the rest of the values use the
        "after" parameter in another request.
644
645
        """

646
        search_request = search.SearchRequest()
647
648
649
650
        args = {
            key: value
            for key, value in repo_quantity_search_request_parser.parse_args().items()
            if value is not None}
651

652
653
654
        add_query(search_request, args)
        after = args.get('after', None)
        size = args.get('size', 100)
655
656
657
658
659
660

        try:
            assert size >= 0
        except AssertionError:
            abort(400, message='invalid size')

661
        search_request.quantity(quantity, size=size, after=after)
662
663

        try:
664
665
666
            results = search_request.execute()
            quantities = results.pop('quantities')
            results['quantity'] = quantities[quantity]
667
668
669
670
671
672

            return results, 200
        except KeyError as e:
            import traceback
            traceback.print_exc()
            abort(400, 'Given quantity does not exist: %s' % str(e))
673
674


675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
repo_quantities_search_request_parser = api.parser()
add_common_parameters(repo_quantities_search_request_parser)
repo_quantities_search_request_parser.add_argument(
    'quantities', type=str, action='append',
    help='The quantities to retrieve values from')
repo_quantities_search_request_parser.add_argument(
    'size', type=int, help='The max size of the returned values.')


@ns.route('/quantities')
class RepoQuantitiesResource(Resource):
    @api.doc('quantities_search')
    @api.response(400, 'Invalid requests, e.g. wrong owner type, bad quantity, bad search parameters')
    @api.expect(repo_quantities_search_request_parser, validate=True)
    @api.marshal_with(repo_quantities_model, skip_none=True, code=200, description='Search results send')
    @authenticate()
    def get(self):
        """
        Retrieve quantity values for multiple quantities at once.

        You can use the various quantities to search/filter for. For some of the
        indexed quantities this endpoint returns aggregation information. This means
        you will be given a list of all possible values and the number of entries
        that have the certain value. You can also use these aggregations on an empty
        search to determine the possible values.

        There is no ordering and no pagination and not after key based scrolling. Instead
        there is an 'after' key based scrolling.

        The result will contain a 'quantities' key with a dict of quantity names and the
        retrieved values as values.
        """

        search_request = search.SearchRequest()
        args = {
            key: value
            for key, value in repo_quantities_search_request_parser.parse_args().items()
            if value is not None}

        add_query(search_request, args)
        quantities = args.get('quantities', [])
        size = args.get('size', 5)

        try:
            assert size >= 0
        except AssertionError:
            abort(400, message='invalid size')

        for quantity in quantities:
            try:
                search_request.quantity(quantity, size=size)
            except KeyError as e:
                import traceback
                traceback.print_exc()
                abort(400, 'Given quantity does not exist: %s' % str(e))

        return search_request.execute(), 200


734
@ns.route('/pid/<path:pid>')
735
736
737
738
class RepoPidResource(Resource):
    @api.doc('resolve_pid')
    @api.response(404, 'Entry with PID does not exist')
    @api.marshal_with(repo_calc_id_model, skip_none=True, code=200, description='Entry resolved')
Markus Scheidgen's avatar
Markus Scheidgen committed
739
    @authenticate()
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
    def get(self, pid: str):
        if '/' in pid:
            prefix, pid = pid.split('/')
            if prefix != '21.11132':
                abort(400, 'Wrong PID format')
            try:
                pid_int = utils.decode_handle_id(pid)
            except ValueError:
                abort(400, 'Wrong PID format')
        else:
            try:
                pid_int = int(pid)
            except ValueError:
                abort(400, 'Wrong PID format')

755
756
757
758
        search_request = search.SearchRequest()

        if g.user is not None:
            search_request.owner('all', user_id=g.user.user_id)
759
        else:
760
761
            search_request.owner('all')

762
        search_request.search_parameter('pid', pid_int)
763
764
765
766
767

        results = list(search_request.execute_scan())
        total = len(results)

        if total == 0:
768
            abort(404, 'Entry with PID %s does not exist' % pid)
769
770

        if total > 1:
771
            utils.get_logger(__name__).error('Two entries for the same pid', pid=pid_int)
772
773
774
775
776

        result = results[0]
        return dict(
            upload_id=result['upload_id'],
            calc_id=result['calc_id'])