Commit 2a70925d authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Removed most repository db related artifacts.

parent 6aedbc03
......@@ -76,10 +76,6 @@ tests:
stage: test
image: $TEST_IMAGE
- postgres:latest
# this will cause a warning, as the gitlab ci runner health check will test the wrong
# port on rabbitmq container:
- rabbitmq
- name:
alias: elastic
......@@ -87,14 +83,10 @@ tests:
command: [ "bin/elasticsearch", "-Ediscovery.type=single-node" ]
......@@ -81,13 +81,9 @@ processing of uploaded files and therein contained calculations. We use
[mongoengine]( to program with mongodb.
### PostgreSQL
### Keycloak
A relational database is used to store all user provided metadata: users, datasets
(curated sets of uploaded data), references, comments, DOIs, coauthors, etc.
Furthermore, it is still used to store some of the calculation metadata derived
via parsing. *This will most likely move out of Postgres in the future.* We
use [SQLAlchemy]( as on ORM framework.
### flask, et al.
......@@ -124,7 +120,7 @@ The component library [Material-UI](
### docker
To run a **nomad@FAIRDI** instance, many services have to be orchestrated:
the nomad api, nomad worker, mongodb, Elasticsearch, PostgreSQL, RabbitMQ,
the nomad api, nomad worker, mongodb, Elasticsearch, Keycloak, RabbitMQ,
Elasticstack (logging), the nomad GUI, and a reverse proxy to keep everything together.
Further services might be needed (e.g. JypiterHUB), when nomad grows.
The container platform [Docker]( allows us to provide all services
......@@ -224,7 +220,7 @@ passed, stored, etc. by the various nomad modules.
### Implementation
The different entities have often multiple implementations for different storage systems.
For example, aspects of calculations are stored in files (raw files, calc metadata, archive data),
Postgres (user metadata), Elasticsearch (metadata), and mongodb (processing state).
Elasticsearch (metadata), and mongodb (metadata, processing state).
Different transformation between different implementations exist. See
:py:mod:`nomad.datamodel` for further information.
......@@ -38,10 +38,6 @@
.. automodule::
.. automodule:: nomad.coe_repo
.. automodule:: nomad.api
......@@ -263,7 +263,7 @@ The rest should be mocked or provided by the tests. Make sure that you do no run
worker, as they will fight for tasks in the queue.
cd ops/docker-compose
docker-compose up -d elastic rabbitmq postgres
docker-compose up -d elastic rabbitmq
cd ../..
pytest -svx tests
......@@ -26,10 +26,8 @@ import inspect
from datetime import datetime
import pytz
import random
from flask_oidc import OpenIDConnect
import json
from nomad import config, utils
from nomad import config, utils, infrastructure
base_path =
""" Provides the root path of the nomad APIs. """
......@@ -61,24 +59,6 @@ app.config.RESTPLUS_MASK_SWAGGER = False # type: ignore
app.config.SWAGGER_UI_OPERATION_ID = True # type: ignore
app.config.SWAGGER_UI_REQUEST_DURATION = True # type: ignore
oidc_issuer_url = '%s/realms/%s' % (config.keycloak.server_url.rstrip('/'), config.keycloak.realm_name)
oidc_client_secrets = dict(
auth_uri='%s/protocol/openid-connect/auth' % oidc_issuer_url,
token_uri='%s/protocol/openid-connect/token' % oidc_issuer_url,
userinfo_uri='%s/protocol/openid-connect/userinfo' % oidc_issuer_url,
token_introspection_uri='%s/protocol/openid-connect/token/introspect' % oidc_issuer_url,
oidc_client_secrets_file = os.path.join(config.fs.tmp, 'oidc_client_secrets')
with open(oidc_client_secrets_file, 'wt') as f:
json.dump(dict(web=oidc_client_secrets), f)
def api_base_path_response(env, resp):
resp('200 OK', [('Content-Type', 'text/plain')])
......@@ -90,7 +70,7 @@ def api_base_path_response(env, resp):
app.wsgi_app = DispatcherMiddleware( # type: ignore
api_base_path_response, { app.wsgi_app})
oidc = OpenIDConnect(app)
......@@ -31,131 +31,60 @@ endpoints that require or support authentication.
.. autofunction:: admin_login_required
from typing import Tuple
from flask import g, request
from flask_restplus import abort, Resource, fields
from datetime import datetime
import functools
import basicauth
import jwt
import datetime
from nomad import config, processing, files, utils, coe_repo, infrastructure
from nomad.coe_repo import LoginException
from nomad import config, processing, files, utils, infrastructure, datamodel
from .app import api, RFC3339DateTime, oidc
from .app import api, RFC3339DateTime
class User:
A data class that holds all information for a single user. This can be the logged in
and authenticated user, or other users (i.e. co-authors, etc.).
def __init__(
self, email, name=None, first_name='', last_name='', affiliation=None,
created: datetime = None, **kwargs):
assert email is not None, 'Users must have an email, it is used as unique id' = email
first_name = kwargs.get('firstName', first_name)
last_name = kwargs.get('lastName', last_name)
name = kwargs.get('username', name)
created_timestamp = kwargs.get('createdTimestamp', None)
if len(last_name) > 0 and len(first_name) > 0:
name = '%s, %s' % (last_name, first_name)
elif len(last_name) != 0:
name = last_name
elif len(first_name) != 0:
name = first_name
elif name is None:
name = 'unnamed user' = name
if created is not None:
self.created = None
elif created_timestamp is not None:
self.created = datetime.fromtimestamp(created_timestamp)
self.created = None
# TODO affliation
def _validate_token(require_token: bool = True, **kwargs) -> Tuple[bool, str]:
Uses OIDC to check if the request carries token based authentication and if
this authentication is valid.
Returns: A tuple with bool and potential error message
token = None
if 'Authorization' in request.headers and request.headers['Authorization'].startswith('Bearer '):
token = request.headers['Authorization'].split(None, 1)[1].strip()
if 'access_token' in request.form:
token = request.form['access_token']
elif 'access_token' in request.args:
token = request.args['access_token']
validity = oidc.validate_token(token, **kwargs)
if validity:
g.oidc_id_token = g.oidc_token_info
return (validity is True) or (not require_token), validity
def _get_user():
Retrieves OIDC user info and populate the global flask ``g.user`` variable.
if g.oidc_id_token:
g.user = User(**oidc.user_getinfo([
'email', 'firstName', 'lastName', 'username', 'createdTimestamp']))
except Exception as e:
## TODO logging
raise e
g.user = None
def login_if_available(func):
def login_if_available(token_only: bool = True):
A decorator for API endpoint implementations that might authenticate users, but
provide limited functionality even without users.
@api.response(401, 'Not authorized, some data require authentication and authorization')
@api.doc(security=list('OpenIDConnect Bearer Token'))
def wrapper(*args, **kwargs):
valid, msg = _validate_token(require_token=False)
if valid:
def decorator(func):
@api.response(401, 'Not authorized, some data require authentication and authorization')
@api.doc(security=list('OpenIDConnect Bearer Token'))
def wrapper(*args, **kwargs):
user_or_error = infrastructure.keycloak.authorize_flask(token_only)
if user_or_error is None:
elif isinstance(user_or_error, datamodel.User):
g.user = user_or_error
abort(401, message=user_or_error)
return func(*args, **kwargs)
abort(401, message=msg)
return wrapper
return wrapper
return decorator
def login_really_required(func):
def login_really_required(token_only: bool = True):
A decorator for API endpoint implementations that forces user authentication on
@api.response(401, 'Not authorized, this endpoint required authorization')
@api.doc(security=list('OpenIDConnect Bearer Token'))
def wrapper(*args, **kwargs):
valid, msg = _validate_token(require_token=True)
if valid:
def decorator(func):
@api.response(401, 'Not authorized, this endpoint requires authorization')
def wrapper(*args, **kwargs):
if g.user is None:
abort(401, 'Not authorized, this endpoint requires authorization')
return func(*args, **kwargs)
abort(401, message=msg)
return wrapper
return wrapper
return decorator
def admin_login_required(func):
......@@ -164,13 +93,12 @@ def admin_login_required(func):
@api.response(401, 'Authentication required or not authorized as admin user. Only admin can access this endpoint.')
@api.doc(security=list('OpenIDConnect Bearer Token'))
def wrapper(*args, **kwargs):
if oidc.user_getfield('email') == config.keycloak.adminEmail:
return func(*args, **kwargs)
abort(401, message='Only the admin user can perform reset.')
if not g.user.is_admin:
abort(401, message='Only the admin user use this endpoint')
return func(*args, **kwargs)
return wrapper
......@@ -200,69 +128,11 @@ user_model = api.model('User', {
class AuthResource(Resource):
@api.marshal_with(user_model, skip_none=True, code=200, description='User info send')
def get(self):
if g.user is not None:
return g.user
if 'Authorization' in request.headers and request.headers['Authorization'].startswith('Basic '):
username, password = basicauth.decode(request.headers['Authorization'])
token = infrastructure.keycloak_oidc_client.token(username=username, password=password)
validity = oidc.validate_token(token['access_token'])
except Exception as e:
# TODO logging
abort(401, message='Could not authenticate Basic auth: %s' % str(e))
if validity is not True:
abort(401, message=validity)
g.oidc_id_token = g.oidc_token_info
abort(401, message='Authentication credentials found in your request')
if g.user is None:
abort(401, message='User not authenticated')
return g.user
class UserResource(Resource):
@api.response(400, 'Invalid user data')
@api.marshal_with(user_model, skip_none=True, code=200, description='User created')
def put(self):
Creates a new user account. Currently only the admin user is allows. The
NOMAD-CoE repository GUI should be used to create user accounts for now.
Passwords have to be encrypted by the client with bcrypt and 2y indent.
data = request.get_json()
if data is None:
data = {}
for required_key in ['last_name', 'first_name', 'password', 'email']:
if required_key not in data:
abort(400, message='The %s is missing' % required_key)
if 'user_id' in data:
if coe_repo.User.from_user_id(data['user_id']) is not None:
abort(400, 'User with given user_id %d already exists.' % data['user_id'])
user = coe_repo.User.create_user(
email=data['email'], password=data.get('password', None), crypted=True,
first_name=data['first_name'], last_name=data['last_name'],
created=data.get('created', datetime.utcnow()),
affiliation=data.get('affiliation', None), token=data.get('token', None),
user_id=data.get('user_id', None))
return user, 200
token_model = api.model('Token', {
'user': fields.Nested(user_model),
'token': fields.String(description='The short term token to sign URLs'),
......@@ -286,7 +156,11 @@ class TokenResource(Resource):
URLs towards most API get request, e.g. for file downloads on the
raw or archive api endpoints. Use the token query parameter to sign URLs.
token, expires_at = g.user.get_signature_token()
expires_at = datetime.datetime.utcnow() + datetime.timedelta(seconds=10)
token = jwt.encode(
dict(user=g.user.user_id, exp=expires_at),, 'HS256').decode('utf-8')
return {
'user': g.user,
'token': token,
......@@ -298,18 +172,27 @@ def with_signature_token(func):
A decorator for API endpoint implementations that validates signed URLs.
@api.response(401, 'Invalid or expired signature token')
def wrapper(*args, **kwargs):
token = request.args.get('token', None)
if token is not None:
g.user = coe_repo.User.verify_signature_token(token)
except LoginException:
abort(401, 'Invalid or expired signature token')
decoded = jwt.decode(token,, algorithms=['HS256'])
user = datamodel.User.get(decoded['user'])
if user is None:
abort(401, 'User for token does not exist')
g.user = user
except KeyError:
abort(401, 'Token with invalid/unexpected payload')
except jwt.ExpiredSignatureError:
abort(401, 'Expired token')
except jwt.InvalidTokenError:
abort(401, 'Invalid token')
return func(*args, **kwargs)
wrapper.__name__ = func.__name__
wrapper.__doc__ = func.__doc__
return wrapper
......@@ -322,19 +205,12 @@ def create_authorization_predicate(upload_id, calc_id=None):
if g.user is None:
# guest users don't have authorized access to anything
return False
elif g.user.user_id == 0:
elif g.user.is_admin:
# the admin user does have authorization to access everything
return True
# look in repository
upload = coe_repo.Upload.from_upload_id(upload_id)
if upload is not None:
return upload.user_id == g.user.user_id
# look in staging
staging_upload = processing.Upload.get(upload_id)
if staging_upload is not None:
return str(g.user.user_id) == str(staging_upload.user_id)
# look in mongodb
processing.Upload.get(upload_id).user_id == g.user.user_id
# There are no db entries for the given resource
if files.UploadFiles.get(upload_id) is not None:
......@@ -63,10 +63,10 @@ metadata_model = api.model('MetaData', {
'with_embargo': fields.Boolean(default=False, description='Data with embargo is only visible to the upload until the embargo period ended.'),
'comment': fields.String(description='The comment are shown in the repository for each calculation.'),
'references': fields.List(fields.String, descriptions='References allow to link calculations to external source, e.g. URLs.'),
'coauthors': fields.List(fields.Integer, description='A list of co-authors given by user_id.'),
'shared_with': fields.List(fields.Integer, description='A list of users to share calculations with given by user_id.'),
'coauthors': fields.List(fields.String, description='A list of co-authors given by user_id.'),
'shared_with': fields.List(fields.String, description='A list of users to share calculations with given by user_id.'),
'_upload_time': RFC3339DateTime(description='Overrride the upload time.'),
'_uploader': fields.Integer(description='Override the uploader with the given user id.'),
'_uploader': fields.String(description='Override the uploader with the given user id.'),
'datasets': fields.List(fields.Nested(model=dataset_model, skip_none=True), description='A list of datasets.')
......@@ -20,7 +20,7 @@ from pymongo import UpdateOne
import threading
import elasticsearch_dsl as es
from nomad import processing as proc, config, infrastructure, utils, search, files, coe_repo
from nomad import processing as proc, config, infrastructure, utils, search, files, datamodel
from .admin import admin
......@@ -90,17 +90,15 @@ def ls(ctx, uploads):
@uploads.command(help='Change the owner of the upload and all its calcs.')
@click.argument('USER', nargs=1)
@click.argument('EMAIL', nargs=1)
@click.argument('UPLOADS', nargs=-1)
def chown(ctx, user, uploads):
def chown(ctx, email, uploads):
_, uploads = query_uploads(ctx, uploads)
print('%d uploads selected, changing its owner ...' % uploads.count())
user_id = user
user = coe_repo.User.from_user_id(int(user_id))
user = datamodel.User.get_by_email(email)
for upload in uploads:
upload.user_id = user_id
......@@ -142,26 +140,17 @@ def index(ctx, uploads):
@uploads.command(help='Delete selected upload')
@click.argument('UPLOADS', nargs=-1)
@click.option('--with-coe-repo', help='Also attempt to delete from repository db', is_flag=True)
@click.option('--skip-es', help='Keep the elastic index version of the data.', is_flag=True)
@click.option('--skip-mongo', help='Keep uploads and calcs in mongo.', is_flag=True)
@click.option('--skip-files', help='Keep all related files.', is_flag=True)
def rm(ctx, uploads, with_coe_repo, skip_es, skip_mongo, skip_files):
def rm(ctx, uploads, skip_es, skip_mongo, skip_files):
_, uploads = query_uploads(ctx, uploads)
logger = utils.get_logger(__name__)
print('%d uploads selected, deleting ...' % uploads.count())
if with_coe_repo:
from nomad import coe_repo
for upload in uploads:
# delete repository db entry
if with_coe_repo:
# delete elastic
if not skip_es:
# Copyright 2018 Markus Scheidgen
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
import click
import time
import datetime
import os
import os.path
import re
import shutil
import multiprocessing
import queue
import json
from nomad import config, infrastructure
from nomad.migration import NomadCOEMigration, SourceCalc, Package, missing_calcs_data
from .client import client
def _Migration(**kwargs) -> NomadCOEMigration:
return NomadCOEMigration(**kwargs)
def _setup():
pass'Migrate data from NOMAD CoE to nomad@FAIRDI')
@click.option('-h', '--host',, help='The migration repository source db host, default is "%s".' %
@click.option('-p', '--port', default=config.migration_source_db.port, help='The migration repository source db port, default is %d.' % config.migration_source_db.port)
@click.option('-u', '--user', default=config.migration_source_db.user, help='The migration repository source db user, default is %s.' % config.migration_source_db.user)
@click.option('-w', '--password', default=config.migration_source_db.password, help='The migration repository source db password.')
@click.option('-db', '--dbname', default=config.migration_source_db.dbname, help='The migration repository source db name, default is %s.' % config.migration_source_db.dbname)
@click.option('--migration-version', default=0, type=int, help='The version number, only packages with lower or no number will be migrated.')
@click.option('--package-directory', default=config.fs.migration_packages, help='The directory used as bucket for upload packages, default is %s.' % config.fs.migration_packages)
@click.option('--compress-packages', is_flag=True, help='Turn on compression for creating migration packages')
def migration(
host, port, user, password, dbname, migration_version, package_directory, compress_packages):
global _setup
def _setup():
readony=True, host=host, port=port, user=user, password=password, dbname=dbname)
global _Migration
def _Migration(**kwargs):
return NomadCOEMigration(
migration_version=migration_version, package_directory=package_directory,
compress_packages=compress_packages, **kwargs)
@migration.command(help='Create/update the coe repository db migration index')
@click.option('--drop', help='Drop the existing index, otherwise it will only add new data.', is_flag=True)
@click.option('--with-metadata', help='Extract metadata for each calc and add it to the index.', is_flag=True)
@click.option('--per-query', default=100, help='We index many objects with one query. Default is 100.')
@click.option('--start-pid', type=int, default=-1, help='Only index calculations with PID greater equal the given value')
def index(drop, with_metadata, per_query, start_pid):
start = time.time()
indexed_total = 0
indexed_calcs = 0
for calc, total in _Migration().source_calc_index(
drop=drop, with_metadata=with_metadata, per_query=int(per_query), start_pid=start_pid):
indexed_total += 1
indexed_calcs += 1 if calc is not None else 0
eta = total * ((time.time() - start) / indexed_total)
'indexed: %8d, calcs: %8d, total: %8d, ETA: %s\r' %
(indexed_total, indexed_calcs, total, datetime.timedelta(seconds=eta)), end='')