Commit d3b00ee2 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Many smaller fixes for migration. Basic migration test.

parent d357567c
......@@ -44,7 +44,7 @@
"cwd": "${workspaceFolder}",
"program": "${workspaceFolder}/.pyenv/bin/pytest",
"args": [
"-sv", "tests/test_client.py"
"-sv", "tests/test_migration.py::test_migrate"
]
},
{
......
......@@ -35,9 +35,11 @@ app = Flask(
static_folder=os.path.abspath(os.path.join(os.path.dirname(__file__), '../../docs/.build/html')))
""" The Flask app that serves all APIs. """
app.config.setdefault('APPLICATION_ROOT', base_path)
app.config.setdefault('RESTPLUS_MASK_HEADER', False)
app.config.setdefault('RESTPLUS_MASK_SWAGGER', False)
app.config.APPLICATION_ROOT = base_path
app.config.RESTPLUS_MASK_HEADER = False
app.config.RESTPLUS_MASK_SWAGGER = False
app.config.SWAGGER_UI_OPERATION_ID = True
app.config.SWAGGER_UI_REQUEST_DURATION = True
def api_base_path_response(env, resp):
......
......@@ -32,7 +32,7 @@ ns = api.namespace('repo', description='Access repository metadata.')
class RepoCalcResource(Resource):
@api.response(404, 'The upload or calculation does not exist')
@api.response(401, 'Not authorized to access the calculation')
@api.response(200, 'Metadata send')
@api.response(200, 'Metadata send', fields.Raw)
@api.doc('get_repo_calc')
@login_if_available
def get(self, upload_id, calc_id):
......
......@@ -227,6 +227,7 @@ class ProxyUpload:
class UploadResource(Resource):
@api.doc('get_upload')
@api.response(404, 'Upload does not exist')
@api.response(400, 'Invalid parameters')
@api.marshal_with(upload_with_calcs_model, skip_none=True, code=200, description='Upload send')
@api.expect(pagination_request_parser)
@login_really_required
......
......@@ -56,7 +56,7 @@ def qa(skip_tests: bool):
ret_code = 0
if not skip_tests:
click.echo('Run tests ...')
ret_code += os.system('python -m pytest tests')
ret_code += os.system('python -m pytest -svx tests')
click.echo('Run code style checks ...')
ret_code += os.system('python -m pycodestyle --ignore=E501,E701 nomad tests')
click.echo('Run linter ...')
......
......@@ -706,21 +706,28 @@ class Calc(datamodel.Calc):
return self._data['section_repository_info']['repository_filepaths'][0]
def to_calc_with_metadata(self):
target = datamodel.CalcWithMetadata(upload_id=self.upload.upload_id)
target.calc_id = self.calc_id
target.basis_set_type = self.calc_data['repository_basis_set_type']
target.crystal_system = self.calc_data['repository_crystal_system']
target.XC_functional_name = self.calc_data['repository_xc_treatment']
target.system_type = self.calc_data['repository_system_type']
target.atom_species = self.calc_data['repository_atomic_elements']
target.space_group_number = self.calc_data['repository_spacegroup_nr']
target.chemical_composition = self.calc_data['repository_chemical_formula']
target.program_version = self.calc_data['repository_code_version']
target.program_name = self.calc_data['repository_program_name']
target.files = self._data['section_repository_info']['repository_filepaths']
target.mainfile = self.mainfile
return target
return repo_data_to_calc_with_metadata(
self.upload.upload_id, self.calc_id, self._data)
def repo_data_to_calc_with_metadata(upload_id, calc_id, repo_data):
calc_data = repo_data['section_repository_info']['section_repository_parserdata']
target = datamodel.CalcWithMetadata(upload_id=upload_id)
target.calc_id = calc_id
target.basis_set_type = calc_data['repository_basis_set_type']
target.crystal_system = calc_data['repository_crystal_system']
target.XC_functional_name = calc_data['repository_xc_treatment']
target.system_type = calc_data['repository_system_type']
target.atom_species = calc_data['repository_atomic_elements']
target.space_group_number = calc_data['repository_spacegroup_nr']
target.chemical_composition = calc_data['repository_chemical_formula']
target.program_version = calc_data['repository_code_version']
target.program_name = calc_data['repository_program_name']
target.files = repo_data['section_repository_info']['repository_filepaths']
target.mainfile = target.files[0]
return target
datamodel.CalcWithMetadata.register_mapping(Calc, Calc.to_calc_with_metadata)
......@@ -24,11 +24,17 @@ from typing import Generator, Tuple, List
import os.path
import json
import zipstream
import zipfile
import math
from mongoengine import Document, IntField, StringField, DictField
from passlib.hash import bcrypt
from werkzeug.contrib.iterio import IterIO
import time
from bravado.exception import HTTPNotFound
from datetime import datetime
from nomad import utils, config
from nomad.files import repo_data_to_calc_with_metadata
from nomad.coe_repo import User, Calc
from nomad.datamodel import CalcWithMetadata
from nomad.processing import FAILURE, SUCCESS
......@@ -114,30 +120,6 @@ class SourceCalc(Document):
SourceCalc.objects.insert(source_calcs)
class ZipStreamFileAdaptor:
def __init__(self, path_to_files: str) -> None:
self.zip = zipstream.ZipFile()
self.zip.write(path_to_files)
self._current_chunk = bytes()
self._current_chunk_index = 0
def read(self, n: int) -> bytes:
while (len(self._current_chunk) - self._current_chunk_index) < n:
next_chunk = next(self.zip, None)
left_over_chunk = self._current_chunk[self._current_chunk_index:]
if next_chunk is None:
self._current_chunk = bytes()
self._current_chunk_index = 0
return left_over_chunk
self._current_chunk = left_over_chunk + next_chunk
self._current_chunk_index = 0
old_index = self._current_chunk_index
self._current_chunk_index = self._current_chunk_index + n
return self._current_chunk[old_index:self._current_chunk_index]
class NomadCOEMigration:
"""
Drives a migration from the NOMAD coe repository db to nomad@FAIRDI. It is assumed
......@@ -203,17 +185,17 @@ class NomadCOEMigration:
Uses PIDs of identified old calculations. Will create new PIDs for previously
unknown uploads. New PIDs will be choosed from a `prefix++` range of ints.
Returns: Yields a dictionary with status and statistics for each given upload.
"""
migrated = 0
upload_specs = args
for upload_spec in upload_specs:
# identify uploads
if os.path.isabs(upload_spec):
if os.path.exists(upload_spec):
upload_path = upload_spec
else:
upload_path = None
# identify upload
upload_path = None
abs_upload_path = os.path.abspath(upload_spec)
if os.path.exists(abs_upload_path):
upload_path = upload_spec
else:
for site in self.sites:
potential_upload_path = os.path.join(site, upload_spec)
......@@ -222,55 +204,67 @@ class NomadCOEMigration:
break
if upload_path is None:
self.logger.error('upload does not exist', upload_spec=upload_spec)
error = 'upload does not exist'
self.logger.error(error, upload_spec=upload_spec)
yield dict(status=FAILURE, error=error)
continue
# prepare the upload by determining/creating an upload file, name, source upload id
if os.path.isfile(upload_path):
upload_archive_f = open(upload_path, 'rb')
upload_id = os.path.split(os.path.split(upload_path)[0])[1]
source_upload_id = os.path.split(os.path.split(upload_path)[0])[1]
upload_name = os.path.basename(upload_path)
else:
potential_upload_archive = os.path.join(upload_path, NomadCOEMigration.archive_filename)
if os.path.isfile(potential_upload_archive):
upload_archive_f = open(potential_upload_archive, 'rb')
upload_id = os.path.split(os.path.split(potential_upload_archive)[0])[1]
upload_name = '%s.tar.gz' % upload_id
source_upload_id = os.path.split(os.path.split(potential_upload_archive)[0])[1]
upload_name = '%s.tar.gz' % source_upload_id
else:
upload_id = os.path.split(upload_path)[1]
upload_archive_f = ZipStreamFileAdaptor(upload_path)
upload_name = '%s.zip' % upload_id
# process and upload
source_upload_id = os.path.split(upload_path)[1]
zip_file = zipstream.ZipFile()
path_prefix = len(upload_path) + 1
for root, _, files in os.walk(upload_path):
for file in files:
zip_file.write(
os.path.join(root, file),
os.path.join(root[path_prefix:], file),
zipfile.ZIP_DEFLATED)
zip_file.write(upload_path)
upload_archive_f = IterIO(zip_file)
upload_name = '%s.zip' % source_upload_id
# upload and process the upload file
from nomad.client import create_client
client = create_client()
upload = client.uploads.upload(file=upload_archive_f, name=upload_name).response().result
upload_archive_f.close()
upload_logger = self.logger.bind(
source_upload_id=upload_id,
upload_id=upload.upload_id)
source_upload_id=source_upload_id, upload_id=upload.upload_id)
# grab old metadata
while upload.tasks_running:
upload = client.uploads.get_upload(upload_id=upload.upload_id).response().result
time.sleep(0.1)
if upload.tasks_status == FAILURE:
error = 'failed to process upload'
upload_logger.logger.error(error, process_errors=upload.errors)
return dict(error=error, status=FAILURE)
# grab source metadata
upload_metadata_calcs = list()
metadata_dict = dict()
upload_metadata = dict(calculations=upload_metadata_calcs)
for source_calc in SourceCalc.objects(upload=upload_id):
upload_calc_metadata = dict(
mainfile=source_calc.mainfile,
_pid=source_calc.pid)
upload_calc_metadata.update(source_calc.user_metadata) # TODO to upload calc metadata
upload_metadata_calcs.append(upload_calc_metadata)
source_metadata = CalcWithMetadata(**source_calc.metadata)
for source_calc in SourceCalc.objects(upload=source_upload_id):
source_metadata = CalcWithMetadata(upload_id=upload.upload_id, **source_calc.metadata)
source_metadata.mainfile = source_calc.mainfile
source_metadata.pid = source_calc.pid
source_metadata.__migrated = None
upload_metadata_calcs.append(source_metadata)
metadata_dict[source_calc.mainfile] = source_metadata
# wait for finished processing
while upload.tasks_status not in [SUCCESS, FAILURE]:
upload = client.uploads.get_upload(upload_id=upload.upload_id).response().result
if upload.tasks_status == FAILURE:
upload_logger.info('upload could not be processed', errors=upload.errors)
# verify
# verify upload
total_calcs = upload.calcs.pagination.total
total_source_calcs = len(metadata_dict)
unprocessed_calcs = 0
......@@ -279,10 +273,10 @@ class NomadCOEMigration:
new_calcs = 0
missing_calcs = 0
for page in range(0, math.ceil(total_calcs / 100)):
for page in range(1, math.ceil(total_calcs / 100) + 1):
upload = client.uploads.get_upload(
upload_id=upload.upload_id, per_page=100, page=page,
order_by='mainfile')
order_by='mainfile').response().result
for calc_proc in upload.calcs.results:
calc_logger = upload_logger.bind(
......@@ -292,9 +286,10 @@ class NomadCOEMigration:
source_calc = metadata_dict.get(calc_proc.mainfile, None)
repo_calc = None
if calc_proc.tasks_status == SUCCESS:
repo_calc = client.uploads.get_repo(
repo_calc = client.repo.get_repo_calc(
upload_id=upload.upload_id,
calc_id=calc_proc.calc_id).response().result
else:
unprocessed_calcs += 1
calc_logger.info(
......@@ -303,6 +298,7 @@ class NomadCOEMigration:
if source_calc is not None:
source_calc.__migrated = False
continue
if source_calc is None:
......@@ -310,18 +306,27 @@ class NomadCOEMigration:
new_calcs += 1
continue
# TODO add calc metadata to index
# TODO do the comparison
migrated_calcs += 1
source_calc.__migrated = True
has_diff = False
if source_calc.mainfile != repo_calc['section_repository_info']['repository_filepaths'][0]:
has_diff = True
calc_logger.info('source target missmatch', quantity='mainfile')
target_calc = repo_data_to_calc_with_metadata(
upload.upload_id, repo_calc['calc_id'], repo_calc)
for key, target_value in target_calc.items():
if key in ['calc_id', 'upload_id']:
continue
source_value = source_calc.get(key, None)
if source_value != target_value:
has_diff = True
calc_logger.info(
'source target missmatch', quantity=key,
source_value=source_value, target_value=target_value)
if has_diff:
calcs_with_diffs += 1
source_calc.__migrated = True
for source_calc in upload_metadata_calcs:
if source_calc.__migrated is None:
missing_calcs += 1
......@@ -329,20 +334,42 @@ class NomadCOEMigration:
elif source_calc.__migrated is False:
upload_logger.info('source calc not processed', mainfile=source_calc.mainfile)
admin_keys = ['upload_time, uploader, pid']
def transform(calcWithMetadata):
result = dict()
for key, value in calcWithMetadata.items():
if key in admin_keys:
target_key = '_%s' % key
else:
target_key = key
if isinstance(value, datetime):
value = value.isoformat()
result[target_key] = value
return result
upload_metadata['calculations'] = [
calc for calc in upload_metadata['calculations'] if calc.__migrated]
transform(calc) for calc in upload_metadata['calculations']
if calc.__migrated]
# commit with metadata
if total_calcs > unprocessed_calcs:
upload = client.uploads.exec_upload_command(
upload_id=upload.upload_id, payload=dict(command='commit', metadata=upload_metadata)).response().result
upload_id=upload.upload_id,
payload=dict(command='commit', metadata=upload_metadata)
).response().result
while upload.process_running:
client.uploads.get_upload(upload_id=upload.upload_id)
try:
upload = client.uploads.get_upload(upload_id=upload.upload_id).response().result
time.sleep(0.1)
except HTTPNotFound:
# the proc upload will be deleted by the commit command
break
# report
upload_logger.info(
'migrated upload',
report = dict(
total_calcs=total_calcs,
total_source_calcs=total_source_calcs,
unprocessed_calcs=unprocessed_calcs,
......@@ -350,9 +377,9 @@ class NomadCOEMigration:
calcs_with_diffs=calcs_with_diffs,
new_calcs=new_calcs,
missing_calcs=new_calcs)
migrated += 1
return migrated
upload_logger.info('migrated upload', **report)
report.update(status=SUCCESS)
yield report
def index(self, *args, **kwargs):
""" see :func:`SourceCalc.index` """
......
......@@ -471,11 +471,8 @@ class Upload(Chord, datamodel.Upload):
@task
def cleanup(self):
# TODO issue #83
with utils.timer(
self.get_logger(), 'pack staging upload', step='cleaning',
upload_size=self.upload_files.size):
pass
# nothing todo with the current processing setup
pass
@property
def processed_calcs(self):
......
......@@ -84,12 +84,20 @@ class FlaskTestFutureAdapter:
else:
url = path
data = self._request_params.get('data')
function = getattr(self._flask_client, method.lower())
files = self._request_params.get('files', [])
if len(files) > 1:
raise NotImplementedError
if len(files) == 1:
data = dict() if data is None else data
_, (_, f) = files[0]
data.update(file=(f, 'file'))
return function(
url,
headers=self._request_params.get('headers'),
data=self._request_params.get('data'))
url, headers=self._request_params.get('headers'), data=data)
class FlaskTestResponseAdapter(IncomingResponse):
......
{
"section_run": [
{
"_name": "section_run",
"_gIndex": 0,
"program_name": "VASP",
"program_version": "4.6.35 3Apr08 complex parallel LinuxIFC",
"program_basis_set_type": "plane waves",
"section_method": [
{
"_name": "section_method",
"_gIndex": 0,
"electronic_structure_method": "DFT",
"section_XC_functionals": [
{
"_name": "section_XC_functionals",
"_gIndex": 0,
"XC_functional_name": "GGA_X_PBE"
}
]
}
],
"section_system": [
{
"_name": "section_system",
"_gIndex": 0,
"simulation_cell": [
[
5.76372622e-10,
0.0,
0.0
],
[
0.0,
5.76372622e-10,
0.0
],
[
0.0,
0.0,
4.0755698899999997e-10
]
],
"configuration_periodic_dimensions": [
true,
true,
true
],
"atom_positions": [
[
2.88186311e-10,
0.0,
2.0377849449999999e-10
],
[
0.0,
2.88186311e-10,
2.0377849449999999e-10
],
[
0.0,
0.0,
0.0
],
[
2.88186311e-10,
2.88186311e-10,
0.0
]
],
"atom_labels": [
"Br",
"K",
"Si",
"Si"
]
}
],
"section_single_configuration_calculation": [
{
"_name": "section_single_configuration_calculation",
"_gIndex": 0,
"single_configuration_calculation_to_system_ref": 0,
"single_configuration_to_calculation_method_ref": 0,
"energy_free": -1.5936767191492225e-18,
"energy_total": -1.5935696296699573e-18,
"energy_total_T0": -3.2126683561907e-22
}
],
"section_sampling_method": [
{
"_name": "section_sampling_method",
"_gIndex": 0,
"sampling_method": "geometry_optimization"
}
],
"section_frame_sequence": [
{
"_name": "section_frame_sequence",
"_gIndex": 0,
"frame_sequence_to_sampling_ref": 0,
"frame_sequence_local_frames_ref": [
0
]
}
]
}
]
}
\ No newline at end of file
......@@ -14,9 +14,13 @@
import pytest
from bravado.client import SwaggerClient
import time
from .test_api import client as flask_client, test_user_auth # noqa pylint: disable=unused-import
from .bravado_flaks import FlaskTestHttpClient
from nomad.processing import SUCCESS
from tests.test_files import example_file, create_public_upload, clear_files # noqa pylint: disable=unused-import
from tests.test_api import client as flask_client, test_user_auth # noqa pylint: disable=unused-import
from tests.bravado_flaks import FlaskTestHttpClient
@pytest.fixture(scope='function')
......@@ -27,3 +31,21 @@ def client(flask_client, repository_db, test_user_auth):
def test_get_upload_command(client):
assert client.uploads.get_upload_command().response().result.upload_command is not None
def test_upload(client, worker):
with open(example_file, 'rb') as f:
upload = client.uploads.upload(file=f, name='test_upload').response().result
while upload.tasks_running:
upload = client.uploads.get_upload(upload_id=upload.upload_id).response().result
time.sleep(0.1)
assert upload.tasks_status == SUCCESS
def test_get_repo_calc(client, clear_files):
create_public_upload('test_upload', 'pp')
repo = client.repo.get_repo_calc(upload_id='test_upload', calc_id='0').response().result
assert repo is not None
assert repo['calc_id'] is not None
......@@ -20,6 +20,7 @@ from nomad import infrastructure, coe_repo
from nomad.migration import NomadCOEMigration, SourceCalc
from nomad.infrastructure import repository_db_connection
from nomad.processing import SUCCESS
from .bravado_flaks import FlaskTestHttpClient
from tests.conftest import create_repository_db
......@@ -56,8 +57,8 @@ def source_repo(monkeysession, repository_db):
"INSERT INTO public.calculations VALUES (NULL, NULL, NULL, NULL, 0, false, 2, NULL); "
"INSERT INTO public.codefamilies VALUES (1, 'test_code'); "
"INSERT INTO public.codeversions VALUES (1, 1, 'test_version'); "
"INSERT INTO public.metadata VALUES (1, NULL, NULL, NULL, NULL, 'formula1', '2019-01-01 12:00:00', NULL, decode('[\"$EXTRACTED/upload/test/mainfile.json\"]', 'escape'), 1, NULL); "
"INSERT INTO public.metadata VALUES (1, NULL, NULL, NULL, NULL, 'formula2', '2015-01-01 13:00:00', NULL, decode('[\"$EXTRACTED/upload/test/mainfile.json\"]', 'escape'), 2, NULL); "
"INSERT INTO public.metadata VALUES (1, NULL, NULL, NULL, NULL, 'formula1', '2019-01-01 12:00:00', NULL, decode('[\"$EXTRACTED/upload/1/template.json\"]', 'escape'), 1, NULL); "
"INSERT INTO public.metadata VALUES (1, NULL, NULL, NULL, NULL, 'formula2', '2015-01-01 13:00:00', NULL, decode('[\"$EXTRACTED/upload/2/template.json\"]', 'escape'), 2, NULL); "
"INSERT INTO public.spacegroups VALUES (1, 255); "
"INSERT INTO public.spacegroups VALUES (2, 255); "
"INSERT INTO public.user_metadata VALUES (1, 0, 'label1'); "
......@@ -94,14 +95,14 @@ def perform_index(migration, has_indexed, with_metadata, **kwargs):
has_source_calc = False
for source_calc, total in SourceCalc.index(migration.source, with_metadata=with_metadata, **<