Commit 335008d7 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Merge branch 'v0.10.4' into 'master'

Merge for release

See merge request !357
parents 0bde4980 6919ccf6
Pipeline #103569 passed with stage
in 4 minutes and 55 seconds
......@@ -27,6 +27,7 @@ from nomad import search, processing as proc, files
from nomad.cli import cli
from nomad.cli.cli import POPO
from nomad.processing import Upload, Calc
from nomad.processing.base import SUCCESS
from import BlueprintClient
from import ( # pylint: disable=unused-import
......@@ -253,6 +254,9 @@ class TestAdminUploads:
with upload_files.read_archive(calc.calc_id) as archive:
assert calc.calc_id in archive
assert published.tasks_status == SUCCESS
def test_chown(self, published, test_user, other_test_user):
upload_id = published.upload_id
calc = Calc.objects(upload_id=upload_id).first()
......@@ -270,19 +274,72 @@ class TestAdminUploads:
assert upload.user_id == test_user.user_id
assert calc.metadata['uploader'] == test_user.user_id
def test_reset(self, non_empty_processed):
def test_edit(self, published):
upload_id = published.upload_id
def assert_calcs(publish, with_embargo):
calcs = Calc.objects(upload_id=upload_id)
for calc in calcs:
assert calc.metadata['published'] == publish
assert calc.metadata['with_embargo'] == with_embargo
for calc in, query=dict(upload_id=upload_id)).data:
assert calc['published'] == publish
assert calc['with_embargo'] == with_embargo
assert_calcs(True, True)
def perform_test(publish, with_embargo):
if publish:
params = ['--publish', 'with-embargo' if with_embargo else 'no-embargo']
assert not with_embargo
params = ['--unpublish']
result = click.testing.CliRunner().invoke(
cli, ['admin', 'uploads', 'edit'] + params, catch_exceptions=False)
assert result.exit_code == 0
assert 'editing' in result.stdout
assert_calcs(publish, with_embargo)
perform_test(False, False)
perform_test(True, False)
perform_test(True, True)
@pytest.mark.parametrize('with_calcs,success,failure', [
(True, False, False),
(False, False, False),
(True, True, False),
(False, False, True)])
def test_reset(self, non_empty_processed, with_calcs, success, failure):
upload_id = non_empty_processed.upload_id
result = click.testing.CliRunner().invoke(
cli, ['admin', 'uploads', 'reset', '--with-calcs', upload_id], catch_exceptions=False)
upload = Upload.objects(upload_id=upload_id).first()
calc = Calc.objects(upload_id=upload_id).first()
assert upload.tasks_status == proc.SUCCESS
assert calc.tasks_status == proc.SUCCESS
args = ['admin', 'uploads', 'reset']
if with_calcs: args.append('--with-calcs')
if success: args.append('--success')
if failure: args.append('--failure')
result = click.testing.CliRunner().invoke(cli, args, catch_exceptions=False)
assert result.exit_code == 0
assert 'reset' in result.stdout
upload = Upload.objects(upload_id=upload_id).first()
calc = Calc.objects(upload_id=upload_id).first()
assert upload.tasks_status == proc.PENDING
assert calc.tasks_status == proc.PENDING
expected_state = proc.PENDING
if success: expected_state = proc.SUCCESS
if failure: expected_state = proc.FAILURE
assert upload.tasks_status == expected_state
if not with_calcs:
assert calc.tasks_status == proc.SUCCESS
assert calc.tasks_status == expected_state
......@@ -47,6 +47,7 @@ example_file_contents = [
example_file_mainfile = 'examples_template/template.json'
example_file_vasp_with_binary = 'tests/data/proc/'
empty_file = 'tests/data/proc/'
example_archive_contents = {
"section_run": [],
......@@ -571,6 +572,15 @@ def create_test_upload_files(
return upload_files
def append_raw_files(upload_id: str, path_source: str, path_in_archive: str, access='public'):
''' Used to append published zip files, for testing purposes. '''
upload_files = UploadFiles.get(upload_id)
zip_path = upload_files._raw_file_object(access).os_path # type: ignore
zf = zipfile.ZipFile(zip_path, 'a')
zf.write(path_source, path_in_archive)
def test_test_upload_files(raw_files_infra):
upload_id = utils.create_uuid()
archives: datamodel.EntryArchive = []
......@@ -18,9 +18,13 @@
''' Methods to help with testing of nomad@FAIRDI.'''
from typing import List, Union
import urllib.parse
import json
from logging import LogRecord
from typing import Dict, Any
import zipfile
import os.path
def assert_log(caplog, level: str, event_part: str) -> LogRecord:
......@@ -82,3 +86,51 @@ def assert_url_query_args(url: str, **kwargs):
assert k not in query_dict
assert query_dict[k][0] == str(v)
def build_url(base_url: str, query_args: Dict[str, Any]) -> str:
Takes a base_url and a dictionary, and combines to a url with query arguments.
Arguments with value None are ignored.
# Remove args with value None
query_args_clean = {k: v for k, v in query_args.items() if v is not None}
if not query_args_clean:
return base_url
return base_url + '?' + urllib.parse.urlencode(query_args_clean, doseq=True)
def create_template_upload_file(
tmp, mainfiles: Union[str, List[str]] = None, auxfiles: int = 4,
directory: str = 'examples_template', name: str = '',
more_files: Union[str, List[str]] = None):
Creates a temporary file based on template.json (for the artificial test
parser) that can be used for test processings.
if mainfiles is None:
mainfiles = 'tests/data/proc/templates/template.json'
if isinstance(mainfiles, str):
mainfiles = [mainfiles]
if more_files is None:
more_files = []
if isinstance(more_files, str):
more_files = [more_files]
upload_path = os.path.join(tmp, name)
with zipfile.ZipFile(upload_path, 'w') as zf:
for i in range(0, auxfiles):
with'{directory}/{i}.aux', 'w') as f:
for mainfile in mainfiles:
zf.write(mainfile, f'{directory}/{os.path.basename(mainfile)}')
for additional_file in more_files:
zf.write(additional_file, f'{directory}/{os.path.basename(additional_file)}')
return upload_path
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment