From f40d3738bb15760f7effab55ac161d2839f583a7 Mon Sep 17 00:00:00 2001
From: Daniel Lehmberg <d.lehmberg@tum.de>
Date: Wed, 17 Jan 2024 12:18:02 +0000
Subject: [PATCH] Integrate statistics logger to logtransfer

---
 docs/howto/oasis/install.md                   |  14 +-
 nomad/cli/admin/run.py                        |  31 ++-
 nomad/config/models.py                        |  21 +-
 nomad/logtransfer.py                          | 182 ++++++++++--------
 nomad/statistics_logger.py                    | 103 ++++++++++
 .../nomad-oasis/configs/nomad.yaml            |   2 +-
 tests/conftest.py                             |  19 +-
 tests/logtransfer/conftest.py                 |  40 +++-
 tests/logtransfer/test_logtransfer.py         |  27 ++-
 tests/logtransfer/test_statistics_logger.py   |  53 +++++
 10 files changed, 369 insertions(+), 123 deletions(-)
 create mode 100644 nomad/statistics_logger.py
 create mode 100644 tests/logtransfer/test_statistics_logger.py

diff --git a/docs/howto/oasis/install.md b/docs/howto/oasis/install.md
index f581cdd1b8..0191e7d4fd 100644
--- a/docs/howto/oasis/install.md
+++ b/docs/howto/oasis/install.md
@@ -10,12 +10,10 @@ central NOMAD installation.
 
 !!! note
 
-    **Register your oasis**
-
-    If you installed (or even just plan to install) a NOMAD Oasis, please take
-    the time to register your Oasis with FAIRmat. This will help us to assist
-    you in an problems and keep you updated on new releases. You can register
-    by filling out this [simple form](https://www.fairmat-nfdi.eu/fairmat/oasis_registration){:target="_blank"}.
+    **Register your Oasis**
+    If you installed (or even just plan to install) a NOMAD Oasis, please
+    [register your Oasis with FAIRmat](https://www.fairmat-nfdi.eu/fairmat/oasis_registration) 
+    and help us to assist you in the future.
 
 ## Quick-start
 
@@ -80,10 +78,10 @@ RAM and CPU for running tools like jupyter, if you opt to use NOMAD NORTH.
 
 ### Sharing data through the logtransfer service and data privacy notice
 
-The NOMAD includes a `logtransfer` service. When enabled this service automatically collects
+NOMAD includes a `logtransfer` service. When enabled this service automatically collects
 and transfers non-personalized log-data to us. Currently, this service is experimental
 and requires opt-in. However, in upcoming versions of NOMAD Oasis, we might change to out-out.
-See the instructions in the configuration below on how to enable/disable the `logtransfer`.
+See the instructions in the configuration below on how to enable/disable `logtransfer`.
 
 The service collects log-data and aggregated statistics, such as the number of users or the
 number of uploaded datasets. In any case this data does not personally identify any users or
diff --git a/nomad/cli/admin/run.py b/nomad/cli/admin/run.py
index d52a0666a1..c190616867 100644
--- a/nomad/cli/admin/run.py
+++ b/nomad/cli/admin/run.py
@@ -54,14 +54,41 @@ def app(with_gui: bool, **kwargs):
 
 
 @run.command(
-    help='Run server that collects and submits logs to the central Nomad instance.'
+    help='Run service to collect and submits logs to the central Nomad instance.'
 )
 def logtransfer():
     config.meta.service = 'logtransfer'
 
     from nomad.logtransfer import start_logtransfer_service
+    from nomad.statistics_logger import start_statistics_logger_process
 
-    start_logtransfer_service()
+    if not config.logstash.enabled:
+        raise RuntimeError(
+            'To run the logtransfer service it is required that '
+            f'logstash formatting is enabled (found {config.logstash.enabled=}).'
+        )
+
+    is_logtransfer_enabled = config.logtransfer.enable_logtransfer
+    is_statistics_logger_enabled = config.logtransfer.enable_statistics
+
+    if not is_logtransfer_enabled and is_statistics_logger_enabled:
+        raise ValueError(
+            f'If {config.logtransfer.enable_statistics=} then the logstash '
+            f'service must also be enabled (Got: {config.logtransfer.enable_logtransfer=})'
+        )
+
+    if config.logtransfer.enable_statistics:
+        statistics_process = start_statistics_logger_process()
+    else:
+        statistics_process = None
+
+    if config.logtransfer.enable_logtransfer:
+        start_logtransfer_service()
+
+    if config.logtransfer.enable_statistics:
+        if statistics_process.is_alive():
+            statistics_process.kill()
+        statistics_process.join()
 
 
 def run_app(
diff --git a/nomad/config/models.py b/nomad/config/models.py
index d18fd71e27..89a59d2b58 100644
--- a/nomad/config/models.py
+++ b/nomad/config/models.py
@@ -501,16 +501,20 @@ class Logstash(NomadSettings):
 
 
 class Logtransfer(NomadSettings):
-    """
-    Configuration of logtransfer server.
+    """Configuration of logtransfer and statistics service.
 
     Note that other configurations are also used within logtransfer
 
     * class Logstash (Configs: enabled, host, level, tcp_port) such that logs are send to the logstash proxy
-    * class Oasis (Config: central_nomad_api_url) address to which so send the logs
-    * class FS (Config: tmp) path where collected logfiles are stored.
+    * class Oasis (Config: central_nomad_api_url) address to which the logs are sent to
+    * class FS (Config: tmp) path where collected logfiles are stored until they are transferred
     """
 
+    # for logtransfer, see nomad/logtransfer.py
+    enable_logtransfer: bool = Field(
+        False,
+        description='If enabled this starts process that frequently generates logs with statistics.',
+    )
     submit_interval: int = Field(
         60 * 60 * 24,
         description='Time interval in seconds after which logs are transferred.',
@@ -531,6 +535,15 @@ class Logtransfer(NomadSettings):
         False,
         description='Whether to keep the server alive if an unexpected exception is raised. Set to True for testing.',
     )
+    # for statistics (which are submitted to logstash/logtransfer), see nomad/statistics.py
+    enable_statistics: bool = Field(
+        True,
+        description='If enabled this starts a process that frequently generates logs with statistics.',
+    )
+    statistics_interval: int = Field(
+        60 * 60 * 24,
+        description='Time interval in seconds in which statistics are logged.',
+    )
 
 
 class Tests(NomadSettings):
diff --git a/nomad/logtransfer.py b/nomad/logtransfer.py
index ced11999ab..c3682d3fb6 100644
--- a/nomad/logtransfer.py
+++ b/nomad/logtransfer.py
@@ -1,3 +1,21 @@
+#
+# Copyright The NOMAD Authors.
+#
+# This file is part of NOMAD. See https://nomad-lab.eu for further info.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
 import logging
 import socketserver
 from logging.handlers import RotatingFileHandler
@@ -11,19 +29,16 @@ from datetime import datetime, timedelta
 
 from nomad import config
 
-# It is important to make sure that logstash itself is disabled (otherwise it can happen that logs from the logtransfer
-# server are also included to the logs
-config.logstash.enabled = False
+# For logtransfer no Python logging is used, but instead prints. The reason is that logging can interfere with
+# Nomad logging and may be send to
+enable_log_print = True
 
 
-# logger only for the logstash server (none of the logs should be transferred to the central Nomad instance)
-server_logger = logging.getLogger(name='logtransfer_server')
-server_logger.setLevel(logging.INFO)
-server_handler = logging.StreamHandler()
-server_handler.setFormatter(
-    logging.Formatter('%(levelname)s: %(asctime)s %(message)s', '%Y-%m-%d %H:%M:%S')
-)
-server_logger.addHandler(server_handler)
+def log_print(message, level='INFO'):
+    if enable_log_print:
+        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+        formatted_message = f'{level}: {current_time} {message}'
+        print(formatted_message)
 
 
 def get_log_filepath():
@@ -41,8 +56,9 @@ def get_all_rotated_logfiles():
 
     where [index] is an integer (the higher the index the older the logs in the file).
 
-    In contrast, for the active logfile, where logs may still be written, [index] is empty.
-    The list is sorted such that the oldest file with the highest index appears first.
+    In contrast, for the active logfile, where logs may still be written,
+    [index] is empty. The list is sorted such that the oldest file with the
+    highest index appears first.
     """
     rotated_logfiles = []
 
@@ -66,8 +82,9 @@ def clear_logfiles():
     """
     Perform a rollover on the current logfile and then remove all rotated logfiles.
 
-    Note that if the logger is still used than this operation is unsafe and potentially breaks the logger. This
-    function is mainly intended for testing.
+    Note that if the logger is still used than this operation is unsafe and
+    potentially breaks the logger. This function is mainly intended for
+    testing.
     """
 
     try:
@@ -94,19 +111,21 @@ def is_empty_logfile():
 class NotifyRotatingFileHandler(RotatingFileHandler):
     """Adapted `RotatingFileHandler` used within in the `Logtransfer` class.
 
-    This adaptation of RotatingFileHandler does not perform a file rollover when `maxBytes` is reached. Instead, it
-    notifies the Logtransfer class thread. This is because data is being sent to the central Nomad, and performing a
-    file rollover at the same time could result in file conflicts and race conditions.
+    This adaptation of RotatingFileHandler does not perform a file rollover
+    when `maxBytes` is reached. Instead, it notifies the Logtransfer class
+    thread. This is because data is being sent to the central Nomad, and
+    performing a file rollover at the same time could result in file conflicts
+    and race conditions.
 
-    The Lock is needed to protect `shouldRollover()` and `doRollover()` as they are not thread-safe (see attribute
-    `stream` in the handler)."""
+    The Lock is needed to protect `shouldRollover()` and `doRollover()` as they
+    are not thread-safe (see attribute `stream` in the handler)."""
 
     rollover_lock = threading.Lock()
     notify_rollover_event: threading.Event = None
 
     def set_event_attribute(self, event: threading.Event):
-        """Set the event to the class on which this handler can notify the thread to safely perform the file
-        rollover."""
+        """Set the event to the class on which this handler can notify the
+        thread to safely perform the file rollover."""
         self.notify_rollover_event = event
 
     def is_event_attribute_set(self):
@@ -115,15 +134,17 @@ class NotifyRotatingFileHandler(RotatingFileHandler):
     def emit(self, record) -> None:
         """Emit a record.
 
-        Output the record to the file, if `maxBytes` (see `RotatingFileHandler`) is reached a
-        `threading.Event` is set (for the event see `Logtransfer.transfer_logs`.
+        Output the record to the file, if `maxBytes` (see
+        `RotatingFileHandler`) is reached a `threading.Event` is set (for the
+        event see `Logtransfer.transfer_logs`.
         """
         try:
-            with self.rollover_lock:  # shouldRollover and doRollover are not thread-safe
-                # mypy does not recognize the method in the superclass
+            # shouldRollover and doRollover are not thread-safe (needs lock)
+            with self.rollover_lock:
+                # Ignore since mypy does not recognize the method in the superclass
                 if self.shouldRollover(record):  # type: ignore
-                    # print(f'notify main logstransfer thread self.stream.tell()={self.stream.tell()}')
-                    # notify a thread which is responsible for the rollover (and other tasks)
+                    # notify a thread which is responsible for the rollover
+                    # (and other tasks)
                     self.notify_rollover_event.set()
 
                 # write to file
@@ -136,8 +157,8 @@ class Logtransfer:
     """
     Responsible for rotating the logfile and transfering the logs to central Nomad.
 
-    The main method is `transfer_logs` which is intended to run in a thread (concurrently to the logstash proxy server
-    that receives and writes new logs).
+    The main method is `transfer_logs` which is intended to run in a thread
+    (concurrently to the logstash proxy server that receives and writes new logs).
     """
 
     def __init__(self, rotating_file: NotifyRotatingFileHandler):
@@ -149,7 +170,8 @@ class Logtransfer:
 
         if self.submit_interval <= 0 and not isinstance(self.submit_interval, int):
             raise TypeError(
-                f'submit_interval must be a positive integer value. Got {self.submit_interval} '
+                'submit_interval must be a positive integer value. '
+                f'Got {self.submit_interval} '
                 f'with type {type(self.submit_interval)}'
             )
 
@@ -166,8 +188,9 @@ class Logtransfer:
         all_rotated_logfiles = get_all_rotated_logfiles()
 
         if len(all_rotated_logfiles) > 0:
-            server_logger.info(
-                f'collected files: {[os.path.basename(f) for f in all_rotated_logfiles]} in '
+            log_print(
+                f'collected files: '
+                f'{[os.path.basename(f) for f in all_rotated_logfiles]} in '
                 f'directory {config.fs.tmp}'
             )
 
@@ -177,7 +200,7 @@ class Logtransfer:
 
                 yield file_content, path
         else:
-            server_logger.info('No logfiles to submit.')
+            log_print('No logfiles to submit.')
 
     def _submit_logfile_to_central(self, logfile_content: bytes):
         central_post_federation_url = f'{self.central_nomad_api_url}federation/logs/'
@@ -195,12 +218,12 @@ class Logtransfer:
 
             if is_successful:
                 submitted_bytes = ret_request.json()['filesize']
-                server_logger.info(
+                log_print(
                     f'Successfully submitted logfile ({submitted_bytes} bytes) with HTTP status code '
                     f'{ret_request.status_code} to central Oasis at {ret_request.url}.'
                 )
             else:
-                server_logger.info(
+                log_print(
                     f'Submission of logfiles to {central_post_federation_url} failed with HTTP '
                     f'status code {ret_request.status_code}. \n '
                     'logfiles will be included again in next submission.'
@@ -208,7 +231,7 @@ class Logtransfer:
 
         except requests.exceptions.ConnectionError:
             is_successful = False
-            server_logger.info(
+            log_print(
                 f'HTTP connection to {central_post_federation_url} could not be established'
             )
 
@@ -222,8 +245,8 @@ class Logtransfer:
 
     def transfer_logs(self) -> None:
         # (mypy does not recognize the superclass attribute)
-        server_logger.info(
-            f'Start logtransfer thread with '  # type: ignore
+        log_print(
+            'Start logtransfer thread with '  # type: ignore
             f'{self.submit_interval=} sec | '
             f'{self.rotating_file.maxBytes=} bytes'
         )
@@ -244,34 +267,36 @@ class Logtransfer:
             except Exception as e:
                 if not self.raise_unexpected_exceptions:
                     # do not kill the thread and hope it will fix for the next iteration
-                    server_logger.info(f'unexpected exception was raised \n{e}')
+                    log_print(f'unexpected exception was raised \n{e}')
                 else:
                     raise e
 
             next_submission = datetime.now() + timedelta(seconds=self.submit_interval)
             next_submission = next_submission.strftime('%Y-%m-%dT%H:%M:%S')  # type: ignore
-            server_logger.info(f'The next planned submission is at {next_submission}.')
+            log_print(f'The next planned submission is at {next_submission}.')
 
             self.reached_max_bytes_event.clear()
             self.reached_max_bytes_event.wait(self.submit_interval)
             is_notified = self.reached_max_bytes_event.is_set()
 
-            server_logger.info(f'Thread is waking up (is_notified={is_notified}).')
+            log_print(f'Thread is waking up (is_notified={is_notified}).')
 
             with self.rotating_file.rollover_lock:
                 # mypy does not recognize the superclass attribute
                 if self.rotating_file.stream.tell() != 0:  # type: ignore
-                    server_logger.info(
+                    log_print(
                         'Perform rollover on logfile. stream position in '  # type: ignore
                         f'bytes={self.rotating_file.stream.tell()}'
                     )
 
-                    # Note: this should be the only place where doRollover() is called. A call outside this thread
-                    # causes a race condition on the methods where the logfiles are submitted and removed
+                    # Note: this should be the only place where doRollover() is called.
+                    # A call outside this thread causes a race condition on the methods
+                    # where the logfiles are submitted and removed
                     self.rotating_file.doRollover()
                 else:
-                    server_logger.info(
-                        'No rollover on logfile performed because rotating_file.stream.tell() is at position 0.'
+                    log_print(
+                        'No rollover on logfile performed because '
+                        'rotating_file.stream.tell() is at position 0.'
                     )
 
 
@@ -283,10 +308,12 @@ class LogstashTCPHandler(socketserver.StreamRequestHandler):
             logline = self.rfile.readline()
 
             if logline == b'':
-                # "empty byte" signals that client closed connection, nothing more to read
+                # "empty byte" signals that client closed connection, nothing
+                # more to read
                 break
 
-            # remove newlines from the right, because a newline is also internally included in the logstash_logger
+            # remove newlines from the right, because a newline is also
+            # internally included in the logstash_logger
             logline = logline.rstrip()
 
             if len(logline) > 0:
@@ -296,26 +323,18 @@ class LogstashTCPHandler(socketserver.StreamRequestHandler):
         self.server.rotating_file_handler.flush()
 
 
-class TCPServerReuseAddress(socketserver.TCPServer):
+class TCPServerReuseAddress(socketserver.ThreadingTCPServer):
     """
     This class overwrites default class parameters of TCPServer.
-    This is recommended at https://stackoverflow.com/a/42147927)
+    This is recommended at https://stackoverflow.com/a/42147927
     """
 
-    # TODO: consider also if it is better to inherit from ThreadingTCPServer because (from Python docu):
-    #  https://docs.python.org/3/library/socketserver.html#socketserver.ThreadingTCPServer
-    #  On the other hand, if you are building an HTTP server where all data is stored externally (for instance, in the
-    #  file system), a synchronous class will essentially render the service “deaf” while one request is being handled
-    #  – which may be for a very long time if a client is slow to receive all the data it has requested. Here a
-    #  threading or forking server prevents this.
-    #  -- BUT NOTE: this may not work for Windows, because the ThreadingMixIn is only supported for POSIX systems
-
     # this allows to quickly set up the server again after it may have terminated
     allow_reuse_port = True
     allow_reuse_address = True
-    request_queue_size = (
-        20  # default is 5, set to higher value here to avoid ConnectionRefused errors
-    )
+    # default value of queue_size is 5, set to higher value here to avoid
+    # ConnectionRefused errors
+    request_queue_size = 20
 
     def __init__(self, logger, *args, **kwargs):
         super(TCPServerReuseAddress, self).__init__(*args, **kwargs)
@@ -330,37 +349,40 @@ def _start_logstash_proxy_server(logger, host, port):
             port = int(port)
         except ValueError:
             raise TypeError(
-                f'Server port must be of type integer (or parseable string). Got port={port} of type {type(port)}'
+                'Server port must be of type integer (or parseable string). '
+                f'Got port={port} of type {type(port)}'
             )
 
     logstash_proxy_server = TCPServerReuseAddress(
         logger, (host, port), LogstashTCPHandler, bind_and_activate=True
     )
-    logstash_proxy_server.timeout = (
-        30  # is closed after timeout period with no requests being received
-    )
+
+    # is closed after timeout period with no requests being received
+    logstash_proxy_server.timeout = 30
 
     with logstash_proxy_server as server:
-        server_logger.info(f'Start logstash proxy server on host={host} port={port}.')
+        log_print(f'Start logstash proxy server on host={host} port={port}.')
 
         try:
-            # NOTE: don't use the serve_forever() method as it does not account for the timeout
+            # NOTE: don't use the serve_forever() method as it does not account
+            # for the timeout
             while True:
                 server.handle_request()
         except Exception as e:
-            server_logger.info('logstash proxy server shutdown unexpectedly')
+            log_print('logstash proxy server shutdown unexpectedly')
             raise e
 
 
 def _initialize_logger_and_handler():
-    server_logger.info(
+    log_print(
         f'Initialize logger and handler. Location of intermediate logfiles before '
         f'submission:\n{get_log_filepath()}'
     )
     logstash_logger = logging.getLogger(name='logstash')
-    # Note that backupCount must be a positive number (otherwise no file rollovers is performed).
-    # the number should for performance reasons not be too large. If backupCount is reached then the oldest log file
-    # is deleted. See https://stackoverflow.com/a/56954614
+    # Note that backupCount must be a positive number (otherwise no file
+    # rollovers is performed). the number should for performance reasons not be
+    # too large. If backupCount is reached then the oldest log file is deleted.
+    # See https://stackoverflow.com/a/56954614
     rotating_logfile_handler = NotifyRotatingFileHandler(
         filename=get_log_filepath(),
         mode='a',
@@ -379,13 +401,11 @@ def start_logtransfer_service(host=None, port=None):
     """
     Start logtransfer service.
 
-    The two parameters host and port are mainly necessary for testing. If left None, then the values from the
-    Nomad config are used.
+    The two parameters 'host' and 'port' are mainly necessary for testing. If
+    left None, Nomad config is taken.
     """
-    # for appropriate arguments see attributes in
-    # nomad.config.logstash and nomad.config.logstash_proxy and config.oasis.central_nomad_api_url
 
-    server_logger.info(f'Start logtransfer service')
+    log_print('Start logtransfer service')
 
     logstash_logger, rotating_logfile_handler = _initialize_logger_and_handler()
 
@@ -396,9 +416,9 @@ def start_logtransfer_service(host=None, port=None):
     assert rotating_logfile_handler.is_event_attribute_set()
 
     d = threading.Thread(target=transfer_to_central.transfer_logs, name='logtransfer')
-    d.setDaemon(
-        True
-    )  # kill thread immediately if main thread (running the server) terminates
+
+    # kill thread immediately if main thread (running the server) terminates
+    d.setDaemon(True)
     d.start()
 
     if host is None:
diff --git a/nomad/statistics_logger.py b/nomad/statistics_logger.py
new file mode 100644
index 0000000000..caa53f86c7
--- /dev/null
+++ b/nomad/statistics_logger.py
@@ -0,0 +1,103 @@
+#
+# Copyright The NOMAD Authors.
+#
+# This file is part of NOMAD. See https://nomad-lab.eu for further info.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+
+import requests
+import threading
+import time
+from multiprocessing import Process
+
+from nomad import config
+from nomad import utils
+
+
+# The logger (typically with LogstashHandler enabled in the root logger) to
+# submit the logs.
+statistics_logstash_log = utils.get_logger(__name__)
+
+
+class StatisticsLogger:
+    """
+    Service that frequently collects statistics and logs them in logstash format.
+
+    Currently, the statistics are collected from the local FastAPI enpoint /info.
+    The statistics logs are eventually transferred to
+    * logstash directly (on central NOMAD)
+    * logtransfer (on a NOMAD Oasis, see logtransfer.py).
+
+    In the future it is also possible to collect statistics from different sources
+    or perform aggregations.
+    """
+
+    def __init__(self):
+        self.collect_interval = config.logtransfer.statistics_interval
+        self.raise_exceptions = config.logtransfer.raise_unexpected_exceptions
+        self.stop_thread_event = threading.Event()
+
+    def _collect_statistics(self):
+        """Collect statistics from the FastAPI /info endpoint."""
+        info_response = requests.get(
+            f'http://localhost:{config.services.api_port}/api/v1/info'
+        )
+        statistics = info_response.json()['statistics']
+        return statistics
+
+    def run_statistics_service(self):
+        """Main loop of the statistics service."""
+
+        while True:
+            try:
+                statistics = self._collect_statistics()
+
+                for key, value in statistics.items():
+                    statistics_logstash_log.info('statistics', key=key, value=value)
+
+            except Exception as e:
+                if self.raise_exceptions:
+                    raise e
+
+            # The event can be set to terminate the loop. This is mainly
+            # required for testing.
+            import sys
+
+            sys.stdout.flush()
+            time.sleep(self.collect_interval)
+
+
+def start_statistics_logger(fork_mode='process'):
+    """
+    Initializes and starts a new process of the statistics logger.
+    The returned object is the running process or thread.
+    """
+
+    statistics_logger = StatisticsLogger()
+
+    if fork_mode == 'process':
+        process = Process(target=statistics_logger.run_statistics_service)
+        process.start()
+        return process
+    elif fork_mode == 'thread':
+        statistics_thread = threading.Thread(
+            target=statistics_logger.run_statistics_service
+        )
+
+        statistics_thread.setDaemon(True)
+        statistics_thread.start()
+        return statistics_thread
+    else:
+        raise ValueError(f'{fork_mode=} is not valid (select "process" or "thread").')
diff --git a/ops/docker-compose/nomad-oasis/configs/nomad.yaml b/ops/docker-compose/nomad-oasis/configs/nomad.yaml
index db2189e1bf..6418fd5e12 100644
--- a/ops/docker-compose/nomad-oasis/configs/nomad.yaml
+++ b/ops/docker-compose/nomad-oasis/configs/nomad.yaml
@@ -15,7 +15,7 @@ meta:
   maintainer_email: 'me@my-oasis.org'
 
 logstash:
-  enable: false
+  enabled: false
 
 mongo:
     db_name: nomad_oasis_v1
diff --git a/tests/conftest.py b/tests/conftest.py
index c1ce1a9e50..d9e19c8f53 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -24,6 +24,7 @@ import time
 from datetime import datetime, timezone
 import shutil
 import os
+import socket
 import elasticsearch.exceptions
 import json
 import logging
@@ -1330,14 +1331,24 @@ def central_logstash_mock():
     class TCPServerStore(socketserver.TCPServer):
         received_content = []
 
+        def set_request_timeout(self, timeout):
+            # Note acts timeout is on the LogstashCentralHandler socket
+            # this seems to behave differently to self.timeout and is
+            # particularly useful to interrupt blocking "handle_request()"
+            self.RequestHandlerClass.timeout = timeout
+
     class LogstashCentralHandler(socketserver.StreamRequestHandler):
         def handle(self):
-            # print("OPENING SOCKET TO LogstashCentralHandler")
             while True:
-                line = self.rfile.readline()
-                # print(f"received line {line}")
+                try:
+                    line = self.rfile.readline()
+                    # print(f'received {line=}')
+                except socket.timeout:
+                    # print(f'server timed out')
+                    line = b''  # if time out, close connection
+
                 if line == b'':
-                    # print("received closing for LogstashCentralHandler")
+                    # print(f'received closing for LogstashCentralHandler')
                     break
 
                 line = line.strip()
diff --git a/tests/logtransfer/conftest.py b/tests/logtransfer/conftest.py
index 17440c01f3..7bcd4b6a0a 100644
--- a/tests/logtransfer/conftest.py
+++ b/tests/logtransfer/conftest.py
@@ -13,10 +13,10 @@ logging.getLogger('logtransfer_server').setLevel(level=logging.CRITICAL)
 
 
 def start_logtransfer_service():
-    # Change address to such that api_v1 fixture can replace the address to the testserver
+    # Change address such that api_v1 fixture can replace the address to the testserver
     config.oasis.central_nomad_deployment_url = config.client.url + '/v1'
 
-    # Use a slightly different port to the logstash mock (which uses 5000 by default)
+    # Use a different port to the logstash mock (which uses 5000 by default)
     host, port = config.logstash.host, int(config.logstash.tcp_port) + 1
 
     server_process = Process(
@@ -30,8 +30,8 @@ def start_logtransfer_service():
 
     client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
-    # NOTE: it is in the responsibility of the tests to timeout if no connection can be established
-    #   (use @pytest.mark.timeout( SECONDS ))
+    # NOTE: it is in the responsibility of the tests to timeout if no connection can be
+    #  established (use @pytest.mark.timeout( SECONDS ))
     while True:
         time.sleep(0.001)
         try:
@@ -53,8 +53,21 @@ def kill_logtransfer_service(server_process, client_socket):
 
 
 @pytest.fixture(scope='function')
-def logtransfer_rollover_time(monkeysession):
+def logstash_enabled(monkeysession):
+    monkeysession.setattr('nomad.config.logstash.enabled', True)
+
+    # make sure the root logger has the logstash handler
+    from nomad.utils.structlogging import add_logstash_handler, root
+
+    add_logstash_handler(root)
+
+
+@pytest.fixture(scope='function')
+def logtransfer_rollover_time(api_v1, monkeysession):
+    monkeysession.setattr('nomad.config.logtransfer.enable_statistics', False)
     monkeysession.setattr('nomad.config.logtransfer.submit_interval', 0.1)
+    monkeysession.setattr('nomad.config.logtransfer.raise_unexpected_exceptions', True)
+
     server_process, client_socket = start_logtransfer_service()
 
     yield client_socket
@@ -63,9 +76,11 @@ def logtransfer_rollover_time(monkeysession):
 
 
 @pytest.fixture(scope='function')
-def logtransfer_rollover_space(monkeysession):
+def logtransfer_rollover_space(api_v1, monkeysession):
+    monkeysession.setattr('nomad.config.logtransfer.enable_statistics', False)
     monkeysession.setattr('nomad.config.logtransfer.max_bytes', 500)
     monkeysession.setattr('nomad.config.logtransfer.backup_count', 100)
+    monkeysession.setattr('nomad.config.logtransfer.raise_unexpected_exceptions', True)
 
     server_process, client_socket = start_logtransfer_service()
 
@@ -75,12 +90,19 @@ def logtransfer_rollover_space(monkeysession):
 
 
 @pytest.fixture(scope='function')
-def logtransfer_no_rollover(monkeysession):
-    monkeysession.setattr('nomad.config.logtransfer.max_bytes', 1e100)
-    monkeysession.setattr('nomad.config.logtransfer.submit_interval', 1e100)
+def logtransfer_no_rollover(api_v1, monkeysession):
+    monkeysession.setattr('nomad.config.logtransfer.enable_statistics', False)
+    monkeysession.setattr('nomad.config.logtransfer.max_bytes', 1e8)
+    monkeysession.setattr('nomad.config.logtransfer.submit_interval', 1e8)
+    monkeysession.setattr('nomad.config.logtransfer.raise_unexpected_exceptions', True)
 
     server_process, client_socket = start_logtransfer_service()
 
     yield client_socket
 
     kill_logtransfer_service(server_process, client_socket)
+
+
+@pytest.fixture(scope='function')
+def collect_statistics(monkeysession):
+    monkeysession.setattr('nomad.config.logtransfer.statistics_interval', 1)
diff --git a/tests/logtransfer/test_logtransfer.py b/tests/logtransfer/test_logtransfer.py
index 6fd6b81655..1ef1beb769 100644
--- a/tests/logtransfer/test_logtransfer.py
+++ b/tests/logtransfer/test_logtransfer.py
@@ -9,6 +9,8 @@ import logging
 from nomad.utils.structlogging import LogstashFormatter
 from nomad import config
 
+logtransfer.enable_log_print = False
+
 
 def standard_test_log_record(msg='testmsg') -> bytes:
     record = LogRecord(
@@ -102,9 +104,9 @@ def test_logstash_submit_multiple_logs(
 
     is_successful = False
     while not is_successful:
-        # The logfile may also contain a subset of the messages that were sent at this point
-        # leading to an AssertionError. This loop frequently checks if at some all asserts are True and
-        # then terminates
+        # The logfile may also contain a subset of the messages that were sent
+        # at this point leading to an AssertionError. This loop frequently
+        # checks if at some all asserts are True and then terminates
         if not logtransfer.is_empty_logfile():
             with open(logtransfer.get_log_filepath(), 'rb') as f:
                 file_content = f.read()
@@ -130,7 +132,8 @@ def test_logstash_rollover_time(logtransfer_rollover_time):
     ret1 = logtransfer_rollover_time.send(log1)
     assert ret1 > 0
 
-    # this sleep is needed to trigger the rollover after some time interval (check the fixture for the actual setting)
+    # this sleep is needed to trigger the rollover after some time interval
+    # (check the fixture for the actual setting)
     time.sleep(0.15)
 
     log2 = standard_test_log_record(msg=testlog[1]) + b'\n'
@@ -139,8 +142,8 @@ def test_logstash_rollover_time(logtransfer_rollover_time):
 
     is_successful_assert = False
     while not is_successful_assert:
-        # finish the test as soon as all asserts are passed (this can vary depending on when the server has processed
-        # the log records)
+        # finish the test as soon as all asserts are passed (this can vary
+        # depending on when the server has processed the log records)
         try:
             rolled_log_filepath = logtransfer.get_log_filepath() + '.1'
             assert not logtransfer.is_empty_logfile()
@@ -149,7 +152,6 @@ def test_logstash_rollover_time(logtransfer_rollover_time):
             with open(logtransfer.get_log_filepath(), 'rb') as f:
                 # check active logfile
                 file_content = f.read()
-                # print(f"file_content:\n{file_content}")
                 assert testlog[0].encode() not in file_content
                 assert testlog[1].encode() in file_content
 
@@ -207,9 +209,8 @@ def test_logstash_rollover_space(logtransfer_rollover_space):
         if len(logtransfer.get_all_rotated_logfiles()) > 2:
             break
         else:
-            time.sleep(
-                0.01
-            )  # slow down logging a bit, so that server can perform rollovers
+            # slow down logging a bit, so that server can perform rollovers
+            time.sleep(0.01)
 
     expected_logs = b''.join(expected_logs)
     assert_equal_content_logfiles(expected_logs)
@@ -242,14 +243,11 @@ def test_logtransfer_to_federation_backend(
     with central_logstash_mock as server:
         # server is closed after context manager exits
         ret = logtransfer_rollover_time.send(logstash_line)
-        # print(f"sent logstash line {logstash_line} with ret={ret}")
         server.handle_request()
 
     # only one message was sent
     assert len(central_logstash_mock.received_content) == 1
 
-    # print(f'central_logstash_mock.received_content[0]={central_logstash_mock.received_content[0]}')
-
     # transform to json (also validates that both are valid)
     sent_json = json.loads(logstash_line.rstrip())
     received_json = json.loads(central_logstash_mock.received_content[0])
@@ -258,7 +256,8 @@ def test_logtransfer_to_federation_backend(
     assert 'ip_address' not in sent_json.keys()
     assert 'ip_address' in received_json.keys()
 
-    # remove key and convert back to string to compare that both logs have identical key/values
+    # remove key and convert back to string to compare that both logs have
+    # identical key/values
     received_json.pop('ip_address')
     assert json.dumps(sent_json, sort_keys=True) == json.dumps(
         received_json, sort_keys=True
diff --git a/tests/logtransfer/test_statistics_logger.py b/tests/logtransfer/test_statistics_logger.py
new file mode 100644
index 0000000000..aeef7e8f30
--- /dev/null
+++ b/tests/logtransfer/test_statistics_logger.py
@@ -0,0 +1,53 @@
+import json
+import time
+
+import pytest
+import requests
+
+import nomad.statistics_logger as stats
+
+
+@pytest.mark.timeout(3)
+def test_statistics_service(
+    logstash_enabled, central_logstash_mock, collect_statistics, api_v1, elastic_infra
+):
+    # Note: the "elastic" fixture is required here so that info/ request at the
+    # FastAPI backend actually collects statistics.
+
+    statistics_process = stats.start_statistics_logger(fork_mode='process')
+    # statistics_thread = stats.start_statistics_logger(fork_mode='thread')
+
+    with central_logstash_mock as server:
+        # timeout must be smaller than the number set in fixture collect_statistics
+        timeout = 0.5
+        server.set_request_timeout(timeout)
+
+        # server closes when context manager is left
+        server.handle_request()
+
+    while statistics_process.is_alive():
+        statistics_process.kill()
+        time.sleep(0.01)
+
+    # assert that all current statistics end up in logtransfer file
+    collected_statistics = json.loads(requests.get('info').content)['statistics']
+
+    received_content = []
+    for item in central_logstash_mock.received_content:
+        received_content += [json.loads(item.decode())]
+
+    for expected_key, expected_value in collected_statistics.items():
+        for log_entry in received_content:
+            try:
+                actual_key = log_entry['nomad.statistics_logger.key']
+                actual_value = log_entry['nomad.statistics_logger.value']
+                if actual_key == expected_key and actual_value == expected_value:
+                    # success: found key-value - break inner loop
+                    break
+
+            except KeyError:
+                # there may be other logs that do not contain the statistics keys
+                pass
+        else:
+            # executes if inner-loop was *not* terminated with 'break'
+            assert False, f'Could not find {expected_key=}, {expected_value=} pair.'
-- 
GitLab