Commit 25fb6941 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Merge branch 'offline' into 'master'


See merge request !6
parents 6b6d78fd b30be7cb
Pipeline #37282 passed with stages
in 4 minutes and 35 seconds
......@@ -60,7 +60,6 @@ tests:
# fix issue with running elastic in gitlab ci runner:
command: [ "bin/elasticsearch", "-Ediscovery.type=single-node" ]
- mongo:latest
......@@ -22,5 +22,11 @@
"workbench.colorCustomizations": {
"editorError.foreground": "#FF2222",
"editorOverviewRuler.errorForeground": "#FF2222",
"editorWarning.foreground": "#FF5500",
"editorOverviewRuler.warningForeground": "#FF5500"
\ No newline at end of file
......@@ -61,8 +61,12 @@ COPY --from=build /install/.dependencies/nomad-meta-info /app/.dependencies/noma
# copy the documentation, its files will be served by the API
COPY --from=build /install/docs/.build /app/docs/.build
RUN useradd -ms /bin/bash nomad
RUN mkdir -p /app/.volumes/fs
RUN mkdir -p /nomad
RUN useradd -ms /bin/bash nomad
RUN chown -R nomad /app
RUN chown -R nomad /nomad
USER nomad
VOLUME /app/.volumes/fs
VOLUME /nomad
......@@ -109,14 +109,11 @@ The images are build via *docker-compose* and don't have to be created manually.
We have multiple *docker-compose* files that must be used together.
- `docker-compose.yml` containes the base definitions for all services
- `` configures services for development (notably builds images for nomad services)
- `docker-compose.override.yml` configures services for development (notably builds images for nomad services)
- `` configures services for production (notable uses a pre-build image for nomad services that was build during CI/CD)
It is sufficient to use the implicit `docker-compose.yml` only (like in the command below).
To also use `` replace `docker-compose` with
`docker-compose -f docker-compose.yml -f`.
The biggest difference is that `*.dev.*` exposes more ports to you host, which can
be beneficial for debugging.
The `override` will be used automatically.
There is also an `.env` file. For development you can use `.env_dev`:
......@@ -206,7 +203,12 @@ development, like running them in a debugger, profiler, etc.
### Run the nomad worker manually
To simply run a worker do (from the root)
To simply run a worker with the installed nomad cli, do (from the root)
nomad run worker
To run it manually with celery, do (from the root)
celery -A nomad.processing worker -l info
......@@ -228,6 +230,11 @@ watchmedo auto-restart -d ./nomad -p '*.py' -- celery worker -l info -A
### Run the api
Either with docker, or:
nomad run api
Or manually:
python nomad/
......@@ -42,10 +42,14 @@ services:
- elk
- /nomad:/nomad
- elk
- /nomad:/nomad
......@@ -25,6 +25,7 @@ content-type: application/json
DELETE http://localhost:9200/calcs HTTP/1.1
POST http://localhost:9200/calcs/_update_by_query HTTP/1.1
......@@ -20,7 +20,7 @@ from elasticsearch.exceptions import NotFoundError
from datetime import datetime
import os.path
from nomad import config
from nomad import config, infrastructure
from nomad.files import UploadFile, ArchiveFile, ArchiveLogFile
from nomad.utils import get_logger
from nomad.processing import Upload, NotAllowedDuringProcessing
......@@ -41,6 +41,11 @@ auth = HTTPBasicAuth()
api = Api(app)
def setup():
def verify_password(username_or_token, password):
# first try to authenticate by token
......@@ -180,6 +185,9 @@ class UploadsRes(Resource):
:jsonparam string name: An optional name for the upload.
:jsonparem string local_path: An optional path the a file that is already on the server.
In this case, uploading a file won't be possible, the local file is processed
immediatly as if it was uploaded.
:reqheader Content-Type: application/json
:resheader Content-Type: application/json
:status 200: upload successfully created
......@@ -189,7 +197,19 @@ class UploadsRes(Resource):
if json_data is None:
json_data = {}
upload = Upload.create(user=g.user, name=json_data.get('name'))
upload = Upload.create(
if upload.local_path is not None:
logger = get_logger(
__name__, endpoint='uploads', action='post', upload_id=upload.upload_id)
logger.debug('file uploaded offline')
upload.upload_time =
logger.debug('initiated processing')
return upload.json_dict, 200
......@@ -718,6 +738,8 @@ def get_calc(upload_hash, calc_hash):
def call_admin_operation(operation):
if operation == 'repair_uploads':
if operation == 'reset':
abort(400, message='Unknown operation %s' % operation)
......@@ -17,17 +17,34 @@ Simple client library for the nomad api that allows to bulk upload files via she
import os.path
import os
import sys
import subprocess
import shlex
import time
import sys
import requests
from requests.auth import HTTPBasicAuth
import click
api_base = 'http://localhost/nomad/api'
user = ''
pw = 'nomad'
def handle_common_errors(func):
def wrapper(*args, **kwargs):
func(*args, **kwargs)
except requests.exceptions.ConnectionError:
'\nCould not connect to nomad at %s. '
'Check connection and host/port options.' % api_base)
return wrapper
def upload_file(file_path, name=None, user='', pw='nomad'):
def upload_file(file_path, name=None, offline=False):
Upload a file to nomad.
......@@ -40,14 +57,20 @@ def upload_file(file_path, name=None, user='', pw='nomad'):
if name is None:
name = os.path.basename(file_path)
upload ='%s/uploads' % api_base, data={name: name}, auth=auth).json()
post_data = dict(name=name)
if offline:
click.echo('process offline: %s' % file_path)
upload_cmd = upload['upload_command']
upload_cmd = upload_cmd.replace('your_file', file_path)
upload ='%s/uploads' % api_base, json=post_data, auth=auth).json()
if not offline:
upload_cmd = upload['upload_command']
upload_cmd = upload_cmd.replace('local_file', file_path)
print('File uploaded')
click.echo('uploaded: %s' % file_path)
while True:
upload = requests.get('%s/uploads/%s' % (api_base, upload['upload_id']), auth=auth).json()
......@@ -59,18 +82,118 @@ def upload_file(file_path, name=None, user='', pw='nomad'):
total, successes, failures = (
calcs_pagination[key] for key in ('total', 'successes', 'failures'))
ret = '\n' if status in ('SUCCESS', 'FAILURE') else '\r'
'status: %s; task: %s; parsing: %d/%d/%d' %
(status, upload['current_task'], successes, failures, total))
'status: %s; task: %s; parsing: %d/%d/%d %s' %
(status, upload['current_task'], successes, failures, total, ret), end='')
if status in ('SUCCESS', 'FAILURE'):
if status == 'FAILURE':
click.echo('There have been errors:')
for error in upload['errors']:
click.echo(' %s' % error)
def walk_through_files(path, extension='.zip'):
Returns all abs path of all files in a sub tree of the given path that match
the given extension.
path (str): the directory
extension (str): the extension, incl. '.', e.g. '.zip' (default)
for (dirpath, _, filenames) in os.walk(path):
for filename in filenames:
if filename.endswith(extension):
yield os.path.abspath(os.path.join(dirpath, filename))
@click.option('--host', default='localhost', help='The host nomad runs on, default is "localhost".')
@click.option('--port', default=80, help='the port nomad runs with, default is 80.')
def cli(host: str, port: int):
global api_base
api_base = 'http://%s:%d/nomad/api' % (host, port)
help='Upload files to nomad. The given path can be a single file or a directory. '
'All .zip files in a directory will be uploaded.')
@click.argument('PATH', nargs=-1, required=True, type=click.Path(exists=True))
help='Optional name for the upload of a single file. Will be ignored on directories.')
'--offline', is_flag=True, default=False,
help='Upload files "offline": files will not be uploaded, but processed were they are. '
'Only works when run on the nomad host.')
def upload(path, name: str, offline: bool):
paths = path
click.echo('uploading files from %s paths' % len(paths))
for path in paths:
click.echo('uploading %s' % path)
if os.path.isfile(path):
name = name if name is not None else os.path.basename(path)
upload_file(path, name, offline)
elif os.path.isdir(path):
for file_path in walk_through_files(path):
name = os.path.basename(file_path)
upload_file(file_path, name, offline)
click.echo('Unknown path type %s.' % path)
@cli.command(help='Attempts to reset the nomad.')
def reset():
response ='%s/admin/reset' % api_base, auth=HTTPBasicAuth(user, pw))
if response.status_code != 200:
click.echo('API return %s' % str(response.status_code))
sys.exit(1)'Run a nomad service locally (outside docker).')
def run():
@run.command(help='Run the nomad development worker.')
def worker():
from nomad import processing['worker', '--loglevel=INFO'])
@run.command(help='Run the nomad development api.')
def api():
from nomad import infrastructure, api
infrastructure.setup(), port=8000)
@cli.command(help='Runs tests and linting. Useful before commit code.')
def qa():
os.chdir(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
ret_code = 0
click.echo('Run tests ...')
ret_code += os.system('python -m pytest tests')
click.echo('Run code style checks ...')
ret_code += os.system('python -m pycodestyle --ignore=E501,E701 nomad tests')
click.echo('Run linter ...')
ret_code += os.system('python -m pylint --load-plugins=pylint_mongoengine nomad tests')
click.echo('Run static type checks ...')
ret_code += os.system('python -m mypy --ignore-missing-imports --follow-imports=silent --no-strict-optional nomad tests')
if __name__ == '__main__':
if len(sys.argv) > 3 or len(sys.argv) == 1:
print('usage is: <client> filte_to_upload [upload_name]')
upload_file(sys.argv[1], sys.argv[2] if len(sys.argv) == 3 else None)
cli() # pylint: disable=E1120
......@@ -85,3 +85,5 @@ services = NomadServicesConfig(
api_base_path=os.environ.get('NOMAD_API_BASE_PATH', '/nomad/api'),
api_secret=os.environ.get('NOMAD_API_SECRET', 'defaultApiSecret')
console_log_level = getattr(logging, os.environ.get('NOMAD_CONSOLE_LOGLEVEL', 'INFO'), 'INFO')
......@@ -43,6 +43,7 @@ import os
import os.path
import logging
import subprocess
import shutil
_meta_info_path = './submodules/nomad-meta-info/meta_info/nomad_meta_info/'
_logger = logging.getLogger(__name__)
......@@ -204,8 +205,12 @@ if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Install dependencies from NOMAD-coe.')
parser.add_argument('--dev', help='pip install with -e', action='store_true')
parser.add_argument('--clean', help='remove old deps first', action='store_true')
args = parser.parse_args()
if args.clean and os.path.exists(base_dir):
......@@ -53,9 +53,6 @@ from nomad import config, utils
class Objects:
Object store like abstraction based on a regular file system.
def _os_path(cls, bucket: str, name: str, ext: str) -> str:
if ext is not None and ext != '':
......@@ -74,22 +71,6 @@ class Objects:
return os.path.abspath(path)
def open(cls, bucket: str, name: str, ext: str = None, *args, **kwargs) -> IO:
""" Open an object like you would a file, e.g. with 'rb', etc. """
return open(cls._os_path(bucket, name, ext), *args, **kwargs)
except FileNotFoundError:
raise KeyError()
def delete(cls, bucket: str, name: str, ext: str = None) -> None:
""" Delete a single object. """
os.remove(cls._os_path(bucket, name, ext))
except FileNotFoundError:
raise KeyError()
def delete_all(cls, bucket: str, prefix: str = ''):
""" Delete all files with given prefix, prefix must denote a directory. """
......@@ -98,11 +79,6 @@ class Objects:
except FileNotFoundError:
def exists(cls, bucket: str, name: str, ext: str = None) -> bool:
""" Returns True if object exists. """
return os.path.exists(cls._os_path(bucket, name, ext))
class File:
......@@ -127,19 +103,22 @@ class File:
def open(self, *args, **kwargs) -> IO:
""" Opens the object with he given mode, etc. """
self.logger.debug('open file')
return, self.object_id, self.ext, *args, **kwargs)
return open(self.os_path, *args, **kwargs)
except FileNotFoundError:
raise KeyError()
def delete(self) -> None:
""" Deletes the file with the given object id. """
Objects.delete(self.bucket, self.object_id, self.ext)
self.logger.debug('file deleted')
except FileNotFoundError:
raise KeyError()
def exists(self) -> bool:
""" Returns true if object exists. """
return Objects.exists(self.bucket, self.object_id, self.ext)
return os.path.exists(self.os_path)
def os_path(self) -> str:
......@@ -163,6 +142,10 @@ class UploadFile(File):
upload_id: The upload of this uploaded file.
local_path: Optional override for the path used to store/access the upload
on the server. This can be usefull to create uploads for files that
were not uploaded but put to the server in another way, e.g. offline
processing, syncing with other data, etc.
upload_extract_dir: The path of the tmp directory with the extracted contents.
......@@ -172,7 +155,7 @@ class UploadFile(File):
formats = ['zip']
""" A human readable list of supported file formats. """
def __init__(self, upload_id: str) -> None:
def __init__(self, upload_id: str, local_path: str = None) -> None:
......@@ -180,6 +163,7 @@ class UploadFile(File):
self.upload_extract_dir: str = os.path.join(config.fs.tmp, 'uploads_extracted', upload_id)
self.filelist: List[str] = None
self._local_path = local_path
# There is not good way to capsule decorators in a class:
......@@ -195,6 +179,10 @@ class UploadFile(File):
raise FileError(msg, e)
return wrapper
def os_path(self):
return self._local_path if self._local_path is not None else super().os_path
def hash(self) -> str:
""" Calculates the first 28 bytes of a websafe base64 encoded SHA512 of the upload. """
......@@ -255,6 +243,16 @@ class UploadFile(File):
""" Returns the tmp directory relative version of a filename. """
return os.path.join(self.upload_extract_dir, filename)
def delete(self) -> None:
""" Deletes the file with the given object id. """
# Do not delete local files, no matter what
if self._local_path is None:
self.logger.debug('file deleted')
except FileNotFoundError:
raise KeyError()
def is_valid(self):
return is_zipfile(self.os_path)
# Copyright 2018 Markus Scheidgen
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
This module provides function to establish connections to the database, searchengine, etc.
infrastructure services.
import shutil
from mongoengine import connect
from elasticsearch_dsl import connections
from elasticsearch.exceptions import RequestError
from nomad import config, utils
logger = utils.get_logger(__name__)
elastic_client = None
""" The elastic search client. """
mongo_client = None
""" The pymongo mongodb client. """
def setup():
""" Creates connections to mongodb and elastic search. """
global elastic_client
from nomad import user
def setup_mongo():
""" Creates connection to mongodb. """
global mongo_client
mongo_client = connect(db=config.mongo.users_db,, port=config.mongo.port)'setup mongo connection')
def setup_elastic():
""" Creates connection to elastic search. """
global elastic_client
elastic_client = connections.create_connection(hosts=[])'setup elastic connection')
from nomad.repo import RepoCalc
except RequestError as e:
if e.status_code == 400 and 'resource_already_exists_exception' in e.error:
pass # happens if two services try this at the same time
raise e
else:'init elastic index')
def reset():
""" Resets the databases mongo/user and elastic/calcs. Be careful. """
from nomad import user
from nomad.repo import RepoCalc
shutil.rmtree(config.fs.objects, ignore_errors=True)
shutil.rmtree(config.fs.tmp, ignore_errors=True)
......@@ -18,21 +18,16 @@ import time
from celery import Celery
from celery.signals import after_setup_task_logger, after_setup_logger, worker_process_init
from mongoengine import Document, StringField, ListField, DateTimeField, IntField, \
connect, ValidationError, BooleanField
ValidationError, BooleanField
from mongoengine.connection import MongoEngineConnectionError
from mongoengine.base.metaclasses import TopLevelDocumentMetaclass
from pymongo import ReturnDocument
from datetime import datetime
import sys
from nomad import config, utils
from nomad import config, utils, infrastructure
import nomad.patch # pylint: disable=unused-import
def mongo_connect():
return connect(db=config.mongo.users_db,, port=config.mongo.port)
if config.logstash.enabled:
def initialize_logstash(logger=None, loglevel=logging.DEBUG, **kwargs):
......@@ -41,14 +36,15 @@ if config.logstash.enabled:
worker_process_init.connect(lambda **kwargs: mongo_connect())