structlogging.py 11.1 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
5
6
7
8
9
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
Markus Scheidgen's avatar
Markus Scheidgen committed
10
#     http://www.apache.org/licenses/LICENSE-2.0
11
12
#
# Unless required by applicable law or agreed to in writing, software
Markus Scheidgen's avatar
Markus Scheidgen committed
13
# distributed under the License is distributed on an "AS IS" BASIS,
14
15
16
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Markus Scheidgen's avatar
Markus Scheidgen committed
17
#
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

'''
.. autofunc::nomad.utils.create_uuid
.. autofunc::nomad.utils.hash
.. autofunc::nomad.utils.timer

Logging in nomad is structured. Structured logging means that log entries contain
dictionaries with quantities related to respective events. E.g. having the code,
parser, parser version, calc_id, mainfile, etc. for all events that happen during
calculation processing. This means the :func:`get_logger` and all logger functions
take keyword arguments for structured data. Otherwise :func:`get_logger` can
be used similar to the standard *logging.getLogger*.

Depending on the configuration all logs will also be send to a central logstash.

.. autofunc::nomad.utils.get_logger
.. autofunc::nomad.utils.hash
.. autofunc::nomad.utils.create_uuid
.. autofunc::nomad.utils.timer
.. autofunc::nomad.utils.lnr
'''

40
from typing import cast, Any
41
42
43
44
45
46
47
48
49
import logging
import structlog
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper, JSONRenderer
from structlog.stdlib import LoggerFactory
import logstash
from contextlib import contextmanager
import json
import re

50
from nomad import config, utils
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89


def sanitize_logevent(event: str) -> str:
    '''
    Prepares a log event or message for analysis in elastic stack. It removes numbers,
    list, and matrices of numbers from the event string and limits its size. The
    goal is to make it easier to define aggregations over events by using event
    strings as representatives for event classes rather than event instances (with
    concrete numbers, etc).
    '''
    sanitized_event = event[:120]
    sanitized_event = re.sub(r'(\d*\.\d+|\d+(\.\d*)?)', 'X', sanitized_event)
    sanitized_event = re.sub(r'((\[|\()\s*)?X\s*(,\s*X)+(\s*(\]|\)))?', 'L', sanitized_event)
    sanitized_event = re.sub(r'((\[|\()\s*)?[XL](,\s*[XL])+(\s*(\]|\)))?', 'M', sanitized_event)
    return sanitized_event


@contextmanager
def legacy_logger(logger):
    ''' Context manager that makes the given logger the logger for legacy log entries. '''
    LogstashHandler.legacy_logger = logger
    try:
        yield
    finally:
        LogstashHandler.legacy_logger = None


class LogstashHandler(logstash.TCPLogstashHandler):
    '''
    A log handler that emits records to logstash. It also filters logs for being
    structlog entries. All other entries are diverted to a global `legacy_logger`.
    This legacy logger is supposed to be a structlog logger that turns legacy
    records into structlog entries with reasonable binds depending on the current
    execution context (e.g. parsing/normalizing, etc.). If no legacy logger is
    set, they get emitted as usual (e.g. non nomad logs, celery, dbs, etc.)
    '''

    legacy_logger = None

90
91
92
93
94
    def __init__(self):
        super().__init__(
            config.logstash.host,
            config.logstash.tcp_port, version=1)

95
    def filter(self, record):
96
97
98
        if record.name == 'uvicorn.access':
            http_access_path = record.args[2]
            if 'alive' in http_access_path or 'gui/index.html' in http_access_path:
99
                return False
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144

        if super().filter(record):
            is_structlog = False
            if isinstance(record.msg, str):
                is_structlog = record.msg.startswith('{') and record.msg.endswith('}')

            if is_structlog:
                return True
            else:
                if LogstashHandler.legacy_logger is None:
                    return True
                else:
                    LogstashHandler.legacy_logger.log(
                        record.levelno, sanitize_logevent(record.msg), args=record.args,
                        exc_info=record.exc_info, stack_info=record.stack_info,
                        legacy_logger=record.name)

                    return False

        return False


class LogstashFormatter(logstash.formatter.LogstashFormatterBase):

    def format(self, record):
        try:
            structlog = json.loads(record.getMessage())
        except json.JSONDecodeError:
            structlog = dict(event=record.getMessage())

        # Create message dict
        message = {
            '@timestamp': self.format_timestamp(record.created),
            '@version': '1',
            'event': structlog['event'],
            'host': self.host,
            'path': record.pathname,
            'tags': self.tags,
            'type': self.message_type,

            # Extra Fields
            'level': record.levelname,
            'logger_name': record.name,

            # Nomad specific
145
            'nomad.service': config.meta.service,
146
            'nomad.deployment': config.meta.deployment,
147
            'nomad.version': config.meta.version,
148
            'nomad.commit': config.meta.commit
149
        }
150
151
        if config.meta.label:
            message['nomad.label'] = config.meta.label
152
153
154
155
156
157

        if record.name.startswith('nomad'):
            for key, value in structlog.items():
                if key in ('event', 'stack_info', 'id', 'timestamp'):
                    continue
                elif key == 'exception':
158
                    exception_trace = value.strip('\n')
159
                    message['digest'] = str(value)[-256:]
160
161
162
163
                    # exclude the last line, which is the exception message and might
                    # vary for different instances of the same exception
                    message['exception_hash'] = utils.hash(
                        exception_trace[:exception_trace.rfind('\n')])
164
165
166
167
168
169
170
171
172
                elif key in ['upload_id', 'calc_id', 'mainfile']:
                    key = 'nomad.%s' % key
                else:
                    key = '%s.%s' % (record.name, key)

                message[key] = value
        else:
            message.update(structlog)

173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
        # Handle uvicorn access events
        if record.name == 'uvicorn.access':
            status_code = getattr(record, 'status_code', None)
            if status_code is not None:
                message['uvicorn.status_code'] = status_code
            scope = getattr(record, 'scope', None)
            if scope is not None:
                message['uvicorn.method'] = scope['method']
                message['uvicorn.path'] = scope['path']
                message['uvicorn.query_string'] = scope['query_string'].decode()
                message['uvicorn.headers'] = {
                    key.decode(): value.decode() for key, value in scope['headers']}
            args = getattr(record, 'args', None)
            if args is not None and len(args) == 5:
                _, method, path_w_query, _, status_code = args
188
189
190
191
192
                path_w_query_components = path_w_query.split('?', 1)
                path = path_w_query_components[0]
                if len(path_w_query_components) == 2:
                    query_string = path_w_query_components[1]
                    message['uvicorn.query_string'] = query_string
193
194
195
196
197
198
                message['uvicorn.method'] = method
                message['uvicorn.path'] = path
                message['uvicorn.status_code'] = status_code
        else:
            # Add extra fields
            message.update(self.get_extra_fields(record))
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236

        # If exception, add debug info
        if record.exc_info:
            message.update(self.get_debug_fields(record))

        return self.serialize(message)


class ConsoleFormatter(LogstashFormatter):

    short_format = False

    @classmethod
    def serialize(cls, message_dict):
        from io import StringIO

        logger = message_dict.pop('logger_name', 'unknown logger')
        event = message_dict.pop('event', None)
        level = message_dict.pop('level', 'UNKNOWN')
        exception = message_dict.pop('exception', None)
        time = message_dict.pop('@timestamp', '1970-01-01 12:00:00')

        for key in ['type', 'tags', 'stack_info', 'path', 'message', 'host', '@version', 'digest']:
            message_dict.pop(key, None)
        keys = list(message_dict.keys())
        keys.sort()

        out = StringIO()
        out.write('%s %s %s %s' % (
            level.ljust(8), logger.ljust(20)[:20], time.ljust(19)[:19], event))
        if exception is not None:
            out.write('\n  - exception: %s' % str(exception).replace('\n', '\n    '))

        for key in keys:
            if cls.short_format and key.startswith('nomad.'):
                print_key = key[6:]
            else:
                print_key = key
237
            if not cls.short_format or print_key not in ['deployment', 'service']:
238
239
240
241
242
243
244
245
246
247
                out.write('\n  - %s: %s' % (print_key, str(message_dict.get(key, None))))
        return out.getvalue()


def add_logstash_handler(logger):
    logstash_handler = next((
        handler for handler in logger.handlers
        if isinstance(handler, LogstashHandler)), None)

    if logstash_handler is None:
248
        logstash_handler = LogstashHandler()
249
        logstash_handler.formatter = LogstashFormatter(tags=['nomad', config.meta.deployment])
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
        logstash_handler.setLevel(config.logstash.level)
        logger.addHandler(logstash_handler)


def get_logger(name, **kwargs):
    '''
    Returns a structlog logger that is already attached with a logstash handler.
    Use additional *kwargs* to pre-bind some values to all events.
    '''
    if name.startswith('nomad.'):
        name = '.'.join(name.split('.')[:2])

    logger = structlog.get_logger(name, **kwargs)
    return logger


# configure structlog
log_processors = [
    StackInfoRenderer(),
    format_exc_info,
    TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False),
    JSONRenderer(sort_keys=True)
]

default_factory = LoggerFactory()


def logger_factory(*args):
    logger = default_factory(*args)
    logger.setLevel(logging.DEBUG)
    return logger


structlog.configure(
284
    processors=cast(Any, log_processors),
285
286
287
    logger_factory=logger_factory,
    wrapper_class=structlog.stdlib.BoundLogger)

288

289
root = logging.getLogger()
290
291
292
293
294
295
296
297
298
299
300
301
302


# configure logging in general
def configure_logging(console_log_level=config.console_log_level):
    logging.basicConfig(level=logging.DEBUG)
    for handler in root.handlers:
        if not isinstance(handler, LogstashHandler):
            handler.setLevel(console_log_level)
            handler.setFormatter(ConsoleFormatter())


configure_logging()

303
304
305
306
307
308
309
310
311
312
313
314
315

# configure logstash
if config.logstash.enabled:
    add_logstash_handler(root)

    get_logger(__name__).info(
        'setup logging',
        logstash=config.logstash.enabled,
        logstash_host=config.logstash.host,
        logstash_port=config.logstash.tcp_port,
        logstash_level=config.logstash.level)

# configure log levels
Markus Scheidgen's avatar
Markus Scheidgen committed
316
for logger in ['elasticsearch', 'urllib3.connectionpool']:
317
    logging.getLogger(logger).setLevel(logging.WARNING)