Commit 9f2aa427 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Merge remote-tracking branch 'origin/v0.8.0' into mui4

parent cdc23968
Subproject commit d04f69646ce811c92d8b3ea274d9c9afee9996b2
Subproject commit 5023497ce1651f41de44cb35e953432d450b29f5
......@@ -181,4 +181,11 @@ def setup():
from nomad import infrastructure
if not app.config['TESTING']:
# each subprocess is supposed disconnect connect again: https://jira.mongodb.org/browse/PYTHON-2090
try:
from mongoengine import disconnect
disconnect()
except Exception:
pass
infrastructure.setup()
......@@ -121,6 +121,36 @@ def reset(remove, i_am_really_sure):
infrastructure.reset(remove)
@admin.command(help='Reset all "stuck" in processing uploads and calc in low level mongodb operations.')
@click.option('--zero-complete-time', is_flag=True, help='Sets the complete time to epoch zero.')
def reset_processing(zero_complete_time):
infrastructure.setup_mongo()
def reset_collection(cls):
in_processing = cls.objects(process_status__in=[proc.PROCESS_RUNNING, proc.base.PROCESS_CALLED])
print('%d %s processes need to be reset due to incomplete process' % (in_processing.count(), cls.__name__))
in_processing.update(
process_status=None,
current_process=None,
worker_hostname=None,
celery_task_id=None,
errors=[], warnings=[],
complete_time=datetime.datetime.fromtimestamp(0) if zero_complete_time else datetime.datetime.now(),
current_task=None,
tasks_status=proc.base.CREATED)
in_tasks = cls.objects(tasks_status__in=[proc.PENDING, proc.RUNNING])
print('%d %s processes need to be reset due to incomplete tasks' % (in_tasks.count(), cls.__name__))
in_tasks.update(
current_task=None,
tasks_status=proc.base.CREATED,
errors=[], warnings=[],
complete_time=datetime.datetime.fromtimestamp(0) if zero_complete_time else datetime.datetime.now())
reset_collection(proc.Calc)
reset_collection(proc.Upload)
@admin.command(help='Check and lift embargo of data with expired embargo period.')
@click.option('--dry', is_flag=True, help='Do not lift the embargo, just show what needs to be done.')
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
......
......@@ -27,30 +27,41 @@ from .admin import admin, __run_processing
@admin.group(help='Upload related commands')
@click.option('--user', help='Select uploads of user with given id', type=str)
@click.option('--staging', help='Select only uploads in staging', is_flag=True)
@click.option('--processing', help='Select only processing uploads', is_flag=True)
@click.option('--outdated', help='Select published uploads with older nomad version', is_flag=True)
@click.option('--code', multiple=True, type=str, help='Select only uploads with calcs of given codes')
@click.option('--query-mongo', is_flag=True, help='Select query mongo instead of elastic search.')
@click.option('--processing', help='Select only processing uploads', is_flag=True)
@click.option('--processing-failure-uploads', is_flag=True, help='Select uploads with failed processing')
@click.option('--processing-failure-calcs', is_flag=True, help='Select uploads with calcs with failed processing')
@click.option('--processing-failure', is_flag=True, help='Select uploads where the upload or any calc has failed processing')
@click.option('--processing-incomplete-uploads', is_flag=True, help='Select uploads that have not yet been processed')
@click.option('--processing-incomplete-calcs', is_flag=True, help='Select uploads where any calc has net yot been processed')
@click.option('--processing-incomplete', is_flag=True, help='Select uploads where the upload or any calc has not yet been processed')
@click.option('--processing-necessary', is_flag=True, help='Select uploads where the upload or any calc has either not been processed or processing has failed in the past')
@click.pass_context
def uploads(
ctx, user: str, staging: bool, processing: bool, outdated: bool,
code: typing.List[str], query_mongo: bool):
code: typing.List[str], query_mongo: bool,
processing_failure_uploads: bool, processing_failure_calcs: bool, processing_failure: bool,
processing_incomplete_uploads: bool, processing_incomplete_calcs: bool, processing_incomplete: bool,
processing_necessary: bool):
infrastructure.setup_mongo()
infrastructure.setup_elastic()
query = mongoengine.Q()
calc_query = None
if user is not None:
query &= mongoengine.Q(user_id=user)
query |= mongoengine.Q(user_id=user)
if staging:
query &= mongoengine.Q(published=False)
query |= mongoengine.Q(published=False)
if processing:
query &= mongoengine.Q(process_status=proc.PROCESS_RUNNING) | mongoengine.Q(tasks_status=proc.RUNNING)
query |= mongoengine.Q(process_status=proc.PROCESS_RUNNING) | mongoengine.Q(tasks_status=proc.RUNNING)
if outdated:
uploads = proc.Calc._get_collection().distinct(
'upload_id',
{'metadata.nomad_version': {'$ne': config.version}})
query &= mongoengine.Q(upload_id__in=uploads)
query |= mongoengine.Q(upload_id__in=uploads)
if code is not None and len(code) > 0:
code_queries = [es.Q('match', **{'dft.code_name': code_name}) for code_name in code]
......@@ -64,9 +75,23 @@ def uploads(
upload['key']
for upload in code_search.execute().aggs['uploads']['buckets']]
query &= mongoengine.Q(upload_id__in=uploads)
query |= mongoengine.Q(upload_id__in=uploads)
if processing_failure_calcs or processing_failure or processing_necessary:
if calc_query is None:
calc_query = mongoengine.Q()
calc_query |= mongoengine.Q(tasks_status=proc.FAILURE)
if processing_failure_uploads or processing_failure or processing_necessary:
query |= mongoengine.Q(tasks_status=proc.FAILURE)
if processing_incomplete_calcs or processing_incomplete or processing_necessary:
if calc_query is None:
calc_query = mongoengine.Q()
calc_query |= mongoengine.Q(process_status__ne=proc.PROCESS_COMPLETED)
if processing_incomplete_uploads or processing_incomplete or processing_necessary:
query |= mongoengine.Q(process_status__ne=proc.PROCESS_COMPLETED)
ctx.obj.query = query
ctx.obj.calc_query = calc_query
ctx.obj.uploads = proc.Upload.objects(query)
ctx.obj.query_mongo = query_mongo
......@@ -85,8 +110,11 @@ def query_uploads(ctx, uploads):
pass
query = ctx.obj.query
if ctx.obj.calc_query is not None:
query |= mongoengine.Q(
upload_id__in=proc.Calc.objects(ctx.obj.calc_query).distinct(field="upload_id"))
if len(uploads) > 0:
query &= mongoengine.Q(upload_id__in=uploads)
query |= mongoengine.Q(upload_id__in=uploads)
return query, proc.Upload.objects(query)
......
......@@ -115,6 +115,11 @@ def run_cli():
return cli() # pylint: disable=E1120,E1123
except ImportError:
import sys
if next(arg for arg in sys.argv if arg == '-v') is not None:
import traceback
traceback.print_exc()
print(
'You are accessing functionality that requires extra dependencies.\n'
'Check the NOMAD documentation or install all extra dependencies:\n'
......
......@@ -262,6 +262,15 @@ class DFTMetadata(MSection):
description='Metadata used for the optimade API.',
a_search='optimade')
def code_name_from_parser(self):
entry = self.m_parent
if entry.parser_name is not None:
from nomad.parsing import parser_dict
parser = parser_dict.get(entry.parser_name)
if hasattr(parser, 'code_name'):
return parser.code_name
return config.services.unavailable_value
def apply_domain_metadata(self, backend):
from nomad import utils
from nomad.normalizing.system import normalized_atom_labels
......@@ -271,15 +280,16 @@ class DFTMetadata(MSection):
upload_id=entry.upload_id, calc_id=entry.calc_id, mainfile=entry.mainfile)
if backend is None:
if entry.parser_name is not None:
from nomad.parsing import parser_dict
parser = parser_dict.get(entry.parser_name)
if hasattr(parser, 'code_name'):
self.code_name = parser.code_name
self.code_name = self.code_name_from_parser()
return
# code and code specific ids
self.code_name = backend.get_value('program_name', 0)
try:
self.code_name = backend.get_value('program_name', 0)
except KeyError as e:
logger.warn('backend after parsing without program_name', exc_info=e)
self.code_name = self.code_name_from_parser()
try:
self.code_version = simplify_version(backend.get_value('program_version', 0))
except KeyError:
......
......@@ -25,7 +25,7 @@ import shutil
from elasticsearch.exceptions import RequestError
from elasticsearch_dsl import connections
from mongoengine import connect, disconnect
from mongoengine.connection import MongoEngineConnectionError
from mongoengine.connection import ConnectionFailure
import smtplib
from email.mime.text import MIMEText
from keycloak import KeycloakOpenID, KeycloakAdmin
......@@ -67,12 +67,12 @@ def setup_files():
os.makedirs(directory)
def setup_mongo():
def setup_mongo(client=False):
''' Creates connection to mongodb. '''
global mongo_client
try:
mongo_client = connect(db=config.mongo.db_name, host=config.mongo.host, port=config.mongo.port)
except MongoEngineConnectionError:
except ConnectionFailure:
disconnect()
mongo_client = connect(db=config.mongo.db_name, host=config.mongo.host, port=config.mongo.port)
......
......@@ -28,6 +28,7 @@ import os
import signal
from nomad import metainfo
from nomad.datamodel.metainfo import m_env as general_nomad_metainfo_env
from .legacy import Backend
from .parser import Parser, MatchingParser
......@@ -52,7 +53,7 @@ class EmptyParser(MatchingParser):
Implementation that produces an empty code_run
'''
def run(self, mainfile: str, logger=None) -> Backend:
backend = Backend(metainfo='public', domain=self.domain, logger=logger)
backend = Backend(metainfo=general_nomad_metainfo_env, domain=self.domain, logger=logger)
backend.openSection('section_run')
backend.addValue('program_name', self.code_name)
backend.closeSection('section_run', 0)
......
......@@ -19,12 +19,12 @@ import os
from celery import Celery, Task
from celery.worker.request import Request
from celery.signals import after_setup_task_logger, after_setup_logger, worker_process_init, \
celeryd_after_setup
celeryd_after_setup, worker_process_shutdown
from celery.utils import worker_direct
from celery.exceptions import SoftTimeLimitExceeded
from billiard.exceptions import WorkerLostError
from mongoengine import Document, StringField, ListField, DateTimeField, ValidationError
from mongoengine.connection import MongoEngineConnectionError
from mongoengine.connection import ConnectionFailure
from mongoengine.base.metaclasses import TopLevelDocumentMetaclass
from datetime import datetime
import functools
......@@ -46,6 +46,13 @@ if config.logstash.enabled:
@worker_process_init.connect
def setup(**kwargs):
# each subprocess is supposed disconnect connect again: https://jira.mongodb.org/browse/PYTHON-2090
try:
from mongoengine import disconnect
disconnect()
except Exception:
pass
infrastructure.setup()
utils.get_logger(__name__).info(
'celery configured with acks_late=%s' % str(config.celery.acks_late))
......@@ -60,6 +67,13 @@ def capture_worker_name(sender, instance, **kwargs):
worker_hostname = sender
@worker_process_shutdown.connect
def on_worker_process_shutdown(*args, **kwargs):
# We need to make sure not to leave open sessions: https://jira.mongodb.org/browse/PYTHON-2090
from mongoengine.connection import disconnect
disconnect()
app = Celery('nomad.processing', broker=config.rabbitmq_url())
app.conf.update(worker_hijack_root_logger=False)
app.conf.update(worker_max_memory_per_child=config.celery.max_memory)
......@@ -217,7 +231,7 @@ class Proc(Document, metaclass=ProcMetaclass):
obj = cls.objects(**{id_field: id}).first()
except ValidationError as e:
raise InvalidId('%s is not a valid id' % id)
except MongoEngineConnectionError as e:
except ConnectionFailure as e:
raise e
if obj is None:
......
......@@ -8,7 +8,7 @@ metadata:
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
spec:
{{ if .Values.nodePort }}
{{ if .Values.proxy.nodePort }}
type: NodePort
externalIPs:
- {{ .Values.proxy.nodeIP }}
......@@ -18,7 +18,7 @@ spec:
targetPort: 80
protocol: TCP
name: http
{{ if .Values.nodePort }}
{{ if .Values.proxy.nodePort }}
nodePort: {{ .Values.proxy.nodePort }}
{{ end }}
selector:
......
......@@ -16,7 +16,7 @@ click
requests
bravado
pytz
aniso8601
aniso8601<=7
ase==3.19.0
python-keycloak
elasticsearch-dsl==6.4.0
......@@ -41,7 +41,7 @@ structlog
elasticsearch==6.4.0
msgpack<0.6.0
celery[redis]
mongoengine==0.18.2
mongoengine==0.19.1
Werkzeug==0.16.1
flask
flask-restplus
......
......@@ -158,6 +158,8 @@ def worker(mongo, celery_session_worker, celery_inspect):
@pytest.fixture(scope='session')
def mongo_infra(monkeysession):
monkeysession.setattr('nomad.config.mongo.db_name', 'test_db')
# disconnecting and connecting again results in an empty database with mongomock
monkeysession.setattr('mongoengine.disconnect', lambda *args, **kwargs: None)
return infrastructure.setup_mongo()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment