Commit 6d690b3e authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Implemented send emails after finish processing.

parent 9e049847
Pipeline #43221 canceled with stages
in 32 seconds
......@@ -48,9 +48,12 @@ MongoConfig = namedtuple('MongoConfig', ['host', 'port', 'db_name'])
LogstashConfig = namedtuple('LogstashConfig', ['enabled', 'host', 'tcp_port', 'level'])
""" Used to configure and enable/disable the ELK based centralized logging. """
NomadServicesConfig = namedtuple('NomadServicesConfig', ['api_host', 'api_port', 'api_base_path', 'api_secret', 'admin_password', 'disable_reset'])
NomadServicesConfig = namedtuple('NomadServicesConfig', ['api_host', 'api_port', 'api_base_path', 'api_secret', 'admin_password', 'upload_url', 'disable_reset'])
""" Used to configure nomad services: worker, handler, api """
MailConfig = namedtuple('MailConfig', ['enabled', 'host', 'port', 'user', 'password', 'from_address'])
""" Used to configure how nomad can send email """
files = FilesConfig(
uploads_bucket='uploads',
raw_bucket=os.environ.get('NOMAD_FILES_RAW_BUCKET', 'raw'),
......@@ -115,6 +118,7 @@ services = NomadServicesConfig(
api_base_path=os.environ.get('NOMAD_API_BASE_PATH', '/nomad/api'),
api_secret=os.environ.get('NOMAD_API_SECRET', 'defaultApiSecret'),
admin_password=os.environ.get('NOMAD_API_ADMIN_PASSWORD', 'password'),
upload_url=os.environ.get('NOMAD_UPLOAD_URL', 'http://localhost/nomad/uploads'),
disable_reset=os.environ.get('NOMAD_API_DISABLE_RESET', 'false') == 'true'
)
migration_source_db = RepositoryDBConfig(
......@@ -124,6 +128,14 @@ migration_source_db = RepositoryDBConfig(
user=os.environ.get('NOMAD_MIGRATION_SOURCE_USER', 'nomadlab'),
password=os.environ.get('NOMAD_MIGRATION_SOURCE_PASSWORD', '*')
)
mail = MailConfig(
enabled=True,
host=os.environ.get('NOMAD_SMTP_HOST', 'localhost'),
port=int(os.environ.get('NOMAD_SMTP_PORT', 8995)),
user=os.environ.get('NOMAD_SMTP_USER', None),
password=os.environ.get('NOMAD_SMTP_PASSWORD', None),
from_address=os.environ.get('NOMAD_MAIL_FROM', 'webmaster@nomad-coe.eu')
)
console_log_level = get_loglevel_from_env('NOMAD_CONSOLE_LOGLEVEL', default_level=logging.INFO)
service = os.environ.get('NOMAD_SERVICE', 'unknown nomad service')
......
......@@ -28,6 +28,8 @@ from elasticsearch.exceptions import RequestError
from elasticsearch_dsl import connections
from mongoengine import connect
from passlib.hash import bcrypt
import smtplib
from email.mime.text import MIMEText
from nomad import config, utils
......@@ -326,3 +328,35 @@ def reset_repository_db_schema(**kwargs):
sql_file = os.path.join(os.path.dirname(__file__), 'empty_repository_db.sql')
cur.execute(open(sql_file, 'r').read())
logger.info('(re-)created repository db postgres schema')
def send_mail(name: str, email: str, message: str, subject: str):
if config.mail.host is None or config.mail.host.strip() == '':
return
logger = utils.get_logger(__name__)
server = smtplib.SMTP(config.mail.host, config.mail.port)
if config.mail.port == 995:
try:
server.starttls()
except Exception as e:
logger.warning('Could use TTS', exc_info=e)
if config.mail.user is not None:
try:
server.login("youremailusername", "password")
except Exception as e:
logger.warning('Could not log into mail server', exc_info=e)
msg = MIMEText(message)
msg['Subject'] = subject
msg['From'] = 'nomad@fairdi webmaster'
msg['To'] = name
try:
server.send_message(msg, config.mail.from_address, email)
except Exception as e:
logger.error('Could send email', exc_info=e)
server.quit()
......@@ -30,7 +30,7 @@ import logging
from structlog import wrap_logger
from contextlib import contextmanager
from nomad import utils, coe_repo, datamodel
from nomad import utils, coe_repo, datamodel, config, infrastructure
from nomad.files import PathObject, ArchiveBasedStagingUploadFiles, ExtractError, Calc as FilesCalc
from nomad.processing.base import Proc, Chord, process, task, PENDING, SUCCESS, FAILURE
from nomad.parsing import parsers, parser_dict
......@@ -471,8 +471,18 @@ class Upload(Chord, datamodel.Upload):
@task
def cleanup(self):
# nothing todo with the current processing setup
pass
# send email about process finish
user = self.uploader
name = '%s %s' % (user.first_name, user.last_name)
message = '\n'.join([
'Dear %s,' % name,
'',
'your data %suploaded %s has completed processing.' % (
self.name if self.name else '', self.upload_time.isoformat()),
'You can review your data on your upload page: %s' % config.services.upload_url
])
infrastructure.send_mail(
name=name, email=user.email, message=message, subject='Processing completed')
@property
def processed_calcs(self):
......
......@@ -19,6 +19,8 @@ x-common-variables: &nomad_backend_env
NOMAD_LOGSTASH_HOST: elk
NOMAD_ELASTIC_HOST: elastic
NOMAD_MONGO_HOST: mongo
NOMAD_SMTP_HOST: ''
NOMAD_UPLOAD_URL: ''
services:
# postgres for NOMAD-coe repository API and GUI
......
......@@ -58,6 +58,18 @@ spec:
value: "{{ .Values.postgres.port }}"
- name: NOMAD_COE_REPO_DB_NAME
value: "{{ .Values.dbname }}"
- name: NOMAD_UPLOAD_URL
value: "{{ .Values.uploadurl }}"
- name: NOMAD_SMTP_HOST
value: "{{ .Values.mail.host }}"
- name: NOMAD_SMTP_PORT
value: "{{ .Values.mail.port }}"
- name: NOMAD_SMTP_USER
value: "{{ .Values.mail.user }}"
- name: NOMAD_SMTP_PASSWORD
value: "{{ .Values.mail.password }}"
- name: NOMAD_MAIL_FROM
value: "{{ .Values.mail.from }}"
command: ["python", "-m", "celery", "worker", "-l", "info", "-A", "nomad.processing"]
livenessProbe:
exec:
......
......@@ -72,6 +72,9 @@ rabbitmq:
## A common name/prefix for all dbs and indices.
dbname: fairdi_nomad
## The url for the upload page, used in emails to forward the user
uploadurl: 'http://nomad.fairdi.eu/uploads'
## Databases that are not run within the cluster.
# To run databases in the cluster, use the nomad-full helm chart.
mongo:
......@@ -94,6 +97,13 @@ kibana:
port: 15601
host: enc-preprocessing-nomad.esc
mail:
host: ''
port: 995
user: ''
password: ''
from: 'webmaster@fairdi.eu'
## Everything concerning the data that is used by the service
volumes:
files: /scratch/nomad-fair/fs
......@@ -4,6 +4,12 @@ from sqlalchemy.orm import Session
from mongoengine import connect
from mongoengine.connection import disconnect
from contextlib import contextmanager
from collections import namedtuple
from smtpd import SMTPServer
from threading import Lock, Thread
import asyncore
import time
import pytest
from nomad import config, infrastructure
......@@ -214,3 +220,106 @@ def with_error(caplog):
count += 1
assert count > 0
"""
Fixture for mocked SMTP server for testing.
Based on https://gist.github.com/akheron/cf3863cdc424f08929e4cb7dc365ef23.
"""
RecordedMessage = namedtuple(
'RecordedMessage',
'peer envelope_from envelope_recipients data',
)
class ThreadSafeList:
def __init__(self, *args, **kwds):
self._items = []
self._lock = Lock()
def clear(self):
with self._lock:
self._items = []
def add(self, item):
with self._lock:
self._items.append(item)
def copy(self):
with self._lock:
return self._items[:]
class SMTPServerThread(Thread):
def __init__(self, messages):
super().__init__()
self.messages = messages
self.host_port = None
def run(self):
_messages = self.messages
class _SMTPServer(SMTPServer):
def process_message(self, peer, mailfrom, rcpttos, data, **kwargs):
msg = RecordedMessage(peer, mailfrom, rcpttos, data)
_messages.add(msg)
self.smtp = _SMTPServer(('127.0.0.1', config.mail.port), None)
self.host_port = self.smtp.socket.getsockname()
try:
asyncore.loop(timeout=None)
except Exception:
pass
def close(self):
self.smtp.close()
class SMTPServerFixture:
def __init__(self):
self._messages = ThreadSafeList()
self._thread = SMTPServerThread(self._messages)
self._thread.start()
@property
def host_port(self):
'''SMTP server's listening address as a (host, port) tuple'''
while self._thread.host_port is None:
time.sleep(0.1)
return self._thread.host_port
@property
def host(self):
return self.host_port[0]
@property
def port(self):
return self.host_port[1]
@property
def messages(self):
'''A list of RecordedMessage objects'''
return self._messages.copy()
def clear(self):
self._messages.clear()
def close(self):
self._thread.close()
self._thread.join(10)
if self._thread.is_alive():
raise RuntimeError('smtp server thread did not stop in 10 sec')
@pytest.fixture(scope='session')
def smtpd(request):
fixture = SMTPServerFixture()
request.addfinalizer(fixture.close)
return fixture
@pytest.fixture(scope='function', autouse=True)
def mails(smtpd):
smtpd.clear()
yield smtpd
import pytest
from mongoengine import connect, IntField, ReferenceField, BooleanField, EmbeddedDocumentField
from mongoengine.connection import disconnect
from mongoengine import ReferenceField
import time
import logging
import json
import random
import time
from nomad import config
from nomad.processing.base import Proc, Chord, process, task, SUCCESS, FAILURE, RUNNING, PENDING
random.seed(0)
......
......@@ -24,8 +24,9 @@ from datetime import datetime
import shutil
import os.path
import json
import re
from nomad import utils
from nomad import utils, infrastructure
from nomad.files import ArchiveBasedStagingUploadFiles, UploadFiles, StagingUploadFiles
from nomad.processing import Upload, Calc
from nomad.processing.base import task as task_decorator, FAILURE, SUCCESS
......@@ -38,6 +39,13 @@ from tests.test_files import clear_files # pylint: disable=unused-import
example_files = [empty_file, example_file]
def test_send_mail(mails):
infrastructure.send_mail('test name', 'test@email.de', 'test message', 'subjct')
for message in mails.messages:
assert re.search(r'test message', message.data.decode('utf-8')) is not None
@pytest.fixture(scope='function', autouse=True)
def mocks_forall(mockmongo):
pass
......@@ -111,11 +119,14 @@ def assert_processing(upload: Upload):
assert upload_files.metadata.get(calc.calc_id) is not None
# @pytest.mark.timeout(30)
def test_processing(uploaded_id, worker, test_user, no_warn):
@pytest.mark.timeout(30)
def test_processing(uploaded_id, worker, test_user, no_warn, mails):
upload = run_processing(uploaded_id, test_user)
assert_processing(upload)
assert len(mails.messages) == 1
assert re.search(r'Processing completed', mails.messages[0].data.decode('utf-8')) is not None
@pytest.mark.timeout(30)
def test_processing_with_warning(uploaded_id_with_warning, worker, test_user):
......
......@@ -122,6 +122,7 @@ class TestAdmin:
config.services.api_base_path,
config.services.api_secret,
config.services.admin_password,
config.services.upload_url,
True)
monkeypatch.setattr(config, 'services', new_config)
yield None
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment