-
Markus Scheidgen authored
Changed the gui to reflect delete and commit running as processes. Overhawled gui API error handling. Improved the upload view.
Markus Scheidgen authoredChanged the gui to reflect delete and commit running as processes. Overhawled gui API error handling. Improved the upload view.
test_data.py 5.82 KiB
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
To run this test, a celery worker must be running. The test worker provided by
the celery pytest plugin is currently not working. It results on a timeout when
reading from the redis result backend, even though all task apperently ended successfully.
"""
from typing import Generator
import pytest
from datetime import datetime
import shutil
import os.path
import json
from nomad import utils
from nomad.files import ArchiveBasedStagingUploadFiles, UploadFiles, StagingUploadFiles
from nomad.processing import Upload, Calc
from nomad.processing.base import task as task_decorator
from tests.test_files import example_file, empty_file
# import fixtures
from tests.test_files import clear_files # pylint: disable=unused-import
example_files = [empty_file, example_file]
@pytest.fixture(scope='function', autouse=True)
def mocks_forall(mockmongo):
pass
@pytest.fixture(scope='function', params=example_files)
def uploaded_id(request, clear_files) -> Generator[str, None, None]:
example_file = request.param
example_upload_id = os.path.basename(example_file).replace('.zip', '')
upload_files = ArchiveBasedStagingUploadFiles(example_upload_id, create=True)
shutil.copyfile(example_file, upload_files.upload_file_os_path)
yield example_upload_id
@pytest.fixture
def uploaded_id_with_warning(request, clear_files) -> Generator[str, None, None]:
example_file = 'tests/data/proc/examples_with_warning_template.zip'
example_upload_id = os.path.basename(example_file).replace('.zip', '')
upload_files = ArchiveBasedStagingUploadFiles(example_upload_id, create=True)
shutil.copyfile(example_file, upload_files.upload_file_os_path)
yield example_upload_id
def run_processing(uploaded_id: str, test_user) -> Upload:
upload = Upload.create(upload_id=uploaded_id, user=test_user)
upload.upload_time = datetime.now()
assert upload.tasks_status == 'RUNNING'
assert upload.current_task == 'uploading'
upload.process_upload() # pylint: disable=E1101
upload.block_until_complete(interval=.1)
return upload
@pytest.fixture
def processed_upload(uploaded_id, test_user, worker, no_warn) -> Upload:
return run_processing(uploaded_id, test_user)
def assert_processing(upload: Upload):
assert not upload.tasks_running
assert upload.current_task == 'cleanup'
assert upload.upload_id is not None
assert len(upload.errors) == 0
assert upload.tasks_status == 'SUCCESS'
upload_files = UploadFiles.get(upload.upload_id, is_authorized=lambda: True)
assert isinstance(upload_files, StagingUploadFiles)
for calc in Calc.objects(upload_id=upload.upload_id):
assert calc.parser is not None
assert calc.mainfile is not None
assert calc.tasks_status == 'SUCCESS'
with upload_files.archive_file(calc.calc_id) as archive_json:
archive = json.load(archive_json)
assert 'section_run' in archive
assert 'section_calculation_info' in archive
with upload_files.archive_log_file(calc.calc_id) as f:
assert 'a test' in f.read()
assert len(calc.errors) == 0
with upload_files.raw_file(calc.mainfile) as f:
f.read()
assert upload_files.metadata.get(calc.calc_id) is not None
# @pytest.mark.timeout(30)
def test_processing(uploaded_id, worker, test_user, no_warn):
upload = run_processing(uploaded_id, test_user)
assert_processing(upload)
@pytest.mark.timeout(30)
def test_processing_with_warning(uploaded_id_with_warning, worker, test_user):
upload = run_processing(uploaded_id_with_warning, test_user)
assert_processing(upload)
@pytest.mark.timeout(30)
def test_process_non_existing(worker, test_user, with_error):
upload = run_processing('__does_not_exist', test_user)
assert not upload.tasks_running
assert upload.current_task == 'extracting'
assert upload.tasks_status == 'FAILURE'
assert len(upload.errors) > 0
@pytest.mark.parametrize('task', ['extracting', 'parse_all', 'cleanup', 'parsing'])
@pytest.mark.timeout(30)
def test_task_failure(monkeypatch, uploaded_id, worker, task, test_user, with_error):
# mock the task method to through exceptions
if hasattr(Upload, task):
cls = Upload
elif hasattr(Calc, task):
cls = Calc
else:
assert False
def mock(self):
raise Exception('fail for test')
mock.__name__ = task
mock = task_decorator(mock)
monkeypatch.setattr('nomad.processing.data.%s.%s' % (cls.__name__, task), mock)
# run the test
upload = run_processing(uploaded_id, test_user)
assert not upload.tasks_running
if task != 'parsing':
assert upload.tasks_status == 'FAILURE'
assert upload.current_task == task
assert len(upload.errors) > 0
else:
# there is an empty example with no calcs, even if past parsing_all task
utils.get_logger(__name__).error('fake')
if upload.total_calcs > 0: # pylint: disable=E1101
assert upload.tasks_status == 'SUCCESS'
assert upload.current_task == 'cleanup'
assert len(upload.errors) == 0
for calc in upload.all_calcs(0, 100): # pylint: disable=E1101
assert calc.tasks_status == 'FAILURE'
assert calc.current_task == 'parsing'
assert len(calc.errors) > 0