Commit 09798467 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Merge branch 'migration' of gitlab.mpcdf.mpg.de:nomad-lab/nomad-FAIR into migration

parents b1ae303c 16b8ce85
Subproject commit 386c64df4c8e4acba3d3339a5b018f1178c5294f
Subproject commit aa7414dad899c0ff568802e80e030c13f97afe3c
......@@ -42,7 +42,7 @@ This module also provides functionality to add parsed calculation data to the db
:undoc-members:
"""
from typing import Type, Callable
from typing import Type, Callable, Tuple
import datetime
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey
from sqlalchemy.orm import relationship
......@@ -124,13 +124,12 @@ class Upload(Base): # type: ignore
"""
assert upload.uploader is not None
logger = utils.get_logger(__name__, upload_id=upload.upload_id)
repo_db = infrastructure.repository_db
repo_db.begin()
logger = utils.get_logger(__name__, upload_id=upload.upload_id)
def fill_publish_transaction() -> Tuple[Upload, bool]:
has_calcs = False
has_calcs = False
try:
# create upload
coe_upload = Upload(
upload_name=upload.upload_id,
......@@ -154,25 +153,56 @@ class Upload(Base): # type: ignore
coe_calc.apply_calc_with_metadata(calc, context=context)
logger.debug('added calculation, not yet committed', calc_id=coe_calc.calc_id)
# commit
def complete(commit: bool) -> int:
return coe_upload, has_calcs
repo_db.expunge_all()
repo_db.begin()
try:
coe_upload, has_calcs = fill_publish_transaction()
except Exception as e:
logger.error('Unexpected exception.', exc_info=e)
repo_db.rollback()
raise e
# commit
def complete(commit: bool) -> int:
upload_to_commit = coe_upload
try:
if commit:
if has_calcs:
# empty upload case
repo_db.commit()
return coe_upload.coe_upload_id
last_error = None
repeat_count = 0
while True:
try:
repo_db.commit()
break
except Exception as e:
repo_db.rollback()
error = str(e)
if last_error != error:
last_error = error
logger.info(
'repeat publish transaction',
error=error, repeat_count=repeat_count)
repo_db.expunge_all()
repo_db.begin()
upload_to_commit, _ = fill_publish_transaction()
repeat_count += 1
else:
raise e
return upload_to_commit.coe_upload_id
else:
# empty upload case
repo_db.rollback()
return -1
logger.info('added upload')
else:
repo_db.rollback()
logger.info('rolled upload back')
repo_db.expunge_all()
return -1
except Exception as e:
logger.error('Unexpected exception.', exc_info=e)
raise e
return complete
except Exception as e:
logger.error('Unexpected exception.', exc_info=e)
repo_db.rollback()
raise e
return complete
......@@ -182,9 +182,12 @@ def sqlalchemy_repository_db(exists: bool = False, readonly: bool = True, **kwar
params = config.repository_db._asdict()
params.update(**kwargs)
url = 'postgresql://%s:%s@%s:%d/%s' % utils.to_tuple(params, 'user', 'password', 'host', 'port', 'dbname')
# we set a very high isolation level, to prevent conflicts between transactions on the
# We tried to set a very high isolation level, to prevent conflicts between transactions on the
# start-shaped schema, which usually involve read/writes to many tables at once.
engine = create_engine(url, echo=False, isolation_level="SERIALIZABLE")
# Unfortunately, this had week performance, and postgres wasn't even able to serialize on all
# occasions. We are now simply rollingback and retrying on conflicts.
# engine = create_engine(url, echo=False, isolation_level="SERIALIZABLE")
engine = create_engine(url, echo=False)
repository_db_conn = engine.connect()
repository_db = Session(bind=repository_db_conn, autocommit=True)
......
......@@ -244,6 +244,13 @@ class Proc(Document, metaclass=ProcMetaclass):
assert task in tasks, 'task %s must be one of the classes tasks %s' % (task, str(tasks)) # pylint: disable=E1135
if self.current_task is None:
assert task == tasks[0], "process has to start with first task" # pylint: disable=E1136
elif tasks.index(task) <= tasks.index(self.current_task):
# task is repeated, probably the celery task of the process was reschedule
# due to prior worker failure
self.current_task = task
self.get_logger().warning('task is re-run')
self.save()
return True
else:
assert tasks.index(task) == tasks.index(self.current_task) + 1, \
"tasks must be processed in the right order"
......
......@@ -222,9 +222,6 @@ def create_postgres_infra(monkeysession=None, **kwargs):
connection, _ = infrastructure.sqlalchemy_repository_db(**db_args)
assert connection is not None
# we use a transaction around the session to rollback anything that happens within
# test execution
trans = connection.begin()
db = Session(bind=connection, autocommit=True)
old_connection, old_db = None, None
......@@ -241,7 +238,6 @@ def create_postgres_infra(monkeysession=None, **kwargs):
monkeysession.setattr('nomad.infrastructure.repository_db', old_db)
monkeysession.setattr('nomad.config.repository_db', old_config)
trans.rollback()
db.expunge_all()
db.invalidate()
db.close_all()
......
......@@ -111,6 +111,16 @@ def test_add_upload(processed: processing.Upload):
assert_coe_upload(processed.upload_id, upload_with_metadata)
def test_rollback_upload(processed: processing.Upload, postgres):
assert Upload.from_upload_id(processed.upload_id) is None
upload_with_metadata = processed.to_upload_with_metadata()
assert Upload.publish(upload_with_metadata)(False) == -1
assert Upload.from_upload_id(processed.upload_id) is None
Upload.publish(upload_with_metadata)(True)
assert_coe_upload(processed.upload_id, upload_with_metadata)
# def test_large_upload(processed: processing.Upload, example_user_metadata):
# processed.metadata = example_user_metadata
# upload_with_metadata = processed.to_upload_with_metadata()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment