Commit 007f0b9a authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added auxfiles to processing.

parent a033bd0b
Pipeline #37555 passed with stages
in 6 minutes and 7 seconds
......@@ -91,6 +91,12 @@ class Calc(Proc):
def mainfile_file(self) -> File:
return File(self.mainfile_tmp_path)
@property
def upload(self) -> 'Upload':
if not self._upload:
self._upload = Upload.get(self.upload_id)
return self._upload
def delete(self):
"""
Delete this calculation and all associated data. This includes all files,
......@@ -162,9 +168,8 @@ class Calc(Proc):
@process
def process(self):
self._upload = Upload.get(self.upload_id)
logger = self.get_logger()
if self._upload is None:
if self.upload is None:
logger.error('calculation upload does not exist')
try:
......@@ -181,7 +186,7 @@ class Calc(Proc):
logger.error('could not close calculation proc log', exc_info=e)
# inform parent proc about completion
self._upload.completed_child()
self.upload.completed_child()
@task
def parsing(self):
......@@ -223,10 +228,11 @@ class Calc(Proc):
upload_hash, calc_hash = self.archive_id.split('/')
additional = dict(
mainfile=self.mainfile,
upload_time=self._upload.upload_time,
upload_time=self.upload.upload_time,
staging=True,
restricted=False,
user_id=self._upload.user_id)
user_id=self.upload.user_id,
aux_files=list(self.upload.upload_file.get_siblings(self.mainfile)))
with utils.timer(logger, 'indexed', step='index'):
# persist to elastic search
......@@ -303,7 +309,7 @@ class Upload(Chord):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._upload = None
self._upload_file = None
@classmethod
def get(cls, id):
......@@ -421,21 +427,27 @@ class Upload(Chord):
def uploading(self):
pass
@property
def upload_file(self):
""" The :class:`UploadFile` instance that represents the uploaded file of this upload. """
if not self._upload_file:
self._upload_file = UploadFile(self.upload_id, local_path=self.local_path)
return self._upload_file
@task
def extracting(self):
logger = self.get_logger()
try:
self._upload = UploadFile(self.upload_id, local_path=self.local_path)
with utils.timer(
logger, 'upload extracted', step='extracting',
upload_size=self._upload.size):
self._upload.extract()
upload_size=self.upload_file.size):
self.upload_file.extract()
except KeyError as e:
self.fail('process request for non existing upload', level=logging.INFO)
return
try:
self.upload_hash = self._upload.hash()
self.upload_hash = self.upload_file.hash()
except Exception as e:
self.fail('could not create upload hash', e)
return
......@@ -451,13 +463,13 @@ class Upload(Chord):
# TODO: deal with multiple possible parser specs
with utils.timer(
logger, 'upload extracted', step='matching',
upload_size=self._upload.size,
upload_filecount=len(self._upload.filelist)):
upload_size=self.upload_file.size,
upload_filecount=len(self.upload_file.filelist)):
total_calcs = 0
for filename in self._upload.filelist:
for filename in self.upload_file.filelist:
for parser in parsers:
try:
potential_mainfile = self._upload.get_file(filename)
potential_mainfile = self.upload_file.get_file(filename)
with potential_mainfile.open() as mainfile_f:
if parser.is_mainfile(filename, lambda fn: mainfile_f):
mainfile_path = potential_mainfile.os_path
......
......@@ -76,6 +76,8 @@ class RepoCalc(ElasticDocument):
configuration_raw_gid = Keyword()
XC_functional_name = Keyword()
aux_files = Keyword()
@property
def archive_id(self) -> str:
""" The unique id for this calculation. """
......
......@@ -90,12 +90,18 @@ def elastic():
def mocksearch(monkeypatch):
uploads_by_hash = {}
uploads_by_id = {}
by_archive_id = {}
def create_from_backend(_, **kwargs):
upload_hash = kwargs['upload_hash']
upload_id = kwargs['upload_id']
uploads_by_hash[upload_hash] = (upload_id, upload_hash)
uploads_by_id[upload_id] = (upload_id, upload_hash)
archive_id = '%s/%s' % (upload_hash, kwargs['calc_hash'])
additional = kwargs.pop('additional')
kwargs.update(additional)
by_archive_id[archive_id] = kwargs
return {}
def upload_exists(upload_hash):
......@@ -112,6 +118,8 @@ def mocksearch(monkeypatch):
monkeypatch.setattr('nomad.repo.RepoCalc.delete_upload', delete_upload)
monkeypatch.setattr('nomad.repo.RepoCalc.unstage', lambda *args, **kwargs: None)
return by_archive_id
@pytest.fixture(scope='function')
def no_warn(caplog):
......
......@@ -66,7 +66,7 @@ def run_processing(uploaded_id: str) -> Upload:
return upload
def assert_processing(upload: Upload):
def assert_processing(upload: Upload, mocksearch=None):
assert upload.completed
assert upload.current_task == 'cleanup'
assert upload.upload_hash is not None
......@@ -83,11 +83,16 @@ def assert_processing(upload: Upload):
assert 'a test' in f.read()
assert len(calc.errors) == 0
if mocksearch:
repo = mocksearch[calc.archive_id]
assert repo is not None
assert len(repo.get('aux_files')) == 4
@pytest.mark.timeout(30)
def test_processing(uploaded_id, worker, no_warn):
def test_processing(uploaded_id, worker, mocksearch, no_warn):
upload = run_processing(uploaded_id)
assert_processing(upload)
assert_processing(upload, mocksearch)
@pytest.mark.parametrize('uploaded_id', [example_files[1]], indirect=True)
......
......@@ -43,7 +43,8 @@ def example_elastic_calc(normalized_template_example: LocalBackend, elastic) \
additional=dict(
mainfile='/test/mainfile',
upload_time=datetime.now(),
staging=True, restricted=False, user_id='me@gmail.com'),
staging=True, restricted=False, user_id='me@gmail.com',
aux_files=['/test/aux1', '/test/aux2']),
refresh='true')
yield entry
......@@ -62,6 +63,8 @@ def assert_elastic_calc(calc: RepoCalc):
property = key_mappings.get(property, property)
assert getattr(calc, property) is not None
assert len(getattr(calc, 'aux_files')) > 0
def test_create_elastic_calc(example_elastic_calc: RepoCalc, no_warn):
assert_elastic_calc(example_elastic_calc)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment