Commit c0ce0df9 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Refactored general logging data. Added memory data to processing logs.

parent dda4ab99
Pipeline #116227 passed with stages
in 26 minutes and 38 seconds
...@@ -77,7 +77,7 @@ class InfoModel(BaseModel): ...@@ -77,7 +77,7 @@ class InfoModel(BaseModel):
statistics: StatisticsModel = Field(None, description='General NOMAD statistics') statistics: StatisticsModel = Field(None, description='General NOMAD statistics')
search_quantities: dict search_quantities: dict
version: str version: str
release: str deployment: str
git: GitInfoModel git: GitInfoModel
oasis: bool oasis: bool
...@@ -134,7 +134,7 @@ async def get_info(): ...@@ -134,7 +134,7 @@ async def get_info():
if 'optimade' not in s.qualified_name if 'optimade' not in s.qualified_name
}, },
'version': config.meta.version, 'version': config.meta.version,
'release': config.meta.release, 'deployment': config.meta.deployment,
'git': { 'git': {
'ref': gitinfo.ref, 'ref': gitinfo.ref,
'version': gitinfo.version, 'version': gitinfo.version,
......
...@@ -51,9 +51,11 @@ class POPO(dict): ...@@ -51,9 +51,11 @@ class POPO(dict):
'It uses a sub-command structure similar to the git command.')) 'It uses a sub-command structure similar to the git command.'))
@click.option('-v', '--verbose', help='sets log level to info', is_flag=True) @click.option('-v', '--verbose', help='sets log level to info', is_flag=True)
@click.option('--debug', help='sets log level to debug', is_flag=True) @click.option('--debug', help='sets log level to debug', is_flag=True)
@click.option('--log-label', type=str, help='Label applied to logg entries.')
@click.pass_context @click.pass_context
def cli(ctx, verbose: bool, debug: bool): def cli(ctx, verbose: bool, debug: bool, log_label: str):
config.meta.service = os.environ.get('NOMAD_SERVICE', 'cli') config.meta.service = os.environ.get('NOMAD_SERVICE', 'cli')
config.meta.label = log_label
if debug: if debug:
config.console_log_level = logging.DEBUG config.console_log_level = logging.DEBUG
......
...@@ -309,8 +309,8 @@ datacite = NomadConfig( ...@@ -309,8 +309,8 @@ datacite = NomadConfig(
meta = NomadConfig( meta = NomadConfig(
version='1.0.0', version='1.0.0',
commit=gitinfo.commit, commit=gitinfo.commit,
release='devel', deployment='devel',
deployment='standard', label=None,
default_domain='dft', default_domain='dft',
service='unknown nomad service', service='unknown nomad service',
name='novel materials discovery (NOMAD)', name='novel materials discovery (NOMAD)',
......
...@@ -900,7 +900,7 @@ def index_entries(entries: List, refresh: bool = False): ...@@ -900,7 +900,7 @@ def index_entries(entries: List, refresh: bool = False):
except Exception as e: except Exception as e:
logger.error('could not create entry index doc', calc_id=entry['entry_id'], exc_info=e) logger.error('could not create entry index doc', calc_id=entry['entry_id'], exc_info=e)
timer_kwargs = {} timer_kwargs: Dict[str, Any] = {}
try: try:
import json import json
timer_kwargs['size'] = len(json.dumps(actions_and_docs)) timer_kwargs['size'] = len(json.dumps(actions_and_docs))
...@@ -1119,7 +1119,7 @@ def update_materials(entries: List, refresh: bool = False): ...@@ -1119,7 +1119,7 @@ def update_materials(entries: List, refresh: bool = False):
all_n_entries += material_doc['n_entries'] all_n_entries += material_doc['n_entries']
# Execute the created actions in bulk. # Execute the created actions in bulk.
timer_kwargs = {} timer_kwargs: Dict[str, Any] = {}
try: try:
import json import json
timer_kwargs['size'] = len(json.dumps(_actions_and_docs_bulks)) timer_kwargs['size'] = len(json.dumps(_actions_and_docs_bulks))
......
...@@ -520,6 +520,10 @@ def proc_task(task, cls_name, self_id, func_attr, process_args, process_kwargs): ...@@ -520,6 +520,10 @@ def proc_task(task, cls_name, self_id, func_attr, process_args, process_kwargs):
might happen in sharded, distributed mongo setups where the object might not might happen in sharded, distributed mongo setups where the object might not
have yet been propagated and therefore appear missing. have yet been propagated and therefore appear missing.
''' '''
if '_meta_label' in process_kwargs:
config.meta.label = process_kwargs['_meta_label']
del(process_kwargs['_meta_label'])
self = unwarp_task(task, cls_name, self_id) self = unwarp_task(task, cls_name, self_id)
logger = self.get_logger() logger = self.get_logger()
...@@ -545,7 +549,7 @@ def proc_task(task, cls_name, self_id, func_attr, process_args, process_kwargs): ...@@ -545,7 +549,7 @@ def proc_task(task, cls_name, self_id, func_attr, process_args, process_kwargs):
# call the process function # call the process function
try: try:
os.chdir(config.fs.working_directory) os.chdir(config.fs.working_directory)
with utils.timer(logger, 'process executed on worker'): with utils.timer(logger, 'process executed on worker', log_memory=True):
# Actually call the process function # Actually call the process function
self.process_status = ProcessStatus.RUNNING self.process_status = ProcessStatus.RUNNING
self.last_status_message = 'Started process: ' + func_attr self.last_status_message = 'Started process: ' + func_attr
...@@ -610,6 +614,8 @@ def process(func): ...@@ -610,6 +614,8 @@ def process(func):
self.process_status = ProcessStatus.PENDING self.process_status = ProcessStatus.PENDING
self.save() self.save()
kwargs['_meta_label'] = config.meta.label
self._run_process(func, args, kwargs) self._run_process(func, args, kwargs)
setattr(wrapper, '__process_unwrapped', func) setattr(wrapper, '__process_unwrapped', func)
......
...@@ -87,7 +87,7 @@ def _pack_log_event(logger, method_name, event_dict): ...@@ -87,7 +87,7 @@ def _pack_log_event(logger, method_name, event_dict):
log_data.update(**{ log_data.update(**{
key: value key: value
for key, value in getattr(logger, '_context', {}).items() for key, value in getattr(logger, '_context', {}).items()
if key not in ['service', 'release', 'upload_id', 'calc_id', 'mainfile', 'process_status']}) if key not in ['service', 'deployment', 'upload_id', 'calc_id', 'mainfile', 'process_status']})
log_data.update(logger=logger.name) log_data.update(logger=logger.name)
return log_data return log_data
......
...@@ -52,6 +52,8 @@ import collections ...@@ -52,6 +52,8 @@ import collections
import logging import logging
import inspect import inspect
import orjson import orjson
import resource
import os
from nomad import config from nomad import config
...@@ -214,7 +216,7 @@ def lnr(logger, event, **kwargs): ...@@ -214,7 +216,7 @@ def lnr(logger, event, **kwargs):
@contextmanager @contextmanager
def timer(logger, event, method='info', lnr_event: str = None, **kwargs): def timer(logger, event, method='info', lnr_event: str = None, log_memory: bool = False, **kwargs):
''' '''
A context manager that takes execution time and produces a log entry with said time. A context manager that takes execution time and produces a log entry with said time.
...@@ -223,22 +225,36 @@ def timer(logger, event, method='info', lnr_event: str = None, **kwargs): ...@@ -223,22 +225,36 @@ def timer(logger, event, method='info', lnr_event: str = None, **kwargs):
event: The log message/event. event: The log message/event.
method: The log method that should be used. Must be a valid logger method name. method: The log method that should be used. Must be a valid logger method name.
Default is 'info'. Default is 'info'.
log_memory: Log process memory usage before and after.
**kwargs: Additional logger data that is passed to the log entry. **kwargs: Additional logger data that is passed to the log entry.
Returns: Returns:
The method yields a dictionary that can be used to add further log data. The method yields a dictionary that can be used to add further log data.
''' '''
kwargs = dict(kwargs)
start = time.time() start = time.time()
if log_memory:
rss_before = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
kwargs['pid'] = os.getpid()
kwargs['exec_rss_before'] = rss_before
try: try:
yield kwargs yield kwargs
except Exception as e: except Exception as e:
if lnr_event is not None: if lnr_event is not None:
stop = time.time() stop = time.time()
if log_memory:
rss_after = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
kwargs['exec_rss_after'] = rss_after
kwargs['exec_rss_delta'] = rss_before - rss_after
logger.error(lnr_event, exc_info=e, exec_time=stop - start, **kwargs) logger.error(lnr_event, exc_info=e, exec_time=stop - start, **kwargs)
raise e raise e
finally: finally:
stop = time.time() stop = time.time()
if log_memory:
rss_after = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
kwargs['exec_rss_after'] = rss_after
kwargs['exec_rss_delta'] = rss_before - rss_after
if logger is None: if logger is None:
print(event, stop - start) print(event, stop - start)
......
...@@ -143,11 +143,12 @@ class LogstashFormatter(logstash.formatter.LogstashFormatterBase): ...@@ -143,11 +143,12 @@ class LogstashFormatter(logstash.formatter.LogstashFormatterBase):
# Nomad specific # Nomad specific
'nomad.service': config.meta.service, 'nomad.service': config.meta.service,
'nomad.release': config.meta.release, 'nomad.deployment': config.meta.deployment,
'nomad.version': config.meta.version, 'nomad.version': config.meta.version,
'nomad.commit': config.meta.commit, 'nomad.commit': config.meta.commit
'nomad.deployment': config.meta.deployment
} }
if config.meta.label:
message['nomad.label'] = config.meta.label
if record.name.startswith('nomad'): if record.name.startswith('nomad'):
for key, value in structlog.items(): for key, value in structlog.items():
...@@ -233,7 +234,7 @@ class ConsoleFormatter(LogstashFormatter): ...@@ -233,7 +234,7 @@ class ConsoleFormatter(LogstashFormatter):
print_key = key[6:] print_key = key[6:]
else: else:
print_key = key print_key = key
if not cls.short_format or print_key not in ['release', 'service']: if not cls.short_format or print_key not in ['deployment', 'service']:
out.write('\n - %s: %s' % (print_key, str(message_dict.get(key, None)))) out.write('\n - %s: %s' % (print_key, str(message_dict.get(key, None))))
return out.getvalue() return out.getvalue()
...@@ -245,7 +246,7 @@ def add_logstash_handler(logger): ...@@ -245,7 +246,7 @@ def add_logstash_handler(logger):
if logstash_handler is None: if logstash_handler is None:
logstash_handler = LogstashHandler() logstash_handler = LogstashHandler()
logstash_handler.formatter = LogstashFormatter(tags=['nomad', config.meta.release]) logstash_handler.formatter = LogstashFormatter(tags=['nomad', config.meta.deployment])
logstash_handler.setLevel(config.logstash.level) logstash_handler.setLevel(config.logstash.level)
logger.addHandler(logstash_handler) logger.addHandler(logstash_handler)
......
...@@ -196,7 +196,7 @@ keycloak: ...@@ -196,7 +196,7 @@ keycloak:
oasis: true oasis: true
meta: meta:
release: 'oasis' deployment: 'oasis'
deployment_id: '<your-host>' deployment_id: '<your-host>'
maintainer_email: '<oasis admin email>' maintainer_email: '<oasis admin email>'
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment