Commit 10e0375a authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Fixed search publish update bug #127. Configurable file prefix, worker routing.

parent fe385656
......@@ -44,7 +44,7 @@ Arguments:
to the same upload to the same worker.
"""
FSConfig = namedtuple('FSConfig', ['tmp', 'staging', 'public'])
FSConfig = namedtuple('FSConfig', ['tmp', 'staging', 'public', 'prefix_size'])
""" Used to configure file stystem access. """
RepositoryDBConfig = namedtuple('RepositoryDBConfig', ['host', 'port', 'dbname', 'user', 'password'])
......@@ -96,7 +96,8 @@ celery = CeleryConfig(
fs = FSConfig(
tmp=os.environ.get('NOMAD_FILES_TMP_DIR', '.volumes/fs/tmp'),
staging=os.environ.get('NOMAD_FILES_STAGING_DIR', '.volumes/fs/staging'),
public=os.environ.get('NOMAD_FILES_PUBLIC_DIR', '.volumes/fs/public')
public=os.environ.get('NOMAD_FILES_PUBLIC_DIR', '.volumes/fs/public'),
prefix_size=int(os.environ.get('NOMAD_FILES_PREFIX_SIZE', 2))
)
elastic = ElasticConfig(
host=os.environ.get('NOMAD_ELASTIC_HOST', 'localhost'),
......
......@@ -84,10 +84,10 @@ class PathObject:
else:
self.os_path = os.path.join(bucket, object_id)
if prefix:
if prefix and config.fs.prefix_size > 0:
segments = list(os.path.split(self.os_path))
last = segments[-1]
segments[-1] = last[:3]
segments[-1] = last[:config.fs.prefix_size]
segments.append(last)
self.os_path = os.path.join(*segments)
......@@ -101,7 +101,7 @@ class PathObject:
shutil.rmtree(self.os_path)
if len(parent_name) == 3 and basename.startswith(parent_name):
if len(parent_name) == config.fs.prefix_size and basename.startswith(parent_name):
try:
if not os.listdir(parent_directory):
os.rmdir(parent_directory)
......
......@@ -249,7 +249,16 @@ def reset(repo_content_only: bool = False):
try:
shutil.rmtree(config.fs.staging, ignore_errors=True)
shutil.rmtree(config.fs.public, ignore_errors=True)
shutil.rmtree(config.fs.tmp, ignore_errors=True)
# delete tmp without the folder
for sub_path in os.listdir(config.fs.tmp):
path = os.path.join(config.fs.tmp, sub_path)
try:
if os.path.isfile(path):
os.unlink(path)
elif os.path.isdir(path): shutil.rmtree(path, ignore_errors=True)
except Exception:
pass
logger.info('files resetted')
except Exception as e:
logger.error('exception deleting files', exc_info=e)
......
......@@ -202,7 +202,11 @@ def publish(calcs: Iterable[datamodel.CalcWithMetadata]) -> None:
for calc in calcs:
entry = Entry.from_calc_with_metadata(calc)
entry.published = True
yield entry.to_dict(include_meta=True)
entry = entry.to_dict(include_meta=True)
source = entry.pop('_source')
entry['doc'] = source
entry['_op_type'] = 'update'
yield entry
elasticsearch.helpers.bulk(infrastructure.elastic_client, elastic_updates())
refresh()
......
......@@ -24,7 +24,9 @@ spec:
image: "{{ .Values.images.nomad.name }}:{{ .Values.images.nomad.tag }}"
volumeMounts:
- mountPath: /app/.volumes/fs/public
name: files-volume
name: public-volume
- mountPath: /app/.volumes/fs/staging
name: staging-volume
- mountPath: /nomad
name: nomad-volume
env:
......@@ -75,7 +77,7 @@ spec:
- name: NOMAD_COE_REPO_DB_NAME
value: "{{ .Values.dbname }}"
- name: NOMAD_CELERY_ROUTING
value: "worker"
value: "{{ .Values.worker.routing }}"
command: ["python", "-m", "gunicorn.app.wsgiapp", "--timeout", "3600", "--log-config", "ops/gunicorn.log.conf", "-w", "{{ .Values.api.worker }}", "-b 0.0.0.0:8000", "nomad.api:app"]
livenessProbe:
httpGet:
......@@ -95,10 +97,19 @@ spec:
- name: {{ .Values.images.secret }}
imagePullPolicy: always
volumes:
- name: files-volume
- name: public-volume
hostPath:
path: {{ .Values.volumes.files }}
path: {{ .Values.volumes.public }}
type: Directory
- name: staging-volume
{{ if eq( .Values.worker.routing "worker")}}
emptyDir:
medium: 'Memory'
{{ else }}
hostPath:
path: {{ .Vallues.volumes.staging}}
type: Directory
{{ end }}
- name: nomad-volume
hostPath:
path: {{ .Values.volumes.nomad }}
......
......@@ -29,7 +29,7 @@ spec:
memory: "{{ .Values.worker.memrequest }}Gi"
volumeMounts:
- mountPath: /app/.volumes/fs/public
name: files-volume
name: public-volume
- mountPath: /app/.volumes/fs/staging
name: staging-volume
- mountPath: /nomad
......@@ -84,7 +84,7 @@ spec:
- name: NOMAD_CONSOLE_LOGLEVEL
value: "ERROR"
- name: NOMAD_CELERY_ROUTING
value: "worker"
value: "{{ .Values.worker.routing }}"
command: ["python", "-m", "celery", "worker", "-A", "nomad.processing", "-Q", "celery,calcs,uploads"]
livenessProbe:
exec:
......@@ -108,14 +108,20 @@ spec:
- name: {{ .Values.images.secret }}
imagePullPolicy: always
volumes:
- name: files-volume
- name: public-volume
hostPath:
path: {{ .Values.volumes.files }}
path: {{ .Values.volumes.public }}
type: Directory
- name: staging-volume
{{ if eq( .Values.worker.routing "worker")}}
emptyDir:
medium: 'Memory'
{{ else }}
hostPath:
path: {{ .Vallues.volumes.staging}}
type: Directory
{{ end }}
- name: nomad-volume
hostPath:
path: {{ .Values.volumes.nomad }}
type: Directory
- name: staging-volume
emptyDir:
medium: 'Memory'
......@@ -47,6 +47,7 @@ worker:
memlimit: 420
console_loglevel: INFO
logstash_loglevel: INFO
routing: "queue"
## Everthing concerning the nomad gui
gui:
......@@ -115,6 +116,7 @@ mail:
## Everything concerning the data that is used by the service
volumes:
files: /nomad/fairdi/fs/public
public: /nomad/fairdi/fs/public
staging: /nomad/fairdi/fs/staging
tmp: /nomad/fairdi/fs/tmp
nomad: /nomad
......@@ -28,7 +28,7 @@ from nomad.processing import Upload, Calc, SUCCESS
from tests.conftest import create_auth_headers, clear_elastic
from tests.test_files import example_file, example_file_mainfile, example_file_contents
from tests.test_files import create_staging_upload, create_public_upload
from tests.test_files import create_staging_upload, create_public_upload, assert_upload_files
from tests.test_coe_repo import assert_coe_upload
from tests.test_search import assert_search_upload
......@@ -220,9 +220,9 @@ class TestUploads:
assert upload['tasks_status'] == SUCCESS
assert upload['current_task'] == 'cleanup'
assert not upload['process_running']
upload_files = UploadFiles.get(upload['upload_id'])
assert upload_files is not None
calcs = upload['calcs']['results']
n_calcs = upload['calcs']['pagination']['total']
for calc in calcs:
assert calc['tasks_status'] == SUCCESS
assert calc['current_task'] == 'archiving'
......@@ -235,9 +235,13 @@ class TestUploads:
upload = self.assert_upload(rv.data)
assert len(upload['calcs']['results']) == 1
assert_upload_files(upload_id, files.StagingUploadFiles, n_calcs)
assert_search_upload(upload_id, n_calcs)
def assert_unstage(self, client, test_user_auth, upload_id, proc_infra, metadata={}):
rv = client.get('/uploads/%s' % upload_id, headers=test_user_auth)
upload = self.assert_upload(rv.data)
n_calcs = upload['calcs']['pagination']['total']
rv = client.post(
'/uploads/%s' % upload_id,
......@@ -251,21 +255,8 @@ class TestUploads:
self.assert_upload_does_not_exist(client, upload_id, test_user_auth)
assert_coe_upload(upload_id, user_metadata=metadata)
assert_search_upload(upload_id, published=True)
upload_files = files.UploadFiles.get(upload_id=upload_id)
assert isinstance(upload_files, files.PublicUploadFiles)
for calc_metadata in upload_files.metadata:
assert calc_metadata.get('published', False)
assert 'with_embargo' in calc_metadata
assert calc_metadata['with_embargo'] == metadata.get('with_embargo', False)
try:
with upload_files.raw_file(calc_metadata['mainfile']) as f:
assert f.read() is not None
except files.Restricted:
assert calc_metadata['with_embargo']
else:
assert not calc_metadata['with_embargo']
assert_upload_files(upload_id, files.PublicUploadFiles, n_calcs, additional_keys=['with_embargo', 'pid'], published=True)
assert_search_upload(upload_id, n_calcs, additional_keys=['with_embargo', 'pid'], published=True)
def assert_upload_does_not_exist(self, client, upload_id: str, test_user_auth):
# poll until publish/delete completed
......
......@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Generator, Any, Dict
from typing import Generator, Any, Dict, List
import os
import os.path
import shutil
......@@ -429,3 +429,45 @@ class TestPublicUploadFiles(UploadFilesContract):
staging_upload = create_staging_upload(test_upload_id, calc_specs=calc_specs)
staging_upload.pack()
return PublicUploadFiles(test_upload_id, is_authorized=lambda: not protected)
def assert_upload_files(upload_id: str, cls, n_calcs: int, additional_keys: List[str] = [], **kwargs):
"""
Asserts the files and search index aspect of uploaded data after processing
or publishing
Arguments:
upload_id: The id of the upload to assert
cls: The :class:`UploadFiles` subclass that this upload should have
n_calcs: The number of expected calcs in the upload
additional_keys: Keys that each calc metadata should have
**kwargs: Key, value pairs that each calc metadata should have
"""
keys = ['calc_id', 'upload_id', 'mainfile', 'calc_hash']
upload_files = UploadFiles.get(upload_id, is_authorized=lambda: True)
assert upload_files is not None
assert isinstance(upload_files, cls)
upload_metadata = upload_files.metadata
assert len(upload_metadata) == n_calcs
for calc_metadata in upload_metadata:
assert 'calc_hash' in calc_metadata
for key in keys:
assert key in calc_metadata
for additional_key in additional_keys:
assert additional_key in calc_metadata
for key, value in kwargs.items():
assert calc_metadata[key] == value
upload_files = UploadFiles.get(upload_id)
for calc_metadata in upload_metadata:
try:
with upload_files.raw_file(calc_metadata['mainfile']) as f:
f.read()
with upload_files.archive_file(calc_metadata['calc_id']) as f:
f.read()
with upload_files.archive_log_file(calc_metadata['calc_id']) as f:
f.read()
assert not calc_metadata.get('with_embargo', False) and isinstance(upload_files, PublicUploadFiles)
except Restricted:
assert calc_metadata.get('with_embargo', False) or isinstance(upload_files, StagingUploadFiles)
......@@ -21,7 +21,7 @@ import glob
from io import StringIO
import bravado.exception
from nomad import infrastructure, coe_repo, utils
from nomad import infrastructure, coe_repo, utils, files
from nomad.migration import NomadCOEMigration, SourceCalc, Package
from nomad.infrastructure import repository_db_connection
......@@ -30,6 +30,8 @@ from tests.conftest import create_postgres_infra, create_auth_headers
from tests.bravado_flask import FlaskTestHttpClient
from tests.test_api import create_auth_headers
import tests.utils as test_utils
from tests.test_search import assert_search_upload
from tests.test_files import assert_upload_files
test_source_db_name = 'test_nomad_fairdi_migration_source'
test_target_db_name = 'test_nomad_fairdi_migration_target'
......@@ -306,6 +308,11 @@ def perform_migration_test(migrate_infra, name, test_directory, assertions, kwar
migrate_infra.two_client.raw.get(
upload_id=calc_1['upload_id'], path=calc_1['mainfile']).response().result
assert_search_upload(
calc_1['upload_id'], 2, additional_keys=['with_embargo', 'pid'], published=True)
assert_upload_files(
calc_1['upload_id'], files.PublicUploadFiles, 2, additional_keys=['with_embargo', 'pid'], published=True)
def test_skip_on_same_version(migrate_infra, monkeypatch, caplog):
assertions = dict(migrated=2, source=2, skipped_packages=0)
......
......@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
from elasticsearch_dsl import Q
from nomad import datamodel, search, processing, parsing, infrastructure, config, coe_repo
......@@ -111,15 +112,24 @@ def assert_entry(calc_id):
assert results[0]['calc_id'] == calc_id
def assert_search_upload(upload_id, published: bool = False):
def assert_search_upload(upload_id, n_calcs: int, additional_keys: List[str] = [], **kwargs):
keys = ['calc_id', 'upload_id', 'mainfile', 'calc_hash']
refresh_index()
search = Entry.search().query('match_all')[0:10]
assert search.count() == n_calcs
if search.count() > 0:
for hit in search:
hit = hit.to_dict()
if published:
assert int(hit.get('pid')) > 0
assert hit.get('published')
for key, value in kwargs.items():
if key == 'published':
assert int(hit.get('pid')) > 0
assert hit.get(key, None) == value
for key in keys:
assert key in hit
for key in additional_keys:
assert key in hit
for coauthor in hit.get('coauthors', []):
assert coauthor.get('name', None) is not None
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment