Commit 431daf50 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added further error handling to re-process. Extended the mirror functionality.

parent 14140a65
......@@ -40,7 +40,7 @@ class MirrorUploadResource(Resource):
@api.response(400, 'Not available for the given upload, e.g. upload not published.')
@api.response(404, 'The upload does not exist')
@api.marshal_with(mirror_upload_model, skip_none=True, code=200, description='Upload exported')
@api.doc('get_mirror_upload')
@api.doc('get_upload_mirror')
@admin_login_required
def get(self, upload_id):
"""
......
......@@ -85,16 +85,25 @@ repo_calcs_model = api.model('RepoCalculations', {
', '.join(search.metrics_names)))
})
def add_common_parameters(request_parser):
request_parser.add_argument(
'owner', type=str,
help='Specify which calcs to return: ``all``, ``public``, ``user``, ``staging``, default is ``all``')
request_parser.add_argument(
'from_time', type=lambda x: rfc3339DateTime.parse(x),
help='A yyyy-MM-ddTHH:mm:ss (RFC3339) minimum entry time (e.g. upload time)')
request_parser.add_argument(
'until_time', type=lambda x: rfc3339DateTime.parse(x),
help='A yyyy-MM-ddTHH:mm:ss (RFC3339) maximum entry time (e.g. upload time)')
for search_quantity in search.search_quantities.keys():
_, _, description = search.search_quantities[search_quantity]
request_parser.add_argument(search_quantity, type=str, help=description)
repo_request_parser = pagination_request_parser.copy()
repo_request_parser.add_argument(
'owner', type=str,
help='Specify which calcs to return: ``all``, ``public``, ``user``, ``staging``, default is ``all``')
repo_request_parser.add_argument(
'from_time', type=lambda x: rfc3339DateTime.parse(x),
help='A yyyy-MM-ddTHH:mm:ss (RFC3339) minimum entry time (e.g. upload time)')
repo_request_parser.add_argument(
'until_time', type=lambda x: rfc3339DateTime.parse(x),
help='A yyyy-MM-ddTHH:mm:ss (RFC3339) maximum entry time (e.g. upload time)')
add_common_parameters(repo_request_parser)
repo_request_parser.add_argument(
'scroll', type=bool, help='Enable scrolling')
repo_request_parser.add_argument(
......@@ -104,21 +113,70 @@ repo_request_parser.add_argument(
'Metrics to aggregate over all quantities and their values as comma separated list. '
'Possible values are %s.' % ', '.join(search.metrics_names)))
for search_quantity in search.search_quantities.keys():
_, _, description = search.search_quantities[search_quantity]
repo_request_parser.add_argument(search_quantity, type=str, help=description)
def create_owner_query():
owner = request.args.get('owner', 'all')
# TODO this should be removed after migration
# if owner == 'migrated':
# q = Q('term', published=True) & Q('term', with_embargo=False)
# if g.user is not None:
# q = q | Q('term', owners__user_id=g.user.user_id)
# q = q & ~Q('term', **{'uploader.user_id': 1}) # pylint: disable=invalid-unary-operand-type
if owner == 'all':
q = Q('term', published=True) & Q('term', with_embargo=False)
if g.user is not None:
q = q | Q('term', owners__user_id=g.user.user_id)
elif owner == 'public':
q = Q('term', published=True) & Q('term', with_embargo=False)
elif owner == 'user':
if g.user is None:
abort(401, message='Authentication required for owner value user.')
q = Q('term', owners__user_id=g.user.user_id)
elif owner == 'staging':
if g.user is None:
abort(401, message='Authentication required for owner value user.')
q = Q('term', published=False) & Q('term', owners__user_id=g.user.user_id)
elif owner == 'admin':
if g.user is None or not g.user.is_admin:
abort(401, message='This can only be used by the admin user.')
q = None
else:
abort(400, message='Invalid owner value. Valid values are all|user|staging, default is all')
return q
def create_search_parameters():
""" Helper that creates a request.args dict with isolated search parameters """
search_parameters = dict(**request.args)
search_parameters.pop('owner', None)
search_parameters.pop('scroll', None)
search_parameters.pop('scroll_id', None)
search_parameters.pop('per_page', None)
search_parameters.pop('page', None)
search_parameters.pop('order', None)
search_parameters.pop('order_by', None)
search_parameters.pop('metrics', None)
search_parameters.pop('from_time', None)
search_parameters.pop('until_time', None)
search_parameters.pop('size', None)
search_parameters.pop('after', None)
return search_parameters
@ns.route('/')
class RepoCalcsResource(Resource):
@api.doc('search')
@api.response(400, 'Invalid requests, e.g. wrong owner type or bad quantities')
@api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
@api.expect(repo_request_parser, validate=True)
@api.marshal_with(repo_calcs_model, skip_none=True, code=200, description='Metadata send')
@api.marshal_with(repo_calcs_model, skip_none=True, code=200, description='Search results send')
@login_if_available
def get(self):
"""
Search for calculations in the repository from, paginated.
Search for calculations in the repository form, paginated.
The ``owner`` parameter determines the overall entries to search through.
Possible values are: ``all`` (show all entries visible to the current user), ``public``
......@@ -167,7 +225,6 @@ class RepoCalcsResource(Resource):
except Exception:
abort(400, message='bad parameter types')
owner = request.args.get('owner', 'all')
order_by = request.args.get('order_by', 'formula')
try:
......@@ -179,49 +236,13 @@ class RepoCalcsResource(Resource):
if order not in [-1, 1]:
abort(400, message='invalid pagination')
# TODO this should be removed after migration
# if owner == 'migrated':
# q = Q('term', published=True) & Q('term', with_embargo=False)
# if g.user is not None:
# q = q | Q('term', owners__user_id=g.user.user_id)
# q = q & ~Q('term', **{'uploader.user_id': 1}) # pylint: disable=invalid-unary-operand-type
if owner == 'all':
q = Q('term', published=True) & Q('term', with_embargo=False)
if g.user is not None:
q = q | Q('term', owners__user_id=g.user.user_id)
elif owner == 'public':
q = Q('term', published=True) & Q('term', with_embargo=False)
elif owner == 'user':
if g.user is None:
abort(401, message='Authentication required for owner value user.')
q = Q('term', owners__user_id=g.user.user_id)
elif owner == 'staging':
if g.user is None:
abort(401, message='Authentication required for owner value user.')
q = Q('term', published=False) & Q('term', owners__user_id=g.user.user_id)
elif owner == 'admin':
if g.user is None or not g.user.is_admin:
abort(401, message='This can only be used by the admin user.')
q = None
else:
abort(400, message='Invalid owner value. Valid values are all|user|staging, default is all')
q = create_owner_query()
# TODO this should be removed after migration
without_currupted_mainfile = ~Q('term', code_name='currupted mainfile') # pylint: disable=invalid-unary-operand-type
q = q & without_currupted_mainfile if q is not None else without_currupted_mainfile
search_parameters = dict(**request.args)
search_parameters.pop('owner', None)
search_parameters.pop('scroll', None)
search_parameters.pop('scroll_id', None)
search_parameters.pop('per_page', None)
search_parameters.pop('page', None)
search_parameters.pop('order', None)
search_parameters.pop('order_by', None)
search_parameters.pop('metrics', None)
search_parameters.pop('from_time', None)
search_parameters.pop('until_time', None)
search_parameters = create_search_parameters()
try:
if scroll:
......@@ -243,3 +264,76 @@ class RepoCalcsResource(Resource):
abort(400, 'The given scroll_id does not exist.')
except KeyError as e:
abort(400, str(e))
repo_quantity_values_model = api.model('RepoQuantityValues', {
'quantities': fields.Raw(description='''
A dict with the requested quantity as single key.
The value is a dictionary with 'after' and 'values' keys.
The 'values' key holds a dict with actual values as keys and their entry count
as values (i.e. number of entries with that value). ''')
})
repo_quantity_search_request_parser = api.parser()
add_common_parameters(repo_quantity_search_request_parser)
repo_quantity_search_request_parser.add_argument(
'after', type=str, help='The after value to use for "scrolling".')
repo_request_parser.add_argument(
'size', type=int, help='The max size of the returned values.')
@ns.route('/<string:quantity>')
class RepoQuantityResource(Resource):
@api.doc('quantity_search')
@api.response(400, 'Invalid requests, e.g. wrong owner type, bad quantity, bad search parameters')
@api.expect(repo_quantity_search_request_parser, validate=True)
@api.marshal_with(repo_quantity_values_model, skip_none=True, code=200, description='Search results send')
@login_if_available
def get(self, quantity: str):
"""
Retrieve quantity values from entries matching the search.
You can use the various quantities to search/filter for. For some of the
indexed quantities this endpoint returns aggregation information. This means
you will be given a list of all possible values and the number of entries
that have the certain value. You can also use these aggregations on an empty
search to determine the possible values.
There is no ordering and no pagination. Instead there is an 'after' key based
scrolling. The result will contain an 'after' value, that can be specified
for the next request. You can use the 'size' and 'after' parameters accordingly.
The result will contain a 'quantities' key with the given quantity and the
respective values (upto 'size' many). For the rest of the values use the
'after' parameter accordingly.
"""
try:
after = request.args.get('after', None)
size = int(request.args.get('size', 100))
from_time = rfc3339DateTime.parse(request.args.get('from_time', '2000-01-01'))
until_time_str = request.args.get('until_time', None)
until_time = rfc3339DateTime.parse(until_time_str) if until_time_str is not None else datetime.datetime.now()
time_range = (from_time, until_time)
except Exception:
abort(400, message='bad parameter types')
try:
assert size >= 0
except AssertionError:
abort(400, message='invalid size')
q = create_owner_query()
search_parameters = create_search_parameters()
try:
results = search.quantity_search(
q=q, time_range=time_range, search_parameters=search_parameters,
quantities={quantity: after}, size=size, with_entries=False)
return results, 200
except KeyError as e:
import traceback
traceback.print_exc()
abort(400, 'Given quantity does not exist: %s' % str(e))
......@@ -183,6 +183,10 @@ def re_process(ctx, uploads, parallel: int):
else:
upload.re_process_upload()
upload.block_until_complete(interval=.5)
if upload.tasks_status == proc.FAILURE:
logger.info('re-processing with failure', upload_id=upload.upload_id)
completed = True
logger.info('re-processing complete', upload_id=upload.upload_id)
......
......@@ -13,6 +13,6 @@
# limitations under the License.
from . import local, migration, upload, integrationtests
from . import local, migration, upload, integrationtests, mirror
from .client import create_client
from .upload import stream_upload_with_client
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import click
import sys
import json
from nomad import utils, processing as proc, search
from .client import client
__in_test = False
""" Will be monkeypatched by tests to alter behavior for testing. """
@client.command(
help='''
Mirror data from another nomad deployment.
It uses its 'client' nature to export data from the other nomad (source) via
REST API. I.e., use the -n (--url) client parameter to specify the source deployment.
The data will be added directly to the databases, filesystem, etc. of
'this' nomad deployment (target), i.e. it be haves like an 'admin' command.
This means you either run it in the environement of the target deployment
or use the --config nomad parameter.''')
@click.argument('QUERY', nargs=1, required=False)
@click.option(
'--move', is_flag=True, default=False,
help='Instead of copying the underlying upload files, we move it and replace it with a symlink.')
@click.option(
'--dry', is_flag=True, default=False,
help='Do not actually mirror data, just fetch data and report.')
def mirror(query, move: bool, dry: bool):
if query is not None:
try:
query = json.loads(query)
except Exception as e:
print('Cannot parse the given query %s: %s' % (query, str(e)))
sys.exit(1)
else:
query = {}
query.update(owner='admin')
utils.configure_logging()
from nomad.cli.client import create_client
client = create_client()
while True:
query_results = client.repo.quantity_search(quantity='upload_id', **query).response().result
upload_ids = query_results.quantities['upload_id']
for upload_id in upload_ids['values'].keys():
upload_data = client.mirror.get_upload_mirror(upload_id=upload_id).response().result
try:
upload = proc.Upload.get(upload_id)
if __in_test:
proc.Calc.objects(upload_id=upload_id).delete()
proc.Upload.objects(upload_id=upload_id).delete()
search.delete_upload(upload_id)
raise KeyError()
print(
'Upload %s already exists, updating existing uploads is not implemented yet. '
'Skip upload.' % upload_id)
continue
except KeyError:
pass
if dry:
print(
'Need to mirror %s with %d calcs at %s' %
(upload_data.upload_id, upload_ids['values'][upload_id], upload_data.upload_files_path))
continue
# create mongo
upload = proc.Upload.from_json(upload_data.upload, created=True).save()
for calc_data in upload_data.calcs:
proc.Calc.from_json(calc_data, created=True).save()
# index es
search.index_all(upload.to_upload_with_metadata().calcs)
# copy/mv file
if __in_test:
pass
if move:
pass
else:
pass
print(
'Mirrored %s with %d calcs at %s' %
(upload_data.upload_id, upload_ids['values'][upload_id], upload_data.upload_files_path))
if 'after' not in upload_ids:
break
query.update(after=upload_ids['after'])
......@@ -189,6 +189,8 @@ class Proc(Document, metaclass=ProcMetaclass):
def reset(self):
""" Resets the task chain. Assumes there no current running process. """
assert not self.process_running
self.current_task = None
self.tasks_status = PENDING
self.errors = []
......
......@@ -633,9 +633,25 @@ class Upload(Proc):
public_upload_files.to_staging_upload_files(create=True)
self._continue_with('parse_all')
for calc in Calc.objects(upload_id=self.upload_id):
calc.reset()
calc.re_process_calc()
try:
for calc in Calc.objects(upload_id=self.upload_id):
if calc.process_running:
if calc.current_process == 're_process_calc':
logger.warn('re_process_calc is already running', calc_id=calc.calc_id)
else:
logger.warn('a process is already running on calc', calc_id=calc.calc_id)
continue
calc.reset()
calc.re_process_calc()
except Exception as e:
# try to remove the staging copy in failure case
staging_upload_files = self.upload_files.to_staging_upload_files()
if staging_upload_files.exist():
staging_upload_files.delete()
raise e
# the packing and removing of the staging upload files, will be trigged by
# the 'cleanup' task after processing all calcs
......
......@@ -175,6 +175,19 @@ def publish(calcs: Iterable[datamodel.CalcWithMetadata]) -> None:
refresh()
def index_all(calcs: Iterable[datamodel.CalcWithMetadata]) -> None:
""" Adds all given calcs with their metadata to the index. """
def elastic_updates():
for calc in calcs:
entry = Entry.from_calc_with_metadata(calc)
entry = entry.to_dict(include_meta=True)
entry['_op_type'] = 'index'
yield entry
elasticsearch.helpers.bulk(infrastructure.elastic_client, elastic_updates())
refresh()
def refresh():
infrastructure.elastic_client.indices.refresh(config.elastic.index_name)
......@@ -436,20 +449,25 @@ def quantity_search(
composite = dict(sources={quantity: terms}, size=size)
if after is not None:
composite['after'] = after
composite['after'] = {quantity: after}
search.aggs.bucket(quantity, 'composite', **composite)
response, entry_results = _execute_paginated_search(search, **kwargs)
def create_quantity_result(quantity):
values = getattr(response.aggregations, quantity)
result = dict(values={
getattr(bucket.key, quantity): bucket.doc_count
for bucket in values.buckets})
if hasattr(values, 'after_key'):
result.update(after=getattr(values.after_key, quantity))
return result
quantity_results = {
quantity: {
'after': getattr(getattr(response.aggregations, quantity).after_key, quantity),
'values': {
getattr(bucket.key, quantity): bucket.doc_count
for bucket in getattr(response.aggregations, quantity).buckets
}
}
quantity: create_quantity_result(quantity)
for quantity in quantities.keys()
}
......
......@@ -328,6 +328,15 @@ def bravado(client, postgres, test_user_auth):
return SwaggerClient.from_url('/swagger.json', http_client=http_client)
@pytest.fixture(scope='function')
def admin_user_bravado_client(client, admin_user_auth, monkeypatch):
def create_client():
http_client = FlaskTestHttpClient(client, headers=admin_user_auth)
return SwaggerClient.from_url('/swagger.json', http_client=http_client)
monkeypatch.setattr('nomad.cli.client.create_client', create_client)
@pytest.fixture(scope='function')
def no_warn(caplog):
yield caplog
......
......@@ -856,6 +856,53 @@ class TestRepo():
rv = client.get('/repo/?owner=user')
assert rv.status_code == 401
@pytest.mark.parametrize('calcs, quantity, value', [
(2, 'system', 'bulk'),
(0, 'system', 'atom'),
(1, 'atoms', 'Br'),
(1, 'atoms', 'Fe'),
(1, 'authors', 'Hofstadter, Leonard'),
(2, 'files', 'test/mainfile.txt'),
(0, 'quantities', 'dos')
])
def test_quantity_search(self, client, example_elastic_calcs, no_warn, test_user_auth, calcs, quantity, value):
rv = client.get('/repo/%s' % quantity, headers=test_user_auth)
assert rv.status_code == 200
data = json.loads(rv.data)
quantities = data['quantities']
assert quantity in quantities
values = quantities[quantity]['values']
assert (value in values) == (calcs > 0)
assert values.get(value, 0) == calcs
def test_quantity_search_after(self, client, example_elastic_calcs, no_warn, test_user_auth):
rv = client.get('/repo/atoms?size=1')
assert rv.status_code == 200
data = json.loads(rv.data)
quantity = data['quantities']['atoms']
assert 'after' in quantity
after = quantity['after']
assert len(quantity['values']) == 1
value = list(quantity['values'].keys())[0]
while True:
rv = client.get('/repo/atoms?size=1&after=%s' % after)
assert rv.status_code == 200
data = json.loads(rv.data)
quantity = data['quantities']['atoms']
if 'after' not in quantity:
assert len(quantity['values']) == 0
break
assert len(quantity['values']) == 1
assert value != list(quantity['values'].keys())[0]
assert after != quantity['after']
after = quantity['after']
class TestRaw(UploadFilesBasedTests):
......
......@@ -14,8 +14,9 @@
# limitations under the License.
import click.testing
import json
from nomad import utils, search
from nomad import utils, search, processing as proc
from nomad.cli import cli
from nomad.processing import Upload, Calc
......@@ -75,3 +76,37 @@ class TestAdminUploads:
assert 're-processing' in result.stdout
calc.reload()
assert calc.metadata['nomad_version'] == 'test_version'
class TestClient:
def test_mirror_dry(self, published, admin_user_bravado_client):
result = click.testing.CliRunner().invoke(
cli, ['client', 'mirror', '--dry'], catch_exceptions=False, obj=utils.POPO())
assert result.exit_code == 0
assert published.upload_id in result.output
assert published.upload_files.os_path in result.output
def test_mirror(self, published, admin_user_bravado_client, monkeypatch):
ref_search_results = search.entry_search(
search_parameters=dict(upload_id=published.upload_id))['results'][0]
monkeypatch.setattr('nomad.cli.client.mirror.__in_test', True)
result = click.testing.CliRunner().invoke(
cli, ['client', 'mirror'], catch_exceptions=False, obj=utils.POPO())
assert result.exit_code == 0
assert proc.Upload.objects(upload_id=published.upload_id).count() == 1
assert proc.Calc.objects(upload_id=published.upload_id).count() == 1
new_search = search.entry_search(search_parameters=dict(upload_id=published.upload_id))
calcs_in_search = new_search['pagination']['total']
assert calcs_in_search == 1
new_search_results = new_search['results'][0]
for key in new_search_results.keys():