Commit 27a37a8d authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Publish to repo db is configurable now. Different priorities on celery queues.

parent ec062073
Pipeline #45597 failed with stages
in 4 minutes and 20 seconds
......@@ -55,6 +55,13 @@ your browser.
## Change log
### v0.4.3
- more flexible celery routing
- config via nomad.yml
- repo_db can be disabled
- publishing of calculations with failed processing
- cli for managing running processing tasks
### v0.4.2
- bugfixes regarding the migration
- better migration configurability and reproducibility
......
......@@ -155,6 +155,7 @@ class Upload(Base): # type: ignore
retries = 0
while True:
if config.repository_db.sequential_publish:
publish_filelock = filelock.FileLock(
os.path.join(config.fs.tmp, 'publish.lock'))
logger.info('waiting for filelock')
......@@ -223,5 +224,6 @@ class Upload(Base): # type: ignore
logger.error('Unexpected exception.', exc_info=e)
raise e
finally:
if config.repository_db.sequential_publish:
publish_filelock.release()
logger.info('released filelock')
......@@ -17,6 +17,7 @@ import os
import os.path
import yaml
import warnings
from kombu import Queue
from nomad import utils
......@@ -55,7 +56,11 @@ celery = NomadConfig(
max_memory=64e6, # 64 GB
timeout=1800, # 1/2 h
acks_late=True,
routing=CELERY_QUEUE_ROUTING
routing=CELERY_QUEUE_ROUTING,
task_queues=[
Queue('calcs', routing_key='calcs', queue_arguments={'x-max-priority': 10}),
Queue('uploads', routing_key='uploads', queue_arguments={'x-max-priority': 100})
]
)
fs = NomadConfig(
......@@ -72,6 +77,8 @@ elastic = NomadConfig(
)
repository_db = NomadConfig(
sequential_publish=False,
publish_enabled=True,
host='localhost',
port=5432,
dbname='nomad_fairdi_repo_db',
......
......@@ -54,6 +54,8 @@ app.conf.update(worker_max_memory_per_child=config.celery.max_memory)
if config.celery.routing == config.CELERY_WORKER_ROUTING:
app.conf.update(worker_direct=True)
app.conf.task_queues = config.celery.task_queues
CREATED = 'CREATED'
PENDING = 'PENDING'
RUNNING = 'RUNNING'
......
......@@ -412,37 +412,43 @@ class Upload(Proc):
processing state db.
"""
logger = self.get_logger()
logger.info('started to publish', step='publish')
logger.info('started to publish')
with utils.lnr(logger, 'publish failed'):
upload_with_metadata = self.to_upload_with_metadata()
if config.repository_db.publish_enabled:
with utils.timer(
logger, 'upload added to repository', step='repo',
upload_size=self.upload_files.size):
coe_repo.Upload.publish(upload_with_metadata)
with utils.timer(
logger, 'staged upload files packed', step='pack',
upload_size=self.upload_files.size):
if config.repository_db.publish_enabled:
coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id)
if coe_upload is not None:
for coe_calc in coe_upload.calcs:
calc_metadata = coe_calc.to_calc_with_metadata()
calc_metadata.published = True
calcs = [coe_calc.to_calc_with_metadata() for coe_calc in coe_upload.calcs]
else:
calcs = []
else:
calcs = upload_with_metadata.calcs
with utils.timer(
logger, 'upload metadata updated', step='metadata',
upload_size=self.upload_files.size):
for calc in calcs:
calc.published = True
self.upload_files.metadata.update(
calc_id=calc_metadata.calc_id, updates=calc_metadata.to_dict())
logger.info('metadata updated after publish to coe repo', step='publish')
calc_id=calc.calc_id, updates=calc.to_dict())
with utils.timer(
logger, 'staged upload files packed', step='pack',
upload_size=self.upload_files.size):
self.upload_files.pack()
with utils.timer(
logger, 'index updated', step='index',
upload_size=self.upload_files.size):
coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id)
if coe_upload is not None:
search.publish(
[coe_calc.to_calc_with_metadata() for coe_calc in coe_upload.calcs])
search.publish(calcs)
with utils.timer(
logger, 'staged upload deleted', step='delete staged',
......@@ -572,7 +578,7 @@ class Upload(Proc):
'',
'your data %suploaded %s has completed processing.' % (
self.name if self.name else '', self.upload_time.isoformat()),
'You can review your data on your upload page: %s' % config.services.upload_url
'You can review your data on your upload page: %s' % config.upload_url()
])
try:
infrastructure.send_mail(
......
......@@ -123,7 +123,7 @@ class Entry(Document):
self.upload_time = source.upload_time
self.calc_id = source.calc_id
self.calc_hash = source.calc_hash
self.pid = str(source.pid)
self.pid = None if source.pid is None else str(source.pid)
self.mainfile = source.mainfile
if source.files is None:
......
......@@ -42,6 +42,8 @@ data:
host: "{{ .Values.postgres.host }}"
port: {{ .Values.postgres.port }}
dbname: "{{ .Values.dbname }}"
sequential_public: {{ .Values.postgres.sequential_publish }}
publish_enabled: {{ .Values.postgres.publish_enabled }}
mail:
host: "{{ .Values.mail.host }}"
port: {{ .Values.mail.port }}
......
......@@ -43,7 +43,7 @@ spec:
value: "{{ .Values.worker.console_loglevel }}"
- name: NOMAD_LOGSTASH_LEVEL
value: "{{ .Values.worker.logstash_loglevel }}"
command: ["python", "-m", "celery", "worker", "-A", "nomad.processing", "-Q", "celery,calcs,uploads"]
command: ["python", "-m", "celery", "worker", "-A", "nomad.processing", "-Q", "calcs,uploads"]
livenessProbe:
exec:
command:
......
......@@ -96,6 +96,8 @@ elastic:
port: 19200
postgres:
sequential_publish: false
publish_enabled: true
host: enc-preprocessing-nomad.esc
port: 5432
......
......@@ -96,7 +96,8 @@ def celery_includes():
@pytest.fixture(scope='session')
def celery_config():
return {
'broker_url': config.rabbitmq_url()
'broker_url': config.rabbitmq_url(),
'task_queues': config.celery.task_queues
}
......@@ -517,3 +518,9 @@ def processed(uploaded: Tuple[str, str], test_user: coe_repo.User, proc_infra) -
Provides a processed upload. Upload was uploaded with test_user.
"""
return test_processing.run_processing(uploaded, test_user)
@pytest.fixture(scope='function', params=[False, True])
def with_publish_to_coe_repo(monkeypatch, request):
monkeypatch.setattr('nomad.config.repository_db.publish_enabled', request.param)
return request.param
......@@ -20,10 +20,14 @@ import json
import re
from nomad import utils, infrastructure
from nomad.files import UploadFiles, StagingUploadFiles
from nomad.files import UploadFiles, StagingUploadFiles, PublicUploadFiles
from nomad.processing import Upload, Calc
from nomad.processing.base import task as task_decorator, FAILURE, SUCCESS
from tests.test_search import assert_search_upload
from tests.test_files import assert_upload_files
from tests.test_coe_repo import assert_coe_upload
def test_send_mail(mails, monkeypatch):
monkeypatch.setattr('nomad.config.mail.enabled', True)
......@@ -98,6 +102,28 @@ def test_processing(processed, no_warn, mails, monkeypatch):
assert re.search(r'Processing completed', mails.messages[0].data.decode('utf-8')) is not None
def test_publish(processed: Upload, no_warn, example_user_metadata, monkeypatch, with_publish_to_coe_repo):
processed.metadata = example_user_metadata
n_calcs = processed.total_calcs
additional_keys = ['with_embargo']
if with_publish_to_coe_repo:
additional_keys.append('pid')
processed.publish_upload()
try:
processed.block_until_complete(interval=.01)
except Exception:
pass
assert_coe_upload(processed.upload_id, user_metadata=example_user_metadata)
assert_upload_files(
processed.upload_id, PublicUploadFiles, n_calcs, additional_keys, published=True)
assert_search_upload(processed.upload_id, n_calcs, additional_keys, published=True)
@pytest.mark.timeout(10)
def test_processing_with_warning(proc_infra, test_user, with_warn):
example_file = 'tests/data/proc/examples_with_warning_template.zip'
......
......@@ -225,7 +225,7 @@ class TestUploads:
assert_upload_files(upload_id, files.StagingUploadFiles, n_calcs)
assert_search_upload(upload_id, n_calcs)
def assert_unstage(self, client, test_user_auth, upload_id, proc_infra, metadata={}):
def assert_published(self, client, test_user_auth, upload_id, proc_infra, with_pid=True, metadata={}):
rv = client.get('/uploads/%s' % upload_id, headers=test_user_auth)
upload = self.assert_upload(rv.data)
n_calcs = upload['calcs']['pagination']['total']
......@@ -240,10 +240,13 @@ class TestUploads:
assert upload['current_process'] == 'publish_upload'
assert upload['process_running']
additional_keys = ['with_embargo']
if with_pid:
additional_keys.append('pid')
self.assert_upload_does_not_exist(client, upload_id, test_user_auth)
assert_coe_upload(upload_id, user_metadata=metadata)
assert_upload_files(upload_id, files.PublicUploadFiles, n_calcs, additional_keys=['with_embargo', 'pid'], published=True)
assert_search_upload(upload_id, n_calcs, additional_keys=['with_embargo', 'pid'], published=True)
assert_upload_files(upload_id, files.PublicUploadFiles, n_calcs, additional_keys=additional_keys, published=True)
assert_search_upload(upload_id, n_calcs, additional_keys=additional_keys, published=True)
def assert_upload_does_not_exist(self, client, upload_id: str, test_user_auth):
# poll until publish/delete completed
......@@ -329,11 +332,11 @@ class TestUploads:
yield True
monkeypatch.setattr('nomad.processing.data.Upload.cleanup', old_cleanup)
def test_delete_unstaged(self, client, test_user_auth, proc_infra, no_warn):
def test_delete_published(self, client, test_user_auth, proc_infra, no_warn, with_publish_to_coe_repo):
rv = client.put('/uploads/?local_path=%s' % example_file, headers=test_user_auth)
upload = self.assert_upload(rv.data)
self.assert_processing(client, test_user_auth, upload['upload_id'])
self.assert_unstage(client, test_user_auth, upload['upload_id'], proc_infra)
self.assert_published(client, test_user_auth, upload['upload_id'], proc_infra, with_pid=with_publish_to_coe_repo)
rv = client.delete('/uploads/%s' % upload['upload_id'], headers=test_user_auth)
assert rv.status_code == 404
......@@ -345,11 +348,11 @@ class TestUploads:
assert rv.status_code == 200
self.assert_upload_does_not_exist(client, upload['upload_id'], test_user_auth)
def test_post(self, client, test_user_auth, example_upload, proc_infra, no_warn):
def test_post(self, client, test_user_auth, example_upload, proc_infra, no_warn, with_publish_to_coe_repo):
rv = client.put('/uploads/?local_path=%s' % example_upload, headers=test_user_auth)
upload = self.assert_upload(rv.data)
self.assert_processing(client, test_user_auth, upload['upload_id'])
self.assert_unstage(client, test_user_auth, upload['upload_id'], proc_infra)
self.assert_published(client, test_user_auth, upload['upload_id'], proc_infra, with_pid=with_publish_to_coe_repo)
def test_post_metadata(
self, client, proc_infra, admin_user_auth, test_user_auth, test_user,
......@@ -359,7 +362,7 @@ class TestUploads:
self.assert_processing(client, test_user_auth, upload['upload_id'])
metadata = dict(**example_user_metadata)
metadata['_upload_time'] = datetime.now().isoformat()
self.assert_unstage(client, admin_user_auth, upload['upload_id'], proc_infra, metadata)
self.assert_published(client, admin_user_auth, upload['upload_id'], proc_infra, metadata)
def test_post_metadata_forbidden(self, client, proc_infra, test_user_auth, no_warn):
rv = client.put('/uploads/?local_path=%s' % example_file, headers=test_user_auth)
......
......@@ -132,10 +132,11 @@ def assert_search_upload(upload_id, n_calcs: int, additional_keys: List[str] = [
for hit in search:
hit = hit.to_dict()
for key, value in kwargs.items():
if key == 'published':
assert int(hit.get('pid')) > 0
assert hit.get(key, None) == value
if 'pid' in hit:
assert int(hit.get('pid')) > 0
for key in keys:
assert key in hit
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment