From 10e0375aec6bbcf6f3f9e2070b659646d6966824 Mon Sep 17 00:00:00 2001
From: Markus Scheidgen <markus.scheidgen@gmail.com>
Date: Thu, 14 Mar 2019 13:39:56 +0100
Subject: [PATCH] Fixed search publish update bug #127. Configurable file
 prefix, worker routing.

---
 nomad/config.py                               |  5 ++-
 nomad/files.py                                |  6 +--
 nomad/infrastructure.py                       | 11 ++++-
 nomad/search.py                               |  6 ++-
 ops/helm/nomad/templates/api-deployment.yaml  | 19 ++++++--
 .../nomad/templates/worker-deployment.yaml    | 20 ++++++---
 ops/helm/nomad/values.yaml                    |  4 +-
 tests/test_api.py                             | 27 ++++--------
 tests/test_files.py                           | 44 ++++++++++++++++++-
 tests/test_migration.py                       |  9 +++-
 tests/test_search.py                          | 18 ++++++--
 11 files changed, 126 insertions(+), 43 deletions(-)

diff --git a/nomad/config.py b/nomad/config.py
index f3af14a347..efa1b034e5 100644
--- a/nomad/config.py
+++ b/nomad/config.py
@@ -44,7 +44,7 @@ Arguments:
         to the same upload to the same worker.
 """
 
-FSConfig = namedtuple('FSConfig', ['tmp', 'staging', 'public'])
+FSConfig = namedtuple('FSConfig', ['tmp', 'staging', 'public', 'prefix_size'])
 """ Used to configure file stystem access. """
 
 RepositoryDBConfig = namedtuple('RepositoryDBConfig', ['host', 'port', 'dbname', 'user', 'password'])
@@ -96,7 +96,8 @@ celery = CeleryConfig(
 fs = FSConfig(
     tmp=os.environ.get('NOMAD_FILES_TMP_DIR', '.volumes/fs/tmp'),
     staging=os.environ.get('NOMAD_FILES_STAGING_DIR', '.volumes/fs/staging'),
-    public=os.environ.get('NOMAD_FILES_PUBLIC_DIR', '.volumes/fs/public')
+    public=os.environ.get('NOMAD_FILES_PUBLIC_DIR', '.volumes/fs/public'),
+    prefix_size=int(os.environ.get('NOMAD_FILES_PREFIX_SIZE', 2))
 )
 elastic = ElasticConfig(
     host=os.environ.get('NOMAD_ELASTIC_HOST', 'localhost'),
diff --git a/nomad/files.py b/nomad/files.py
index c75b87b67c..2229e82fe2 100644
--- a/nomad/files.py
+++ b/nomad/files.py
@@ -84,10 +84,10 @@ class PathObject:
         else:
             self.os_path = os.path.join(bucket, object_id)
 
-        if prefix:
+        if prefix and config.fs.prefix_size > 0:
             segments = list(os.path.split(self.os_path))
             last = segments[-1]
-            segments[-1] = last[:3]
+            segments[-1] = last[:config.fs.prefix_size]
             segments.append(last)
             self.os_path = os.path.join(*segments)
 
@@ -101,7 +101,7 @@ class PathObject:
 
         shutil.rmtree(self.os_path)
 
-        if len(parent_name) == 3 and basename.startswith(parent_name):
+        if len(parent_name) == config.fs.prefix_size and basename.startswith(parent_name):
             try:
                 if not os.listdir(parent_directory):
                     os.rmdir(parent_directory)
diff --git a/nomad/infrastructure.py b/nomad/infrastructure.py
index f79b984e3d..182bc9d72d 100644
--- a/nomad/infrastructure.py
+++ b/nomad/infrastructure.py
@@ -249,7 +249,16 @@ def reset(repo_content_only: bool = False):
     try:
         shutil.rmtree(config.fs.staging, ignore_errors=True)
         shutil.rmtree(config.fs.public, ignore_errors=True)
-        shutil.rmtree(config.fs.tmp, ignore_errors=True)
+        # delete tmp without the folder
+        for sub_path in os.listdir(config.fs.tmp):
+            path = os.path.join(config.fs.tmp, sub_path)
+            try:
+                if os.path.isfile(path):
+                    os.unlink(path)
+                elif os.path.isdir(path): shutil.rmtree(path, ignore_errors=True)
+            except Exception:
+                pass
+
         logger.info('files resetted')
     except Exception as e:
         logger.error('exception deleting files', exc_info=e)
diff --git a/nomad/search.py b/nomad/search.py
index 7a9814e2fa..71e7145be7 100644
--- a/nomad/search.py
+++ b/nomad/search.py
@@ -202,7 +202,11 @@ def publish(calcs: Iterable[datamodel.CalcWithMetadata]) -> None:
         for calc in calcs:
             entry = Entry.from_calc_with_metadata(calc)
             entry.published = True
-            yield entry.to_dict(include_meta=True)
+            entry = entry.to_dict(include_meta=True)
+            source = entry.pop('_source')
+            entry['doc'] = source
+            entry['_op_type'] = 'update'
+            yield entry
 
     elasticsearch.helpers.bulk(infrastructure.elastic_client, elastic_updates())
     refresh()
diff --git a/ops/helm/nomad/templates/api-deployment.yaml b/ops/helm/nomad/templates/api-deployment.yaml
index dba04d347e..ca941bc0d0 100644
--- a/ops/helm/nomad/templates/api-deployment.yaml
+++ b/ops/helm/nomad/templates/api-deployment.yaml
@@ -24,7 +24,9 @@ spec:
         image: "{{ .Values.images.nomad.name }}:{{ .Values.images.nomad.tag }}"
         volumeMounts:
         - mountPath: /app/.volumes/fs/public
-          name: files-volume
+          name: public-volume
+        - mountPath: /app/.volumes/fs/staging
+          name: staging-volume
         - mountPath: /nomad
           name: nomad-volume
         env:
@@ -75,7 +77,7 @@ spec:
         - name: NOMAD_COE_REPO_DB_NAME
           value: "{{ .Values.dbname }}"
         - name: NOMAD_CELERY_ROUTING
-          value: "worker"
+          value: "{{ .Values.worker.routing }}"
         command: ["python", "-m", "gunicorn.app.wsgiapp", "--timeout", "3600", "--log-config", "ops/gunicorn.log.conf", "-w", "{{ .Values.api.worker }}", "-b 0.0.0.0:8000", "nomad.api:app"]
         livenessProbe:
           httpGet:
@@ -95,10 +97,19 @@ spec:
       - name: {{ .Values.images.secret }}
       imagePullPolicy: always
       volumes:
-      - name: files-volume
+      - name: public-volume
         hostPath:
-          path: {{ .Values.volumes.files }}
+          path: {{ .Values.volumes.public }}
           type: Directory
+      - name: staging-volume
+        {{ if eq( .Values.worker.routing "worker")}}
+        emptyDir:
+          medium: 'Memory'
+        {{ else }}
+        hostPath:
+          path: {{ .Vallues.volumes.staging}}
+          type: Directory
+        {{ end }}
       - name: nomad-volume
         hostPath:
           path: {{ .Values.volumes.nomad }}
diff --git a/ops/helm/nomad/templates/worker-deployment.yaml b/ops/helm/nomad/templates/worker-deployment.yaml
index 9e133c39c9..c1e8fcfbcf 100644
--- a/ops/helm/nomad/templates/worker-deployment.yaml
+++ b/ops/helm/nomad/templates/worker-deployment.yaml
@@ -29,7 +29,7 @@ spec:
             memory: "{{ .Values.worker.memrequest }}Gi"
         volumeMounts:
         - mountPath: /app/.volumes/fs/public
-          name: files-volume
+          name: public-volume
         - mountPath: /app/.volumes/fs/staging
           name: staging-volume
         - mountPath: /nomad
@@ -84,7 +84,7 @@ spec:
         - name: NOMAD_CONSOLE_LOGLEVEL
           value: "ERROR"
         - name: NOMAD_CELERY_ROUTING
-          value: "worker"
+          value: "{{ .Values.worker.routing }}"
         command: ["python", "-m", "celery", "worker", "-A", "nomad.processing", "-Q", "celery,calcs,uploads"]
         livenessProbe:
           exec:
@@ -108,14 +108,20 @@ spec:
       - name: {{ .Values.images.secret }}
       imagePullPolicy: always
       volumes:
-      - name: files-volume
+      - name: public-volume
         hostPath:
-          path: {{ .Values.volumes.files }}
+          path: {{ .Values.volumes.public }}
           type: Directory
+      - name: staging-volume
+        {{ if eq( .Values.worker.routing "worker")}}
+        emptyDir:
+          medium: 'Memory'
+        {{ else }}
+        hostPath:
+          path: {{ .Vallues.volumes.staging}}
+          type: Directory
+        {{ end }}
       - name: nomad-volume
         hostPath:
           path: {{ .Values.volumes.nomad }}
           type: Directory
-      - name: staging-volume
-        emptyDir:
-          medium: 'Memory'
diff --git a/ops/helm/nomad/values.yaml b/ops/helm/nomad/values.yaml
index c92d322e3e..d52338382b 100644
--- a/ops/helm/nomad/values.yaml
+++ b/ops/helm/nomad/values.yaml
@@ -47,6 +47,7 @@ worker:
   memlimit: 420
   console_loglevel: INFO
   logstash_loglevel: INFO
+  routing: "queue"
 
 ## Everthing concerning the nomad gui
 gui:
@@ -115,6 +116,7 @@ mail:
 
 ## Everything concerning the data that is used by the service
 volumes:
-  files: /nomad/fairdi/fs/public
+  public: /nomad/fairdi/fs/public
+  staging: /nomad/fairdi/fs/staging
   tmp: /nomad/fairdi/fs/tmp
   nomad: /nomad
diff --git a/tests/test_api.py b/tests/test_api.py
index 3d1286b9a2..3901655a26 100644
--- a/tests/test_api.py
+++ b/tests/test_api.py
@@ -28,7 +28,7 @@ from nomad.processing import Upload, Calc, SUCCESS
 
 from tests.conftest import create_auth_headers, clear_elastic
 from tests.test_files import example_file, example_file_mainfile, example_file_contents
-from tests.test_files import create_staging_upload, create_public_upload
+from tests.test_files import create_staging_upload, create_public_upload, assert_upload_files
 from tests.test_coe_repo import assert_coe_upload
 from tests.test_search import assert_search_upload
 
@@ -220,9 +220,9 @@ class TestUploads:
         assert upload['tasks_status'] == SUCCESS
         assert upload['current_task'] == 'cleanup'
         assert not upload['process_running']
-        upload_files = UploadFiles.get(upload['upload_id'])
-        assert upload_files is not None
+
         calcs = upload['calcs']['results']
+        n_calcs = upload['calcs']['pagination']['total']
         for calc in calcs:
             assert calc['tasks_status'] == SUCCESS
             assert calc['current_task'] == 'archiving'
@@ -235,9 +235,13 @@ class TestUploads:
             upload = self.assert_upload(rv.data)
             assert len(upload['calcs']['results']) == 1
 
+        assert_upload_files(upload_id, files.StagingUploadFiles, n_calcs)
+        assert_search_upload(upload_id, n_calcs)
+
     def assert_unstage(self, client, test_user_auth, upload_id, proc_infra, metadata={}):
         rv = client.get('/uploads/%s' % upload_id, headers=test_user_auth)
         upload = self.assert_upload(rv.data)
+        n_calcs = upload['calcs']['pagination']['total']
 
         rv = client.post(
             '/uploads/%s' % upload_id,
@@ -251,21 +255,8 @@ class TestUploads:
 
         self.assert_upload_does_not_exist(client, upload_id, test_user_auth)
         assert_coe_upload(upload_id, user_metadata=metadata)
-        assert_search_upload(upload_id, published=True)
-
-        upload_files = files.UploadFiles.get(upload_id=upload_id)
-        assert isinstance(upload_files, files.PublicUploadFiles)
-        for calc_metadata in upload_files.metadata:
-            assert calc_metadata.get('published', False)
-            assert 'with_embargo' in calc_metadata
-            assert calc_metadata['with_embargo'] == metadata.get('with_embargo', False)
-            try:
-                with upload_files.raw_file(calc_metadata['mainfile']) as f:
-                    assert f.read() is not None
-            except files.Restricted:
-                assert calc_metadata['with_embargo']
-            else:
-                assert not calc_metadata['with_embargo']
+        assert_upload_files(upload_id, files.PublicUploadFiles, n_calcs, additional_keys=['with_embargo', 'pid'], published=True)
+        assert_search_upload(upload_id, n_calcs, additional_keys=['with_embargo', 'pid'], published=True)
 
     def assert_upload_does_not_exist(self, client, upload_id: str, test_user_auth):
         # poll until publish/delete completed
diff --git a/tests/test_files.py b/tests/test_files.py
index 0355146683..f372a41279 100644
--- a/tests/test_files.py
+++ b/tests/test_files.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from typing import Generator, Any, Dict
+from typing import Generator, Any, Dict, List
 import os
 import os.path
 import shutil
@@ -429,3 +429,45 @@ class TestPublicUploadFiles(UploadFilesContract):
         staging_upload = create_staging_upload(test_upload_id, calc_specs=calc_specs)
         staging_upload.pack()
         return PublicUploadFiles(test_upload_id, is_authorized=lambda: not protected)
+
+
+def assert_upload_files(upload_id: str, cls, n_calcs: int, additional_keys: List[str] = [], **kwargs):
+    """
+    Asserts the files and search index aspect of uploaded data after processing
+    or publishing
+
+    Arguments:
+        upload_id: The id of the upload to assert
+        cls: The :class:`UploadFiles` subclass that this upload should have
+        n_calcs: The number of expected calcs in the upload
+        additional_keys: Keys that each calc metadata should have
+        **kwargs: Key, value pairs that each calc metadata should have
+    """
+    keys = ['calc_id', 'upload_id', 'mainfile', 'calc_hash']
+    upload_files = UploadFiles.get(upload_id, is_authorized=lambda: True)
+    assert upload_files is not None
+    assert isinstance(upload_files, cls)
+
+    upload_metadata = upload_files.metadata
+    assert len(upload_metadata) == n_calcs
+    for calc_metadata in upload_metadata:
+        assert 'calc_hash' in calc_metadata
+        for key in keys:
+            assert key in calc_metadata
+        for additional_key in additional_keys:
+            assert additional_key in calc_metadata
+        for key, value in kwargs.items():
+            assert calc_metadata[key] == value
+
+    upload_files = UploadFiles.get(upload_id)
+    for calc_metadata in upload_metadata:
+        try:
+            with upload_files.raw_file(calc_metadata['mainfile']) as f:
+                f.read()
+            with upload_files.archive_file(calc_metadata['calc_id']) as f:
+                f.read()
+            with upload_files.archive_log_file(calc_metadata['calc_id']) as f:
+                f.read()
+            assert not calc_metadata.get('with_embargo', False) and isinstance(upload_files, PublicUploadFiles)
+        except Restricted:
+            assert calc_metadata.get('with_embargo', False) or isinstance(upload_files, StagingUploadFiles)
diff --git a/tests/test_migration.py b/tests/test_migration.py
index 3f89495118..c9e8a3c010 100644
--- a/tests/test_migration.py
+++ b/tests/test_migration.py
@@ -21,7 +21,7 @@ import glob
 from io import StringIO
 import bravado.exception
 
-from nomad import infrastructure, coe_repo, utils
+from nomad import infrastructure, coe_repo, utils, files
 
 from nomad.migration import NomadCOEMigration, SourceCalc, Package
 from nomad.infrastructure import repository_db_connection
@@ -30,6 +30,8 @@ from tests.conftest import create_postgres_infra, create_auth_headers
 from tests.bravado_flask import FlaskTestHttpClient
 from tests.test_api import create_auth_headers
 import tests.utils as test_utils
+from tests.test_search import assert_search_upload
+from tests.test_files import assert_upload_files
 
 test_source_db_name = 'test_nomad_fairdi_migration_source'
 test_target_db_name = 'test_nomad_fairdi_migration_target'
@@ -306,6 +308,11 @@ def perform_migration_test(migrate_infra, name, test_directory, assertions, kwar
         migrate_infra.two_client.raw.get(
             upload_id=calc_1['upload_id'], path=calc_1['mainfile']).response().result
 
+        assert_search_upload(
+            calc_1['upload_id'], 2, additional_keys=['with_embargo', 'pid'], published=True)
+        assert_upload_files(
+            calc_1['upload_id'], files.PublicUploadFiles, 2, additional_keys=['with_embargo', 'pid'], published=True)
+
 
 def test_skip_on_same_version(migrate_infra, monkeypatch, caplog):
     assertions = dict(migrated=2, source=2, skipped_packages=0)
diff --git a/tests/test_search.py b/tests/test_search.py
index 2d1aafe307..c88637fde0 100644
--- a/tests/test_search.py
+++ b/tests/test_search.py
@@ -12,6 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from typing import List
 from elasticsearch_dsl import Q
 
 from nomad import datamodel, search, processing, parsing, infrastructure, config, coe_repo
@@ -111,15 +112,24 @@ def assert_entry(calc_id):
     assert results[0]['calc_id'] == calc_id
 
 
-def assert_search_upload(upload_id, published: bool = False):
+def assert_search_upload(upload_id, n_calcs: int, additional_keys: List[str] = [], **kwargs):
+    keys = ['calc_id', 'upload_id', 'mainfile', 'calc_hash']
     refresh_index()
     search = Entry.search().query('match_all')[0:10]
+    assert search.count() == n_calcs
     if search.count() > 0:
         for hit in search:
             hit = hit.to_dict()
-            if published:
-                assert int(hit.get('pid')) > 0
-                assert hit.get('published')
+            for key, value in kwargs.items():
+                if key == 'published':
+                    assert int(hit.get('pid')) > 0
+                assert hit.get(key, None) == value
+
+            for key in keys:
+                assert key in hit
+
+            for key in additional_keys:
+                assert key in hit
 
             for coauthor in hit.get('coauthors', []):
                 assert coauthor.get('name', None) is not None
-- 
GitLab