Commit e1446748 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Fixed session cache overflow in mongodb.

parent f0df9b2f
......@@ -181,4 +181,11 @@ def setup():
from nomad import infrastructure
if not app.config['TESTING']:
# each subprocess is supposed disconnect connect again: https://jira.mongodb.org/browse/PYTHON-2090
try:
from mongoengine import disconnect
disconnect()
except Exception:
pass
infrastructure.setup()
......@@ -127,7 +127,7 @@ def reset_processing(zero_complete_time):
infrastructure.setup_mongo()
def reset_collection(cls):
in_processing = cls.objects(process_status__ne=proc.PROCESS_COMPLETED)
in_processing = cls.objects(process_status__in=[proc.PROCESS_RUNNING, proc.base.PROCESS_CALLED])
print('%d %s processes need to be reset due to incomplete process' % (in_processing.count(), cls.__name__))
in_processing.update(
process_status=None,
......
......@@ -25,7 +25,7 @@ import shutil
from elasticsearch.exceptions import RequestError
from elasticsearch_dsl import connections
from mongoengine import connect, disconnect
from mongoengine.connection import MongoEngineConnectionError
from mongoengine.connection import ConnectionFailure
import smtplib
from email.mime.text import MIMEText
from keycloak import KeycloakOpenID, KeycloakAdmin
......@@ -67,12 +67,12 @@ def setup_files():
os.makedirs(directory)
def setup_mongo():
def setup_mongo(client=False):
''' Creates connection to mongodb. '''
global mongo_client
try:
mongo_client = connect(db=config.mongo.db_name, host=config.mongo.host, port=config.mongo.port)
except MongoEngineConnectionError:
except ConnectionFailure:
disconnect()
mongo_client = connect(db=config.mongo.db_name, host=config.mongo.host, port=config.mongo.port)
......
......@@ -19,12 +19,12 @@ import os
from celery import Celery, Task
from celery.worker.request import Request
from celery.signals import after_setup_task_logger, after_setup_logger, worker_process_init, \
celeryd_after_setup
celeryd_after_setup, worker_process_shutdown
from celery.utils import worker_direct
from celery.exceptions import SoftTimeLimitExceeded
from billiard.exceptions import WorkerLostError
from mongoengine import Document, StringField, ListField, DateTimeField, ValidationError
from mongoengine.connection import MongoEngineConnectionError
from mongoengine.connection import ConnectionFailure
from mongoengine.base.metaclasses import TopLevelDocumentMetaclass
from datetime import datetime
import functools
......@@ -46,6 +46,13 @@ if config.logstash.enabled:
@worker_process_init.connect
def setup(**kwargs):
# each subprocess is supposed disconnect connect again: https://jira.mongodb.org/browse/PYTHON-2090
try:
from mongoengine import disconnect
disconnect()
except Exception:
pass
infrastructure.setup()
utils.get_logger(__name__).info(
'celery configured with acks_late=%s' % str(config.celery.acks_late))
......@@ -60,6 +67,13 @@ def capture_worker_name(sender, instance, **kwargs):
worker_hostname = sender
@worker_process_shutdown.connect
def on_worker_process_shutdown(*args, **kwargs):
# We need to make sure not to leave open sessions: https://jira.mongodb.org/browse/PYTHON-2090
from mongoengine.connection import disconnect
disconnect()
app = Celery('nomad.processing', broker=config.rabbitmq_url())
app.conf.update(worker_hijack_root_logger=False)
app.conf.update(worker_max_memory_per_child=config.celery.max_memory)
......@@ -217,7 +231,7 @@ class Proc(Document, metaclass=ProcMetaclass):
obj = cls.objects(**{id_field: id}).first()
except ValidationError as e:
raise InvalidId('%s is not a valid id' % id)
except MongoEngineConnectionError as e:
except ConnectionFailure as e:
raise e
if obj is None:
......
......@@ -16,7 +16,7 @@ click
requests
bravado
pytz
aniso8601
aniso8601<=7
ase==3.19.0
python-keycloak
elasticsearch-dsl==6.4.0
......@@ -41,7 +41,7 @@ structlog
elasticsearch==6.4.0
msgpack<0.6.0
celery[redis]
mongoengine==0.18.2
mongoengine==0.19.1
Werkzeug==0.16.1
flask
flask-restplus
......
......@@ -158,6 +158,8 @@ def worker(mongo, celery_session_worker, celery_inspect):
@pytest.fixture(scope='session')
def mongo_infra(monkeysession):
monkeysession.setattr('nomad.config.mongo.db_name', 'test_db')
# disconnecting and connecting again results in an empty database with mongomock
monkeysession.setattr('mongoengine.disconnect', lambda *args, **kwargs: None)
return infrastructure.setup_mongo()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment