Commit e380af17 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Refactored repo.py module.

parent 3e11115b
......@@ -33,7 +33,7 @@ from contextlib import contextmanager
from nomad import utils, coe_repo, datamodel
from nomad.files import UploadFile, ArchiveFile, ArchiveLogFile, File
from nomad.repo import RepoCalc
from nomad.repo import RepoCalc, RepoUpload
from nomad.processing.base import Proc, Chord, process, task, PENDING, SUCCESS, FAILURE
from nomad.parsing import parsers, parser_dict
from nomad.normalizing import normalizers
......@@ -383,7 +383,7 @@ class Upload(Chord, datamodel.Upload):
ArchiveFile.delete_archives(upload_hash=self.upload_hash)
# delete repo entries
RepoCalc.delete_upload(upload_id=self.upload_id)
self.to(RepoUpload).delete()
# delete calc processings
Calc.objects(upload_id=self.upload_id).delete()
......@@ -419,7 +419,7 @@ class Upload(Chord, datamodel.Upload):
raise NotAllowedDuringProcessing()
self.in_staging = False
RepoCalc.unstage(upload_id=self.upload_id)
self.to(RepoUpload).unstage()
coe_repo.add_upload(self, meta_data)
self.save()
......@@ -465,7 +465,7 @@ class Upload(Chord, datamodel.Upload):
return
# check if the file was already uploaded and processed before
if RepoCalc.upload_exists(self.upload_hash):
if self.to(RepoUpload).exists():
self.fail('The same file was already uploaded and processed.', level=logging.INFO)
return
......
......@@ -55,7 +55,29 @@ class RepoUpload(datamodel.Entity):
@property
def calcs(self):
return RepoCalc.upload_calcs(self.upload_id)
return Search(using=infrastructure.elastic_client, index=config.elastic.index_name) \
.query('match', upload_id=self.upload_id) \
.scan()
def delete(self):
""" Deletes all repo entries of the given upload. """
RepoCalc.search().query('match', upload_id=self.upload_id).delete()
def exists(self):
""" Returns true if there are already calcs from the given upload. """
# TODO this is deprecated and should be varyfied via repository files
search = Search(using=infrastructure.elastic_client, index=config.elastic.index_name) \
.query('match', upload_hash=self.upload_hash) \
.execute()
return len(search) > 0
def unstage(self, staging=False):
""" Update the staging property for all repo entries of the given upload. """
RepoCalc.update_by_query(self.upload_id, {
'inline': 'ctx._source.staging=%s' % ('true' if staging else 'false'),
'lang': 'painless'
})
class RepoCalc(ElasticDocument, datamodel.Entity):
......@@ -182,25 +204,6 @@ class RepoCalc(ElasticDocument, datamodel.Entity):
except ConflictError:
raise AlreadyExists('Calculation %s does already exist.' % (self.archive_id))
@staticmethod
def delete_upload(upload_id):
""" Deletes all repo entries of the given upload. """
RepoCalc.search().query('match', upload_id=upload_id).delete()
@classmethod
def unstage(cls, upload_id, staging=False):
""" Update the staging property for all repo entries of the given upload. """
cls.update_by_query(upload_id, {
'inline': 'ctx._source.staging=%s' % ('true' if staging else 'false'),
'lang': 'painless'
})
@classmethod
def update_upload(cls, upload_id, **kwargs):
""" Update all entries of given upload with keyword args. """
for calc in RepoCalc.search().query('match', upload_id=upload_id):
calc.update(**kwargs)
@classmethod
def update_by_query(cls, upload_id, script):
""" Update all entries of a given upload via elastic script. """
......@@ -222,23 +225,6 @@ class RepoCalc(ElasticDocument, datamodel.Entity):
""" Perform an elasticsearch and not elasticsearch_dsl search on the Calc index. """
return infrastructure.elastic_client.search(index=config.elastic.index_name, body=body)
@staticmethod
def upload_exists(upload_hash):
""" Returns true if there are already calcs from the given upload. """
# TODO this is deprecated and should be varified via repository files
search = Search(using=infrastructure.elastic_client, index=config.elastic.index_name) \
.query('match', upload_hash=upload_hash) \
.execute()
return len(search) > 0
@staticmethod
def upload_calcs(upload_id):
""" Returns an iterable over all entries for the given upload_id. """
return Search(using=infrastructure.elastic_client, index=config.elastic.index_name) \
.query('match', upload_id=upload_id) \
.scan()
@property
def json_dict(self):
""" A json serializable dictionary representation. """
......
......@@ -152,10 +152,11 @@ def mocksearch(monkeypatch):
uploads_by_id.setdefault(calc.upload_id, []).append(calc)
by_archive_id[calc.archive_id] = calc
def upload_exists(upload_hash):
return upload_hash in uploads_by_hash
def upload_exists(self):
return self.upload_hash in uploads_by_hash
def delete_upload(upload_id):
def upload_delete(self):
upload_id = self.upload_id
if upload_id in uploads_by_id:
for calc in uploads_by_id[upload_id]:
del(by_archive_id[calc.archive_id])
......@@ -163,14 +164,15 @@ def mocksearch(monkeypatch):
del(uploads_by_id[upload_id])
del(uploads_by_hash[upload_hash])
def upload_calcs(upload_id):
return uploads_by_id.get(upload_id, [])
@property
def upload_calcs(self):
return uploads_by_id.get(self.upload_id, [])
monkeypatch.setattr('nomad.repo.RepoCalc.persist', persist)
monkeypatch.setattr('nomad.repo.RepoCalc.upload_exists', upload_exists)
monkeypatch.setattr('nomad.repo.RepoCalc.delete_upload', delete_upload)
monkeypatch.setattr('nomad.repo.RepoCalc.upload_calcs', upload_calcs)
monkeypatch.setattr('nomad.repo.RepoCalc.unstage', lambda *args, **kwargs: None)
monkeypatch.setattr('nomad.repo.RepoUpload.exists', upload_exists)
monkeypatch.setattr('nomad.repo.RepoUpload.delete', upload_delete)
monkeypatch.setattr('nomad.repo.RepoUpload.calcs', upload_calcs)
monkeypatch.setattr('nomad.repo.RepoUpload.unstage', lambda *args, **kwargs: None)
return by_archive_id
......
......@@ -29,7 +29,7 @@ from nomad import utils
from nomad.files import UploadFile, ArchiveFile, ArchiveLogFile, RepositoryFile
from nomad.processing import Upload, Calc
from nomad.processing.base import task as task_decorator
from nomad.repo import RepoCalc
from nomad.repo import RepoCalc, RepoUpload
from tests.test_files import example_file, empty_file
......@@ -135,7 +135,7 @@ def test_processing_doublets(uploaded_id, worker, test_user, with_error):
upload = run_processing(uploaded_id, test_user)
assert upload.status == 'SUCCESS'
assert RepoCalc.upload_exists(upload.upload_hash) # pylint: disable=E1101
assert upload.to(RepoUpload).exists()
upload = run_processing(uploaded_id, test_user)
assert upload.status == 'FAILURE'
......
......@@ -74,7 +74,7 @@ def assert_elastic_calc(calc: RepoCalc):
def test_create_elastic_calc(example_elastic_calc: RepoCalc, no_warn):
assert_elastic_calc(example_elastic_calc)
assert RepoCalc.upload_exists(example_elastic_calc.upload_hash)
assert example_elastic_calc.upload.exists()
get_result: RepoCalc = RepoCalc.get(
id='%s/%s' % (example_elastic_calc.upload_hash, example_elastic_calc.calc_hash))
......@@ -121,6 +121,6 @@ def test_staging_elastic_calc(example_elastic_calc: RepoCalc, no_warn):
def test_unstage_elastic_calc(example_elastic_calc: RepoCalc, no_warn):
RepoCalc.unstage(upload_id='test_upload_id', staging=False)
example_elastic_calc.upload.unstage(staging=False)
assert not RepoCalc.get(id='test_upload_hash/test_calc_hash').staging
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment