Commit 4a1be7b4 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added publish retry.

parent 2b27e82b
Pipeline #44828 passed with stages
in 27 minutes and 18 seconds
......@@ -128,65 +128,73 @@ class Upload(Base): # type: ignore
logger = utils.get_logger(__name__, upload_id=upload.upload_id)
publish_filelock = filelock.FileLock(
os.path.join(config.fs.tmp, 'publish.lock'))
logger.info('waiting for filelock')
last_error = None
while True:
try:
publish_filelock.acquire(timeout=15 * 60, poll_intervall=1)
logger.info('acquired filelock')
break
except filelock.Timeout:
logger.warning('could not acquire publish lock after generous timeout')
publish_filelock = filelock.FileLock(
os.path.join(config.fs.tmp, 'publish.lock'))
logger.info('waiting for filelock')
while True:
try:
publish_filelock.acquire(timeout=15 * 60, poll_intervall=1)
logger.info('acquired filelock')
break
except filelock.Timeout:
logger.warning('could not acquire publish lock after generous timeout')
repo_db = infrastructure.repository_db
repo_db.expunge_all()
repo_db.begin()
repo_db = infrastructure.repository_db
repo_db.expunge_all()
repo_db.begin()
try:
has_calcs = False
# create upload
coe_upload = Upload(
upload_name=upload.upload_id,
created=upload.upload_time,
user_id=upload.uploader.id,
is_processed=True)
repo_db.add(coe_upload)
# add calculations and metadata
# reuse the cache for the whole transaction to profit from repeating
# star schema entries for users, ds, topics, etc.
context = PublishContext(upload_id=upload.upload_id)
for calc in upload.calcs:
has_calcs = True
coe_calc = Calc(
coe_calc_id=calc.pid,
checksum=calc.calc_id,
upload=coe_upload)
repo_db.add(coe_calc)
coe_calc.apply_calc_with_metadata(calc, context=context)
logger.debug('added calculation, not yet committed', calc_id=coe_calc.calc_id)
logger.info('filled publish transaction')
upload_id = -1
if has_calcs:
repo_db.commit()
logger.info('committed publish transaction')
upload_id = coe_upload.coe_upload_id
else:
# empty upload case
try:
has_calcs = False
# create upload
coe_upload = Upload(
upload_name=upload.upload_id,
created=upload.upload_time,
user_id=upload.uploader.id,
is_processed=True)
repo_db.add(coe_upload)
# add calculations and metadata
# reuse the cache for the whole transaction to profit from repeating
# star schema entries for users, ds, topics, etc.
context = PublishContext(upload_id=upload.upload_id)
for calc in upload.calcs:
has_calcs = True
coe_calc = Calc(
coe_calc_id=calc.pid,
checksum=calc.calc_id,
upload=coe_upload)
repo_db.add(coe_calc)
coe_calc.apply_calc_with_metadata(calc, context=context)
logger.debug(
'added calculation, not yet committed', calc_id=coe_calc.calc_id)
logger.info('filled publish transaction')
upload_id = -1
if has_calcs:
repo_db.commit()
logger.info('committed publish transaction')
upload_id = coe_upload.coe_upload_id
else:
# empty upload case
repo_db.rollback()
return -1
logger.info('added upload')
return upload_id
except Exception as e:
repo_db.rollback()
return -1
logger.info('added upload')
return upload_id
except Exception as e:
logger.error('Unexpected exception.', exc_info=e)
repo_db.rollback()
raise e
finally:
publish_filelock.release()
logger.info('released filelock')
if last_error != str(e):
last_error = str(e)
logger.error('Retry publish after unexpected execption.', exc_info=e)
else:
logger.error('Unexpected exception.', exc_info=e)
raise e
finally:
publish_filelock.release()
logger.info('released filelock')
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment