Skip to content
Snippets Groups Projects
Commit d192a67f authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added elk stack configuration. Hooked celery logger into elk stack.

parent 73333905
Branches
Tags
No related merge requests found
......@@ -53,13 +53,14 @@ services:
ports:
- '6379:6379'
# used for central logging
# used for centralized logging
elk:
restart: always
image: "sebp/elk"
build: ./elk/
hostname: "elk"
ports:
- 5601:5601 # kibana web
# - 9200:9200 # elastic search api
- 9200:9200 # elastic search api
# - 9300:9300 # elastic transport api
- 5044:5044 # logstash
- 5044:5044 # logstash beats
- 5000:5000 # logstash tcp
input {
beats {
port => 5044
}
tcp {
port => 5000
codec => json
}
}
\ No newline at end of file
output {
elasticsearch {
hosts => ["localhost"]
# manage_template => false
# index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
# document_type => "%{[@metadata][type]}"
}
}
\ No newline at end of file
FROM sebp/elk
ENV LOGSTASH_PATH_CONF /etc/logstash
ADD ./02-beats-input.conf ${LOGSTASH_PATH_CONF}/conf.d/02-beats-input.conf
ADD ./30-output.conf ${LOGSTASH_PATH_CONF}/conf.d/30-output.conf
\ No newline at end of file
This image is based on the populer elk-stack docker image:
[github](https://github.com/spujadas/elk-docker),
[readthedocs](http://elk-docker.readthedocs.io/).
## Changes
- disabled ssl for beats communication to logstash server
- added tcp input
- simplified elastic search output (don't now how to set metric and other vars yet :-()
\ No newline at end of file
......@@ -18,6 +18,7 @@ S3Config = namedtuple('S3', ['uploads_bucket', 'repository_bucket', 'archive_buc
RabitMQConfig = namedtuple('RabbitMQ', ['host', 'port', 'user', 'password'])
MinioConfig = namedtuple('Minio', ['host', 'port', 'accesskey', 'secret'])
FSConfig = namedtuple('FSConfig', ['tmp'])
LogstashConfig = namedtuple('LogstashConfig', ['enabled', 'host', 'tcp_port'])
s3 = S3Config(
uploads_bucket='uploads',
......@@ -25,17 +26,22 @@ s3 = S3Config(
archive_bucket='archive'
)
rabbitmq = RabitMQConfig(
host = 'localhost',
port = None,
user = 'rabbitmq',
password = 'rabbitmq'
host='localhost',
port=None,
user='rabbitmq',
password='rabbitmq'
)
minio = MinioConfig(
host = 'localhost',
port = 9007,
accesskey = 'AKIAIOSFODNN7EXAMPLE',
secret = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'
host='localhost',
port=9007,
accesskey='AKIAIOSFODNN7EXAMPLE',
secret='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'
)
fs = FSConfig(
tmp = './infrastructure/data/tmp'
tmp='./infrastructure/data/tmp'
)
logstash = LogstashConfig(
enabled=True,
host='localhost',
tcp_port=5000
)
......@@ -29,7 +29,7 @@ import itertools
import nomad.config as config
LOGGER = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
_client = Minio('%s:%s' % (config.minio.host, config.minio.port),
access_key=config.minio.accesskey,
......@@ -39,9 +39,9 @@ _client = Minio('%s:%s' % (config.minio.host, config.minio.port),
# ensure all neccessary buckets exist
try:
_client.make_bucket(bucket_name=config.s3.uploads_bucket)
LOGGER.info("Created uploads bucket with name %s." % config.s3.uploads_bucket)
logger.info("Created uploads bucket with name %s." % config.s3.uploads_bucket)
except minio.error.BucketAlreadyOwnedByYou:
LOGGER.debug("Uploads bucket with name %s already existed." % config.s3.uploads_bucket)
logger.debug("Uploads bucket with name %s already existed." % config.s3.uploads_bucket)
def get_presigned_upload_url(upload_id):
......@@ -69,18 +69,18 @@ def upload_put_handler(func):
try:
event_name = event_record['eventName']
if event_name == 's3:ObjectCreated:Put':
LOGGER.debug('Received bucket upload event of type %s.' % event_name)
logger.debug('Received bucket upload event of type %s.' % event_name)
upload_id = event_record['s3']['object']['key']
yield upload_id
else:
LOGGER.debug('Unhandled bucket event of type %s.' % event_name)
logger.debug('Unhandled bucket event of type %s.' % event_name)
except KeyError:
LOGGER.warning(
logger.warning(
'Unhandled bucket event due to unexprected event format: %s' %
event_record)
def wrapper(*args, **kwargs):
LOGGER.info('Start listening to uploads notifications.')
logger.info('Start listening to uploads notifications.')
events = _client.listen_bucket_notification(config.s3.uploads_bucket)
......@@ -94,7 +94,7 @@ def upload_put_handler(func):
'Handling of upload notifications was stopped via StopIteration.')
return
except Exception as e:
LOGGER.error(
logger.error(
'Unexpected exception in upload handler for upload:id:' %
upload_id, exc_info=e)
......@@ -132,7 +132,7 @@ class Upload():
return decorated(self, *args, **kwargs)
except Exception as e:
msg = 'Could not %s upload %s.' % (decorated.__name__, self.upload_id)
LOGGER.error(msg, exc_info=e)
logger.error(msg, exc_info=e)
raise UploadError(msg, e)
return wrapper
......
......@@ -14,6 +14,8 @@
from celery import Celery, group, subtask
from celery.result import result_from_tuple
from celery.signals import after_setup_task_logger, after_setup_logger
from celery.utils.log import get_task_logger
import logging
import time
import logstash
......@@ -23,17 +25,17 @@ import nomad.files as files
from nomad.parsers import parsers, parser_dict
# def initialize_logstash(logger=None, loglevel=logging.INFO, **kwargs):
# handler = logstash.TCPLogstashHandler(
# 'localhost', 5044, tags=['celery'], message_type='celery', version=1)
# handler.setLevel(loglevel)
# logger.addHandler(handler)
# return logger
if config.logstash.enabled:
def initialize_logstash(logger=None, loglevel=logging.INFO, **kwargs):
handler = logstash.TCPLogstashHandler(
config.logstash.host, config.logstash.tcp_port,
tags=['celery'], message_type='celery', version=1)
handler.setLevel(loglevel)
logger.addHandler(handler)
return logger
# from celery.signals import after_setup_task_logger
# after_setup_task_logger.connect(initialize_logstash)
# from celery.signals import after_setup_logger
# after_setup_logger.connect(initialize_logstash)
after_setup_task_logger.connect(initialize_logstash)
after_setup_logger.connect(initialize_logstash)
broker_url = 'pyamqp://%s:%s@localhost//' % (config.rabbitmq.user, config.rabbitmq.password)
......@@ -45,7 +47,7 @@ app.conf.update(
result_serializer='pickle',
)
LOGGER = logging.getLogger(__name__)
logger = get_task_logger(__name__)
@app.task()
......@@ -86,7 +88,7 @@ def close_upload(parse_results, upload_id):
@app.task()
def parse(mainfile_spec):
upload, mainfile, parser = mainfile_spec
LOGGER.debug('Start parsing mainfile %s/%s with %s.' % (upload, mainfile, parser))
logger.debug('Start parsing mainfile %s/%s with %s.' % (upload, mainfile, parser))
parser_dict[parser].run(upload.get_path(mainfile))
return True
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment