repo.py 42.3 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
16
17
The repository API of the nomad@FAIRDI APIs. Currently allows to resolve repository
meta-data.
18
'''
Markus Scheidgen's avatar
Markus Scheidgen committed
19

20
from typing import List, Dict, Any
Markus Scheidgen's avatar
Markus Scheidgen committed
21
from flask_restplus import Resource, abort, fields
22
from flask import request, g
23
from elasticsearch_dsl import Q
24
from elasticsearch.exceptions import NotFoundError
25
import elasticsearch.helpers
Markus Scheidgen's avatar
Markus Scheidgen committed
26
from datetime import datetime
Markus Scheidgen's avatar
Markus Scheidgen committed
27

28
from nomad import search, utils, datamodel, processing as proc, infrastructure, files
29
from nomad.metainfo import search_extension
30
from nomad.datamodel import Dataset, User, EditableUserMetadata
31
from nomad.app import common
32
from nomad.app.common import RFC3339DateTime, DotKeyNested
Markus Scheidgen's avatar
Markus Scheidgen committed
33

Markus Scheidgen's avatar
Markus Scheidgen committed
34
from .api import api
35
from .auth import authenticate
36
37
from .common import search_model, calc_route, add_pagination_parameters,\
    add_scroll_parameters, add_search_parameters, apply_search_parameters,\
38
    query_api_python, query_api_curl, query_api_clientlib, _search_quantities
Markus Scheidgen's avatar
Markus Scheidgen committed
39

40
ns = api.namespace('repo', description='Access repository metadata.')
Markus Scheidgen's avatar
Markus Scheidgen committed
41
42
43
44
45


@calc_route(ns)
class RepoCalcResource(Resource):
    @api.response(404, 'The upload or calculation does not exist')
46
    @api.response(401, 'Not authorized to access the calculation')
47
    @api.response(200, 'Metadata send', fields.Raw)
48
    @api.doc('get_repo_calc')
49
    @authenticate()
50
    def get(self, upload_id, calc_id):
51
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
52
53
        Get calculation metadata in repository form.

54
        Repository metadata only entails the quantities shown in the repository.
55
        Calcs are references via *upload_id*, *calc_id* pairs.
56
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
57
        try:
58
            calc = search.entry_document.get(calc_id)
59
60
61
62
63
64
65
        except NotFoundError:
            abort(404, message='There is no calculation %s/%s' % (upload_id, calc_id))

        if calc.with_embargo or not calc.published:
            if g.user is None:
                abort(401, message='Not logged in to access %s/%s.' % (upload_id, calc_id))

66
            if not (any(g.user.user_id == user.user_id for user in calc.owners) or g.user.is_admin):
67
68
                abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))

69
        result = calc.to_dict()
70
        result['code'] = {
71
72
            'python': query_api_python(dict(upload_id=upload_id, calc_id=calc_id)),
            'curl': query_api_curl(dict(upload_id=upload_id, calc_id=calc_id)),
73
74
            'clientlib': query_api_clientlib(upload_id=[upload_id], calc_id=[calc_id])
        }
Markus Scheidgen's avatar
Markus Scheidgen committed
75

76
        return result, 200
Markus Scheidgen's avatar
Markus Scheidgen committed
77

78

79
80
81
82
83
_search_request_parser = api.parser()
add_pagination_parameters(_search_request_parser)
add_scroll_parameters(_search_request_parser)
add_search_parameters(_search_request_parser)
_search_request_parser.add_argument(
84
    'date_histogram', type=bool, help='Add an additional aggregation over the upload time')
85
86
_search_request_parser.add_argument(
    'interval', type=str, help='Interval to use for upload time aggregation.')
87
_search_request_parser.add_argument(
88
    'metrics', type=str, action='append', help=(
89
        'Metrics to aggregate over all quantities and their values as comma separated list. '
90
        'Possible values are %s.' % ', '.join(search_extension.metrics.keys())))
91
_search_request_parser.add_argument(
92
93
    'statistics', type=str, action='append', help=(
        'Quantities for which to aggregate values and their metrics.'))
94
95
_search_request_parser.add_argument(
    'exclude', type=str, action='split', help='Excludes the given keys in the returned data.')
96
for group_name in search_extension.groups:
97
    _search_request_parser.add_argument(
98
        group_name, type=bool, help=('Return %s group data.' % group_name))
99
    _search_request_parser.add_argument(
Markus Scheidgen's avatar
Markus Scheidgen committed
100
101
102
        '%s_after' % group_name, type=str,
        help='The last %s id of the last scroll window for the %s group' % (group_name, group_name))

103
104
105
106
107
_repo_calcs_model_fields = {
    'statistics': fields.Raw(description=(
        'A dict with all statistics. Each statistic is dictionary with a metrics dict as '
        'value and quantity value as key. The possible metrics are code runs(calcs), %s. '
        'There is a pseudo quantity "total" with a single value "all" that contains the '
108
        ' metrics over all results. ' % ', '.join(search_extension.metrics.keys())))}
109

110
for group_name in search_extension.groups:
111
    _repo_calcs_model_fields[group_name] = (DotKeyNested if '.' in group_name else fields.Nested)(api.model('RepoGroup', {
112
        'after': fields.String(description='The after value that can be used to retrieve the next %s.' % group_name),
113
        'values': fields.Raw(description='A dict with %s as key. The values are dicts with "total" and "examples" keys.' % group_name)
114
    }), skip_none=True)
115

116
for qualified_name, quantity in search_extension.search_quantities.items():
117
    _repo_calcs_model_fields[qualified_name] = fields.Raw(
118
119
        description=quantity.description, allow_null=True, skip_none=True)

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
120
121
122
123
124
125
126
127
128
129
_repo_calcs_model_fields.update(**{
    'date_histogram': fields.Boolean(default=False, description='Add an additional aggregation over the upload time', allow_null=True, skip_none=True),
    'interval': fields.String(description='Interval to use for upload time aggregation.', allow_null=True, skip_none=True),
    'metrics': fields.List(fields.String, description=(
        'Metrics to aggregate over all quantities and their values as comma separated list. '
        'Possible values are %s.' % ', '.join(search_extension.metrics.keys())), allow_null=True, skip_none=True),
    'statistics_required': fields.List(fields.String, description='Quantities for which to aggregate values and their metrics.', allow_null=True, skip_none=True),
    'exclude': fields.List(fields.String, description='Excludes the given keys in the returned data.', allow_null=True, skip_none=True)
})

130
_repo_calcs_model = api.inherit('RepoCalculations', search_model, _repo_calcs_model_fields)
131
132


Markus Scheidgen's avatar
Markus Scheidgen committed
133
134
@ns.route('/')
class RepoCalcsResource(Resource):
135
    @api.doc('search')
136
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
137
138
    @api.expect(_search_request_parser, validate=True)
    @api.marshal_with(_repo_calcs_model, skip_none=True, code=200, description='Search results send')
139
    @authenticate()
Markus Scheidgen's avatar
Markus Scheidgen committed
140
    def get(self):
141
        '''
142
        Search for calculations in the repository form, paginated.
143
144

        The ``owner`` parameter determines the overall entries to search through.
145
146
147
148
        Possible values are: ``all`` (show all entries visible to the current user), ``public``
        (show all publically visible entries), ``user`` (show all user entries, requires login),
        ``staging`` (show all user entries in staging area, requires login).

149
150
151
152
153
        You can use the various quantities to search/filter for. For some of the
        indexed quantities this endpoint returns aggregation information. This means
        you will be given a list of all possible values and the number of entries
        that have the certain value. You can also use these aggregations on an empty
        search to determine the possible values.
154
155
156

        The pagination parameters allows determine which page to return via the
        ``page`` and ``per_page`` parameters. Pagination however, is limited to the first
Markus Scheidgen's avatar
Markus Scheidgen committed
157
158
159
160
161
162
163
164
        100k (depending on ES configuration) hits.

        An alternative to pagination is to use ``scroll`` and ``scroll_id``. With ``scroll``
        you will get a ``scroll_id`` on the first request. Each call with ``scroll`` and
        the respective ``scroll_id`` will return the next ``per_page`` (here the default is 1000)
        results. Scroll however, ignores ordering and does not return aggregations.
        The scroll view used in the background will stay alive for 1 minute between requests.
        If the given ``scroll_id`` is not available anymore, a HTTP 400 is raised.
165
166
167
168

        The search will return aggregations on a predefined set of quantities. Aggregations
        will tell you what quantity values exist and how many entries match those values.

169
170
        Ordering is determined by ``order_by`` and ``order`` parameters. Default is
        ``upload_time`` in decending order.
171
        '''
172
173

        try:
174
            parsed_args = _search_request_parser.parse_args()
175
            args = {
176
                key: value for key, value in parsed_args.items()
177
178
179
180
181
182
183
                if value is not None}

            scroll = args.get('scroll', False)
            scroll_id = args.get('scroll_id', None)
            page = args.get('page', 1)
            per_page = args.get('per_page', 10 if not scroll else 1000)
            order = args.get('order', -1)
184
            order_by = args.get('order_by', 'upload_time')
185
186

            date_histogram = args.get('date_histogram', False)
187
            interval = args.get('interval', '1M')
188
            metrics: List[str] = request.args.getlist('metrics')
189
            statistics = args.get('statistics', [])
190
191
192
        except Exception as e:
            abort(400, message='bad parameters: %s' % str(e))

193
194
195
196
        for metric in metrics:
            if metric not in search_extension.metrics:
                abort(400, message='there is no metric %s' % metric)

197
        search_request = search.SearchRequest()
198
        apply_search_parameters(search_request, args)
199
        if date_histogram:
200
            search_request.date_histogram(interval=interval, metrics_to_use=metrics)
201

202
        try:
203
            assert page >= 1
204
            assert per_page >= 0
205
206
207
        except AssertionError:
            abort(400, message='invalid pagination')

208
209
210
        if order not in [-1, 1]:
            abort(400, message='invalid pagination')

211
212
        if len(statistics) > 0:
            search_request.statistics(statistics, metrics_to_use=metrics)
213

214
215
216
217
218
219
        group_metrics = [
            group_quantity.metric_name
            for group_name, group_quantity in search_extension.groups.items()
            if args.get(group_name, False)]
        total_metrics = metrics + group_metrics
        if len(total_metrics) > 0:
Markus Scheidgen's avatar
Markus Scheidgen committed
220
            search_request.totals(metrics_to_use=total_metrics)
221

222
        if 'exclude' in parsed_args:
223
224
225
            excludes = parsed_args['exclude']
            if excludes is not None:
                search_request.exclude(*excludes)
226

227
        try:
228
            if scroll:
229
                results = search_request.execute_scrolled(scroll_id=scroll_id, size=per_page)
230

231
            else:
232
                for group_name, group_quantity in search_extension.groups.items():
233
                    if args.get(group_name, False):
234
                        kwargs: Dict[str, Any] = {}
Markus Scheidgen's avatar
Markus Scheidgen committed
235
                        if group_name == 'uploads_grouped':
236
                            kwargs.update(order_by='upload_time', order='desc')
Markus Scheidgen's avatar
Markus Scheidgen committed
237
                        search_request.quantity(
238
                            group_quantity.qualified_name, size=per_page, examples=1,
239
240
                            after=request.args.get('%s_after' % group_name, None),
                            **kwargs)
241

242
243
                results = search_request.execute_paginated(
                    per_page=per_page, page=page, order=order, order_by=order_by)
244
245

                # TODO just a work around to make things prettier
246
                if 'statistics' in results:
Markus Scheidgen's avatar
Markus Scheidgen committed
247
248
249
250
                    statistics = results['statistics']
                    if 'code_name' in statistics and 'currupted mainfile' in statistics['code_name']:
                        del(statistics['code_name']['currupted mainfile'])

251
252
253
                if 'quantities' in results:
                    quantities = results.pop('quantities')

254
                for group_name, group_quantity in search_extension.groups.items():
255
                    if args.get(group_name, False):
256
                        results[group_name] = quantities[group_quantity.qualified_name]
257

258
            # build python code/curl snippet
259
            code_args = request.args.to_dict(flat=False)
260
261
            if 'statistics' in code_args:
                del(code_args['statistics'])
262
            results['code'] = {
263
264
                'curl': query_api_curl(code_args),
                'python': query_api_python(code_args),
265
266
                'clientlib': query_api_clientlib(**code_args)
            }
267

268
            return results, 200
Markus Scheidgen's avatar
Markus Scheidgen committed
269
270
        except search.ScrollIdNotFound:
            abort(400, 'The given scroll_id does not exist.')
271
        except KeyError as e:
272
273
            import traceback
            traceback.print_exc()
274
            abort(400, str(e))
275

Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
    @api.doc('post_search')
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
    @api.expect(_repo_calcs_model)
    @api.marshal_with(_repo_calcs_model, skip_none=True, code=200, description='Search results send')
    @authenticate()
    def post(self):
        '''
        Search for calculations in the repository form, paginated.

        The ``owner`` parameter determines the overall entries to search through.
        Possible values are: ``all`` (show all entries visible to the current user), ``public``
        (show all publically visible entries), ``user`` (show all user entries, requires login),
        ``staging`` (show all user entries in staging area, requires login).

        You can use the various quantities to search/filter for. For some of the
        indexed quantities this endpoint returns aggregation information. This means
        you will be given a list of all possible values and the number of entries
        that have the certain value. You can also use these aggregations on an empty
        search to determine the possible values.

        The pagination parameters allows determine which page to return via the
        ``page`` and ``per_page`` parameters. Pagination however, is limited to the first
        100k (depending on ES configuration) hits.

        An alternative to pagination is to use ``scroll`` and ``scroll_id``. With ``scroll``
        you will get a ``scroll_id`` on the first request. Each call with ``scroll`` and
        the respective ``scroll_id`` will return the next ``per_page`` (here the default is 1000)
        results. Scroll however, ignores ordering and does not return aggregations.
        The scroll view used in the background will stay alive for 1 minute between requests.
        If the given ``scroll_id`` is not available anymore, a HTTP 400 is raised.

        The search will return aggregations on a predefined set of quantities. Aggregations
        will tell you what quantity values exist and how many entries match those values.

        Ordering is determined by ``order_by`` and ``order`` parameters. Default is
        ``upload_time`` in decending order.
        '''
        try:
            data_in = request.get_json()

            Scroll = data_in.get('scroll', {})
            scroll = Scroll.get('scroll', False)
            scroll_id = Scroll.get('scroll_id', None)
            pagination = data_in.get('pagination', {})
            page = pagination.get('page', 1)
            per_page = pagination.get('per_page', 10 if not scroll else 1000)
            order = pagination.get('order', -1)
            order_by = pagination.get('order_by', 'upload_time')

            date_histogram = data_in.get('date_histogram', False)
            interval = data_in.get('interval', '1M')
            metrics: List[str] = data_in.get('metrics', [])
            statistics = data_in.get('statistics_required', [])

            query = data_in.get('query', {})
            query_expression = {key: val for key, val in query.items() if '$' in key}
        except Exception as e:
            abort(400, message='bad parameters: %s' % str(e))

        for metric in metrics:
            if metric not in search_extension.metrics:
                abort(400, message='there is no metric %s' % metric)

        search_request = search.SearchRequest()
        apply_search_parameters(search_request, query)
        if date_histogram:
            search_request.date_histogram(interval=interval, metrics_to_use=metrics)

        if query_expression:
345
346
347
348
            try:
                search_request.query_expression(query_expression)
            except AssertionError as e:
                abort(400, str(e))
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364

        try:
            assert page >= 1
            assert per_page >= 0
        except AssertionError:
            abort(400, message='invalid pagination')

        if order not in [-1, 1]:
            abort(400, message='invalid pagination')

        if len(statistics) > 0:
            search_request.statistics(statistics, metrics_to_use=metrics)

        group_metrics = [
            group_quantity.metric_name
            for group_name, group_quantity in search_extension.groups.items()
365
            if group_name in data_in]
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
        total_metrics = metrics + group_metrics
        if len(total_metrics) > 0:
            search_request.totals(metrics_to_use=total_metrics)

        if 'exclude' in data_in:
            excludes = data_in['exclude']
            if excludes is not None:
                search_request.exclude(*excludes)

        try:
            if scroll:
                results = search_request.execute_scrolled(scroll_id=scroll_id, size=per_page)

            else:
                for group_name, group_quantity in search_extension.groups.items():
381
                    if group_name in data_in:
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
382
383
384
385
386
                        kwargs: Dict[str, Any] = {}
                        if group_name == 'uploads_grouped':
                            kwargs.update(order_by='upload_time', order='desc')
                        search_request.quantity(
                            group_quantity.qualified_name, size=per_page, examples=1,
387
                            after=data_in[group_name].get('after', None),
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
                            **kwargs)

                results = search_request.execute_paginated(
                    per_page=per_page, page=page, order=order, order_by=order_by)

                # TODO just a work around to make things prettier
                if 'statistics' in results:
                    statistics = results['statistics']
                    if 'code_name' in statistics and 'currupted mainfile' in statistics['code_name']:
                        del(statistics['code_name']['currupted mainfile'])

                if 'quantities' in results:
                    quantities = results.pop('quantities')

                for group_name, group_quantity in search_extension.groups.items():
403
                    if group_name in data_in:
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
404
405
406
407
408
409
410
                        results[group_name] = quantities[group_quantity.qualified_name]

            # build python code/curl snippet
            code_args = dict(data_in)
            if 'statistics' in code_args:
                del(code_args['statistics'])
            results['code'] = {
411
412
                'curl': query_api_curl(code_args),
                'python': query_api_python(code_args),
Alvin Noe Ladines's avatar
Alvin Noe Ladines committed
413
414
415
416
417
418
419
420
421
422
423
                'clientlib': query_api_clientlib(**code_args)
            }

            return results, 200
        except search.ScrollIdNotFound:
            abort(400, 'The given scroll_id does not exist.')
        except KeyError as e:
            import traceback
            traceback.print_exc()
            abort(400, str(e))

424

425
_query_model_parameters = {
426
427
428
429
430
    'owner': fields.String(description='Specify which calcs to return: ``all``, ``public``, ``user``, ``staging``, default is ``all``'),
    'from_time': RFC3339DateTime(description='A yyyy-MM-ddTHH:mm:ss (RFC3339) minimum entry time (e.g. upload time)'),
    'until_time': RFC3339DateTime(description='A yyyy-MM-ddTHH:mm:ss (RFC3339) maximum entry time (e.g. upload time)')
}

431
for qualified_name, quantity in search.search_quantities.items():
432
    if quantity.many_and == 'append' or quantity.many_or == 'append':
433
434
435
436
        def field(**kwargs):
            return fields.List(fields.String(**kwargs))
    else:
        field = fields.String
437
    _query_model_parameters[qualified_name] = field(description=quantity.description)
438

439
_repo_query_model = api.model('RepoQuery', _query_model_parameters, skip_none=True)
440
441
442
443


def repo_edit_action_field(quantity):
    if quantity.is_scalar:
444
        return fields.Nested(_repo_edit_action_model, description=quantity.description, skip_none=True)
445
446
    else:
        return fields.List(
447
            fields.Nested(_repo_edit_action_model, skip_none=True), description=quantity.description)
448
449


450
_repo_edit_action_model = api.model('RepoEditAction', {
451
    'value': fields.String(description='The value/values that is set as a string.'),
452
453
454
455
    'success': fields.Boolean(description='If this can/could be done. Only in API response.'),
    'message': fields.String(descriptin='A message that details the action result. Only in API response.')
})

456
_repo_edit_model = api.model('RepoEdit', {
457
    'verify': fields.Boolean(description='If true, no action is performed.'),
458
    'query': fields.Nested(_repo_query_model, skip_none=True, description='New metadata will be applied to query results.'),
459
460
461
    'actions': fields.Nested(
        api.model('RepoEditActions', {
            quantity.name: repo_edit_action_field(quantity)
462
            for quantity in EditableUserMetadata.m_def.definitions
463
        }), skip_none=True,
464
465
466
        description='Each action specifies a single value (even for multi valued quantities).'),
    'success': fields.Boolean(description='If the overall edit can/could be done. Only in API response.'),
    'message': fields.String(description='A message that details the overall edit result. Only in API response.')
467
468
})

469
470
471
_editable_quantities = {
    quantity.name: quantity for quantity in EditableUserMetadata.m_def.definitions}

472

473
def edit(parsed_query: Dict[str, Any], mongo_update: Dict[str, Any] = None, re_index=True) -> List[str]:
474
    # get all calculations that have to change
475
    with utils.timer(common.logger, 'edit query executed'):
476
        search_request = search.SearchRequest().include('calc_id', 'upload_id')
477
        apply_search_parameters(search_request, parsed_query)
478
479
        upload_ids = set()
        calc_ids = []
480

481
482
483
        for hit in search_request.execute_scan():
            calc_ids.append(hit['calc_id'])
            upload_ids.add(hit['upload_id'])
484
485

    # perform the update on the mongo db
486
    with utils.timer(common.logger, 'edit mongo update executed', size=len(calc_ids)):
487
488
489
        if mongo_update is not None:
            n_updated = proc.Calc.objects(calc_id__in=calc_ids).update(multi=True, **mongo_update)
            if n_updated != len(calc_ids):
490
                common.logger.error('edit repo did not update all entries', payload=mongo_update)
491
492

    # re-index the affected entries in elastic search
493
    with utils.timer(common.logger, 'edit elastic update executed', size=len(calc_ids)):
494
495
        if re_index:
            def elastic_updates():
496
497
                upload_files_cache: Dict[str, files.UploadFiles] = dict()

498
                for calc in proc.Calc.objects(calc_id__in=calc_ids):
499
500
501
502
503
504
505
                    upload_id = calc.upload_id
                    upload_files = upload_files_cache.get(upload_id)
                    if upload_files is None:
                        upload_files = files.UploadFiles.get(upload_id, is_authorized=lambda: True)
                        upload_files_cache[upload_id] = upload_files

                    entry_metadata = calc.entry_metadata(upload_files)
506
                    entry = entry_metadata.a_elastic.create_index_entry().to_dict(include_meta=True)
507
                    entry['_op_type'] = 'index'
508

509
510
                    yield entry

511
512
513
                for upload_files in upload_files_cache.values():
                    upload_files.close()

514
515
516
517
            _, failed = elasticsearch.helpers.bulk(
                infrastructure.elastic_client, elastic_updates(), stats_only=True)
            search.refresh()
            if failed > 0:
518
                common.logger.error(
519
520
                    'edit repo with failed elastic updates',
                    payload=mongo_update, nfailed=len(failed))
521

522
523
    return list(upload_ids)

524

525
def get_uploader_ids(query):
526
    ''' Get all the uploader from the query, to check coauthers and shared_with for uploaders. '''
527
    search_request = search.SearchRequest()
528
    apply_search_parameters(search_request, query)
529
530
531
532
    search_request.quantity(name='uploader_id')
    return search_request.execute()['quantities']['uploader_id']['values']


533
534
@ns.route('/edit')
class EditRepoCalcsResource(Resource):
535
536
    @api.doc('edit_repo')
    @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
537
538
    @api.expect(_repo_edit_model)
    @api.marshal_with(_repo_edit_model, skip_none=True, code=200, description='Edit verified/performed')
539
    @authenticate()
540
    def post(self):
541
        ''' Edit repository metadata. '''
542
543

        # basic body parsing and some semantic checks
544
545
546
547
548
549
550
551
552
553
        json_data = request.get_json()
        if json_data is None:
            json_data = {}
        query = json_data.get('query', {})

        owner = query.get('owner', 'user')
        if owner not in ['user', 'staging']:
            abort(400, 'Not a valid owner for edit %s. Edit can only be performed in user or staging' % owner)
        query['owner'] = owner

554
555
556
557
        if 'actions' not in json_data:
            abort(400, 'Missing key actions in edit data')
        actions = json_data['actions']
        verify = json_data.get('verify', False)
558

559
560
        # preparing the query of entries that are edited
        parsed_query = {}
561
562
        for quantity_name, value in query.items():
            if quantity_name in _search_quantities:
563
564
565
566
                quantity = search.search_quantities[quantity_name]
                if quantity.many:
                    if not isinstance(value, list):
                        value = value.split(',')
567
568
                parsed_query[quantity_name] = value
        parsed_query['owner'] = owner
569
        parsed_query['domain'] = query.get('domain')
570

571
        # checking the edit actions and preparing a mongo update on the fly
572
        json_data['success'] = True
573
        mongo_update = {}
574
        uploader_ids = None
575
        lift_embargo = False
576
577
        removed_datasets = None

578
        with utils.timer(common.logger, 'edit verified'):
579
            for action_quantity_name, quantity_actions in actions.items():
580
                quantity = _editable_quantities.get(action_quantity_name)
581
582
583
                if quantity is None:
                    abort(400, 'Unknown quantity %s' % action_quantity_name)

584
                quantity_flask = quantity.m_get_annotations('flask', {})
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
                if quantity_flask.get('admin_only', False):
                    if not g.user.is_admin():
                        abort(404, 'Only the admin user can set %s' % quantity.name)

                if isinstance(quantity_actions, list) == quantity.is_scalar:
                    abort(400, 'Wrong shape for quantity %s' % action_quantity_name)

                if not isinstance(quantity_actions, list):
                    quantity_actions = [quantity_actions]

                flask_verify = quantity_flask.get('verify', None)
                mongo_key = 'metadata__%s' % quantity.name
                has_error = False
                for action in quantity_actions:
                    action['success'] = True
                    action['message'] = None
                    action_value = action.get('value')
                    action_value = action_value if action_value is None else action_value.strip()

                    if action_value is None:
                        mongo_value = None
606

607
                    elif action_value == '':
608
                        mongo_value = None
609
610
611
612
613

                    elif flask_verify == datamodel.User:
                        try:
                            mongo_value = User.get(user_id=action_value).user_id
                        except KeyError:
614
615
                            action['success'] = False
                            has_error = True
616
                            action['message'] = 'User does not exist'
617
                            continue
618

619
620
621
622
623
624
625
626
627
628
                        if uploader_ids is None:
                            uploader_ids = get_uploader_ids(parsed_query)
                        if action_value in uploader_ids:
                            action['success'] = False
                            has_error = True
                            action['message'] = 'This user is already an uploader of one entry in the query'
                            continue

                    elif flask_verify == datamodel.Dataset:
                        try:
629
                            mongo_value = Dataset.m_def.a_mongo.get(
630
631
632
633
634
635
636
                                user_id=g.user.user_id, name=action_value).dataset_id
                        except KeyError:
                            action['message'] = 'Dataset does not exist and will be created'
                            mongo_value = None
                            if not verify:
                                dataset = Dataset(
                                    dataset_id=utils.create_uuid(), user_id=g.user.user_id,
Markus Scheidgen's avatar
Markus Scheidgen committed
637
                                    name=action_value, created=datetime.utcnow())
638
                                dataset.a_mongo.create()
639
640
641
642
643
644
645
646
647
                                mongo_value = dataset.dataset_id

                    elif action_quantity_name == 'with_embargo':
                        # ignore the actual value ... just lift the embargo
                        mongo_value = False
                        lift_embargo = True

                        # check if necessary
                        search_request = search.SearchRequest()
648
                        apply_search_parameters(search_request, parsed_query)
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
                        search_request.q = search_request.q & Q('term', with_embargo=True)
                        if search_request.execute()['total'] == 0:
                            action['success'] = False
                            has_error = True
                            action['message'] = 'There is no embargo to lift'
                            continue
                    else:
                        mongo_value = action_value

                    if len(quantity.shape) == 0:
                        mongo_update[mongo_key] = mongo_value
                    else:
                        mongo_values = mongo_update.setdefault(mongo_key, [])
                        if mongo_value is not None:
                            if mongo_value in mongo_values:
                                action['success'] = False
                                has_error = True
                                action['message'] = 'Duplicate values are not allowed'
                                continue
                            mongo_values.append(mongo_value)

                if len(quantity_actions) == 0 and len(quantity.shape) > 0:
                    mongo_update[mongo_key] = []

                if action_quantity_name == 'datasets':
                    # check if datasets edit is allowed and if datasets have to be removed
                    search_request = search.SearchRequest()
676
                    apply_search_parameters(search_request, parsed_query)
677
678
679
680
681
682
683
684
685
                    search_request.quantity(name='dataset_id')
                    old_datasets = list(
                        search_request.execute()['quantities']['dataset_id']['values'].keys())

                    removed_datasets = []
                    for dataset_id in old_datasets:
                        if dataset_id not in mongo_update.get(mongo_key, []):
                            removed_datasets.append(dataset_id)

686
                    doi_ds = Dataset.m_def.a_mongo.objects(
687
688
689
690
691
692
                        dataset_id__in=removed_datasets, doi__ne=None).first()
                    if doi_ds is not None:
                        json_data['success'] = False
                        json_data['message'] = json_data.get('message', '') + \
                            'Edit would remove entries from a dataset with DOI (%s) ' % doi_ds.name
                        has_error = True
693

694
695
696
697
698
699
700
701
        # stop here, if client just wants to verify its actions
        if verify:
            return json_data, 200

        # stop if the action were not ok
        if has_error:
            return json_data, 400

702
        # perform the change
Markus Scheidgen's avatar
Markus Scheidgen committed
703
        mongo_update['metadata__last_edit'] = datetime.utcnow()
704
        upload_ids = edit(parsed_query, mongo_update, True)
705
706
707
708
709
710

        # lift embargo
        if lift_embargo:
            for upload_id in upload_ids:
                upload = proc.Upload.get(upload_id)
                upload.re_pack()
711

712
        # remove potentially empty old datasets
713
        if removed_datasets is not None:
714
            for dataset in removed_datasets:
715
                if proc.Calc.objects(metadata__datasets=dataset).first() is None:
716
                    Dataset.m_def.a_mongo.objects(dataset_id=dataset).delete()
717

718
        return json_data, 200
719

720

721
722
723
724
725
726
_repo_quantity_search_request_parser = api.parser()
add_search_parameters(_repo_quantity_search_request_parser)
_repo_quantity_search_request_parser.add_argument(
    'after', type=str, help='The after value to use for "scrolling".')
_repo_quantity_search_request_parser.add_argument(
    'size', type=int, help='The max size of the returned values.')
727
728
_repo_quantity_search_request_parser.add_argument(
    'value', type=str, help='A partial value. Only values that include this will be returned')
729
730

_repo_quantity_model = api.model('RepoQuantity', {
731
732
733
734
    'after': fields.String(description='The after value that can be used to retrieve the next set of values.'),
    'values': fields.Raw(description='A dict with values as key. Values are dicts with "total" and "examples" keys.')
})

735
736
_repo_quantity_values_model = api.model('RepoQuantityValues', {
    'quantity': fields.Nested(_repo_quantity_model, allow_null=True)
737
738
})

739

740
@ns.route('/quantity/<string:quantity>')
741
742
743
class RepoQuantityResource(Resource):
    @api.doc('quantity_search')
    @api.response(400, 'Invalid requests, e.g. wrong owner type, bad quantity, bad search parameters')
744
745
    @api.expect(_repo_quantity_search_request_parser, validate=True)
    @api.marshal_with(_repo_quantity_values_model, skip_none=True, code=200, description='Search results send')
746
    @authenticate()
747
    def get(self, quantity: str):
748
        '''
749
750
751
752
753
754
755
756
757
758
759
760
        Retrieve quantity values from entries matching the search.

        You can use the various quantities to search/filter for. For some of the
        indexed quantities this endpoint returns aggregation information. This means
        you will be given a list of all possible values and the number of entries
        that have the certain value. You can also use these aggregations on an empty
        search to determine the possible values.

        There is no ordering and no pagination. Instead there is an 'after' key based
        scrolling. The result will contain an 'after' value, that can be specified
        for the next request. You can use the 'size' and 'after' parameters accordingly.

761
762
763
        The result will contain a 'quantity' key with quantity values and the "after"
        value. There will be upto 'size' many values. For the rest of the values use the
        "after" parameter in another request.
764
        '''
765

766
        search_request = search.SearchRequest()
767
768
        args = {
            key: value
769
            for key, value in _repo_quantity_search_request_parser.parse_args().items()
770
            if value is not None}
771

772
        apply_search_parameters(search_request, args)
773
774
        after = args.get('after', None)
        size = args.get('size', 100)
775
776
777
778
779
780
781

        try:
            assert size >= 0
        except AssertionError:
            abort(400, message='invalid size')

        try:
782
            search_request.quantity(quantity, size=size, after=after)
783
784
785
            results = search_request.execute()
            quantities = results.pop('quantities')
            results['quantity'] = quantities[quantity]
786
787
788
789
790
791

            return results, 200
        except KeyError as e:
            import traceback
            traceback.print_exc()
            abort(400, 'Given quantity does not exist: %s' % str(e))
792
793


794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
_repo_suggestions_search_request_parser = api.parser()
add_search_parameters(_repo_suggestions_search_request_parser)
_repo_suggestions_search_request_parser.add_argument(
    'size', type=int, help='The max size of the returned values.')
_repo_suggestions_search_request_parser.add_argument(
    'include', type=str, help='A substring that all values need to include.')

_repo_suggestions_model = api.model('RepoSuggestionsValues', {
    'suggestions': fields.List(fields.String, description='A list with the suggested values.')
})


@ns.route('/suggestions/<string:quantity>')
class RepoSuggestionsResource(Resource):
    @api.doc('suggestions_search')
    @api.response(400, 'Invalid requests, e.g. wrong owner type, bad quantity, bad search parameters')
    @api.expect(_repo_suggestions_search_request_parser, validate=True)
    @api.marshal_with(_repo_suggestions_model, skip_none=True, code=200, description='Suggestions send')
    @authenticate()
    def get(self, quantity: str):
        '''
        Retrieve the top values for the given quantity from entries matching the search.
        Values can be filtered by to include a given value.

        There is no ordering, no pagination, and no scroll interface.

        The result will contain a 'suggestions' key with values. There will be upto 'size' many values.
        '''

        search_request = search.SearchRequest()
        args = {
            key: value
            for key, value in _repo_suggestions_search_request_parser.parse_args().items()
            if value is not None}

        apply_search_parameters(search_request, args)
        size = args.get('size', 20)
        include = args.get('include', None)

        try:
            assert size >= 0
        except AssertionError:
            abort(400, message='invalid size')

        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
839
            search_request.statistic(quantity, size=size, include=include, order=dict(_key='desc'))
840
            results = search_request.execute()
Markus Scheidgen's avatar
Markus Scheidgen committed
841
842
843
844
            values = {
                value: metric['code_runs']
                for value, metric in results['statistics'][quantity].items()
                if metric['code_runs'] > 0}
Markus Scheidgen's avatar
Markus Scheidgen committed
845
            results['suggestions'] = sorted(
Markus Scheidgen's avatar
Markus Scheidgen committed
846
                values.keys(), key=lambda value: values[value], reverse=True)
847
848
849
850
851
852
853
854

            return results, 200
        except KeyError as e:
            import traceback
            traceback.print_exc()
            abort(400, 'Given quantity does not exist: %s' % str(e))


855
856
857
_repo_quantities_search_request_parser = api.parser()
add_search_parameters(_repo_quantities_search_request_parser)
_repo_quantities_search_request_parser.add_argument(
858
859
    'quantities', type=str, action='append',
    help='The quantities to retrieve values from')
860
_repo_quantities_search_request_parser.add_argument(
861
862
    'size', type=int, help='The max size of the returned values.')

863
864
865
866
867
_repo_quantities_model = api.model('RepoQuantitiesResponse', {
    'quantities': fields.Nested(api.model('RepoQuantities', {
        quantity: fields.List(fields.Nested(_repo_quantity_model))
        for quantity in search_extension.search_quantities
    }))
868
869
})

870
871
872
873
874

@ns.route('/quantities')
class RepoQuantitiesResource(Resource):
    @api.doc('quantities_search')
    @api.response(400, 'Invalid requests, e.g. wrong owner type, bad quantity, bad search parameters')
875
    @api.expect(_repo_quantities_search_request_parser, validate=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
876
    @api.marshal_with(_repo_quantities_model, skip_none=True, code=200, description='Search results send')
877
878
    @authenticate()
    def get(self):
879
        '''
880
881
882
883
884
885
886
887
888
889
890
891
892
        Retrieve quantity values for multiple quantities at once.

        You can use the various quantities to search/filter for. For some of the
        indexed quantities this endpoint returns aggregation information. This means
        you will be given a list of all possible values and the number of entries
        that have the certain value. You can also use these aggregations on an empty
        search to determine the possible values.

        There is no ordering and no pagination and not after key based scrolling. Instead
        there is an 'after' key based scrolling.

        The result will contain a 'quantities' key with a dict of quantity names and the
        retrieved values as values.
893
        '''
894
895
896
897

        search_request = search.SearchRequest()
        args = {
            key: value
898
            for key, value in _repo_quantities_search_request_parser.parse_args().items()
899
900
            if value is not None}

901
        apply_search_parameters(search_request, args)
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
        quantities = args.get('quantities', [])
        size = args.get('size', 5)

        try:
            assert size >= 0
        except AssertionError:
            abort(400, message='invalid size')

        for quantity in quantities:
            try:
                search_request.quantity(quantity, size=size)
            except KeyError as e:
                import traceback
                traceback.print_exc()
                abort(400, 'Given quantity does not exist: %s' % str(e))

        return search_request.execute(), 200


Markus Scheidgen's avatar
Markus Scheidgen committed
921
_repo_calc_id_model = api.model('RepoCalculationId', {
922
923
924
925
    'upload_id': fields.String(), 'calc_id': fields.String()
})


926
@ns.route('/pid/<path:pid>')
927
928
929
class RepoPidResource(Resource):
    @api.doc('resolve_pid')
    @api.response(404, 'Entry with PID does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
930
    @api.marshal_with(_repo_calc_id_model, skip_none=True, code=200, description='Entry resolved')
Markus Scheidgen's avatar
Markus Scheidgen committed
931
    @authenticate()
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
    def get(self, pid: str):
        if '/' in pid:
            prefix, pid = pid.split('/')
            if prefix != '21.11132':
                abort(400, 'Wrong PID format')
            try:
                pid_int = utils.decode_handle_id(pid)
            except ValueError:
                abort(400, 'Wrong PID format')
        else:
            try:
                pid_int = int(pid)
            except ValueError:
                abort(400, 'Wrong PID format')

947
        search_request = search.SearchRequest().include('upload_id', 'calc_id')
948
949
950

        if g.user is not None:
            search_request.owner('all', user_id=g.user.user_id)
951
        else:
952
953
            search_request.owner('all')

954
        search_request.search_parameter('pid', pid_int)
955
956
957
958
959

        results = list(search_request.execute_scan())
        total = len(results)

        if total == 0:
960
            abort(404, 'Entry with PID %s does not exist' % pid)
961
962

        if total > 1:
963
            common.logger.error('Two entries for the same pid', pid=pid_int)
964
965
966
967
968

        result = results[0]
        return dict(
            upload_id=result['upload_id'],
            calc_id=result['calc_id'])