From f50e9cc705fa11b9660d87b5143b8d6570a10931 Mon Sep 17 00:00:00 2001 From: Markus Scheidgen <markus.scheidgen@gmail.com> Date: Wed, 29 Aug 2018 14:17:28 +0200 Subject: [PATCH] Transitioned to structlog. --- infrastructure/nomadxt/docker-compose.yml | 6 ++ nomad/api.py | 26 ++++--- nomad/config.py | 6 +- nomad/files.py | 18 ++--- nomad/normalizing/normalizer.py | 13 ++-- nomad/parsing.py | 6 +- nomad/processing/app.py | 21 +++-- nomad/processing/handler.py | 9 ++- nomad/processing/handlerdaemon.py | 3 - nomad/processing/state.py | 2 +- nomad/processing/tasks.py | 38 +++++---- nomad/search.py | 8 +- nomad/utils.py | 95 ++++++++++++----------- tests/test_processing.py | 2 + 14 files changed, 136 insertions(+), 117 deletions(-) diff --git a/infrastructure/nomadxt/docker-compose.yml b/infrastructure/nomadxt/docker-compose.yml index b98b8dede1..233e507013 100644 --- a/infrastructure/nomadxt/docker-compose.yml +++ b/infrastructure/nomadxt/docker-compose.yml @@ -72,6 +72,7 @@ services: # the search engine elastic: image: docker.elastic.co/elasticsearch/elasticsearch:6.3.2 + container_name: nomadxt_elastic volumes: - nomadxt_elastic:/usr/share/elasticsearch/data ports: @@ -107,6 +108,7 @@ services: worker: restart: always build: ../../ + container_name: nomadxt_worker environment: - NOMAD_MINIO_PORT=9000 - NOMAD_MINIO_HOST=minio @@ -129,6 +131,7 @@ services: handler: restart: always build: ../../ + container_name: nomadxt_handler environment: - NOMAD_MINIO_PORT=9000 - NOMAD_MINIO_HOST=minio @@ -149,6 +152,7 @@ services: api: restart: always build: ../../ + container_name: nomadxt_api environment: - NOMAD_MINIO_PORT=9000 - NOMAD_MINIO_HOST=minio @@ -174,6 +178,7 @@ services: # nomad gui gui: build: ../../gui/ + container_name: nomadxt_gui ports: - ${GUI_HOST_PORT}:8080 volumes: @@ -184,6 +189,7 @@ services: proxy: restart: always image: nginx:1.13.9-alpine + container_name: nomadxt_proxy links: - elk - gui diff --git a/nomad/api.py b/nomad/api.py index 7b1705c14f..f1f982c97f 100644 --- a/nomad/api.py +++ b/nomad/api.py @@ -4,12 +4,11 @@ from flask_restful import Resource, Api, abort from datetime import datetime import mongoengine.errors from flask_cors import CORS -import logging from elasticsearch.exceptions import NotFoundError from nomad import users, files, search, config +from nomad.utils import lnr, get_logger from nomad.processing import UploadProc -from nomad.utils import get_logger base_path = config.services.api_base_path @@ -117,10 +116,11 @@ class Upload(Resource): return _update_and_render(upload), 200 def delete(self, upload_id): + logger = get_logger(__name__, upload_id=upload_id, endpoint='upload', action='delete') + try: upload = users.Upload.objects(id=upload_id).first() except mongoengine.errors.ValidationError: - print('###') abort(400, message='%s is not a valid upload id.' % upload_id) if upload is None: @@ -130,21 +130,20 @@ class Upload(Resource): if not (proc.ready() or is_stale or proc.current_task_name == 'uploading'): abort(400, message='%s has not finished processing.' % upload_id) - logger = get_logger(__name__, upload_id=upload_id) - with logger.lnr_error('Delete upload file'): + with lnr(logger, 'Delete upload file'): try: files.Upload(upload.upload_id).delete() except KeyError: - logger.error('Upload exist, but file does not exist.') + logger.error('Upload exist, but file does not exist') if proc.upload_hash is not None: - with logger.lnr_error('Deleting archives.'): + with lnr(logger, 'Deleting archives'): files.delete_archives(proc.upload_hash) - with logger.lnr_error('Deleting indexed calcs.'): + with lnr(logger, 'Deleting indexed calcs'): search.Calc.delete_all(upload_id=proc.upload_id) - with logger.lnr_error('Deleting user upload.'): + with lnr(logger, 'Deleting user upload'): upload.delete() return _render(upload, proc, is_stale), 200 @@ -172,6 +171,8 @@ class RepoCalc(Resource): class RepoCalcs(Resource): def get(self): + logger = get_logger(__name__, endpoint='repo', action='get') + # TODO use argparse? bad request reponse an bad params, pagination as decorator page = int(request.args.get('page', 1)) per_page = int(request.args.get('per_page', 10)) @@ -190,7 +191,7 @@ class RepoCalcs(Resource): try: results = search.Calc.search(body=body) except Exception as e: - get_logger(__name__).error('Could not execute repo calcs get.', exc_info=e) + logger.error('Could not execute repo calcs get', exc_info=e) abort(500, message=str(e)) return { @@ -205,8 +206,10 @@ class RepoCalcs(Resource): @app.route('%s/archive/<string:upload_hash>/<string:calc_hash>' % base_path, methods=['GET']) def get_calc(upload_hash, calc_hash): + logger = get_logger(__name__, endpoint='archive', action='get', upload_hash=upload_hash, calc_hash=calc_hash) + archive_id = '%s/%s' % (upload_hash, calc_hash) - logger = get_logger(__name__, archive_id=archive_id) + try: url = _external_objects_url(files.archive_url(archive_id)) return redirect(url, 302) @@ -224,5 +227,4 @@ api.add_resource(RepoCalc, '%s/repo/<string:upload_hash>/<string:calc_hash>' % b if __name__ == '__main__': - logging.basicConfig(level=logging.DEBUG) app.run(debug=True, port=8000) diff --git a/nomad/config.py b/nomad/config.py index bf78a0187a..73a2eec928 100644 --- a/nomad/config.py +++ b/nomad/config.py @@ -18,6 +18,7 @@ This module is used to store all configuration values. It makes use of """ import os +import logging from collections import namedtuple FilesConfig = namedtuple( @@ -39,7 +40,7 @@ ElasticConfig = namedtuple('ElasticConfig', ['host', 'calc_index']) MongoConfig = namedtuple('MongoConfig', ['host', 'users_db']) """ Used to configure mongo db. """ -LogstashConfig = namedtuple('LogstashConfig', ['enabled', 'host', 'tcp_port']) +LogstashConfig = namedtuple('LogstashConfig', ['enabled', 'host', 'tcp_port', 'level']) """ Used to configure and enable/disable the ELK based centralized logging. """ NomadServicesConfig = namedtuple('NomadServicesConfig', ['api_base_path', 'objects_host', 'objects_port', 'objects_base_path']) @@ -87,7 +88,8 @@ mongo = MongoConfig( logstash = LogstashConfig( enabled=True, host=os.environ.get('NOMAD_LOGSTASH_HOST', 'localhost'), - tcp_port=int(os.environ.get('NOMAD_LOGSTASH_TCPPORT', '5000')) + tcp_port=int(os.environ.get('NOMAD_LOGSTASH_TCPPORT', '5000')), + level=int(os.environ.get('NOMAD_LOGSTASH_LEVEL', logging.DEBUG)) ) services = NomadServicesConfig( api_base_path=os.environ.get('NOMAD_API_BASE_PATH', '/nomadxt/api'), diff --git a/nomad/files.py b/nomad/files.py index 6fefb9120c..f502201f78 100644 --- a/nomad/files.py +++ b/nomad/files.py @@ -46,7 +46,6 @@ from zipfile import ZipFile, BadZipFile import shutil from minio import Minio import minio.error -import logging import hashlib import base64 from contextlib import contextmanager @@ -54,8 +53,9 @@ import gzip import io import nomad.config as config +from nomad.utils import get_logger -logger = logging.getLogger(__name__) +logger = get_logger(__name__) _client = None @@ -69,7 +69,7 @@ if _client is None and 'sphinx' not in sys.modules: def ensure_bucket(name): try: _client.make_bucket(bucket_name=name) - logger.info("Created uploads bucket with name %s." % name) + logger.info('Created uploads bucket', bucket=name) except minio.error.BucketAlreadyOwnedByYou: pass @@ -117,15 +117,16 @@ def upload_put_handler(func: Callable[[str], None]) -> Callable[[], None]: event_name = event_record['eventName'] if event_name == 's3:ObjectCreated:Put': upload_id = event_record['s3']['object']['key'] - logger.debug('Received bucket upload event of for upload %s.' % upload_id) + logger.debug('Received bucket upload event', upload_id=upload_id) yield upload_id break # only one per record, pls else: - logger.debug('Unhanled bucket event %s.' % event_name) + logger.debug('Unhanled bucket event', bucket_event_name=event_name) except KeyError: logger.warning( - 'Unhandled bucket event due to unexprected event format: %s' % - event_record) + 'Unhandled bucket event due to unexprected event format', + bucket_event_record=event_record) + def wrapper(*args, **kwargs) -> None: logger.info('Start listening to uploads notifications.') @@ -141,8 +142,7 @@ def upload_put_handler(func: Callable[[str], None]) -> Callable[[], None]: func(upload_id) except StopIteration: # Using StopIteration to allow clients to stop handling of events. - logging.debug( - 'Handling of upload notifications was stopped via StopIteration.') + logger.debug('Handling of upload notifications was stopped via StopIteration.') return except Exception: pass diff --git a/nomad/normalizing/normalizer.py b/nomad/normalizing/normalizer.py index 478c00cdb1..b63be25e27 100644 --- a/nomad/normalizing/normalizer.py +++ b/nomad/normalizing/normalizer.py @@ -1,10 +1,10 @@ from abc import ABCMeta, abstractmethod from typing import List, Dict, Any -import logging from nomad.parsing import AbstractParserBackend +from nomad.utils import get_logger -logger = logging.getLogger(__name__) +logger = get_logger(__name__) class Normalizer(metaclass=ABCMeta): @@ -61,11 +61,10 @@ class SystemBasedNormalizer(Normalizer, metaclass=ABCMeta): self._normalize_system(g_index) except KeyError as e: logger.error( - 'Could not read all data for %s. Skip section %s: %s' % - (self.__class__.__name__, 'section_system/%d' % g_index, e)) + 'Could not read all input data', normalizer=self.__class__.__name__, + section='section_system', g_index=g_index, key_error=str(e)) except Exception as e: logger.error( - 'Unexpected error during %s. Skip section %s.' % - (self.__class__.__name__, 'section_system/%d' % g_index), - exc_info=e) + 'Unexpected error during normalizing', normalizer=self.__class__.__name__, + section='section_system', g_index=g_index, exc_info=e) raise e diff --git a/nomad/parsing.py b/nomad/parsing.py index 9a3e3465cf..87a373789e 100644 --- a/nomad/parsing.py +++ b/nomad/parsing.py @@ -48,14 +48,14 @@ from io import StringIO import json import re import importlib -import logging from nomadcore.local_backend import LocalBackend as LegacyLocalBackend from nomadcore.local_backend import Section, Results from nomad.dependencies import dependencies_dict as dependencies, PythonGit +from nomad.utils import get_logger -logger = logging.getLogger(__name__) +logger = get_logger(__name__) ParserStatus = Tuple[str, List[str]] @@ -367,7 +367,7 @@ class LocalBackend(LegacyParserBackend): self._errors = parserErrors def pwarn(self, msg): - logger.debug('Warning in parser: %s' % msg) + logger.debug('Warning in parser', parse_msg=msg) def _parse_context_uri(self, context_uri: str) -> Tuple[str, int]: """ diff --git a/nomad/processing/app.py b/nomad/processing/app.py index 0831e0c72b..b4f04d807f 100644 --- a/nomad/processing/app.py +++ b/nomad/processing/app.py @@ -27,17 +27,16 @@ import nomad.patch # pylint: disable=unused-import logger = get_task_logger(__name__.replace('nomad', 'nomad-xt')) logger.setLevel(logging.DEBUG) -if config.logstash.enabled: - def initialize_logstash(logger=None, loglevel=logging.INFO, **kwargs): - handler = logstash.TCPLogstashHandler( - config.logstash.host, config.logstash.tcp_port, - tags=['celery'], message_type='celery', version=1) - handler.setLevel(loglevel) - logger.addHandler(handler) - return logger - - after_setup_task_logger.connect(initialize_logstash) - after_setup_logger.connect(initialize_logstash) +# if config.logstash.enabled: +# def initialize_logstash(logger=None, loglevel=logging.DEBUG, **kwargs): +# handler = logstash.TCPLogstashHandler( +# config.logstash.host, config.logstash.tcp_port, message_type='celery', version=1) +# handler.setLevel(loglevel) +# logger.addHandler(handler) +# return logger + +# after_setup_task_logger.connect(initialize_logstash) +# after_setup_logger.connect(initialize_logstash) app = Celery('nomad.processing', backend=config.celery.backend_url, broker=config.celery.broker_url) diff --git a/nomad/processing/handler.py b/nomad/processing/handler.py index eed2a4c75a..805865307e 100644 --- a/nomad/processing/handler.py +++ b/nomad/processing/handler.py @@ -19,6 +19,7 @@ from nomad import files, utils, users from nomad.processing.tasks import extracting_task, cleanup_task, parse_all_task from nomad.processing.state import UploadProc +from nomad.utils import get_logger, lnr def start_processing(upload_id, proc: UploadProc=None) -> UploadProc: @@ -63,10 +64,10 @@ def handle_uploads(quit=False): @files.upload_put_handler def handle_upload_put(received_upload_id: str): - logger = utils.get_logger(__name__, upload_id=received_upload_id) + logger = get_logger(__name__, upload_id=received_upload_id) logger.debug('Initiate upload processing') try: - with logger.lnr_error('Could not load'): + with lnr(logger, 'Could not load'): upload = users.Upload.objects(id=received_upload_id).first() if upload is None: logger.error('Upload does not exist') @@ -76,11 +77,11 @@ def handle_uploads(quit=False): logger.warn('Ignore upload notification, since file is already uploaded') raise StopIteration - with logger.lnr_error('Save upload time'): + with lnr(logger, 'Save upload time'): upload.upload_time = datetime.now() upload.save() - with logger.lnr_error('Start processing'): + with lnr(logger, 'Start processing'): proc = start_processing(received_upload_id, proc=upload.proc) assert proc.is_started upload.proc = proc diff --git a/nomad/processing/handlerdaemon.py b/nomad/processing/handlerdaemon.py index f3c0e239fd..f617a200a5 100644 --- a/nomad/processing/handlerdaemon.py +++ b/nomad/processing/handlerdaemon.py @@ -1,7 +1,4 @@ -import logging - from nomad.processing.handler import handle_uploads if __name__ == '__main__': - logging.basicConfig(level=logging.DEBUG) handle_uploads() diff --git a/nomad/processing/state.py b/nomad/processing/state.py index 82939df3d1..46c21ecb88 100644 --- a/nomad/processing/state.py +++ b/nomad/processing/state.py @@ -243,7 +243,7 @@ class UploadProc(ProcPipeline): __name__, upload_id=self.upload_id, current_task_name=self.current_task_name) - logger.error('Celery task raised exception.', exc_info=result) + logger.error('Celery task raised exception', exc_info=result) else: self.update(result) might_have_changed = True diff --git a/nomad/processing/tasks.py b/nomad/processing/tasks.py index cb16751f8a..f1ea03c13a 100644 --- a/nomad/processing/tasks.py +++ b/nomad/processing/tasks.py @@ -35,16 +35,17 @@ def extracting_task(task: Task, proc: UploadProc) -> UploadProc: try: upload = files.Upload(proc.upload_id) upload.open() + logger.debug('Opened upload') except KeyError as e: - logger.debug('Process request for non existing upload') + logger.info('Process request for non existing upload') proc.fail(e) return proc except files.UploadError as e: - logger.debug('Could not open upload, %s' % e) + logger.info('Could not open upload', error=str(e)) proc.fail(e) return proc except Exception as e: - logger.error('Unknown exception %s', exc_info=e) + logger.error('Unknown exception', exc_info=e) proc.fail(e) return proc @@ -53,11 +54,12 @@ def extracting_task(task: Task, proc: UploadProc) -> UploadProc: try: proc.upload_hash = upload.hash() except files.UploadError as e: - logger.error('Could not create upload hash', exc_info=e) + logger.error('Could not create upload hash', error=str(e)) proc.fail(e) return proc if search.Calc.upload_exists(proc.upload_hash): + logger.info('Upload hash doublet') proc.fail('The same file was already uploaded and processed.') return proc @@ -71,7 +73,7 @@ def extracting_task(task: Task, proc: UploadProc) -> UploadProc: proc.calc_procs.append(calc_proc) except files.UploadError as e: - logger.warn('Could find parse specs in open upload', exc_info=e) + logger.warn('Could find parse specs in open upload', error=str(e)) proc.fail(e) return proc @@ -131,18 +133,22 @@ def parse_task(self, proc: CalcProc, upload_proc: UploadProc) -> CalcProc: assert upload_proc.upload_hash is not None upload_hash, parser, mainfile = upload_proc.upload_hash, proc.parser_name, proc.mainfile - logger = utils.get_logger(__name__, upload_hash=upload_hash, mainfile=mainfile) + logger = utils.get_logger( + __name__, task=self.name, + upload_id=upload_proc.upload_id, upload_hash=upload_hash, mainfile=mainfile) # parsing proc.continue_with(parser) - logger.debug('Start parsing with %s' % parser) try: parser_backend = parser_dict[parser].run(proc.tmp_mainfile) if parser_backend.status[0] != 'ParseSuccess': - proc.fail(parser_backend.status[1]) + error = parser_backend.status[1] + logger.debug('Failed parsing', parser=parser, error=error) + proc.fail(error) return proc + logger.debug('Completed successfully', parser=parser) except Exception as e: - logger.warn('Exception wile parsing', exc_info=e) + logger.warn('Exception wile parsing', parser=parser, exc_info=e) proc.fail(e) return proc _report_progress(self, proc) @@ -151,12 +157,14 @@ def parse_task(self, proc: CalcProc, upload_proc: UploadProc) -> CalcProc: for normalizer in normalizers: normalizer_name = normalizer.__name__ proc.continue_with(normalizer_name) - logger.debug('Start %s.' % normalizer) try: normalizer(parser_backend).normalize() if parser_backend.status[0] != 'ParseSuccess': - proc.fail(parser_backend.status[1]) + error = parser_backend.status[1] + logger.info('Failed run of %s: %s' % (normalizer, error)) + proc.fail(error) return proc + logger.debug('Completed %s successfully' % normalizer) except Exception as e: logger.warn('Exception wile normalizing with %s' % normalizer, exc_info=e) proc.fail(e) @@ -173,8 +181,9 @@ def parse_task(self, proc: CalcProc, upload_proc: UploadProc) -> CalcProc: upload_id=upload_proc.upload_id, mainfile=mainfile, upload_time=datetime.now()) + logger.debug('Indexed successfully') except Exception as e: - logger.error('Could not index', exc_info=e) + logger.error('Failed to index', exc_info=e) proc.fail(e) return proc _report_progress(self, proc) @@ -185,12 +194,13 @@ def parse_task(self, proc: CalcProc, upload_proc: UploadProc) -> CalcProc: try: with files.write_archive_json(archive_id) as out: parser_backend.write_json(out, pretty=True) + logger.debug('Indexed successfully') except Exception as e: - logger.error('Could not write archive', exc_info=e) + logger.error('Failed to archive', exc_info=e) proc.fail(e) return proc - logger.debug('Completed') + logger.debug('Completed processing') proc.success() return proc diff --git a/nomad/search.py b/nomad/search.py index f4da082c64..05ae6f1dd5 100644 --- a/nomad/search.py +++ b/nomad/search.py @@ -21,13 +21,13 @@ of search relevant properties. import elasticsearch.exceptions from elasticsearch_dsl import Document, Date, Keyword, Search, connections -import logging import sys from nomad import config from nomad.parsing import LocalBackend +from nomad.utils import get_logger -logger = logging.getLogger(__name__) +logger = get_logger(__name__) # ensure elastic connection if 'sphinx' not in sys.modules: @@ -113,8 +113,8 @@ class Calc(Document): value = backend.get_value(property, 0) except KeyError: logger.warning( - 'No value for property %s could be extracted for calc %s/%s.' % - (property, upload_hash, calc_hash)) + 'Missing property value', property=property, upload_id=upload_id, + upload_hash=upload_hash, calc_hash=calc_hash) continue setattr(calc, property, value) diff --git a/nomad/utils.py b/nomad/utils.py index 05b231f2b9..5a752d7083 100644 --- a/nomad/utils.py +++ b/nomad/utils.py @@ -1,10 +1,39 @@ from typing import Union, IO, cast import hashlib import base64 -import json import logging +import structlog +from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper, JSONRenderer +from structlog.stdlib import LoggerFactory +import logstash from contextlib import contextmanager +from nomad import config + + +_logging_is_configured = False +if not _logging_is_configured: + # basic config + logging.basicConfig(level=logging.WARNING) + + # configure logstash + if config.logstash.enabled: + logstash_handler = logstash.TCPLogstashHandler( + config.logstash.host, + config.logstash.tcp_port, version=1) + logstash_handler.setLevel(config.logstash.level) + logging.getLogger().addHandler(logstash_handler) + + # configure structlog + log_processors = [ + StackInfoRenderer(), + format_exc_info, + TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False), + JSONRenderer(sort_keys=True) + ] + structlog.configure(processors=log_processors, logger_factory=LoggerFactory()) + _logging_is_configured = True + def hash(obj: Union[IO, str]) -> str: """ First 28 character of an URL safe base 64 encoded sha512 digest. """ @@ -18,54 +47,22 @@ def hash(obj: Union[IO, str]) -> str: return base64.b64encode(hash.digest(), altchars=b'-_')[0:28].decode('utf-8') -class DataLogger(): - def __init__(self, logger, **kwargs): - self._logger = logger - self.data = kwargs - - def _prepare_msg(self, base_msg): - return '%s %s' % (base_msg, self._format_data()) - - def _format_data(self, ): - return json.dumps(self.data) - - def debug(self, msg, *args, **kwargs): - self._logger.debug(self._prepare_msg(msg), *args, **kwargs) - - def info(self, msg, *args, **kwargs): - self._logger.info(self._prepare_msg(msg), *args, **kwargs) - - def warn(self, msg, *args, **kwargs): - self._logger.warn(self._prepare_msg(msg), *args, **kwargs) - - def error(self, msg, *args, **kwargs): - self._logger.error(self._prepare_msg(msg), *args, **kwargs) - - def crit(self, msg, *args, **kwargs): - self._logger.crit(self._prepare_msg(msg), *args, **kwargs) - - @contextmanager - def lnr_error(self, msg, *args, **kwargs): - """ - Will *log and raise* with an error and the given message and args/kwargs - on all exceptions. - """ - try: - yield - except Exception as e: - self._logger.error( - self._prepare_msg('Exception while: %s' % msg), - exc_info=e, *args, **kwargs) - raise e - - -def get_logger(name, *args, **kwargs): +def get_logger(name, **kwargs): """ - Returns a :class:`DataLogger` with the data given as kwargs. - A data logger can be used like any other logger, but will add the data to all - log output. Allowing more structured logging. + Returns a structlog logger that is already attached with a logstash handler. + User additional *kwargs* to pre-bind some values. """ - return DataLogger(logging.getLogger(name), *args, **kwargs) + logger = structlog.get_logger(**kwargs) + return logger + + +@contextmanager +def lnr(logger, event, **kwargs): + try: + yield + except Exception as e: + logger.error(event, exc_info=e, **kwargs) + raise e class DataObject(dict): @@ -88,3 +85,7 @@ class DataObject(dict): def update(self, dct): return super().update({key: value for key, value in dct.items() if value is not None}) + +if __name__ == '__main__': + logger = get_logger(__name__, test='value') + logger.info('Hi', add='cool') \ No newline at end of file diff --git a/tests/test_processing.py b/tests/test_processing.py index 5f4371af78..8763ee87fd 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -136,6 +136,8 @@ def test_process_non_existing(celery_session_worker): @pytest.mark.parametrize('task', ['extracting', 'parse_all', 'cleanup', 'parsers/vasp']) def test_task_failure(monkeypatch, uploaded_id, celery_session_worker, task): + import logging + logging.getLogger().setLevel(level=logging.CRITICAL) original_continue_with = ProcPipeline.continue_with def continue_with(self: ProcPipeline, current_task): -- GitLab