Commit f50e9cc7 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Transitioned to structlog.

parent 107958a2
......@@ -72,6 +72,7 @@ services:
# the search engine
elastic:
image: docker.elastic.co/elasticsearch/elasticsearch:6.3.2
container_name: nomadxt_elastic
volumes:
- nomadxt_elastic:/usr/share/elasticsearch/data
ports:
......@@ -107,6 +108,7 @@ services:
worker:
restart: always
build: ../../
container_name: nomadxt_worker
environment:
- NOMAD_MINIO_PORT=9000
- NOMAD_MINIO_HOST=minio
......@@ -129,6 +131,7 @@ services:
handler:
restart: always
build: ../../
container_name: nomadxt_handler
environment:
- NOMAD_MINIO_PORT=9000
- NOMAD_MINIO_HOST=minio
......@@ -149,6 +152,7 @@ services:
api:
restart: always
build: ../../
container_name: nomadxt_api
environment:
- NOMAD_MINIO_PORT=9000
- NOMAD_MINIO_HOST=minio
......@@ -174,6 +178,7 @@ services:
# nomad gui
gui:
build: ../../gui/
container_name: nomadxt_gui
ports:
- ${GUI_HOST_PORT}:8080
volumes:
......@@ -184,6 +189,7 @@ services:
proxy:
restart: always
image: nginx:1.13.9-alpine
container_name: nomadxt_proxy
links:
- elk
- gui
......
......@@ -4,12 +4,11 @@ from flask_restful import Resource, Api, abort
from datetime import datetime
import mongoengine.errors
from flask_cors import CORS
import logging
from elasticsearch.exceptions import NotFoundError
from nomad import users, files, search, config
from nomad.utils import lnr, get_logger
from nomad.processing import UploadProc
from nomad.utils import get_logger
base_path = config.services.api_base_path
......@@ -117,10 +116,11 @@ class Upload(Resource):
return _update_and_render(upload), 200
def delete(self, upload_id):
logger = get_logger(__name__, upload_id=upload_id, endpoint='upload', action='delete')
try:
upload = users.Upload.objects(id=upload_id).first()
except mongoengine.errors.ValidationError:
print('###')
abort(400, message='%s is not a valid upload id.' % upload_id)
if upload is None:
......@@ -130,21 +130,20 @@ class Upload(Resource):
if not (proc.ready() or is_stale or proc.current_task_name == 'uploading'):
abort(400, message='%s has not finished processing.' % upload_id)
logger = get_logger(__name__, upload_id=upload_id)
with logger.lnr_error('Delete upload file'):
with lnr(logger, 'Delete upload file'):
try:
files.Upload(upload.upload_id).delete()
except KeyError:
logger.error('Upload exist, but file does not exist.')
logger.error('Upload exist, but file does not exist')
if proc.upload_hash is not None:
with logger.lnr_error('Deleting archives.'):
with lnr(logger, 'Deleting archives'):
files.delete_archives(proc.upload_hash)
with logger.lnr_error('Deleting indexed calcs.'):
with lnr(logger, 'Deleting indexed calcs'):
search.Calc.delete_all(upload_id=proc.upload_id)
with logger.lnr_error('Deleting user upload.'):
with lnr(logger, 'Deleting user upload'):
upload.delete()
return _render(upload, proc, is_stale), 200
......@@ -172,6 +171,8 @@ class RepoCalc(Resource):
class RepoCalcs(Resource):
def get(self):
logger = get_logger(__name__, endpoint='repo', action='get')
# TODO use argparse? bad request reponse an bad params, pagination as decorator
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 10))
......@@ -190,7 +191,7 @@ class RepoCalcs(Resource):
try:
results = search.Calc.search(body=body)
except Exception as e:
get_logger(__name__).error('Could not execute repo calcs get.', exc_info=e)
logger.error('Could not execute repo calcs get', exc_info=e)
abort(500, message=str(e))
return {
......@@ -205,8 +206,10 @@ class RepoCalcs(Resource):
@app.route('%s/archive/<string:upload_hash>/<string:calc_hash>' % base_path, methods=['GET'])
def get_calc(upload_hash, calc_hash):
logger = get_logger(__name__, endpoint='archive', action='get', upload_hash=upload_hash, calc_hash=calc_hash)
archive_id = '%s/%s' % (upload_hash, calc_hash)
logger = get_logger(__name__, archive_id=archive_id)
try:
url = _external_objects_url(files.archive_url(archive_id))
return redirect(url, 302)
......@@ -224,5 +227,4 @@ api.add_resource(RepoCalc, '%s/repo/<string:upload_hash>/<string:calc_hash>' % b
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
app.run(debug=True, port=8000)
......@@ -18,6 +18,7 @@ This module is used to store all configuration values. It makes use of
"""
import os
import logging
from collections import namedtuple
FilesConfig = namedtuple(
......@@ -39,7 +40,7 @@ ElasticConfig = namedtuple('ElasticConfig', ['host', 'calc_index'])
MongoConfig = namedtuple('MongoConfig', ['host', 'users_db'])
""" Used to configure mongo db. """
LogstashConfig = namedtuple('LogstashConfig', ['enabled', 'host', 'tcp_port'])
LogstashConfig = namedtuple('LogstashConfig', ['enabled', 'host', 'tcp_port', 'level'])
""" Used to configure and enable/disable the ELK based centralized logging. """
NomadServicesConfig = namedtuple('NomadServicesConfig', ['api_base_path', 'objects_host', 'objects_port', 'objects_base_path'])
......@@ -87,7 +88,8 @@ mongo = MongoConfig(
logstash = LogstashConfig(
enabled=True,
host=os.environ.get('NOMAD_LOGSTASH_HOST', 'localhost'),
tcp_port=int(os.environ.get('NOMAD_LOGSTASH_TCPPORT', '5000'))
tcp_port=int(os.environ.get('NOMAD_LOGSTASH_TCPPORT', '5000')),
level=int(os.environ.get('NOMAD_LOGSTASH_LEVEL', logging.DEBUG))
)
services = NomadServicesConfig(
api_base_path=os.environ.get('NOMAD_API_BASE_PATH', '/nomadxt/api'),
......
......@@ -46,7 +46,6 @@ from zipfile import ZipFile, BadZipFile
import shutil
from minio import Minio
import minio.error
import logging
import hashlib
import base64
from contextlib import contextmanager
......@@ -54,8 +53,9 @@ import gzip
import io
import nomad.config as config
from nomad.utils import get_logger
logger = logging.getLogger(__name__)
logger = get_logger(__name__)
_client = None
......@@ -69,7 +69,7 @@ if _client is None and 'sphinx' not in sys.modules:
def ensure_bucket(name):
try:
_client.make_bucket(bucket_name=name)
logger.info("Created uploads bucket with name %s." % name)
logger.info('Created uploads bucket', bucket=name)
except minio.error.BucketAlreadyOwnedByYou:
pass
......@@ -117,15 +117,16 @@ def upload_put_handler(func: Callable[[str], None]) -> Callable[[], None]:
event_name = event_record['eventName']
if event_name == 's3:ObjectCreated:Put':
upload_id = event_record['s3']['object']['key']
logger.debug('Received bucket upload event of for upload %s.' % upload_id)
logger.debug('Received bucket upload event', upload_id=upload_id)
yield upload_id
break # only one per record, pls
else:
logger.debug('Unhanled bucket event %s.' % event_name)
logger.debug('Unhanled bucket event', bucket_event_name=event_name)
except KeyError:
logger.warning(
'Unhandled bucket event due to unexprected event format: %s' %
event_record)
'Unhandled bucket event due to unexprected event format',
bucket_event_record=event_record)
def wrapper(*args, **kwargs) -> None:
logger.info('Start listening to uploads notifications.')
......@@ -141,8 +142,7 @@ def upload_put_handler(func: Callable[[str], None]) -> Callable[[], None]:
func(upload_id)
except StopIteration:
# Using StopIteration to allow clients to stop handling of events.
logging.debug(
'Handling of upload notifications was stopped via StopIteration.')
logger.debug('Handling of upload notifications was stopped via StopIteration.')
return
except Exception:
pass
......
from abc import ABCMeta, abstractmethod
from typing import List, Dict, Any
import logging
from nomad.parsing import AbstractParserBackend
from nomad.utils import get_logger
logger = logging.getLogger(__name__)
logger = get_logger(__name__)
class Normalizer(metaclass=ABCMeta):
......@@ -61,11 +61,10 @@ class SystemBasedNormalizer(Normalizer, metaclass=ABCMeta):
self._normalize_system(g_index)
except KeyError as e:
logger.error(
'Could not read all data for %s. Skip section %s: %s' %
(self.__class__.__name__, 'section_system/%d' % g_index, e))
'Could not read all input data', normalizer=self.__class__.__name__,
section='section_system', g_index=g_index, key_error=str(e))
except Exception as e:
logger.error(
'Unexpected error during %s. Skip section %s.' %
(self.__class__.__name__, 'section_system/%d' % g_index),
exc_info=e)
'Unexpected error during normalizing', normalizer=self.__class__.__name__,
section='section_system', g_index=g_index, exc_info=e)
raise e
......@@ -48,14 +48,14 @@ from io import StringIO
import json
import re
import importlib
import logging
from nomadcore.local_backend import LocalBackend as LegacyLocalBackend
from nomadcore.local_backend import Section, Results
from nomad.dependencies import dependencies_dict as dependencies, PythonGit
from nomad.utils import get_logger
logger = logging.getLogger(__name__)
logger = get_logger(__name__)
ParserStatus = Tuple[str, List[str]]
......@@ -367,7 +367,7 @@ class LocalBackend(LegacyParserBackend):
self._errors = parserErrors
def pwarn(self, msg):
logger.debug('Warning in parser: %s' % msg)
logger.debug('Warning in parser', parse_msg=msg)
def _parse_context_uri(self, context_uri: str) -> Tuple[str, int]:
"""
......
......@@ -27,17 +27,16 @@ import nomad.patch # pylint: disable=unused-import
logger = get_task_logger(__name__.replace('nomad', 'nomad-xt'))
logger.setLevel(logging.DEBUG)
if config.logstash.enabled:
def initialize_logstash(logger=None, loglevel=logging.INFO, **kwargs):
handler = logstash.TCPLogstashHandler(
config.logstash.host, config.logstash.tcp_port,
tags=['celery'], message_type='celery', version=1)
handler.setLevel(loglevel)
logger.addHandler(handler)
return logger
after_setup_task_logger.connect(initialize_logstash)
after_setup_logger.connect(initialize_logstash)
# if config.logstash.enabled:
# def initialize_logstash(logger=None, loglevel=logging.DEBUG, **kwargs):
# handler = logstash.TCPLogstashHandler(
# config.logstash.host, config.logstash.tcp_port, message_type='celery', version=1)
# handler.setLevel(loglevel)
# logger.addHandler(handler)
# return logger
# after_setup_task_logger.connect(initialize_logstash)
# after_setup_logger.connect(initialize_logstash)
app = Celery('nomad.processing', backend=config.celery.backend_url, broker=config.celery.broker_url)
......
......@@ -19,6 +19,7 @@ from nomad import files, utils, users
from nomad.processing.tasks import extracting_task, cleanup_task, parse_all_task
from nomad.processing.state import UploadProc
from nomad.utils import get_logger, lnr
def start_processing(upload_id, proc: UploadProc=None) -> UploadProc:
......@@ -63,10 +64,10 @@ def handle_uploads(quit=False):
@files.upload_put_handler
def handle_upload_put(received_upload_id: str):
logger = utils.get_logger(__name__, upload_id=received_upload_id)
logger = get_logger(__name__, upload_id=received_upload_id)
logger.debug('Initiate upload processing')
try:
with logger.lnr_error('Could not load'):
with lnr(logger, 'Could not load'):
upload = users.Upload.objects(id=received_upload_id).first()
if upload is None:
logger.error('Upload does not exist')
......@@ -76,11 +77,11 @@ def handle_uploads(quit=False):
logger.warn('Ignore upload notification, since file is already uploaded')
raise StopIteration
with logger.lnr_error('Save upload time'):
with lnr(logger, 'Save upload time'):
upload.upload_time = datetime.now()
upload.save()
with logger.lnr_error('Start processing'):
with lnr(logger, 'Start processing'):
proc = start_processing(received_upload_id, proc=upload.proc)
assert proc.is_started
upload.proc = proc
......
import logging
from nomad.processing.handler import handle_uploads
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
handle_uploads()
......@@ -243,7 +243,7 @@ class UploadProc(ProcPipeline):
__name__,
upload_id=self.upload_id,
current_task_name=self.current_task_name)
logger.error('Celery task raised exception.', exc_info=result)
logger.error('Celery task raised exception', exc_info=result)
else:
self.update(result)
might_have_changed = True
......
......@@ -35,16 +35,17 @@ def extracting_task(task: Task, proc: UploadProc) -> UploadProc:
try:
upload = files.Upload(proc.upload_id)
upload.open()
logger.debug('Opened upload')
except KeyError as e:
logger.debug('Process request for non existing upload')
logger.info('Process request for non existing upload')
proc.fail(e)
return proc
except files.UploadError as e:
logger.debug('Could not open upload, %s' % e)
logger.info('Could not open upload', error=str(e))
proc.fail(e)
return proc
except Exception as e:
logger.error('Unknown exception %s', exc_info=e)
logger.error('Unknown exception', exc_info=e)
proc.fail(e)
return proc
......@@ -53,11 +54,12 @@ def extracting_task(task: Task, proc: UploadProc) -> UploadProc:
try:
proc.upload_hash = upload.hash()
except files.UploadError as e:
logger.error('Could not create upload hash', exc_info=e)
logger.error('Could not create upload hash', error=str(e))
proc.fail(e)
return proc
if search.Calc.upload_exists(proc.upload_hash):
logger.info('Upload hash doublet')
proc.fail('The same file was already uploaded and processed.')
return proc
......@@ -71,7 +73,7 @@ def extracting_task(task: Task, proc: UploadProc) -> UploadProc:
proc.calc_procs.append(calc_proc)
except files.UploadError as e:
logger.warn('Could find parse specs in open upload', exc_info=e)
logger.warn('Could find parse specs in open upload', error=str(e))
proc.fail(e)
return proc
......@@ -131,18 +133,22 @@ def parse_task(self, proc: CalcProc, upload_proc: UploadProc) -> CalcProc:
assert upload_proc.upload_hash is not None
upload_hash, parser, mainfile = upload_proc.upload_hash, proc.parser_name, proc.mainfile
logger = utils.get_logger(__name__, upload_hash=upload_hash, mainfile=mainfile)
logger = utils.get_logger(
__name__, task=self.name,
upload_id=upload_proc.upload_id, upload_hash=upload_hash, mainfile=mainfile)
# parsing
proc.continue_with(parser)
logger.debug('Start parsing with %s' % parser)
try:
parser_backend = parser_dict[parser].run(proc.tmp_mainfile)
if parser_backend.status[0] != 'ParseSuccess':
proc.fail(parser_backend.status[1])
error = parser_backend.status[1]
logger.debug('Failed parsing', parser=parser, error=error)
proc.fail(error)
return proc
logger.debug('Completed successfully', parser=parser)
except Exception as e:
logger.warn('Exception wile parsing', exc_info=e)
logger.warn('Exception wile parsing', parser=parser, exc_info=e)
proc.fail(e)
return proc
_report_progress(self, proc)
......@@ -151,12 +157,14 @@ def parse_task(self, proc: CalcProc, upload_proc: UploadProc) -> CalcProc:
for normalizer in normalizers:
normalizer_name = normalizer.__name__
proc.continue_with(normalizer_name)
logger.debug('Start %s.' % normalizer)
try:
normalizer(parser_backend).normalize()
if parser_backend.status[0] != 'ParseSuccess':
proc.fail(parser_backend.status[1])
error = parser_backend.status[1]
logger.info('Failed run of %s: %s' % (normalizer, error))
proc.fail(error)
return proc
logger.debug('Completed %s successfully' % normalizer)
except Exception as e:
logger.warn('Exception wile normalizing with %s' % normalizer, exc_info=e)
proc.fail(e)
......@@ -173,8 +181,9 @@ def parse_task(self, proc: CalcProc, upload_proc: UploadProc) -> CalcProc:
upload_id=upload_proc.upload_id,
mainfile=mainfile,
upload_time=datetime.now())
logger.debug('Indexed successfully')
except Exception as e:
logger.error('Could not index', exc_info=e)
logger.error('Failed to index', exc_info=e)
proc.fail(e)
return proc
_report_progress(self, proc)
......@@ -185,12 +194,13 @@ def parse_task(self, proc: CalcProc, upload_proc: UploadProc) -> CalcProc:
try:
with files.write_archive_json(archive_id) as out:
parser_backend.write_json(out, pretty=True)
logger.debug('Indexed successfully')
except Exception as e:
logger.error('Could not write archive', exc_info=e)
logger.error('Failed to archive', exc_info=e)
proc.fail(e)
return proc
logger.debug('Completed')
logger.debug('Completed processing')
proc.success()
return proc
......@@ -21,13 +21,13 @@ of search relevant properties.
import elasticsearch.exceptions
from elasticsearch_dsl import Document, Date, Keyword, Search, connections
import logging
import sys
from nomad import config
from nomad.parsing import LocalBackend
from nomad.utils import get_logger
logger = logging.getLogger(__name__)
logger = get_logger(__name__)
# ensure elastic connection
if 'sphinx' not in sys.modules:
......@@ -113,8 +113,8 @@ class Calc(Document):
value = backend.get_value(property, 0)
except KeyError:
logger.warning(
'No value for property %s could be extracted for calc %s/%s.' %
(property, upload_hash, calc_hash))
'Missing property value', property=property, upload_id=upload_id,
upload_hash=upload_hash, calc_hash=calc_hash)
continue
setattr(calc, property, value)
......
from typing import Union, IO, cast
import hashlib
import base64
import json
import logging
import structlog
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper, JSONRenderer
from structlog.stdlib import LoggerFactory
import logstash
from contextlib import contextmanager
from nomad import config
_logging_is_configured = False
if not _logging_is_configured:
# basic config
logging.basicConfig(level=logging.WARNING)
# configure logstash
if config.logstash.enabled:
logstash_handler = logstash.TCPLogstashHandler(
config.logstash.host,
config.logstash.tcp_port, version=1)
logstash_handler.setLevel(config.logstash.level)
logging.getLogger().addHandler(logstash_handler)
# configure structlog
log_processors = [
StackInfoRenderer(),
format_exc_info,
TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False),
JSONRenderer(sort_keys=True)
]
structlog.configure(processors=log_processors, logger_factory=LoggerFactory())
_logging_is_configured = True
def hash(obj: Union[IO, str]) -> str:
""" First 28 character of an URL safe base 64 encoded sha512 digest. """
......@@ -18,54 +47,22 @@ def hash(obj: Union[IO, str]) -> str:
return base64.b64encode(hash.digest(), altchars=b'-_')[0:28].decode('utf-8')
class DataLogger():
def __init__(self, logger, **kwargs):
self._logger = logger
self.data = kwargs
def _prepare_msg(self, base_msg):
return '%s %s' % (base_msg, self._format_data())
def _format_data(self, ):
return json.dumps(self.data)
def debug(self, msg, *args, **kwargs):
self._logger.debug(self._prepare_msg(msg), *args, **kwargs)
def info(self, msg, *args, **kwargs):
self._logger.info(self._prepare_msg(msg), *args, **kwargs)
def warn(self, msg, *args, **kwargs):
self._logger.warn(self._prepare_msg(msg), *args, **kwargs)
def error(self, msg, *args, **kwargs):
self._logger.error(self._prepare_msg(msg), *args, **kwargs)
def crit(self, msg, *args, **kwargs):
self._logger.crit(self._prepare_msg(msg), *args, **kwargs)
@contextmanager
def lnr_error(self, msg, *args, **kwargs):
"""
Will *log and raise* with an error and the given message and args/kwargs
on all exceptions.
"""
try:
yield
except Exception as e:
self._logger.error(
self._prepare_msg('Exception while: %s' % msg),
exc_info=e, *args, **kwargs)
raise e
def get_logger(name, *args, **kwargs):
def get_logger(name, **kwargs):
"""
Returns a :class:`DataLogger` with the data given as kwargs.
A data logger can be used like any other logger, but will add the data to all
log output. Allowing more structured logging.
Returns a structlog logger that is already attached with a logstash handler.
User additional *kwargs* to pre-bind some values.
"""
return DataLogger(logging.getLogger(name), *args, **kwargs)
logger = structlog.get_logger(**kwargs)
return logger
@contextmanager
def lnr(logger, event, **kwargs):
try:
yield
except Exception as e:
logger.error(event, exc_info=e, **kwargs)
raise e
class DataObject(dict):
......@@ -88,3 +85,7 @@ class DataObject(dict):
def update(self, dct):
return super().update({key: value for key, value in dct.items() if value is not None})
if __name__ == '__main__':
logger = get_logger(__name__, test='value')
logger.info('Hi', add='cool')
\ No newline at end of file
......@@ -136,6 +136,8 @@ def test_process_non_existing(celery_session_worker):