Commit bf563e30 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Refactoring processing and logging.

parent aa120658
......@@ -767,6 +767,8 @@ def get_raw(upload_hash, calc_hash):
:returns: the raw data in body
"""
archive_id = '%s/%s' % (upload_hash, calc_hash)
logger = get_logger(__name__, endpoint='raw', action='get', archive_id=archive_id)
try:
repo = RepoCalc.get(id=archive_id)
except NotFoundError:
......@@ -808,8 +810,11 @@ def get_raw(upload_hash, calc_hash):
return dict(arcname=filename, iterable=iter_content())
yield write(repo.mainfile)
for auxfile in repo.aux_files:
yield write(os.path.join(os.path.dirname(repo.mainfile), auxfile))
try:
for auxfile in repo.aux_files:
yield write(os.path.join(os.path.dirname(repo.mainfile), auxfile))
except Exception as e:
logger.error('Exception while accessing auxfiles.', exc_info=e)
zip_stream = zipstream.ZipFile(mode='w', compression=ZIP_DEFLATED)
zip_stream.paths_to_write = iterator()
......
......@@ -98,4 +98,4 @@ services = NomadServicesConfig(
api_secret=os.environ.get('NOMAD_API_SECRET', 'defaultApiSecret')
)
console_log_level = get_loglevel_from_env('NOMAD_CONSOLE_LOGLEVEL', default_level=logging.CRITICAL)
console_log_level = get_loglevel_from_env('NOMAD_CONSOLE_LOGLEVEL', default_level=logging.ERROR)
......@@ -36,6 +36,7 @@ mongo_client = None
def setup():
""" Creates connections to mongodb and elastic search. """
global elastic_client
setup_logging()
setup_mongo()
setup_elastic()
......@@ -43,6 +44,10 @@ def setup():
user.ensure_test_users()
def setup_logging():
utils.configure_logging()
def setup_mongo():
""" Creates connection to mongodb. """
global mongo_client
......
......@@ -24,7 +24,7 @@ calculations, and files
:members:
"""
from typing import List, Any, ContextManager
from typing import List, Any, ContextManager, Tuple, Generator
from datetime import datetime
from elasticsearch.exceptions import NotFoundError
from mongoengine import StringField, BooleanField, DateTimeField, DictField, IntField
......@@ -456,8 +456,32 @@ class Upload(Chord):
self.fail('The same file was already uploaded and processed.', level=logging.INFO)
return
def match_mainfiles(self) -> Generator[Tuple[File, str, object], None, None]:
"""
Generator function that matches all files in the upload to all parsers to
determine the upload's mainfiles.
Returns:
Tuples of mainfile, filename, and parsers
"""
for filename in self.upload_file.filelist:
potential_mainfile = self.upload_file.get_file(filename)
for parser in parsers:
try:
with potential_mainfile.open() as mainfile_f:
if parser.is_mainfile(filename, lambda fn: mainfile_f):
yield potential_mainfile, filename, parser
except Exception as e:
self.error(
'exception while matching pot. mainfile',
mainfile=filename, exc_info=e)
@task
def parse_all(self):
"""
Identified mainfail/parser combinations among the upload's files, creates
respective :class:`Calc` instances, and triggers their processing.
"""
logger = self.get_logger()
# TODO: deal with multiple possible parser specs
......@@ -466,25 +490,15 @@ class Upload(Chord):
upload_size=self.upload_file.size,
upload_filecount=len(self.upload_file.filelist)):
total_calcs = 0
for filename in self.upload_file.filelist:
for parser in parsers:
try:
potential_mainfile = self.upload_file.get_file(filename)
with potential_mainfile.open() as mainfile_f:
if parser.is_mainfile(filename, lambda fn: mainfile_f):
mainfile_path = potential_mainfile.os_path
calc = Calc.create(
archive_id='%s/%s' % (self.upload_hash, utils.hash(filename)),
mainfile=filename, parser=parser.name,
mainfile_tmp_path=mainfile_path,
upload_id=self.upload_id)
calc.process()
total_calcs += 1
except Exception as e:
self.error(
'exception while matching pot. mainfile',
mainfile=filename, exc_info=e)
for mainfile, filename, parser in self.match_mainfiles():
calc = Calc.create(
archive_id='%s/%s' % (self.upload_hash, utils.hash(filename)),
mainfile=filename, parser=parser.name,
mainfile_tmp_path=mainfile.os_path,
upload_id=self.upload_id)
calc.process()
total_calcs += 1
# have to save the total_calcs information for chord management
self.spwaned_childred(total_calcs)
......
......@@ -29,7 +29,7 @@ Depending on the configuration all logs will also be send to a central logstash.
.. autofunc::nomad.utils.get_logger
"""
from typing import Union, IO, cast
from typing import Union, IO, cast, List
import hashlib
import base64
import logging
......@@ -110,8 +110,7 @@ def add_logstash_handler(logger):
logger.addHandler(logstash_handler)
_logging_is_configured = False
if not _logging_is_configured:
def configure_logging():
# configure structlog
log_processors = [
StackInfoRenderer(),
......@@ -136,15 +135,13 @@ if not _logging_is_configured:
logging.basicConfig(stream=sys.stdout)
root = logging.getLogger()
for handler in root.handlers:
handler.setLevel(config.console_log_level if 'pytest' not in sys.modules else logging.CRITICAL)
handler.setLevel(config.console_log_level)
# configure logstash
if config.logstash.enabled and 'pytest' not in sys.modules:
if config.logstash.enabled:
add_logstash_handler(root)
root.info('Structlog configured for logstash')
_logging_is_configured = True
def create_uuid() -> str:
""" Returns a web-save base64 encoded random uuid (type 4). """
......@@ -222,3 +219,25 @@ def timer(logger, event, method='info', **kwargs):
logger_method(event, exec_time=stop - start, **kwargs)
else:
logger.error('Uknown logger method %s.' % method)
class archive:
@staticmethod
def create(upload_hash: str, calc_hash: str) -> str:
return '%s/%s' % (upload_hash, calc_hash)
@staticmethod
def items(archive_id: str) -> List[str]:
return archive_id.split('/')
@staticmethod
def item(archive_id: str, index: int) -> str:
return archive.items(archive_id)[index]
@staticmethod
def calc_hash(archive_id: str) -> str:
return archive.item(archive_id, 1)
@staticmethod
def upload_hash(archive_id: str) -> str:
return archive.item(archive_id, 0)
import pytest
import logging
from mongoengine import connect
from mongoengine.connection import disconnect
from nomad import config, user, infrastructure
@pytest.fixture(scope='session', autouse=True)
def nomad_logging():
config.logstash = config.logstash._replace(enabled=False)
config.console_log_level = logging.CRITICAL
@pytest.fixture(scope='session')
def celery_includes():
return ['nomad.processing.base']
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment