entries.py 54.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
tlc@void's avatar
tlc@void committed
18
import math
19
20
21
from datetime import datetime

from typing import Optional, Set, Union, Dict, Iterator, Any, List
22
23
24
from fastapi import (
    APIRouter, Depends, Path, status, HTTPException, Request, Query as QueryParameter,
    Body)
25
from fastapi.responses import StreamingResponse
26
from fastapi.exceptions import RequestValidationError
27
from pydantic import BaseModel, Field, validator
28
29
30
31
import os.path
import io
import json
import orjson
32
33
from pydantic.main import create_model
from starlette.responses import Response
tlc@void's avatar
tlc@void committed
34
from joblib import Parallel, delayed, parallel_backend
35

36
37
38
from nomad import files, config, utils, metainfo, processing as proc
from nomad import datamodel
from nomad.datamodel import EditableUserMetadata
David Sikter's avatar
David Sikter committed
39
from nomad.files import StreamedFile, create_zipstream
40
from nomad.utils import strip
41
from nomad.archive import RequiredReader, RequiredValidationError, ArchiveQueryError
42
from nomad.search import AuthenticationRequiredError, SearchError, update_metadata as es_update_metadata
43
from nomad.search import search, QueryValidationError
44
from nomad.metainfo.elasticsearch_extension import entry_type
45

46
from .auth import create_user_dependency
47
from ..utils import (
48
    create_download_stream_zipped, create_download_stream_raw_file, browser_download_headers,
David Sikter's avatar
David Sikter committed
49
    DownloadItem, create_responses)
50
from ..models import (
51
52
    Aggregation, Pagination, PaginationResponse, MetadataPagination, TermsAggregation,
    WithQuery, WithQueryAndPagination, MetadataRequired, MetadataResponse, Metadata,
53
    MetadataEditRequest, Files, Query, User, Owner,
54
55
    QueryParameters, metadata_required_parameters, files_parameters, metadata_pagination_parameters,
    HTTPExceptionModel)
56
57
58
59
60
61
62
63
64
65


router = APIRouter()
default_tag = 'entries'
metadata_tag = 'entries/metadata'
raw_tag = 'entries/raw'
archive_tag = 'entries/archive'

logger = utils.get_logger(__name__)

66
67
query_parameters = QueryParameters(doc_type=entry_type)

68
69
70
71
archive_required_documentation = strip('''
The `required` part allows you to specify what parts of the requested archives
should be returned. The NOMAD Archive is a hierarchical data format and
you can *require* certain branches (i.e. *sections*) in the hierarchy.
72
By specifying certain sections with specific contents or all contents (via
73
74
75
76
77
78
79
80
81
82
83
84
the directive `"*"`), you can determine what sections and what quantities should
be returned. The default is the whole archive, i.e., `"*"`.

For example to specify that you are only interested in the `metadata`
use:

```json
{
    "metadata": "*"
}
```

85
Or to only get the `energy_total` from each individual entry, use:
86
87
88
89
90
```json
{
    "run": {
        "configuration": {
            "energy": "*"
91
        }
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
    }
}
```

You can also request certain parts of a list, e.g. the last calculation:
```json
{
    "run": {
        "calculation[-1]": "*"
    }
}
```

These required specifications are also very useful to get workflow results.
This works because we can use references (e.g. workflow to final result calculation)
and the API will resolve these references and return the respective data.
For example just the total energy value and reduced formula from the resulting
calculation:
```json
{
    "workflow": {
        "calculation_result_ref": {
            "energy": "*",
            "system_ref": {
                "value": {
                    "chemical_composition": "*"
118
119
120
                }
            }
        }
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
    }
}
```

You can also resolve all references in a branch with the `include-resolved`
directive. This will resolve all references in the branch, and also all references
in referenced sections:
```json
{
    "workflow":
        "calculation_result_ref": "include-resolved"
    }
}
```

By default, the targets of "resolved" references are added to the archive at
their original hierarchy positions.
This means, all references are still references, but they are resolvable within
the returned data, since they targets are now part of the data. Another option
is to add
`"resolve-inplace": true` to the root of required. Here, the reference targets will
replace the references:
```json
{
    "resolve-inplace": true,
    "workflow":
        "calculation_result_ref": "include-resolved"
    }
}
```
''')
152

153
154
155
156
157
158
159

ArchiveRequired = Union[str, Dict[str, Any]]

_archive_required_field = Body(
    '*',
    embed=True,
    description=archive_required_documentation,
160
    example={
161
162
163
        'run': {
            'calculation[-1]': {
                'energy': '*'
164
            },
165
            'system[-1]': '*'
166
        },
167
        'metadata': '*'
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
    })


class EntriesArchive(WithQueryAndPagination):
    required: Optional[ArchiveRequired] = _archive_required_field


class EntryArchiveRequest(BaseModel):
    required: Optional[ArchiveRequired] = _archive_required_field


class EntriesArchiveDownload(WithQuery):
    files: Optional[Files] = Body(None)


183
class EntriesRawDir(WithQuery):
184
185
186
    pagination: Optional[MetadataPagination] = Body(None)


187
class EntriesRaw(WithQuery):
188
189
190
191
192
193
194
    files: Optional[Files] = Body(
        None,
        example={
            'glob_pattern': 'vasp*.xml*'
        })


195
class EntryRawDirFile(BaseModel):
196
197
198
199
    path: str = Field(None)
    size: int = Field(None)


200
class EntryRawDir(BaseModel):
201
202
203
    entry_id: str = Field(None)
    upload_id: str = Field(None)
    mainfile: str = Field(None)
204
    mainfile_key: Optional[str] = Field(None)
205
    files: List[EntryRawDirFile] = Field(None)
206
207


208
class EntriesRawDirResponse(EntriesRawDir):
209
    pagination: PaginationResponse = Field(None)  # type: ignore
210
    data: List[EntryRawDir] = Field(None)
211
212


213
class EntryRawDirResponse(BaseModel):
214
    entry_id: str = Field(...)
215
    data: EntryRawDir = Field(...)
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240


class EntryArchive(BaseModel):
    entry_id: str = Field(None)
    upload_id: str = Field(None)
    parser_name: str = Field(None)
    archive: Dict[str, Any] = Field(None)


class EntriesArchiveResponse(EntriesArchive):
    pagination: PaginationResponse = Field(None)  # type: ignore
    data: List[EntryArchive] = Field(None)


class EntryArchiveResponse(EntryArchiveRequest):
    entry_id: str = Field(...)
    data: EntryArchive = Field(None)


class EntryMetadataResponse(BaseModel):
    entry_id: str = Field(None)
    required: MetadataRequired = Field(None)
    data: Any = Field(
        None, description=strip('''The entry metadata as dictionary.'''))

241

242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
class EntryMetadataEditActionField(BaseModel):
    value: str = Field(None, description='The value/values that is set as a string.')
    success: Optional[bool] = Field(None, description='If this can/could be done. Only in API response.')
    message: Optional[str] = Field(None, descriptin='A message that details the action result. Only in API response.')


EntryMetadataEditActions = create_model('EntryMetadataEditActions', **{  # type: ignore
    quantity.name: (
        Optional[EntryMetadataEditActionField]
        if quantity.is_scalar else Optional[List[EntryMetadataEditActionField]], None)
    for quantity in EditableUserMetadata.m_def.definitions
    if isinstance(quantity, metainfo.Quantity)
})


class EntryMetadataEdit(WithQuery):
    verify: Optional[bool] = Field(False, description='If true, no action is performed.')
    actions: EntryMetadataEditActions = Field(  # type: ignore
        None,
        description='Each action specifies a single value (even for multi valued quantities).')

    @validator('owner')
    def validate_query(cls, owner):  # pylint: disable=no-self-argument
        return Owner.user


class EntryMetadataEditResponse(EntryMetadataEdit):
    success: bool = Field(None, description='If the overall edit can/could be done. Only in API response.')
    message: str = Field(None, description='A message that details the overall edit result. Only in API response.')


273
274
275
276
277
278
279
280
281
282
283
_bad_owner_response = status.HTTP_401_UNAUTHORIZED, {
    'model': HTTPExceptionModel,
    'description': strip('''
        Unauthorized. The given owner requires authorization,
        but no or bad authentication credentials are given.''')}

_bad_id_response = status.HTTP_404_NOT_FOUND, {
    'model': HTTPExceptionModel,
    'description': strip('''
        Entry not found. The given id does not match any entry.''')}

284
285
286
287
_bad_path_response = status.HTTP_404_NOT_FOUND, {
    'model': HTTPExceptionModel,
    'description': strip('File or directory not found.')}

288
289
290
291
292
293
294
295
296
297
298
299
_bad_edit_request = status.HTTP_400_BAD_REQUEST, {
    'model': HTTPExceptionModel,
    'description': strip('Edit request could not be executed.')}

_bad_edit_request_authorization = status.HTTP_401_UNAUTHORIZED, {
    'model': HTTPExceptionModel,
    'description': strip('Not enough permissions to execute edit request.')}

_bad_edit_request_empty_query = status.HTTP_404_NOT_FOUND, {
    'model': HTTPExceptionModel,
    'description': strip('No matching entries found.')}

300
_raw_response = 200, {
301
302
303
304
305
306
    'content': {'application/zip': {}},
    'description': strip('''
        A zip file with the requested raw files. The file is streamed.
        The content length is not known in advance.
    ''')}

307
_raw_file_response = 200, {
308
309
310
311
312
313
314
    'content': {'application/octet-stream': {}},
    'description': strip('''
        A byte stream with raw file contents. The content length is not known in advance.
        If the whole file is requested, the mime-type might be more specific, depending
        on the file contents.
    ''')}

315
_archives_download_response = 200, {
316
317
318
319
320
321
    'content': {'application/zip': {}},
    'description': strip('''
        A zip file with the requested archive files. The file is streamed.
        The content length is not known in advance.
    ''')}

322
323
324
325
326
327
_archive_download_response = 200, {
    'content': {'application/json': {}},
    'description': strip('''
        A json body with the requested archive.
    ''')}

328
329
330
331
332
333
334

_bad_archive_required_response = status.HTTP_400_BAD_REQUEST, {
    'model': HTTPExceptionModel,
    'description': strip('''
        The given required specification could not be understood.''')}


335
336
337
338
339
340
_bad_metadata_edit_response = status.HTTP_400_BAD_REQUEST, {
    'model': HTTPExceptionModel,
    'description': strip('''
        The given edit actions cannot be performed by you on the given query.''')}


341
342
def perform_search(*args, **kwargs):
    try:
343
344
345
346
347
        search_response = search(*args, **kwargs)
        search_response.es_query = None
        return search_response
    except QueryValidationError as e:
        raise RequestValidationError(errors=e.errors)
348
    except AuthenticationRequiredError as e:
349
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=str(e))
350
    except SearchError as e:
351
352
353
354
355
356
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail='Elasticsearch could not process your query: %s' % str(e))


@router.post(
357
    '/query', tags=[metadata_tag],
358
    summary='Search entries and retrieve their metadata',
359
    response_model=MetadataResponse,
360
361
362
363
    responses=create_responses(_bad_owner_response),
    response_model_exclude_unset=True,
    response_model_exclude_none=True)
async def post_entries_metadata_query(
364
        request: Request,
365
        data: Metadata,
366
        user: User = Depends(create_user_dependency())):
367
368
369
370
371
372
373
374
375

    '''
    Executes a *query* and returns a *page* of the results with *required* result data
    as well as *statistics* and *aggregated* data.

    This is the basic search operation to retrieve metadata for entries that match
    certain search criteria (`query` and `owner`). All parameters (including `query`, `owner`)
    are optional. Look at the body schema or parameter documentation for more details.

Theodore Chang's avatar
Theodore Chang committed
376
    By default the *empty* search (that returns everything) is performed. Only a small
377
    page of the search results are returned at a time; use `pagination` in subsequent
Theodore Chang's avatar
Theodore Chang committed
378
    requests to retrive more data. Each entry has a lot of different *metadata*, use
379
380
    `required` to limit the data that is returned.

Theodore Chang's avatar
Theodore Chang committed
381
    The `statistics` and `aggregations` keys will further allow to return statistics
382
383
384
    and aggregated data over all search results.
    '''

385
    return perform_search(
386
387
388
389
390
391
392
393
394
395
396
        owner=data.owner,
        query=data.query,
        pagination=data.pagination,
        required=data.required,
        aggregations=data.aggregations,
        user_id=user.user_id if user is not None else None)


@router.get(
    '', tags=[metadata_tag],
    summary='Search entries and retrieve their metadata',
397
    response_model=MetadataResponse,
398
399
400
401
    responses=create_responses(_bad_owner_response),
    response_model_exclude_unset=True,
    response_model_exclude_none=True)
async def get_entries_metadata(
402
        request: Request,
403
        with_query: WithQuery = Depends(query_parameters),
404
        pagination: MetadataPagination = Depends(metadata_pagination_parameters),
405
        required: MetadataRequired = Depends(metadata_required_parameters),
406
        user: User = Depends(create_user_dependency())):
407
408
409
410
411
412
413
414
415
416
417
418
    '''
    Executes a *query* and returns a *page* of the results with *required* result data.
    This is a version of `/entries/query`. Queries work a little different, because
    we cannot put complex queries into URL parameters.

    In addition to the `q` parameter (see parameter documentation for details), you can use all NOMAD
    search quantities as parameters, e.g. `?atoms=H&atoms=O`. Those quantities can be
    used with additional operators attached to their names, e.g. `?n_atoms__gte=3` for
    all entries with more than 3 atoms. Operators are `all`, `any`, `none`, `gte`,
    `gt`, `lt`, `lte`.
    '''

419
    res = perform_search(
420
421
422
        owner=with_query.owner, query=with_query.query,
        pagination=pagination, required=required,
        user_id=user.user_id if user is not None else None)
423
424
    res.pagination.populate_urls(request)
    return res
425
426


Theodore Chang's avatar
Theodore Chang committed
427
def _do_exaustive_search(owner: Owner, query: Query, include: List[str], user: User) -> Iterator[Dict[str, Any]]:
428
    page_after_value = None
429
430
431
    while True:
        response = perform_search(
            owner=owner, query=query,
432
            pagination=MetadataPagination(page_size=100, page_after_value=page_after_value, order_by='upload_id'),
433
434
435
            required=MetadataRequired(include=include),
            user_id=user.user_id if user is not None else None)

436
        page_after_value = response.pagination.next_page_after_value
437
438
439
440

        for result in response.data:
            yield result

441
        if page_after_value is None or len(response.data) == 0:
442
443
444
            break


445
class _Uploads:
446
447
448
    '''
    A helper class that caches subsequent access to upload files the same upload.
    '''
449

450
451
452
    def __init__(self):
        self._upload_files = None

453
454
455
456
457
458
    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

459
460
461
462
463
    def get_upload_files(self, upload_id: str) -> files.UploadFiles:
        if self._upload_files is not None and self._upload_files.upload_id != upload_id:
            self._upload_files.close()

        if self._upload_files is None or self._upload_files.upload_id != upload_id:
David Sikter's avatar
David Sikter committed
464
            self._upload_files = files.UploadFiles.get(upload_id)
465
466
467
468
469
470
471
472

        return self._upload_files

    def close(self):
        if self._upload_files is not None:
            self._upload_files.close()


473
def _create_entry_rawdir(entry_metadata: Dict[str, Any], uploads: _Uploads):
474
    entry_id = entry_metadata['entry_id']
475
476
    upload_id = entry_metadata['upload_id']
    mainfile = entry_metadata['mainfile']
477
    mainfile_key = entry_metadata.get('mainfile_key')
478
479
480
481
482

    upload_files = uploads.get_upload_files(upload_id)
    mainfile_dir = os.path.dirname(mainfile)

    files = []
David Sikter's avatar
David Sikter committed
483
    for path_info in upload_files.raw_directory_list(mainfile_dir, files_only=True):
484
        files.append(EntryRawDirFile(path=path_info.path, size=path_info.size))
485

486
487
    return EntryRawDir(
        entry_id=entry_id, upload_id=upload_id, mainfile=mainfile, mainfile_key=mainfile_key, files=files)
488
489


490
def _answer_entries_rawdir_request(
491
        owner: Owner, query: Query, pagination: MetadataPagination, user: User):
492
493
494
495
496
497
498
499
500
501

    if owner == Owner.all_:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=strip('''
            The owner=all is not allowed for this operation as it will search for entries
            that you might now be allowed to access.
            '''))

    search_response = perform_search(
        owner=owner, query=query,
        pagination=pagination,
502
        required=MetadataRequired(include=['entry_id', 'upload_id', 'mainfile']),
503
504
        user_id=user.user_id if user is not None else None)

505
506
507
    with _Uploads() as uploads:
        response_data = [_create_entry_rawdir(
            entry_metadata, uploads) for entry_metadata in search_response.data]
508

509
    return EntriesRawDirResponse(
510
511
512
513
514
515
        owner=search_response.owner,
        query=search_response.query,
        pagination=search_response.pagination,
        data=response_data)


516
def _answer_entries_raw_request(owner: Owner, query: Query, files: Files, user: User):
517
518
519
520
521
522
523
524
    if owner == Owner.all_:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=strip('''
            The owner=all is not allowed for this operation as it will search for entries
            that you might now be allowed to access.
            '''))

    response = perform_search(
        owner=owner, query=query,
525
        pagination=MetadataPagination(page_size=0),
526
527
528
529
530
531
        required=MetadataRequired(include=[]),
        user_id=user.user_id if user is not None else None)

    if response.pagination.total > config.max_entry_download:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
Theodore Chang's avatar
Theodore Chang committed
532
533
            detail='The limit of maximum number of entries in a single download (%d) has been exeeded (%d).' % (
                config.max_entry_download, response.pagination.total))
534
535

    files_params = Files() if files is None else files
536
    search_includes = ['entry_id', 'upload_id', 'mainfile']
537
538
539

    try:
        # a generator of File objects to create the streamed zip from
540
        def download_items_generator():
541
            # go through all entries that match the query
Theodore Chang's avatar
Theodore Chang committed
542
            for entry_metadata in _do_exaustive_search(owner, query, include=search_includes, user=user):
543
544
                upload_id = entry_metadata['upload_id']
                mainfile = entry_metadata['mainfile']
545
                entry_metadata['mainfile'] = os.path.join(upload_id, mainfile)
546
547

                mainfile_dir = os.path.dirname(mainfile)
548
549
550
551
552
                yield DownloadItem(
                    upload_id=upload_id,
                    raw_path=mainfile_dir,
                    zip_path=os.path.join(upload_id, mainfile_dir),
                    entry_metadata=entry_metadata)
553
554

        # create the streaming response with zip file contents
555
556
557
558
559
560
        content = create_download_stream_zipped(
            download_items=download_items_generator(),
            re_pattern=files_params.re_pattern,
            recursive=False,
            create_manifest_file=True,
            compress=files_params.compress)
561
562
563
        return StreamingResponse(content, headers=browser_download_headers(
            filename='raw_files.zip',
            media_type='application/zip'))
564
565
    except Exception as e:
        logger.error('exception while streaming download', exc_info=e)
566
        raise
567
568


569
_entries_rawdir_query_docstring = strip('''
570
571
572
573
574
    Will perform a search and return a *page* of raw file metadata for entries fulfilling
    the query. This allows you to get a complete list of all rawfiles with their full
    path in their respective upload and their sizes. The first returned files for each
    entry, is their respective *mainfile*.

575
576
577
    Each entry on NOMAD has a set of raw files. These are the files in their original form,
    i.e. as provided by the uploader. More specifically, an entry has a *mainfile*, identified as
    parseable. For CMS entries, the mainfile is usually the main output file of the code. All other
578
579
580
581
582
583
584
585
    files in the same directory are considered the entries *auxiliary* no matter their role
    or if they were actually parsed by NOMAD.

    This operation supports the usual `owner`, `query`, and `pagination` parameters.
    ''')


@router.post(
586
    '/rawdir/query',
587
588
    tags=[raw_tag],
    summary='Search entries and get their raw files metadata',
589
590
    description=_entries_rawdir_query_docstring,
    response_model=EntriesRawDirResponse,
591
592
593
    responses=create_responses(_bad_owner_response),
    response_model_exclude_unset=True,
    response_model_exclude_none=True)
594
595
async def post_entries_rawdir_query(
        request: Request, data: EntriesRawDir, user: User = Depends(create_user_dependency())):
596

597
    return _answer_entries_rawdir_request(
598
599
600
601
        owner=data.owner, query=data.query, pagination=data.pagination, user=user)


@router.get(
602
    '/rawdir',
603
    tags=[raw_tag],
604
605
606
    summary='Search entries and get their raw files metadata',
    description=_entries_rawdir_query_docstring,
    response_model=EntriesRawDirResponse,
607
608
609
    response_model_exclude_unset=True,
    response_model_exclude_none=True,
    responses=create_responses(_bad_owner_response))
610
async def get_entries_rawdir(
611
        request: Request,
612
        with_query: WithQuery = Depends(query_parameters),
613
        pagination: MetadataPagination = Depends(metadata_pagination_parameters),
614
        user: User = Depends(create_user_dependency())):
615

616
    res = _answer_entries_rawdir_request(
617
        owner=with_query.owner, query=with_query.query, pagination=pagination, user=user)
618
619
    res.pagination.populate_urls(request)
    return res
620
621


622
623
624
_entries_raw_query_docstring = strip('''
    This operation will perform a search and stream a .zip file with the raw files of the
    found entries.
625

626
627
628
    Each entry on NOMAD has a set of raw files. These are the files in their original form,
    i.e. as provided by the uploader. More specifically, an entry has a *mainfile*, identified as
    parseable. For CMS entries, the mainfile is usually the main output file of the code. All other
629
630
631
632
633
634
635
636
    files in the same directory are considered the entries *auxiliary* no matter their role
    or if they were actually parsed by NOMAD.

    After performing a search (that uses the same parameters as in all search operations),
    NOMAD will iterate through all results and create a .zip-file with all the entries'
    main and auxiliary files. The files will be organized in the same directory structure
    that they were uploaded in. The respective upload root directories are further prefixed
    with the `upload_id` of the respective uploads. The .zip-file will further contain
637
    a `manifest.json` with `upload_id`, `entry_id`, and `mainfile` of each entry.
638
639
640
641
    ''')


@router.post(
642
    '/raw/query',
643
644
    tags=[raw_tag],
    summary='Search entries and download their raw files',
645
    description=_entries_raw_query_docstring,
646
    response_class=StreamingResponse,
647
648
649
    responses=create_responses(_raw_response, _bad_owner_response))
async def post_entries_raw_query(
        data: EntriesRaw, user: User = Depends(create_user_dependency())):
650

651
    return _answer_entries_raw_request(
652
653
654
655
        owner=data.owner, query=data.query, files=data.files, user=user)


@router.get(
656
    '/raw',
657
658
    tags=[raw_tag],
    summary='Search entries and download their raw files',
659
    description=_entries_raw_query_docstring,
660
    response_class=StreamingResponse,
661
662
    responses=create_responses(_raw_response, _bad_owner_response))
async def get_entries_raw(
663
664
        with_query: WithQuery = Depends(query_parameters),
        files: Files = Depends(files_parameters),
665
        user: User = Depends(create_user_dependency(signature_token_auth_allowed=True))):
666

667
    return _answer_entries_raw_request(
668
669
670
        owner=with_query.owner, query=with_query.query, files=files, user=user)


671
def _read_archive(entry_metadata, uploads, required_reader: RequiredReader):
672
    entry_id = entry_metadata['entry_id']
673
674
675
676
    upload_id = entry_metadata['upload_id']
    upload_files = uploads.get_upload_files(upload_id)

    try:
677
        with upload_files.read_archive(entry_id) as archive:
678
            return {
679
                'entry_id': entry_id,
680
                'parser_name': entry_metadata['parser_name'],
681
                'archive': required_reader.read(archive, entry_id, upload_id)}
682
    except ArchiveQueryError as e:
683
        raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=str(e))
684
685


686
def _validate_required(required: ArchiveRequired, user) -> RequiredReader:
687
    try:
688
        return RequiredReader(required, user=user)
689
    except RequiredValidationError as e:
690
691
692
693
694
        raise HTTPException(
            status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail=[dict(msg=e.msg, loc=['required'] + e.loc)])


tlc@void's avatar
tlc@void committed
695
def _read_entry_from_archive(entry: dict, uploads, required_reader: RequiredReader):
696
697
698
699
700
701
702
    entry_id, upload_id = entry['entry_id'], entry['upload_id']

    # all other exceptions are handled by the caller `_read_entries_from_archive`
    try:
        upload_files = uploads.get_upload_files(upload_id)

        with upload_files.read_archive(entry_id, True) as archive:
tlc@void's avatar
tlc@void committed
703
704
            entry['archive'] = required_reader.read(archive, entry_id, upload_id)
            return entry
705
706
707
708
709
710
711
712
    except ArchiveQueryError as e:
        raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=str(e))
    except KeyError as e:
        logger.error('missing archive', exc_info=e, entry_id=entry_id)

        return None


tlc@void's avatar
tlc@void committed
713
def _read_entries_from_archive(entries: Union[list, dict], required: ArchiveRequired, user):
714
715
716
717
718
719
    '''
    Takes pickleable arguments so that it can be offloaded to worker processes.

    It is important to ensure the return values are also pickleable.
    '''
    with _Uploads() as uploads:
720
        required_reader = _validate_required(required, user)
721

tlc@void's avatar
tlc@void committed
722
723
        if isinstance(entries, dict):
            return _read_entry_from_archive(entries, uploads, required_reader)
724

tlc@void's avatar
tlc@void committed
725
        return [_read_entry_from_archive(entry, uploads, required_reader) for entry in entries]
726
727


Theodore Chang's avatar
Theodore Chang committed
728
def _answer_entries_archive_request(
729
        owner: Owner, query: Query, pagination: MetadataPagination, required: ArchiveRequired,
730
731
        user: User):
    if owner == Owner.all_:
732
733
734
735
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED, detail=strip(
                '''The owner=all is not allowed for this operation as it will search for entries
                that you might now be allowed to access.'''))
736
737
738
739
740

    if required is None:
        required = '*'

    search_response = perform_search(
Theodore Chang's avatar
Theodore Chang committed
741
742
        owner=owner, query=query,
        pagination=pagination,
743
        required=MetadataRequired(include=['entry_id', 'upload_id', 'parser_name']),
744
745
        user_id=user.user_id if user is not None else None)

tlc@void's avatar
tlc@void committed
746
747
748
749
750
    entries: list = [{
        'entry_id': entry['entry_id'], 'upload_id': entry['upload_id'],
        'parser_name': entry['parser_name']} for entry in search_response.data]

    # fewer than config.archive.min_entries_per_process entries per process is not useful
751
752
    # more than config.max_process_number processes is too much for the server
    number: int = min(
tlc@void's avatar
tlc@void committed
753
        int(math.ceil(len(entries) / config.archive.min_entries_per_process)),
754
        config.archive.max_process_number)
Theodore Chang's avatar
Theodore Chang committed
755

756
    if number <= 1:
tlc@void's avatar
tlc@void committed
757
        request_data: list = _read_entries_from_archive(entries, required, user)
758
    else:
tlc@void's avatar
tlc@void committed
759
760
761
        with parallel_backend('threading', n_jobs=number):
            request_data = Parallel()(delayed(
                _read_entries_from_archive)(i, required, user) for i in entries)
762
763
764
765
766

    return EntriesArchiveResponse(
        owner=search_response.owner,
        query=search_response.query,
        pagination=search_response.pagination,
767
        required=required,
tlc@void's avatar
tlc@void committed
768
        data=list(filter(None, request_data)))
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788


_entries_archive_docstring = strip('''
    This operation will perform a search with the given `query` and `owner` and return
    the a *page* of `required` archive data. Look at the body schema or parameter documentation
    for more details. The **GET** version of this operation will only allow to provide
    the full archives.
    ''')


@router.post(
    '/archive/query',
    tags=[archive_tag],
    summary='Search entries and access their archives',
    description=_entries_archive_docstring,
    response_model=EntriesArchiveResponse,
    response_model_exclude_unset=True,
    response_model_exclude_none=True,
    responses=create_responses(_bad_owner_response, _bad_archive_required_response))
async def post_entries_archive_query(
789
        request: Request, data: EntriesArchive, user: User = Depends(create_user_dependency())):
790

Theodore Chang's avatar
Theodore Chang committed
791
    return _answer_entries_archive_request(
792
793
794
795
796
797
798
799
800
801
802
803
804
805
        owner=data.owner, query=data.query, pagination=data.pagination,
        required=data.required, user=user)


@router.get(
    '/archive',
    tags=[archive_tag],
    summary='Search entries and access their archives',
    description=_entries_archive_docstring,
    response_model=EntriesArchiveResponse,
    response_model_exclude_unset=True,
    response_model_exclude_none=True,
    responses=create_responses(_bad_owner_response, _bad_archive_required_response))
async def get_entries_archive_query(
806
        request: Request,
807
        with_query: WithQuery = Depends(query_parameters),
808
        pagination: MetadataPagination = Depends(metadata_pagination_parameters),
809
        user: User = Depends(create_user_dependency())):
810

Theodore Chang's avatar
Theodore Chang committed
811
    res = _answer_entries_archive_request(
812
813
        owner=with_query.owner, query=with_query.query, pagination=pagination,
        required=None, user=user)
814
815
    res.pagination.populate_urls(request)
    return res
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830


def _answer_entries_archive_download_request(
        owner: Owner, query: Query, files: Files, user: User):

    if owner == Owner.all_:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=strip('''
            The owner=all is not allowed for this operation as it will search for entries
            that you might now be allowed to access.
            '''))

    files_params = Files() if files is None else files

    response = perform_search(
        owner=owner, query=query,
831
        pagination=MetadataPagination(page_size=0),
832
833
834
835
836
837
838
        required=MetadataRequired(include=[]),
        user_id=user.user_id if user is not None else None)

    if response.pagination.total > config.max_entry_download:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=(
Theodore Chang's avatar
Theodore Chang committed
839
840
                'The limit of maximum number of entries in a single download (%d) has been '
                'exeeded (%d).' % (config.max_entry_download, response.pagination.total)))
841
842

    manifest = []
843
    search_includes = ['entry_id', 'upload_id', 'parser_name']
844

845
    required_reader = RequiredReader('*', user=user)
846

David Sikter's avatar
David Sikter committed
847
848
    # a generator of StreamedFile objects to create the zipstream from
    def streamed_files():
849
        # go through all entries that match the query
Theodore Chang's avatar
Theodore Chang committed
850
        for entry_metadata in _do_exaustive_search(owner, query, include=search_includes, user=user):
851
            path = os.path.join(entry_metadata['upload_id'], '%s.json' % entry_metadata['entry_id'])
852
            try:
853
                archive_data = _read_archive(entry_metadata, uploads, required_reader)
854
855
856
857

                f = io.BytesIO(orjson.dumps(
                    archive_data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS))

David Sikter's avatar
David Sikter committed
858
                yield StreamedFile(path=path, f=f, size=f.getbuffer().nbytes)
859
            except KeyError as e:
860
                logger.error('missing archive', entry_id=entry_metadata['entry_id'], exc_info=e)
861
862
863
864
865

            entry_metadata['path'] = path
            manifest.append(entry_metadata)

        # add the manifest at the end
866
        manifest_content = json.dumps(manifest, indent=2).encode()
David Sikter's avatar
David Sikter committed
867
        yield StreamedFile(path='manifest.json', f=io.BytesIO(manifest_content), size=len(manifest_content))
868

869
    with _Uploads() as uploads:
870
        # create the streaming response with zip file contents
David Sikter's avatar
David Sikter committed
871
        content = create_zipstream(streamed_files(), compress=files_params.compress)
872
873
874
        return StreamingResponse(content, headers=browser_download_headers(
            filename='archives.zip',
            media_type='application/zip'))
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890


_entries_archive_download_docstring = strip('''
    This operation will perform a search with the given `query` and `owner` and stream
    a .zip-file with the full archive contents for all matching entries. This is not
    paginated. Look at the body schema or parameter documentation for more details.
    ''')


@router.post(
    '/archive/download/query',
    tags=[archive_tag],
    summary='Search entries and download their archives',
    description=_entries_archive_download_docstring,
    response_class=StreamingResponse,
    responses=create_responses(
891
        _archives_download_response, _bad_owner_response, _bad_archive_required_response))
892
async def post_entries_archive_download_query(
893
        data: EntriesArchiveDownload, user: User = Depends(create_user_dependency())):
894
895
896
897
898
899
900
901
902
903
904
905

    return _answer_entries_archive_download_request(
        owner=data.owner, query=data.query, files=data.files, user=user)


@router.get(
    '/archive/download',
    tags=[archive_tag],
    summary='Search entries and download their archives',
    description=_entries_archive_download_docstring,
    response_class=StreamingResponse,
    responses=create_responses(
906
        _archives_download_response, _bad_owner_response, _bad_archive_required_response))
907
908
909
async def get_entries_archive_download(
        with_query: WithQuery = Depends(query_parameters),
        files: Files = Depends(files_parameters),
910
        user: User = Depends(create_user_dependency(signature_token_auth_allowed=True))):
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925

    return _answer_entries_archive_download_request(
        owner=with_query.owner, query=with_query.query, files=files, user=user)


@router.get(
    '/{entry_id}', tags=[metadata_tag],
    summary='Get the metadata of an entry by its id',
    response_model=EntryMetadataResponse,
    responses=create_responses(_bad_id_response),
    response_model_exclude_unset=True,
    response_model_exclude_none=True)
async def get_entry_metadata(
        entry_id: str = Path(..., description='The unique entry id of the entry to retrieve metadata from.'),
        required: MetadataRequired = Depends(metadata_required_parameters),
926
        user: User = Depends(create_user_dependency())):
927
928
929
930
    '''
    Retrives the entry metadata for the given id.
    '''

931
    query = {'entry_id': entry_id}
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
    response = perform_search(owner=Owner.all_, query=query, required=required, user_id=user.user_id if user is not None else None)

    if response.pagination.total == 0:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail='The entry with the given id does not exist or is not visible to you.')

    return {
        'entry_id': entry_id,
        'required': required,
        'data': response.data[0]
    }


@router.get(
947
    '/{entry_id}/rawdir',
948
949
    tags=[raw_tag],
    summary='Get the raw files metadata for an entry by its id',
950
    response_model=EntryRawDirResponse,
951
952
953
    responses=create_responses(_bad_id_response),
    response_model_exclude_unset=True,
    response_model_exclude_none=True)
954
async def get_entry_rawdir(
955
        entry_id: str = Path(..., description='The unique entry id of the entry to retrieve raw data from.'),
956
        user: User = Depends(create_user_dependency())):
957
958
959
960
    '''
    Returns the file metadata for all input and output files (including auxiliary files)
    of the given `entry_id`. The first file will be the *mainfile*.
    '''
961
    query = dict(entry_id=entry_id)
962
963
    response = perform_search(
        owner=Owner.visible, query=query,
964
        required=MetadataRequired(include=['entry_id', 'upload_id', 'mainfile']),
965
966
967
968
969
970
971
        user_id=user.user_id if user is not None else None)

    if response.pagination.total == 0:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail='The entry with the given id does not exist or is not visible to you.')

972
973
974
    with _Uploads() as uploads:
        return EntryRawDirResponse(
            entry_id=entry_id, data=_create_entry_rawdir(response.data[0], uploads))
975
976
977


@router.get(
978
    '/{entry_id}/raw',
979
980
981
    tags=[raw_tag],
    summary='Get the raw data of an entry by its id',
    response_class=StreamingResponse,
982
983
    responses=create_responses(_bad_id_response, _raw_response))
async def get_entry_raw(
984
985
        entry_id: str = Path(..., description='The unique entry id of the entry to retrieve raw data from.'),
        files: Files = Depends(files_parameters),
986
        user: User = Depends(create_user_dependency(signature_token_auth_allowed=True))):
987
988
989
    '''
    Streams a .zip file with the raw files from the requested entry.
    '''
990
    query = dict(entry_id=entry_id)
991
992
    response = perform_search(
        owner=Owner.visible, query=query,
993
        required=MetadataRequired(include=['entry_id']),
994
995
996
997
998
999
1000
        user_id=user.user_id if user is not None else None)

    if response.pagination.total == 0:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail='The entry with the given id does not exist or is not visible to you.')

For faster browsing, not all history is shown. View entire blame