diff --git a/.vscode/launch.json b/.vscode/launch.json
index 51439e8689c64d1b50b9b721214d6def20013224..29b01f918c446d3899134d9f7fdb212423dbc2c4 100644
--- a/.vscode/launch.json
+++ b/.vscode/launch.json
@@ -68,13 +68,13 @@
       ]
     },
     {
-      "name": "Python: tests/test_utils.py",
+      "name": "Python: tests/processing/test_base.py",
       "type": "python",
       "request": "launch",
       "cwd": "${workspaceFolder}",
       "program": "${workspaceFolder}/.pyenv/bin/pytest",
       "args": [
-        "-sv", "tests/test_utils.py"
+        "-sv", "tests/processing/test_base.py::test_fail"
       ]
     },
     {
diff --git a/infrastructure/nomadxt/docker-compose.yml b/infrastructure/nomadxt/docker-compose.yml
index 2eceb4d49dc4266a156ba2216e21a5f245c3a5f3..dfa748c32626b9e6dff915b2867c74041744b145 100644
--- a/infrastructure/nomadxt/docker-compose.yml
+++ b/infrastructure/nomadxt/docker-compose.yml
@@ -54,6 +54,7 @@ services:
             - RABBITMQ_DEFAULT_USER=rabbitmq
             - RABBITMQ_DEFAULT_PASS=rabbitmq
             - RABBITMQ_DEFAULT_VHOST=/
+            - RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=-rabbit log_levels [{default,debug}]
         ports:
             - ${RABBITMQ_HOST_PORT}:5672
         volumes:
diff --git a/nomad/api.py b/nomad/api.py
index 639a726927b8da8442afdac6d8a72743fa37ed41..17706851fadeb435c9e5ee8ecadcb9e13882f5e6 100644
--- a/nomad/api.py
+++ b/nomad/api.py
@@ -4,8 +4,10 @@ from flask_cors import CORS
 from elasticsearch.exceptions import NotFoundError
 
 from nomad import config, files
-from nomad.utils import get_logger
-from nomad.data import Calc, Upload, User, InvalidId, NotAllowedDuringProcessing, me
+from nomad.utils import get_logger, create_uuid
+from nomad.processing import Upload, Calc, InvalidId, NotAllowedDuringProcessing
+from nomad.search import CalcElasticDocument
+from nomad.user import me
 
 base_path = config.services.api_base_path
 
@@ -135,7 +137,8 @@ class UploadsRes(Resource):
         if json_data is None:
             json_data = {}
 
-        return Upload.create(user=me, name=json_data.get('name', None)).json_dict, 200
+        upload = Upload.create(upload_id=create_uuid(), user=me, name=json_data.get('name'))
+        return upload.json_dict, 200
 
 
 class UploadRes(Resource):
@@ -191,17 +194,18 @@ class UploadRes(Resource):
         :param string upload_id: the id for the upload
         :resheader Content-Type: application/json
         :status 200: upload successfully updated and retrieved
-        :status 400: bad upload id
         :status 404: upload with id does not exist
         :returns: the :class:`nomad.data.Upload` instance
         """
+        # TODO calc paging
         try:
-            return Upload.get(upload_id=upload_id).json_dict, 200
-        except InvalidId:
-            abort(400, message='%s is not a valid upload id.' % upload_id)
+            result = Upload.get(upload_id).json_dict
         except KeyError:
             abort(404, message='Upload with id %s does not exist.' % upload_id)
 
+        result['calcs'] = [calc.json_dict for calc in Calc.objects(upload_id=upload_id)]
+        return result, 200
+
     def delete(self, upload_id):
         """
         Deletes an existing upload. Only ``is_ready`` or ``is_stale`` uploads
@@ -224,9 +228,9 @@ class UploadRes(Resource):
         :returns: the :class:`nomad.data.Upload` instance with the latest processing state
         """
         try:
-            return Upload.get(upload_id=upload_id).delete().json_dict, 200
-        except InvalidId:
-            abort(400, message='%s is not a valid upload id.' % upload_id)
+            upload = Upload.get(upload_id)
+            upload.delete()
+            return upload.json_dict, 200
         except KeyError:
             abort(404, message='Upload with id %s does not exist.' % upload_id)
         except NotAllowedDuringProcessing:
@@ -236,7 +240,7 @@ class UploadRes(Resource):
 class RepoCalcRes(Resource):
     def get(self, upload_hash, calc_hash):
         try:
-            return Calc.get(id='%s/%s' % (upload_hash, calc_hash)).json_dict, 200
+            return CalcElasticDocument.get(id='%s/%s' % (upload_hash, calc_hash)).json_dict, 200
         except NotFoundError:
             abort(404, message='There is no calculation for %s/%s' % (upload_hash, calc_hash))
         except Exception as e:
@@ -255,7 +259,7 @@ class RepoCalcsRes(Resource):
         assert per_page > 0
 
         try:
-            search = Calc.search().query('match_all')
+            search = CalcElasticDocument.search().query('match_all')
             search = search[(page - 1) * per_page: page * per_page]
             return {
                 'pagination': {
diff --git a/nomad/data.py b/nomad/data.py
deleted file mode 100644
index b73e647460526da8539899e4c9d7235c9a35ad30..0000000000000000000000000000000000000000
--- a/nomad/data.py
+++ /dev/null
@@ -1,400 +0,0 @@
-# Copyright 2018 Markus Scheidgen
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an"AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-This module comprises a set of persistent document classes that hold all user related
-data. These are information about users, their uploads and datasets, the associated
-calculations, and files
-
-
-.. autoclass:: Calc
-    :members:
-.. autoclass:: Upload
-    :members:
-.. autoclass:: DataSet
-.. autoclass:: User
-
-"""
-
-from typing import List
-import sys
-from datetime import datetime
-import elasticsearch.exceptions
-from elasticsearch_dsl import Document as ElasticDocument, Date, Keyword, Search, connections
-from mongoengine import \
-    Document, EmailField, StringField, BooleanField, DateTimeField, \
-    ListField, DictField, ReferenceField, connect
-import mongoengine.errors
-
-from nomad import config, files
-from nomad.processing import UploadProc, CalcProc
-from nomad.parsing import LocalBackend
-from nomad.utils import get_logger, lnr
-
-logger = get_logger(__name__)
-
-# ensure elastic and mongo connections
-if 'sphinx' not in sys.modules:
-    client = connections.create_connection(hosts=[config.elastic.host])
-    connect(db=config.mongo.users_db, host=config.mongo.host)
-
-key_mappings = {
-    'basis_set_type': 'program_basis_set_type',
-    'chemical_composition': 'chemical_composition_bulk_reduced'
-}
-
-
-class AlreadyExists(Exception): pass
-
-
-class InvalidId(Exception): pass
-
-
-class NotAllowedDuringProcessing(Exception): pass
-
-
-class Calc(ElasticDocument):
-    """
-    Instances of this class represent calculations. This class manages the elastic
-    search index entry, files, and archive for the respective calculation.
-    Instances should be created directly, but via the static :func:`create_from_backend`
-    function.
-
-    The attribute list, does not include the various repository properties generated
-    while parsing, including ``program_name``, ``program_version``, etc.
-
-    Attributes:
-        calc_hash: The hash that identified the calculation within an upload
-        upload_hash: The hash of the upload
-        upload_id: The id of the upload used to create this calculation
-        mainfile: The mainfile (including path in upload) that was used to create this calc
-    """
-    calc_hash = Keyword()
-
-    upload_time = Date()
-    upload_id = Keyword()
-    upload_hash = Keyword()
-    mainfile = Keyword()
-
-    program_name = Keyword()
-    program_version = Keyword()
-
-    chemical_composition = Keyword()
-    basis_set_type = Keyword()
-    atom_species = Keyword()
-    system_type = Keyword()
-    crystal_system = Keyword()
-    space_group_number = Keyword()
-    configuration_raw_gid = Keyword()
-    XC_functional_name = Keyword()
-
-    class Index:
-        name = config.elastic.calc_index
-
-    @property
-    def archive_id(self) -> str:
-        """ The unique id for this calculation. """
-        return '%s/%s' % (self.upload_hash, self.calc_hash)
-
-    def delete(self):
-        """
-        Delete this calculation and all associated data. This includes all files,
-        the archive, and this search index entry.
-        """
-        # delete the archive
-        files.delete_archive(self.archive_id)
-
-        # delete the search index entry
-        super().delete()
-
-    @staticmethod
-    def es_search(body):
-        """ Perform an elasticsearch and not elasticsearch_dsl search on the Calc index. """
-        return client.search(index=config.elastic.calc_index, body=body)
-
-    @staticmethod
-    def delete_all(**kwargs):
-        for calc in Calc.search().query('match', **kwargs).execute():
-            calc.delete()
-
-    @staticmethod
-    def create_from_backend(
-            backend: LocalBackend, upload_id: str, upload_hash: str, calc_hash: str, **kwargs) \
-            -> 'Calc':
-        """
-        Create a new calculation instance. The data from the given backend
-        will be used. Additional meta-data can be given as *kwargs*. ``upload_id``,
-        ``upload_hash``, and ``calc_hash`` are mandatory.
-        This will create a elastic search entry and store the backend data to the
-        archive.
-
-        Arguments:
-            backend: The parsing/normalizing backend that contains the calculation data.
-            upload_hash: The upload hash of the originating upload.
-            upload_id: The upload id of the originating upload.
-            calc_hash: The upload unique hash for this calculation.
-            kwargs: Additional arguments not stored in the backend.
-
-        Raises:
-            AlreadyExists: If the calculation already exists in elastic search. We use
-                the elastic document lock here. The elastic document is ided via the
-                ``archive_id``.
-        """
-        assert upload_hash is not None and calc_hash is not None and upload_id is not None
-        kwargs.update(dict(upload_hash=upload_hash, calc_hash=calc_hash, upload_id=upload_id))
-
-        calc = Calc(meta=dict(id='%s/%s' % (upload_hash, calc_hash)))
-
-        for property in Calc._doc_type.mapping:
-            property = key_mappings.get(property, property)
-
-            if property in kwargs:
-                value = kwargs[property]
-            else:
-                try:
-                    value = backend.get_value(property, 0)
-                except KeyError:
-                    logger.warning(
-                        'Missing property value', property=property, upload_id=upload_id,
-                        upload_hash=upload_hash, calc_hash=calc_hash)
-                    continue
-
-            setattr(calc, property, value)
-
-        # persist to elastic search
-        try:
-            calc.save(op_type='create')
-        except Exception:
-            raise AlreadyExists('Calculation %s does already exist.' % (calc.archive_id))
-
-        # persist the archive
-        with files.write_archive_json(calc.archive_id) as out:
-            backend.write_json(out, pretty=True)
-
-        return calc
-
-    @property
-    def json_dict(self):
-        """ A json serializable dictionary representation. """
-        data = self.to_dict()
-
-        upload_time = data.get('upload_time', None)
-        if upload_time is not None and isinstance(upload_time, datetime):
-            data['upload_time'] = data['upload_time'].isoformat()
-
-        data['archive_id'] = self.archive_id
-
-        return {key: value for key, value in data.items() if value is not None}
-
-    @staticmethod
-    def upload_exists(upload_hash):
-        """ Returns true if there are already calcs from the given upload. """
-        search = Search(using=client, index=config.elastic.calc_index) \
-            .query('match', upload_hash=upload_hash) \
-            .execute()
-
-        return len(search) > 0
-
-
-if 'sphinx' not in sys.modules:
-    try:
-        Calc.init()
-    except elasticsearch.exceptions.RequestError as e:
-        if e.status_code == 400 and 'resource_already_exists_exception' in e.error:
-            pass  # happens if two services try this at the same time
-        else:
-            raise e
-
-
-class User(Document):
-    """ Represents users in the database. """
-    email = EmailField(primary=True)
-    name = StringField()
-
-
-class Upload(Document):
-    """
-    Represents uploads in the databases. Provides persistence access to the files storage,
-    and processing state.
-
-    Attributes:
-        file_name: Optional user provided upload name
-        upload_id: The upload id. Generated by the database.
-        in_staging: True if the upload is still in staging and can be edited by the uploader.
-        is_private: True if the upload and its derivitaves are only visible to the uploader.
-        proc: The :class:`nomad.processing.UploadProc` that holds the processing state.
-        created_time: The timestamp this upload was created.
-        upload_time: The timestamp when the system realised the upload.
-        proc_time: The timestamp when the processing realised finished by the system.
-    """
-
-    name = StringField(default=None)
-
-    in_staging = BooleanField(default=True)
-    is_private = BooleanField(default=False)
-
-    presigned_url = StringField()
-    upload_time = DateTimeField()
-    create_time = DateTimeField()
-
-    proc_time = DateTimeField()
-    proc = DictField()
-
-    user = ReferenceField(User, required=True)
-
-    meta = {
-        'indexes': [
-            'proc.upload_hash',
-            'user'
-        ]
-    }
-
-    @staticmethod
-    def user_uploads(user: User) -> List['Upload']:
-        """ Returns all uploads for the given user. Currently returns all uploads. """
-        return [upload.update_proc() for upload in Upload.objects()]
-
-    @staticmethod
-    def get(upload_id: str) -> 'Upload':
-        try:
-            upload = Upload.objects(id=upload_id).first()
-        except mongoengine.errors.ValidationError:
-            raise InvalidId('Invalid upload id')
-
-        if upload is None:
-            raise KeyError('Upload does not exist')
-
-        upload.update_proc()
-        return upload
-
-    def logger(self, **kwargs):
-        return get_logger(__name__, upload_id=self.upload_id, cls='Upload', **kwargs)
-
-    def delete(self) -> 'Upload':
-        logger = self.logger(action='delete')
-
-        self.update_proc()
-        if not (self.is_ready or self.is_stale or self._proc.current_task_name == 'uploading'):
-            raise NotAllowedDuringProcessing()
-
-        with lnr(logger, 'Delete upload file'):
-            try:
-                files.Upload(self.upload_id).delete()
-            except KeyError:
-                if self._proc.current_task_name == 'uploading':
-                    logger.debug('Upload exist, but file does not exist. It was probably aborted and deleted.')
-                else:
-                    logger.debug('Upload exist, but uploaded file does not exist.')
-
-        if self._proc.upload_hash is not None:
-            with lnr(logger, 'Deleting calcs'):
-                Calc.delete_all(upload_id=self.upload_id)
-
-        with lnr(logger, 'Deleting upload'):
-            super().delete()
-
-        return self
-
-    @staticmethod
-    def create(user: User, name: str=None) -> 'Upload':
-        """
-        Creates a new upload for the given user, a user given name is optional.
-        It will populate the record with a signed url and pending :class:`UploadProc`.
-        The upload will be already saved to the database.
-        """
-        upload = Upload(user=user, name=name)
-        upload.save()
-
-        upload.presigned_url = files.get_presigned_upload_url(upload.upload_id)
-        upload.create_time = datetime.now()
-        upload.proc = UploadProc(upload.upload_id)
-        upload.save()
-
-        upload.update_proc()
-
-        return upload
-
-    @property
-    def upload_id(self) -> str:
-        return self.id.__str__()
-
-    @property
-    def is_stale(self) -> bool:
-        proc = self._proc
-        if proc.current_task_name == proc.task_names[0] and self.upload_time is None:
-            return (datetime.now() - self.create_time).days > 1
-        else:
-            return False
-
-    @property
-    def is_ready(self) -> bool:
-        return self._proc.status in ['SUCCESS', 'FAILURE']
-
-    @property
-    def _proc(self):
-        """ Cast the internal mongo dict to an actual :class:`UploadProc` instance. """
-        # keep the instance cached
-        if '__proc' not in self.__dict__:
-            self.__dict__['__proc'] = UploadProc(**self.proc)
-
-        return self.__dict__['__proc']
-
-    def update_proc(self) -> 'Upload':
-        """ Updates this instance with information from the celery results backend. """
-        if self._proc.update_from_backend():
-            self.proc = self._proc
-            self.save()
-
-        return self
-
-    @property
-    def json_dict(self) -> dict:
-        """ A json serializable dictionary representation. """
-        data = {
-            'name': self.name,
-            'upload_id': self.upload_id,
-            'presigned_url': files.external_objects_url(self.presigned_url),
-            'create_time': self.create_time.isoformat() if self.create_time is not None else None,
-            'upload_time': self.upload_time.isoformat() if self.upload_time is not None else None,
-            'proc_time': self.proc_time.isoformat() if self.proc_time is not None else None,
-            'is_stale': self.is_stale,
-            'is_ready': self.is_ready,
-            'proc': self._proc
-        }
-        return {key: value for key, value in data.items() if value is not None}
-
-
-class DataSet(Document):
-    name = StringField()
-    description = StringField()
-    doi = StringField()
-
-    user = ReferenceField(User)
-    calcs = ListField(StringField)
-
-    meta = {
-        'indexes': [
-            'user',
-            'doi',
-            'calcs'
-        ]
-    }
-
-# provid a fake user for testing
-me = None
-if 'sphinx' not in sys.modules:
-    me = User.objects(email='me@gmail.com').first()
-    if me is None:
-        me = User(email='me@gmail.com', name='Me Meyer')
-        me.save()
diff --git a/nomad/processing/__init__.py b/nomad/processing/__init__.py
index 27448b50bee390b22b8e56119bf4cc6db6c72acd..87e89555ee32e25a5faded3e201b7e8587d8f419 100644
--- a/nomad/processing/__init__.py
+++ b/nomad/processing/__init__.py
@@ -51,7 +51,6 @@ Initiate processing
 
 """
 
-from nomad.processing.app import app
-from nomad.processing import tasks
-from nomad.processing.state import ProcPipeline, UploadProc, CalcProc
-from nomad.processing.handler import handle_uploads, handle_uploads_thread, start_processing
+from nomad.processing.base import app, InvalidId, ProcNotRegistered
+from nomad.processing.data import Upload, Calc, NotAllowedDuringProcessing
+from nomad.processing.handler import handle_uploads, handle_uploads_thread
diff --git a/nomad/processing/app.py b/nomad/processing/app.py
deleted file mode 100644
index 5aecae6307c2815d45e558b5783a24d917e2994b..0000000000000000000000000000000000000000
--- a/nomad/processing/app.py
+++ /dev/null
@@ -1,287 +0,0 @@
-# Copyright 2018 Markus Scheidgen
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an"AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from typing import List
-from contextlib import contextmanager
-import inspect
-import logging
-import celery
-from celery import Task
-from celery.signals import after_setup_task_logger, after_setup_logger, worker_process_init
-from mongoengine import Document, StringField, ListField, DateTimeField, IntField, \
-    ReferenceField, connect
-from pymongo import ReturnDocument
-from datetime import datetime
-
-from nomad import config, utils
-import nomad.patch  # pylint: disable=unused-import
-
-
-class Celery(celery.Celery):
-
-    def mongo_connect(self):
-        return connect(db=config.mongo.users_db, host=config.mongo.host)
-
-    def __init__(self):
-        if config.logstash.enabled:
-            def initialize_logstash(logger=None, loglevel=logging.DEBUG, **kwargs):
-                utils.add_logstash_handler(logger)
-                return logger
-
-            after_setup_task_logger.connect(initialize_logstash)
-            after_setup_logger.connect(initialize_logstash)
-
-        worker_process_init.connect(lambda **kwargs: self.mongo_connect())
-
-        super().__init__(
-            'nomad.processing', backend=config.celery.backend_url, broker=config.celery.broker_url)
-
-        self.add_defaults(dict(
-            accept_content=['json', 'pickle'],
-            task_serializer=config.celery.serializer,
-            result_serializer=config.celery.serializer,
-        ))
-
-
-app = Celery()
-
-
-PENDING = 'PENDING'
-RUNNING = 'RUNNING'
-FAILURE = 'FAILURE'
-SUCCESS = 'SUCCESS'
-
-
-class InvalidId(Exception): pass
-
-
-class AsyncDocumentNotRegistered(Exception): pass
-
-
-class Proc(Document):
-    """
-    Base class for objects involved in processing and need persistent processing
-    state.
-
-    Attributes:
-        current_task: the currently running or last completed task
-        status: the overall status of the processing
-        errors: a list of errors that happened during processing. Error fail a processing
-            run
-        warnings: a list of warnings that happened during processing. Warnings do not
-            fail a processing run
-        create_time: the time of creation (not the start of processing)
-        proc_time: the time that processing completed (successfully or not)
-    """
-
-    meta = {
-        'abstract': True,
-    }
-
-    tasks: List[str] = None
-    """ the ordered list of tasks that comprise a processing run """
-
-    current_task = StringField(default=None)
-    status = StringField(default='CREATED')
-
-    errors = ListField(StringField, default=[])
-    warnings = ListField(StringField, default=[])
-
-    create_time = DateTimeField(required=True)
-    complete_time = DateTimeField()
-
-    @property
-    def completed(self) -> bool:
-        return self.status in [SUCCESS, FAILURE]
-
-    def __init__(self, **kwargs):
-        assert self.__class__.tasks is not None and len(self.tasks) > 0, \
-            """ the class attribute tasks must be overwritten with an acutal list """
-        assert 'status' not in kwargs, \
-            """ do not set the status manually, its managed """
-
-        kwargs.setdefault('create_time', datetime.now())
-        super().__init__(**kwargs)
-        self.status = PENDING if self.current_task is None else RUNNING
-
-    @classmethod
-    def get(cls, obj_id):
-        try:
-            obj = cls.objects(id=obj_id).first()
-        except Exception as e:
-            raise InvalidId('%s is not a valid id' % obj_id)
-
-        if obj is None:
-            raise KeyError('%s with id %s does not exist' % (cls.__name__, obj_id))
-
-        return obj
-
-    def get_id(self):
-        return self.id.__str__()
-
-    def fail(self, error):
-        assert not self.completed
-
-        self.status = FAILURE
-        self.errors = [str(error) for error in errors]
-        self.complete_time = datetime.now()
-
-        self.save()
-
-    def warning(self, *warnings):
-        assert not self.completed
-
-        for warning in warnings:
-            self.warnings.append(str(warning))
-
-    def continue_with(self, task):
-        assert task in self.tasks
-        assert self.tasks.index(task) == self.tasks.index(self.current_task) + 1
-
-        if self.status == FAILURE:
-            return False
-
-        if self.status == PENDING:
-            assert self.current_task is None
-            assert task == self.tasks[0]
-            self.status = RUNNING
-
-        self.current_task = task
-        self.save()
-        return True
-
-    def complete(self):
-        if self.status != FAILURE:
-            assert self.status == RUNNING
-            self.status = SUCCESS
-            self.save()
-
-    @property
-    def json_dict(self) -> dict:
-        """ A json serializable dictionary representation. """
-        data = {
-            'tasks': self.tasks,
-            'current_task': self.current_task,
-            'status': self.status,
-            'errors': self.errors,
-            'warnings': self.warnings,
-            'create_time': self.create_time.isoformat() if self.create_time is not None else None,
-            'complete_time': self.complete_time.isoformat() if self.complete_time is not None else None,
-        }
-        return {key: value for key, value in data.items() if value is not None}
-
-
-def task(func):
-    def wrapper(self, *args, **kwargs):
-        if self.status == 'FAILURE':
-            return
-
-        self.continue_with(func.__name__)
-        try:
-            func(self, *args, **kwargs)
-        except Exception as e:
-            self.fail(e)
-
-    return wrapper
-
-
-def process(func):
-    @app.task(bind=True, name=func.__name__, ignore_results=True)
-    def task_func(task, cls_name, self_id):
-        all_cls = AsyncDocument.__subclasses__()
-        cls = next((cls for cls in all_cls if cls.__name__ == cls_name), None)
-        if cls is None:
-            raise AsyncDocumentNotRegistered('Document type %s not registered for async methods' % cls_name)
-
-        self = cls.get(self_id)
-
-        try:
-            func(self)
-        except Exception as e:
-            self.fail(e)
-            raise e
-
-    def wrapper(self, *args, **kwargs):
-        assert len(args) == 0 and len(kwargs) == 0
-        self.save()
-
-        self_id = self.get_id()
-        cls_name = self.__class__.__name__
-
-        return task_func.s(cls_name, self_id).delay()
-
-    return wrapper
-
-
-class Upload(Proc):
-
-    data = StringField(default='Hello, World')
-    processed_calcs = IntField(default=0)
-    total_calcs = IntField(default=-1)
-
-    @process
-    def after_upload(self):
-        self.extract()
-        self.parse()
-
-    @process
-    def after_parse(self):
-        self.cleanup()
-
-    @task
-    def extract(self):
-        print('now extracting')
-
-    @task
-    def parse(self):
-        Calc(upload=self).parse()
-        Calc(upload=self).parse()
-        self.total_calcs = 2
-        self.save()
-        self.check_calcs_complete(more_calcs=0)
-
-    @task
-    def cleanup(self):
-        print('cleanup')
-
-    def check_calcs_complete(self, more_calcs=1):
-        # use a primitive but atomic pymongo call to increase the number of calcs
-        updated_raw = Upload._get_collection().find_one_and_update(
-            {'_id': self.id},
-            {'$inc': {'processed_calcs': more_calcs}},
-            return_document=ReturnDocument.AFTER)
-
-        updated_total_calcs = updated_raw['total_calcs']
-        updated_processed_calcs = updated_raw['processed_calcs']
-
-        print('%d:%d' % (updated_processed_calcs, updated_total_calcs))
-        if updated_processed_calcs == updated_total_calcs and updated_total_calcs != -1:
-            self.after_parse()
-
-
-class Calc(Proc):
-
-    upload = ReferenceField(Upload, required=True)
-
-    @process
-    def parse(self):
-        print('parsee')
-        self.upload.check_calcs_complete()
-
-
-if __name__ == '__main__':
-    connect(db=config.mongo.users_db, host=config.mongo.host)
-    tds = [Upload(), Upload(), Upload()]
-    for td in tds:
-        td.after_upload()
diff --git a/nomad/proc.py b/nomad/processing/base.py
similarity index 81%
rename from nomad/proc.py
rename to nomad/processing/base.py
index a4c451c12c674717512d2f0235fb5b60600ff406..ae09173d96b6faf402002ea3a1b70acdc98a7b42 100644
--- a/nomad/proc.py
+++ b/nomad/processing/base.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from typing import List, cast
+from typing import List, cast, Any
 import types
 from contextlib import contextmanager
 import collections
@@ -28,6 +28,7 @@ from mongoengine.connection import MongoEngineConnectionError
 from mongoengine.base.metaclasses import TopLevelDocumentMetaclass
 from pymongo import ReturnDocument
 from datetime import datetime
+import sys
 
 from nomad import config, utils
 import nomad.patch  # pylint: disable=unused-import
@@ -57,6 +58,10 @@ app.add_defaults(dict(
     result_serializer=config.celery.serializer,
 ))
 
+# ensure elastic and mongo connections
+if 'sphinx' not in sys.modules:
+    connect(db=config.mongo.users_db, host=config.mongo.host)
+
 
 PENDING = 'PENDING'
 RUNNING = 'RUNNING'
@@ -67,7 +72,7 @@ SUCCESS = 'SUCCESS'
 class InvalidId(Exception): pass
 
 
-class AsyncDocumentNotRegistered(Exception): pass
+class ProcNotRegistered(Exception): pass
 
 
 class ProcMetaclass(TopLevelDocumentMetaclass):
@@ -113,7 +118,7 @@ class Proc(Document, metaclass=ProcMetaclass):
         proc_time: the time that processing completed (successfully or not)
     """
 
-    meta = {
+    meta: Any = {
         'abstract': True,
     }
 
@@ -134,6 +139,11 @@ class Proc(Document, metaclass=ProcMetaclass):
         """ Returns True of the process has failed or succeeded. """
         return self.status in [SUCCESS, FAILURE]
 
+    def get_logger(self):
+        return utils.get_logger(
+            __name__, current_task=self.current_task, process=self.__class__.__name__,
+            status=self.status)
+
     @classmethod
     def create(cls, **kwargs):
         """ Factory method that must be used instead of regular constructor. """
@@ -150,37 +160,72 @@ class Proc(Document, metaclass=ProcMetaclass):
         return self
 
     @classmethod
-    def get(cls, obj_id):
-        """ Loads the object from the database. """
+    def get_by_id(cls, id: str, id_field: str):
         try:
-            obj = cls.objects(id=str(obj_id)).first()
+            obj = cls.objects(**{id_field: id}).first()
         except ValidationError as e:
-            raise InvalidId('%s is not a valid id' % obj_id)
+            raise InvalidId('%s is not a valid id' % id)
         except MongoEngineConnectionError as e:
             raise e
 
         if obj is None:
-            raise KeyError('%s with id %s does not exist' % (cls.__name__, obj_id))
+            raise KeyError('%s with id %s does not exist' % (cls.__name__, id))
 
         return obj
 
-    def fail(self, *errors):
+    @classmethod
+    def get(cls, obj_id):
+        return cls.get_by_id(str(obj_id), 'id')
+
+    @staticmethod
+    def log(logger, log_level, msg, **kwargs):
+        # TODO there seems to be a bug in structlog, cannot use logger.log
+        if log_level == logging.ERROR:
+            logger.error(msg, **kwargs)
+        elif log_level == logging.WARNING:
+            logger.warning(msg, **kwargs)
+        elif log_level == logging.INFO:
+            logger.info(msg, **kwargs)
+        elif log_level == logging.DEBUG:
+            logger.debug(msg, **kwargs)
+        else:
+            logger.critical(msg, **kwargs)
+
+    def fail(self, *errors, log_level=logging.ERROR, **kwargs):
         """ Allows to fail the process. Takes strings or exceptions as args. """
-        assert not self.completed
+        assert not self.completed, 'Cannot fail a completed process.'
+
+        failed_with_exception = False
 
         self.status = FAILURE
+
+        logger = self.get_logger(**kwargs)
+        for error in errors:
+            if isinstance(error, Exception):
+                failed_with_exception = True
+                Proc.log(logger, log_level, 'task failed with exception', exc_info=error, **kwargs)
+
         self.errors = [str(error) for error in errors]
         self.complete_time = datetime.now()
 
-        print(self.errors)
+        if not failed_with_exception:
+            errors_str = "; ".join([str(error) for error in errors])
+            Proc.log(logger, log_level, 'task failed', errors=errors_str, **kwargs)
+
+        logger.debug('process failed')
+
         self.save()
 
-    def warning(self, *warnings):
+    def warning(self, *warnings, log_level=logging.warning, **kwargs):
         """ Allows to save warnings. Takes strings or exceptions as args. """
         assert not self.completed
 
+        logger = self.get_logger(**kwargs)
+
         for warning in warnings:
-            self.warnings.append(str(warning))
+            warning = str(warning)
+            self.warnings.append(warning)
+            logger.log('task with warning', warning=warning, level=log_level)
 
     def _continue_with(self, task):
         tasks = self.__class__.tasks
@@ -198,16 +243,21 @@ class Proc(Document, metaclass=ProcMetaclass):
             assert self.current_task is None
             assert task == tasks[0]
             self.status = RUNNING
+            self.current_task = task
+            self.get_logger().debug('started process')
+        else:
+            self.current_task = task
+            self.get_logger().debug('successfully completed task')
 
-        self.current_task = task
         self.save()
         return True
 
     def _complete(self):
         if self.status != FAILURE:
-            assert self.status == RUNNING
+            assert self.status == RUNNING, 'Can only complete a running process.'
             self.status = SUCCESS
             self.save()
+            self.get_logger().debug('completed process')
 
     def incr_counter(self, field, value=1, other_fields=None):
         """
@@ -254,6 +304,7 @@ class Proc(Document, metaclass=ProcMetaclass):
             'tasks': getattr(self.__class__, 'tasks'),
             'current_task': self.current_task,
             'status': self.status,
+            'completed': self.completed,
             'errors': self.errors,
             'warnings': self.warnings,
             'create_time': self.create_time.isoformat() if self.create_time is not None else None,
@@ -281,7 +332,7 @@ def task(func):
         except Exception as e:
             self.fail(e)
 
-        if self.__class__.tasks[-1] == self.current_task:
+        if self.__class__.tasks[-1] == self.current_task and not self.completed:
             self._complete()
 
     setattr(wrapper, '__task_name', func.__name__)
@@ -305,7 +356,7 @@ def proc_task(task, cls_name, self_id, func_attr):
     cls = next((cls for cls in all_cls if cls.__name__ == cls_name), None)
     if cls is None:
         logger.error('document not a subcass of Proc')
-        raise AsyncDocumentNotRegistered('document type %s not a subclass of Proc' % cls_name)
+        raise ProcNotRegistered('document %s not a subclass of Proc' % cls_name)
 
     try:
         self = cls.get(self_id)
diff --git a/nomad/processing/data.py b/nomad/processing/data.py
new file mode 100644
index 0000000000000000000000000000000000000000..832da1b85be7456a47537fdd6768b8197a7a82d1
--- /dev/null
+++ b/nomad/processing/data.py
@@ -0,0 +1,369 @@
+# Copyright 2018 Markus Scheidgen
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an"AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module comprises a set of persistent document classes that hold all user related
+data. These are information about users, their uploads and datasets, the associated
+calculations, and files
+
+
+.. autoclass:: Calc
+    :members:
+.. autoclass:: Upload
+    :members:
+.. autoclass:: DataSet
+.. autoclass:: User
+
+"""
+
+from typing import List, Any
+import sys
+from datetime import datetime
+from mongoengine import \
+    Document, EmailField, StringField, BooleanField, DateTimeField, \
+    ListField, DictField, ReferenceField, IntField, connect
+import mongoengine.errors
+import logging
+
+from nomad import config, files, utils
+from nomad.search import CalcElasticDocument
+from nomad.user import User, me
+from nomad.processing.base import Proc, process, task, PENDING
+from nomad.parsing import LocalBackend, parsers, parser_dict
+from nomad.normalizing import normalizers
+from nomad.utils import get_logger, lnr
+
+
+class NotAllowedDuringProcessing(Exception): pass
+
+
+class Calc(Proc):
+    """
+    Instances of this class represent calculations. This class manages the elastic
+    search index entry, files, and archive for the respective calculation.
+
+    It also contains the calculations processing and its state.
+
+    The attribute list, does not include the various repository properties generated
+    while parsing, including ``program_name``, ``program_version``, etc.
+
+    Attributes:
+        archive_id: the hash based archive id of the calc
+        parser: the name of the parser used to process this calc
+        upload_id: the id of the upload used to create this calculation
+        mainfile: the mainfile (including path in upload) that was used to create this calc
+        mainfile_tmp_path: path to the mainfile extracted for processing
+    """
+    archive_id = StringField(primary_key=True)
+    upload_id = StringField()
+    mainfile = StringField()
+    parser = StringField()
+    mainfile_tmp_path = StringField()
+
+    meta: Any = {
+        'indices': [
+            'upload_id'
+        ]
+    }
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self._parser_backend = None
+        self._upload = None
+
+    @classmethod
+    def get(cls, id):
+        return cls.get_by_id(id, 'archive_id')
+
+    def delete(self):
+        """
+        Delete this calculation and all associated data. This includes all files,
+        the archive, and this search index entry.
+        """
+        # delete the archive
+        if self.archive_id is not None:
+            files.delete_archive(self.archive_id)
+
+        # delete the search index entry
+        elastic_entry = CalcElasticDocument.get(self.archive_id)
+        if elastic_entry is not None:
+            elastic_entry.delete()
+
+        # delete this mongo document
+        super().delete()
+
+    def get_logger(self, **kwargs):
+        upload_hash, calc_hash = self.archive_id.split('/')
+        logger = super().get_logger()
+        logger = logger.bind(
+            upload_id=self.upload_id, mainfile=self.mainfile,
+            upload_hash=upload_hash, calc_hash=calc_hash, **kwargs)
+        return logger
+
+    @property
+    def json_dict(self):
+        """ A json serializable dictionary representation. """
+        data = {
+            'archive_id': self.archive_id,
+            'mainfile': self.mainfile,
+            'upload_id': self.upload_id,
+            'parser': self.parser
+        }
+        data.update(super().json_dict)
+        return {key: value for key, value in data.items() if value is not None}
+
+    @process
+    def process(self):
+        self._upload = Upload.get(self.upload_id)
+        if self._upload is None:
+            get_logger().error('calculation upload does not exist')
+
+        try:
+            self.parsing()
+            self.normalizing()
+            self.archiving()
+        finally:
+            self._upload.calc_proc_completed()
+
+    @task
+    def parsing(self):
+        self._parser_backend = parser_dict[self.parser].run(self.mainfile_tmp_path)
+        if self._parser_backend.status[0] != 'ParseSuccess':
+            error = self._parser_backend.status[1]
+            self.fail(error, level=logging.DEBUG)
+
+    @task
+    def normalizing(self):
+        for normalizer in normalizers:
+            normalizer_name = normalizer.__name__
+            normalizer(self._parser_backend).normalize()
+            if self._parser_backend.status[0] != 'ParseSuccess':
+                error = self._parser_backend.status[1]
+                self.fail(error, normalizer=normalizer_name, level=logging.WARNING)
+                return
+            self.get_logger().debug(
+                'completed normalizer successfully', normalizer=normalizer_name)
+
+    @task
+    def archiving(self):
+        upload_hash, calc_hash = self.archive_id.split('/')
+        # persist to elastic search
+        CalcElasticDocument.create_from_backend(
+            self._parser_backend,
+            upload_hash=upload_hash,
+            calc_hash=calc_hash,
+            upload_id=self.upload_id,
+            mainfile=self.mainfile,
+            upload_time=self._upload.upload_time)
+
+        # persist the archive
+        with files.write_archive_json(self.archive_id) as out:
+            self._parser_backend.write_json(out, pretty=True)
+
+
+class Upload(Proc):
+    """
+    Represents uploads in the databases. Provides persistence access to the files storage,
+    and processing state.
+
+    Attributes:
+        name: optional user provided upload name
+        additional_metadata: optional user provided additional meta data
+        upload_id: the upload id generated by the database
+        in_staging: true if the upload is still in staging and can be edited by the uploader
+        is_private: true if the upload and its derivitaves are only visible to the uploader
+        presigned_url: the presigned url for file upload
+        upload_time: the timestamp when the system realised the upload
+        upload_hash: the hash of the uploaded file
+    """
+    id_field = 'upload_id'
+
+    upload_id = StringField(primary_key=True)
+
+    name = StringField(default=None)
+    additional_metadata = DictField(default=None)
+
+    in_staging = BooleanField(default=True)
+    is_private = BooleanField(default=False)
+
+    presigned_url = StringField()
+    upload_time = DateTimeField()
+    upload_hash = StringField(default=None)
+
+    processed_calcs = IntField(default=0)
+    total_calcs = IntField(default=-1)
+
+    user = ReferenceField(User, required=True)
+
+    meta: Any = {
+        'indexes': [
+            'upload_hash',
+            'user'
+        ]
+    }
+
+    def __init__(self, **kwargs):
+        super().__init__(**kwargs)
+        self._upload = None
+
+    @classmethod
+    def get(cls, id):
+        return cls.get_by_id(id, 'upload_id')
+
+    @classmethod
+    def user_uploads(cls, user: User) -> List['Upload']:
+        """ Returns all uploads for the given user. Currently returns all uploads. """
+        return cls.objects()
+
+    def get_logger(self, **kwargs):
+        logger = super().get_logger()
+        logger = logger.bind(upload_id=self.upload_id, **kwargs)
+        return logger
+
+    def delete(self):
+        logger = self.get_logger(task='delete')
+
+        if not (self.completed or self.is_stale or self.current_task == 'uploading'):
+            raise NotAllowedDuringProcessing()
+
+        with lnr(logger, 'delete upload file'):
+            try:
+                files.Upload(self.upload_id).delete()
+            except KeyError:
+                if self.current_task == 'uploading':
+                    logger.debug(
+                        'Upload exist, but file does not exist. '
+                        'It was probably aborted and deleted.')
+                else:
+                    logger.debug('Upload exist, but uploaded file does not exist.')
+
+        with lnr(logger, 'deleting calcs'):
+            for calc in Calc.objects(upload_id=self.upload_id):
+                calc.delete()
+
+        with lnr(logger, 'deleting upload'):
+            super().delete()
+
+    @classmethod
+    def create(cls, **kwargs) -> 'Upload':
+        """
+        Creates a new upload for the given user, a user given name is optional.
+        It will populate the record with a signed url and pending :class:`UploadProc`.
+        The upload will be already saved to the database.
+        """
+        self = super().create(**kwargs)
+        self.presigned_url = files.get_presigned_upload_url(self.upload_id)
+        self._continue_with('uploading')
+        return self
+
+    @property
+    def is_stale(self) -> bool:
+        if self.current_task == 'uploading' and self.upload_time is None:
+            return (datetime.now() - self.create_time).days > 1
+        else:
+            return False
+
+    @property
+    def json_dict(self) -> dict:
+        """ A json serializable dictionary representation. """
+        data = {
+            'name': self.name,
+            'additional_metadata': self.additional_metadata,
+            'upload_id': self.upload_id,
+            'presigned_url': files.external_objects_url(self.presigned_url),
+            'upload_time': self.upload_time.isoformat() if self.upload_time is not None else None,
+            'is_stale': self.is_stale,
+        }
+        data.update(super().json_dict)
+        return {key: value for key, value in data.items() if value is not None}
+
+    @process
+    def process(self):
+        self.extracting()
+        self.parse_all()
+
+    @task
+    def uploading(self):
+        pass
+
+    @task
+    def extracting(self):
+        logger = self.get_logger()
+        try:
+            self._upload = files.Upload(self.upload_id)
+            self._upload.open()
+            logger.debug('opened upload')
+        except KeyError as e:
+            self.fail('process request for non existing upload', level=logging.INFO)
+            return
+
+        try:
+            self.upload_hash = self._upload.hash()
+        except files.UploadError as e:
+            self.fail('could not create upload hash', e)
+            return
+
+        if CalcElasticDocument.upload_exists(self.upload_hash):
+            self.fail('The same file was already uploaded and processed.', level=logging.INFO)
+            return
+
+    @task
+    def parse_all(self):
+        # TODO: deal with multiple possible parser specs
+        self.total_calcs = 0
+        for filename in self._upload.filelist:
+            for parser in parsers:
+                try:
+                    if parser.is_mainfile(filename, lambda fn: self._upload.open_file(fn)):
+                        tmp_mainfile = self._upload.get_path(filename)
+                        calc = Calc.create(
+                            archive_id='%s/%s' % (self.upload_hash, utils.hash(filename)),
+                            mainfile=filename, parser=parser.name,
+                            mainfile_tmp_path=tmp_mainfile,
+                            upload_id=self.upload_id)
+
+                        calc.process()
+                        self.total_calcs += 1
+                except Exception as e:
+                    self.warnings(
+                        'exception while matching pot. mainfile',
+                        mainfile=filename, exc_info=e)
+
+        if self.total_calcs == 0:
+            self.cleanup()
+
+        # have to save the total_calcs information
+        self.save()
+
+    @task
+    def cleanup(self):
+        try:
+            upload = files.Upload(self.upload_id)
+        except KeyError as e:
+            upload_proc.fail('Upload does not exist', exc_info=e)
+            return
+
+        upload.close()
+        self.get_logger().debug('closed upload')
+
+    def calc_proc_completed(self):
+        processed_calcs, (total_calcs,) = self.incr_counter(
+            'processed_calcs', other_fields=['total_calcs'])
+
+        if processed_calcs == total_calcs:
+            self.cleanup()
+
+    @property
+    def calcs(self):
+        return Calc.objects(upload_id=self.upload_hash)
diff --git a/nomad/processing/handler.py b/nomad/processing/handler.py
index 2f528c7409ecc47114d7f93762e5eea132b8ffbd..948de664b40c689f8bfb29faa05d8968de70bcc7 100644
--- a/nomad/processing/handler.py
+++ b/nomad/processing/handler.py
@@ -17,40 +17,10 @@ from threading import Thread
 
 from nomad import files, utils
 
-from nomad.processing.tasks import extracting_task, cleanup_task, parse_all_task
-from nomad.processing.state import UploadProc
+from nomad.processing.data import Upload
 from nomad.utils import get_logger, lnr
 
 
-def start_processing(upload_id, proc: UploadProc=None) -> UploadProc:
-    """ Starts the processing tasks via celery canvas. """
-
-    if proc is not None:
-        proc = UploadProc(**proc)
-    else:
-        proc = UploadProc(upload_id)
-
-    # Keep the results of the last task is the workflow.
-    # The last task is started by another task, therefore it
-    # is not the end of the main task chain.
-    finalize = cleanup_task.s()
-    finalize_result = finalize.freeze()
-
-    # start the main chain
-    main_chain = extracting_task.s(proc) | parse_all_task.s(finalize)
-    main_chain_result = main_chain.delay()
-
-    # Create a singular result tree. This might not be the right way to do it.
-    finalize_result.parent = main_chain_result
-
-    # Keep the result as tuple that also includes all parents, i.e. the whole
-    # serializable tree
-    proc.celery_task_ids = finalize_result.as_tuple()
-    proc.status = 'STARTED'
-
-    return proc
-
-
 def handle_uploads(quit=False):
     """
     Starts a daemon that will listen to files for new uploads. For each new
@@ -64,13 +34,12 @@ def handle_uploads(quit=False):
 
     @files.upload_put_handler
     def handle_upload_put(received_upload_id: str):
-        from nomad.data import Upload
         logger = get_logger(__name__, upload_id=received_upload_id)
         logger.debug('Initiate upload processing')
         try:
             with lnr(logger, 'Could not load'):
                 try:
-                    upload = Upload.get(upload_id=received_upload_id)
+                    upload = Upload.get(received_upload_id)
                 except KeyError as e:
                     logger.error('Upload does not exist')
                     raise e
@@ -81,15 +50,11 @@ def handle_uploads(quit=False):
 
             with lnr(logger, 'Save upload time'):
                 upload.upload_time = datetime.now()
-                upload.save()
 
             with lnr(logger, 'Start processing'):
-                proc = start_processing(received_upload_id, proc=upload.proc)
-                assert proc.is_started
-                upload.proc = proc
-                upload.save()
+                upload.process()
 
-        except Exception:
+        except Exception as e:
             logger.error('Exception while handling upload put notification.', exc_info=e)
 
         if quit:
diff --git a/nomad/processing/state.py b/nomad/processing/state.py
deleted file mode 100644
index ef60611fa7b4baa8d6e75e98987bc126dd0fcda4..0000000000000000000000000000000000000000
--- a/nomad/processing/state.py
+++ /dev/null
@@ -1,305 +0,0 @@
-# Copyright 2018 Markus Scheidgen
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an"AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from typing import List, Any, Union, cast
-from celery.result import AsyncResult, result_from_tuple
-import itertools
-import time
-
-from nomad import utils
-from nomad.normalizing import normalizers
-from nomad.processing.app import app
-
-
-class ProcPipeline(utils.DataObject):
-    """
-    Arguments:
-        task_names: A list of task names in pipeline order.
-
-    Attributes:
-        current_task_name: Name of the currently running task.
-        status: Aggregated status for the whole process. Celery status names as convention.
-        errors: A list of potential error that caused failure.
-    """
-    def __init__(self, task_names: List[str], *args, **kwargs) -> None:
-        super().__init__(*args)
-        self.task_names: List[str] = task_names
-        self.current_task_name: str = None
-        self.status: str = 'PENDING'
-        self.errors: List[str] = []
-        self.warnings: List[str] = []
-
-        self.update(kwargs)
-
-    def continue_with(self, task_name: str) -> bool:
-        """ Upadtes itself with information about the new current task. """
-
-        assert self.status != 'SUCCESS', 'Cannot continue on completed workflow.'
-
-        if self.status == 'FAILURE':
-            return False
-        else:
-            self.status = 'PROGRESS'
-            self.current_task_name = task_name
-            return True
-
-    def success(self) -> None:
-        self.status = 'SUCCESS'
-
-    @property
-    def is_started(self) -> bool:
-        """ True, if the task is started. """
-        return self.status is not 'PENDING'
-
-    def fail(self, e: Union[List[str], List[Exception], Exception, str]):
-        """ Allows tasks to mark this processing as failed. All following task will do nothing. """
-        raw_errors: Union[List[str], List[Exception]] = None
-        if isinstance(e, list):
-            raw_errors = e
-        else:
-            raw_errors = cast(Union[List[str], List[Exception]], [e])
-
-        for error in raw_errors:
-            if isinstance(error, str):
-                self.errors.append(error)
-            elif isinstance(error, Exception):
-                self.errors.append(error.__str__())
-            else:
-                assert False, 'Unknown error'
-
-        self.status = 'FAILURE'
-
-
-class CalcProc(ProcPipeline):
-    """
-    Used to represent the state of an calc processing. It is used to provide
-    information to the user (via api, users) and act as a state object within the
-    more complex calc processing task. Keep in mind that this task might become several
-    celery tasks in the future.
-
-    Arguments:
-        upload_hash: The hash that identifies the upload in the archive.
-        mainfile: The path to the mainfile in the upload.
-        parser_name: The name of the parser to use/used.
-        tmp_mainfile: The full path to the mainfile in the local fs.
-
-    Attributes:
-        calc_hash: The mainfile hash that identifies the calc in the archive.
-        celery_task_id: The celery task id for the calc parse celery task.
-    """
-    def __init__(self, mainfile, parser_name, tmp_mainfile, *args, **kwargs):
-        task_names = [
-            [parser_name],
-            [n.__name__ for n in normalizers],
-            ['archiving']
-        ]
-
-        super().__init__(task_names=list(itertools.chain(*task_names)), *args)
-
-        self.mainfile = mainfile
-        self.parser_name = parser_name
-        self.tmp_mainfile = tmp_mainfile
-
-        self.calc_hash = utils.hash(mainfile)
-
-        self.celery_task_id: str = None
-
-        self.update(kwargs)
-
-    def update_from_backend(self) -> bool:
-        """ Consults results backend and updates. Returns if object might have changed. """
-        if self.status in ['FAILURE', 'SUCCESS']:
-            return False
-        if self.celery_task_id is None:
-            return False
-
-        celery_task_result = AsyncResult(self.celery_task_id, app=app)
-        if celery_task_result.ready():
-            self.update(celery_task_result.result)
-            return True
-        else:
-            info = celery_task_result.info
-            if info is not None:
-                self.update(info)
-                return True
-
-        return False
-
-
-class UploadProc(ProcPipeline):
-    """
-    Used to represent the state of an upload processing. It is used to provide
-    information to the user (via api, users) and act as a state object that is passed
-    from celery task to celery task.
-
-    It is serializable (JSON, pickle). Iternaly stores
-    :class:`~celery.results.AsyncResults` instance in serialized *tuple* form to
-    keep connected to the results backend.
-
-    Warning:
-        You have to call :func:`forget` eventually to free all resources and the celery
-        results backend.
-
-        Anyhow, results will be deleted after 1 day, depending on `configuration
-        <http://docs.celeryproject.org/en/latest/userguide/configuration.html#result-expires>`_.
-
-    Arguments:
-        upload_id: The id of the uploaded file in the object storage,
-                   see also :mod:`nomad.files`.
-        task_name: The names of all task in pipeline order.
-
-    Attributes:
-        upload_hash: The hash of the uploaded file. E.g., used for archive/repo ids.
-        calc_procs: The state data for all child calc processings.
-        celery_task_ids: Serialized form of the celery async_results tree for the processing.
-    """
-    def __init__(self, upload_id: str, *args, **kwargs) -> None:
-        assert upload_id is not None
-        # TODO there should be a way to read the names from the tasks
-        # but currently this is not possible due to import cycles
-        task_names = ['uploading', 'extracting', 'parse_all', 'cleanup']
-        super().__init__(task_names, *args)
-
-        self.upload_id = upload_id
-        self.upload_hash: str = None
-
-        self.calc_procs: List[CalcProc] = []
-
-        self.celery_task_ids: Any = None
-
-        self.update(kwargs)
-
-        if not self.is_started:
-            self.continue_with(task_names[0])
-
-    def update(self, dct):
-        # Since the data might be updated from deserialized dicts, list and CalcProc
-        # instances might by dicts, or mongoengines BaseList, BaseDicts. This overwrite
-        # replaces it.
-        # TODO there might be a better solution, or the problem solves itself, when
-        # moving away from mongo.
-        if 'calc_procs' in dct:
-            calc_procs = dct['calc_procs']
-            for idx, calc_proc_dct in enumerate(calc_procs):
-                if not isinstance(calc_proc_dct, CalcProc):
-                    calc_procs[idx] = CalcProc(**calc_proc_dct)
-            if type(calc_procs) != list:
-                dct['calc_procs'] = list(calc_procs)
-
-        super().update(dct)
-
-    @property
-    def _celery_task_result(self) -> AsyncResult:
-        """
-        The celery async_result in its regular usable, but not serializable form.
-
-        We use the tuple form to allow serialization (i.e. storage). Keep in mind
-        that the sheer `task_id` is not enough, because it does not contain
-        the parent tasks, i.e. result tree.
-        See `third comment <https://github.com/celery/celery/issues/1328>`_
-        for details.
-        """
-        assert self.celery_task_ids is not None
-
-        return result_from_tuple(self.celery_task_ids, app=app)
-
-    def update_from_backend(self) -> bool:
-        """
-        Consults the result backend and updates itself with the available results.
-        Will only update not completed processings.
-
-        Returns:
-             If object might have changed.
-        """
-        assert self.is_started, 'Run is not yet started.'
-
-        if self.status in ['SUCCESS', 'FAILURE']:
-            return False
-
-        if self.celery_task_ids is None:
-            return False
-
-        celery_task_result = self._celery_task_result
-        task_index = len(self.task_names)
-        might_have_changed = False
-        while celery_task_result is not None:
-            task_index -= 1
-            if celery_task_result.ready():
-                result = celery_task_result.result
-                if isinstance(result, Exception):
-                    self.fail(result)
-                    self.current_task_name = self.task_names[task_index]
-                    logger = utils.get_logger(
-                        __name__,
-                        upload_id=self.upload_id,
-                        current_task_name=self.current_task_name)
-                    logger.error('Celery task raised exception', exc_info=result)
-                else:
-                    self.update(result)
-                might_have_changed = True
-                break
-            else:
-                if celery_task_result.status == 'PROGRESS':
-                    # get potential info
-                    result = celery_task_result.info
-                    if result is not None:
-                        self.update(result)
-                        break
-
-                celery_task_result = celery_task_result.parent
-
-        if self.calc_procs is not None:
-            for calc_proc in self.calc_procs:
-                if calc_proc.update_from_backend():
-                    might_have_changed = True
-
-        return might_have_changed
-
-    def forget(self) -> None:
-        """ Forget the results of a completed run; free all resources in the results backend. """
-        # TODO, this is not forgetting the parse task in the parse_all header, right?
-        assert self.ready(), 'Run is not completed.'
-
-        celery_task_result = self._celery_task_result
-        while celery_task_result is not None:
-            celery_task_result.forget()
-            celery_task_result = celery_task_result.parent
-
-    def ready(self) -> bool:
-        """ Returns: True if the task has been executed. """
-        self.update_from_backend()
-        return self.status in ['FAILURE', 'SUCCESS']
-
-    def get(self, interval=1, timeout=None) -> 'UploadProc':
-        """
-        Blocks until the processing has finished. It uses the given interval
-        to contineously consult the results backend.
-
-        Arguments:
-            interval: a period to sleep between updates
-            timeout: a rough timeout to terminated, even unfinished
-
-        Returns: An upadted instance of itself with all the results.
-        """
-        assert self.is_started, 'Run is not yet started.'
-
-        slept = 0
-        while not self.ready() and (timeout is None or slept < timeout):
-            time.sleep(interval)
-            slept += interval
-            self.update_from_backend()
-
-        self.update_from_backend()
-
-        return self
diff --git a/nomad/processing/tasks.py b/nomad/processing/tasks.py
deleted file mode 100644
index 1adced9e44b317699ed6bf681db56926ef3a31c1..0000000000000000000000000000000000000000
--- a/nomad/processing/tasks.py
+++ /dev/null
@@ -1,206 +0,0 @@
-# Copyright 2018 Markus Scheidgen
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an"AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from typing import List
-from celery import Task, chord, group
-from celery.canvas import Signature
-from datetime import datetime
-
-from nomad import files, utils
-from nomad.parsing import parsers, parser_dict
-from nomad.normalizing import normalizers
-import nomad.patch  # pylint: disable=unused-import
-
-from nomad.processing.app import app
-from nomad.processing.state import UploadProc, CalcProc
-
-
-def _report_progress(task, dct):
-    if not task.request.called_directly:
-        task.update_state(state='PROGRESS', meta=dct)
-
-
-@app.task(bind=True, name='extracting')
-def extracting_task(task: Task, proc: UploadProc) -> UploadProc:
-    logger = utils.get_logger(__name__, task=task.name, upload_id=proc.upload_id)
-    if not proc.continue_with(task.name):
-        return proc
-
-    _report_progress(task, proc)
-
-    try:
-        upload = files.Upload(proc.upload_id)
-        upload.open()
-        logger.debug('Opened upload')
-    except KeyError as e:
-        logger.info('Process request for non existing upload')
-        proc.fail(e)
-        return proc
-    except files.UploadError as e:
-        logger.info('Could not open upload', error=str(e))
-        proc.fail(e)
-        return proc
-    except Exception as e:
-        logger.error('Unknown exception', exc_info=e)
-        proc.fail(e)
-        return proc
-
-    logger.debug('Upload opened')
-
-    try:
-        proc.upload_hash = upload.hash()
-    except files.UploadError as e:
-        logger.error('Could not create upload hash', error=str(e))
-        proc.fail(e)
-        return proc
-
-    from nomad.data import Calc
-    if Calc.upload_exists(proc.upload_hash):
-        logger.info('Upload hash doublet')
-        proc.fail('The same file was already uploaded and processed.')
-        return proc
-
-    try:
-        # TODO: deal with multiple possible parser specs
-        for filename in upload.filelist:
-            for parser in parsers:
-                try:
-                    if parser.is_mainfile(filename, lambda fn: upload.open_file(fn)):
-                        tmp_mainfile = upload.get_path(filename)
-                        calc_proc = CalcProc(filename, parser.name, tmp_mainfile)
-                        proc.calc_procs.append(calc_proc)
-                except Exception as e:
-                    logger.warn('Exception while matching pot. mainfile.', mainfile=filename)
-                    proc.warnings.append('Exception while matching pot. mainfile %s.' % filename)
-
-    except files.UploadError as e:
-        logger.warn('Could find parse specs in open upload', error=str(e))
-        proc.fail(e)
-        return proc
-
-    return proc
-
-
-@app.task(bind=True, name='cleanup')
-def cleanup_task(task, calc_procs: List[CalcProc], upload_proc: UploadProc) -> UploadProc:
-    logger = utils.get_logger(__name__, task=task.name, upload_id=upload_proc.upload_id)
-    if upload_proc.continue_with(task.name):
-
-        _report_progress(task, upload_proc)
-
-        try:
-            upload = files.Upload(upload_proc.upload_id)
-        except KeyError as e:
-            logger.warn('Upload does not exist')
-            upload_proc.fail(e)
-            return upload_proc
-
-        try:
-            upload.close()
-        except Exception as e:
-            logger.error('Could not close upload', exc_info=e)
-            upload_proc.fail(e)
-            return upload_proc
-
-        logger.debug('Closed upload')
-        upload_proc.success()
-
-    return upload_proc
-
-
-@app.task(bind=True, name='parse_all')
-def parse_all_task(task: Task, upload_proc: UploadProc, cleanup: Signature) -> UploadProc:
-    cleanup = cleanup.clone(args=(upload_proc,))
-    if not upload_proc.continue_with(task.name):
-        chord(group())(cleanup)
-        return upload_proc
-
-    # prepare the group of parallel calc processings
-    parses = group(parse_task.s(calc_proc, upload_proc) for calc_proc in upload_proc.calc_procs)
-
-    # save the calc processing task ids to the overall processing
-    for idx, child in enumerate(parses.freeze().children):
-        upload_proc.calc_procs[idx].celery_task_id = child.task_id
-
-    # initiate the chord that runs calc processings first, and close_upload afterwards
-    chord(parses)(cleanup)
-
-    return upload_proc
-
-
-@app.task(bind=True, name='parse')
-def parse_task(self, proc: CalcProc, upload_proc: UploadProc) -> CalcProc:
-    assert upload_proc.upload_hash is not None
-
-    upload_hash, parser, mainfile = upload_proc.upload_hash, proc.parser_name, proc.mainfile
-    logger = utils.get_logger(
-        __name__, task=self.name,
-        upload_id=upload_proc.upload_id, upload_hash=upload_hash, mainfile=mainfile)
-
-    # parsing
-    proc.continue_with(parser)
-    try:
-        parser_backend = parser_dict[parser].run(proc.tmp_mainfile)
-        if parser_backend.status[0] != 'ParseSuccess':
-            error = parser_backend.status[1]
-            logger.debug('Failed parsing', parser=parser, error=error)
-            proc.fail(error)
-            return proc
-        logger.debug('Completed successfully', parser=parser)
-    except Exception as e:
-        logger.warn('Exception wile parsing', parser=parser, exc_info=e)
-        proc.fail(e)
-        return proc
-    _report_progress(self, proc)
-
-    # normalization
-    for normalizer in normalizers:
-        normalizer_name = normalizer.__name__
-        proc.continue_with(normalizer_name)
-        try:
-            normalizer(parser_backend).normalize()
-            if parser_backend.status[0] != 'ParseSuccess':
-                error = parser_backend.status[1]
-                logger.info('Failed run of %s: %s' % (normalizer, error))
-                proc.fail(error)
-                return proc
-            logger.debug('Completed %s successfully' % normalizer)
-        except Exception as e:
-            logger.warn('Exception wile normalizing with %s' % normalizer, exc_info=e)
-            proc.fail(e)
-            return proc
-    _report_progress(self, proc)
-
-    # update search
-    proc.continue_with('archiving')
-    try:
-        from nomad.data import Calc
-        Calc.create_from_backend(
-            parser_backend,
-            upload_hash=upload_hash,
-            calc_hash=proc.calc_hash,
-            upload_id=upload_proc.upload_id,
-            mainfile=mainfile,
-            upload_time=datetime.now())
-        logger.debug('Archived successfully')
-    except Exception as e:
-        logger.error('Failed to archive', exc_info=e)
-        proc.fail(e)
-        return proc
-    _report_progress(self, proc)
-
-    logger.debug('Completed processing')
-
-    proc.success()
-    return proc
diff --git a/nomad/search.py b/nomad/search.py
new file mode 100644
index 0000000000000000000000000000000000000000..1c870f336550a5d235e4d72da193a4913e9b8562
--- /dev/null
+++ b/nomad/search.py
@@ -0,0 +1,159 @@
+# Copyright 2018 Markus Scheidgen
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an"AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+import elasticsearch.exceptions
+from elasticsearch_dsl import Document as ElasticDocument, Search, Date, Keyword, connections
+from datetime import datetime
+
+from nomad import config
+from nomad.parsing import LocalBackend
+from nomad.utils import get_logger
+
+logger = get_logger(__name__)
+
+# ensure elastic and mongo connections
+if 'sphinx' not in sys.modules:
+    client = connections.create_connection(hosts=[config.elastic.host])
+
+key_mappings = {
+    'basis_set_type': 'program_basis_set_type',
+    'chemical_composition': 'chemical_composition_bulk_reduced'
+}
+
+
+class AlreadyExists(Exception): pass
+
+
+class CalcElasticDocument(ElasticDocument):
+    """
+    Elastic search document that represents a calculation. It is supposed to be a
+    component of :class:`Calc`. Should only be created by its parent :class:`Calc`
+    instance and only via the :func:`create_from_backend` factory method.
+    """
+    class Index:
+        name = config.elastic.calc_index
+
+    calc_hash = Keyword()
+    mainfile = Keyword()
+    upload_hash = Keyword()
+    upload_id = Keyword()
+
+    upload_time = Date()
+
+    program_name = Keyword()
+    program_version = Keyword()
+
+    chemical_composition = Keyword()
+    basis_set_type = Keyword()
+    atom_species = Keyword()
+    system_type = Keyword()
+    crystal_system = Keyword()
+    space_group_number = Keyword()
+    configuration_raw_gid = Keyword()
+    XC_functional_name = Keyword()
+
+    @property
+    def archive_id(self) -> str:
+        """ The unique id for this calculation. """
+        return '%s/%s' % (self.upload_hash, self.calc_hash)
+
+    @classmethod
+    def create_from_backend(
+            cls, backend: LocalBackend, upload_id: str, upload_hash: str, calc_hash: str,
+            **kwargs) -> 'CalcElasticDocument':
+        """
+        Create a new calculation instance in elastic search. The data from the given backend
+        will be used. Additional meta-data can be given as *kwargs*. ``upload_id``,
+        ``upload_hash``, and ``calc_hash`` are mandatory.
+        This will create a elastic search entry and store the backend data to the
+        archive.
+
+        Arguments:
+            backend: The parsing/normalizing backend that contains the calculation data.
+            upload_hash: The upload hash of the originating upload.
+            upload_id: The upload id of the originating upload.
+            calc_hash: The upload unique hash for this calculation.
+            kwargs: Additional arguments not stored in the backend.
+
+        Raises:
+            AlreadyExists: If the calculation already exists in elastic search. We use
+                the elastic document lock here. The elastic document is IDed via the
+                ``archive_id``.
+        """
+        assert upload_hash is not None and calc_hash is not None and upload_id is not None
+        kwargs.update(dict(upload_hash=upload_hash, calc_hash=calc_hash, upload_id=upload_id))
+
+        # prepare the entry with all necessary properties from the backend
+        calc = cls(meta=dict(id='%s/%s' % (upload_hash, calc_hash)))
+        for property in cls._doc_type.mapping:
+            property = key_mappings.get(property, property)
+
+            if property in kwargs:
+                value = kwargs[property]
+            else:
+                try:
+                    value = backend.get_value(property, 0)
+                except KeyError:
+                    logger.warning(
+                        'Missing property value', property=property, upload_id=upload_id,
+                        upload_hash=upload_hash, calc_hash=calc_hash)
+                    continue
+
+            setattr(calc, property, value)
+
+        # persist to elastic search
+        try:
+            calc.save(op_type='create')
+        except Exception:
+            raise AlreadyExists('Calculation %s does already exist.' % (calc.archive_id))
+
+        return calc
+
+    @staticmethod
+    def es_search(body):
+        """ Perform an elasticsearch and not elasticsearch_dsl search on the Calc index. """
+        return client.search(index=config.elastic.calc_index, body=body)
+
+    @staticmethod
+    def upload_exists(upload_hash):
+        """ Returns true if there are already calcs from the given upload. """
+        search = Search(using=client, index=config.elastic.calc_index) \
+            .query('match', upload_hash=upload_hash) \
+            .execute()
+
+        return len(search) > 0
+
+    @property
+    def json_dict(self):
+        """ A json serializable dictionary representation. """
+        data = self.to_dict()
+
+        upload_time = data.get('upload_time', None)
+        if upload_time is not None and isinstance(upload_time, datetime):
+            data['upload_time'] = data['upload_time'].isoformat()
+
+        data['archive_id'] = self.archive_id
+
+        return {key: value for key, value in data.items() if value is not None}
+
+
+if 'sphinx' not in sys.modules:
+    try:
+        CalcElasticDocument.init()
+    except elasticsearch.exceptions.RequestError as e:
+        if e.status_code == 400 and 'resource_already_exists_exception' in e.error:
+            pass  # happens if two services try this at the same time
+        else:
+            raise e
diff --git a/nomad/user.py b/nomad/user.py
new file mode 100644
index 0000000000000000000000000000000000000000..755e1611fe1e2569cd9418e9211ce92471f8e54d
--- /dev/null
+++ b/nomad/user.py
@@ -0,0 +1,33 @@
+import sys
+from mongoengine import Document, EmailField, StringField, ReferenceField, ListField
+
+
+class User(Document):
+    """ Represents users in the database. """
+    email = EmailField(primary=True)
+    name = StringField()
+
+
+class DataSet(Document):
+    name = StringField()
+    description = StringField()
+    doi = StringField()
+
+    user = ReferenceField(User)
+    calcs = ListField(StringField)
+
+    meta = {
+        'indexes': [
+            'user',
+            'doi',
+            'calcs'
+        ]
+    }
+
+# provid a fake user for testing
+me = None
+if 'sphinx' not in sys.modules:
+    me = User.objects(email='me@gmail.com').first()
+    if me is None:
+        me = User(email='me@gmail.com', name='Me Meyer')
+        me.save()
diff --git a/nomad/utils.py b/nomad/utils.py
index 5ffcb6c89b954a5a9a958858a9de0ddaa00ccfd3..620ff0ae5a556f920bb19cc51924db10550ec2a3 100644
--- a/nomad/utils.py
+++ b/nomad/utils.py
@@ -10,6 +10,7 @@ from contextlib import contextmanager
 import json
 import os
 import sys
+import uuid
 
 from nomad import config
 
@@ -98,6 +99,10 @@ if not _logging_is_configured:
     _logging_is_configured = True
 
 
+def create_uuid() -> str:
+    return base64.b64encode(uuid.uuid4().bytes, altchars=b'-_').decode('utf-8')[0:-2]
+
+
 def hash(obj: Union[IO, str]) -> str:
     """ First 28 character of an URL safe base 64 encoded sha512 digest. """
     hash = hashlib.sha512()
diff --git a/tests/conftest.py b/tests/conftest.py
new file mode 100644
index 0000000000000000000000000000000000000000..9803c09c0035fec4db3d155bc5194646aae75ac7
--- /dev/null
+++ b/tests/conftest.py
@@ -0,0 +1,49 @@
+import pytest
+from mongoengine import connect
+from mongoengine.connection import disconnect
+
+from nomad import config
+
+
+@pytest.fixture(scope='session')
+def celery_includes():
+    return ['nomad.processing.base']
+
+
+@pytest.fixture(scope='session')
+def celery_config():
+    return {
+        'broker_url': config.celery.broker_url,
+        'result_backend': config.celery.backend_url,
+        'accept_content': ['json', 'pickle'],
+        'task_serializer': config.celery.serializer,
+        'result_serializer': config.celery.serializer
+    }
+
+
+@pytest.fixture(scope='function', autouse=True)
+def mongomock(monkeypatch):
+    def mock_connect(**kwargs):
+        return connect('test_db', host='mongomock://localhost')
+
+    disconnect()
+    connection = mock_connect()
+    monkeypatch.setattr('nomad.processing.base.mongo_connect', mock_connect)
+    yield
+    connection.drop_database('test_db')
+
+
+@pytest.fixture(scope='function')
+def mocksearch(monkeypatch):
+    uploads = []
+
+    def create_from_backend(_, **kwargs):
+        upload_hash = kwargs.get('upload_hash', None)
+        uploads.append(upload_hash)
+        return {}
+
+    def upload_exists(upload_hash):
+        return upload_hash in uploads
+
+    monkeypatch.setattr('nomad.search.CalcElasticDocument.create_from_backend', create_from_backend)
+    monkeypatch.setattr('nomad.search.CalcElasticDocument.upload_exists', upload_exists)
diff --git a/tests/test_proc.py b/tests/processing/test_base.py
similarity index 65%
rename from tests/test_proc.py
rename to tests/processing/test_base.py
index 7da2c042e03647bf9c7922349e76720c0f57eb6a..74c1903d10aa4125f74d20f85d0cf55b21b8d2bf 100644
--- a/tests/test_proc.py
+++ b/tests/processing/test_base.py
@@ -4,35 +4,7 @@ from mongoengine.connection import disconnect
 import time
 
 from nomad import config
-from nomad.proc import Proc, process, task, SUCCESS, FAILURE, RUNNING, PENDING
-
-
-@pytest.fixture(scope='session')
-def celery_includes():
-    return ['nomad.proc']
-
-
-@pytest.fixture(scope='session')
-def celery_config():
-    return {
-        'broker_url': config.celery.broker_url,
-        'result_backend': config.celery.backend_url,
-        'accept_content': ['json', 'pickle'],
-        'task_serializer': config.celery.serializer,
-        'result_serializer': config.celery.serializer
-    }
-
-
-@pytest.fixture(scope='function')
-def mongomock(monkeypatch):
-    def mock_connect(**kwargs):
-        return connect('test_db', host='mongomock://localhost')
-
-    disconnect()
-    connection = mock_connect()
-    monkeypatch.setattr('nomad.proc.mongo_connect', mock_connect)
-    yield
-    connection.drop_database('test_db')
+from nomad.processing.base import Proc, process, task, SUCCESS, FAILURE, RUNNING, PENDING
 
 
 def assert_proc(proc, current_task, status=SUCCESS, errors=0, warnings=0):
@@ -80,7 +52,7 @@ class FailTasks(Proc):
         self.fail('fail fail fail')
 
 
-def test_fail(mongomock):
+def test_fail():
     p = FailTasks.create()
     p.will_fail()
 
@@ -102,7 +74,7 @@ class SimpleProc(Proc):
         pass
 
 
-def test_simple_process(celery_session_worker, mongomock):
+def test_simple_process(celery_session_worker):
     p = SimpleProc.create()
     p.process()
     p.block_until_complete()
@@ -116,7 +88,7 @@ class TaskInProc(Proc):
         pass
 
 
-def test_task_as_proc(celery_session_worker, mongomock):
+def test_task_as_proc(celery_session_worker):
     p = TaskInProc.create()
     p.process()
     p.block_until_complete()
@@ -129,17 +101,14 @@ class ParentProc(Proc):
     @process
     @task
     def spawn_children(self):
-        print('## spawn')
         ChildProc.create(parent=self).process()
 
     @process
     @task
     def after_children(self):
-        print('## after')
         pass
 
     def on_child_complete(self):
-        print('## on complete')
         if self.incr_counter('children') == 1:
             self.after_children()
 
@@ -153,8 +122,12 @@ class ChildProc(Proc):
         self.parent.on_child_complete()
 
 
-def test_counter(celery_session_worker, mongomock):
+def test_counter(celery_session_worker):
     p = ParentProc.create()
     p.spawn_children()
     p.block_until_complete()
     assert_proc(p, 'after_children')
+
+    # wait for session worker to complete all open tasks
+    # otherwise uncompleted task request will bleed into the next tests
+    time.sleep(1)
diff --git a/tests/processing/test_data.py b/tests/processing/test_data.py
new file mode 100644
index 0000000000000000000000000000000000000000..e45159bf9e2c4123138f549b2f740d81347f346c
--- /dev/null
+++ b/tests/processing/test_data.py
@@ -0,0 +1,148 @@
+# Copyright 2018 Markus Scheidgen
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an"AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+To run this test, a celery worker must be running. The test worker provided by
+the celery pytest plugin is currently not working. It results on a timeout when
+reading from the redis result backend, even though all task apperently ended successfully.
+"""
+
+from typing import Generator
+import pytest
+import logging
+from datetime import datetime
+
+from nomad import config, files
+from nomad.processing import Upload, Calc
+from nomad.processing.base import task as task_decorator
+from nomad.user import me
+from nomad.search import CalcElasticDocument
+
+from tests.test_files import example_file, empty_file
+
+# import fixtures
+from tests.test_files import clear_files  # pylint: disable=unused-import
+
+example_files = [empty_file, example_file]
+
+
+@pytest.fixture(scope='function', autouse=True)
+def mocksearch_forall(mocksearch):
+    pass
+
+
+@pytest.fixture(scope='function', params=example_files)
+def uploaded_id(request, clear_files) -> Generator[str, None, None]:
+    example_file = request.param
+    example_upload_id = example_file.replace('.zip', '')
+    files._client.fput_object(config.files.uploads_bucket, example_upload_id, example_file)
+
+    yield example_upload_id
+
+
+def run_processing(uploaded_id: str) -> Upload:
+    upload = Upload.create(upload_id=uploaded_id, user=me)
+    upload.upload_time = datetime.now()
+
+    assert upload.status == 'RUNNING'
+    assert upload.current_task == 'uploading'
+
+    upload.process()  # pylint: disable=E1101
+    upload.block_until_complete(interval=.1)
+
+    return upload
+
+
+def assert_processing(upload: Upload):
+    assert upload.completed
+    assert upload.current_task == 'cleanup'
+    assert upload.upload_hash is not None
+    assert len(upload.errors) == 0
+    assert upload.status == 'SUCCESS'
+
+    for calc in Calc.objects(upload_id=upload.upload_id):
+        assert calc.parser is not None
+        assert calc.mainfile is not None
+        assert calc.status == 'SUCCESS', calc.archive_id
+        assert len(calc.errors) == 0
+
+
+@pytest.mark.timeout(30)
+def test_processing(uploaded_id, celery_session_worker):
+    upload = run_processing(uploaded_id)
+    assert_processing(upload)
+
+
+@pytest.mark.parametrize('uploaded_id', [example_files[1]], indirect=True)
+def test_processing_doublets(uploaded_id, celery_session_worker, caplog):
+    caplog.set_level(logging.CRITICAL)
+
+    upload = run_processing(uploaded_id)
+    assert upload.status == 'SUCCESS'
+    assert CalcElasticDocument.upload_exists(upload.upload_hash)  # pylint: disable=E1101
+
+    upload = run_processing(uploaded_id)
+    assert upload.status == 'FAILURE'
+    assert len(upload.errors) > 0
+    assert 'already' in upload.errors[0]
+
+
+@pytest.mark.timeout(30)
+def test_process_non_existing(celery_session_worker, caplog):
+    caplog.set_level(logging.CRITICAL)
+    upload = run_processing('__does_not_exist')
+
+    assert upload.completed
+    assert upload.current_task == 'extracting'
+    assert upload.status == 'FAILURE'
+    assert len(upload.errors) > 0
+
+
+@pytest.mark.parametrize('task', ['extracting', 'parse_all', 'cleanup', 'parsing'])
+@pytest.mark.timeout(30)
+def test_task_failure(monkeypatch, uploaded_id, celery_session_worker, task, caplog):
+    caplog.set_level(logging.CRITICAL)
+
+    # mock the task method to through exceptions
+    if hasattr(Upload, task):
+        cls = Upload
+    elif hasattr(Calc, task):
+        cls = Calc
+    else:
+        assert False
+
+    def mock(self):
+        raise Exception('fail for test')
+    mock.__name__ = task
+    mock = task_decorator(mock)
+
+    monkeypatch.setattr('nomad.processing.data.%s.%s' % (cls.__name__, task), mock)
+
+    # run the test
+    upload = run_processing(uploaded_id)
+
+    assert upload.completed
+
+    if task != 'parsing':
+        assert upload.status == 'FAILURE'
+        assert upload.current_task == task
+        assert len(upload.errors) > 0
+    elif len(upload.calcs) > 0:  # pylint: disable=E1101
+        assert upload.status == 'SUCCESS'
+        assert upload.current_task == 'cleanup'
+        assert len(upload.errors) > 0
+        for calc in upload.calcs:  # pylint: disable=E1101
+            assert calc.status == 'FAILURE'
+            assert calc.current_task == 'parsing'
+            assert len(calc.errors) > 0
diff --git a/tests/test_api.py b/tests/test_api.py
index b75298d4de768b09ce02f2bded9bfef0c87e0b59..340f542e19f5e63bba2e77b442e4d0a28e2f82b3 100644
--- a/tests/test_api.py
+++ b/tests/test_api.py
@@ -14,18 +14,17 @@ services_config = config.services._asdict()
 services_config.update(api_base_path='')
 config.services = config.NomadServicesConfig(**services_config)
 
-from nomad import api, files, processing  # noqa
-from nomad.data import Upload  # noqa
+from nomad import api, files  # noqa
+from nomad.processing import Upload, handle_uploads_thread  # noqa
 
-from tests.test_processing import example_files  # noqa
+from tests.processing.test_data import example_files  # noqa
 from tests.test_files import assert_exists  # noqa
 
 # import fixtures
 from tests.test_files import clear_files, archive_id  # noqa pylint: disable=unused-import
 from tests.test_normalizing import normalized_vasp_example  # noqa pylint: disable=unused-import
 from tests.test_parsing import parsed_vasp_example  # noqa pylint: disable=unused-import
-from tests.test_data import example_calc  # noqa pylint: disable=unused-import
-from tests.test_processing import celery_config, celery_includes, mocksearch  # noqa pylint: disable=unused-import
+from tests.test_search import example_elastic_calc  # noqa pylint: disable=unused-import
 
 
 @pytest.fixture(scope='function')
@@ -70,11 +69,6 @@ def test_no_uploads(client):
     assert_uploads(rv.data, count=0)
 
 
-def test_bad_upload_id(client):
-    rv = client.get('/uploads/bad_id')
-    assert rv.status_code == 400
-
-
 def test_not_existing_upload(client):
     rv = client.get('/uploads/123456789012123456789012')
     assert rv.status_code == 404
@@ -88,7 +82,7 @@ def test_stale_upload(client):
     assert rv.status_code == 200
     upload_id = assert_upload(rv.data)['upload_id']
 
-    upload = Upload.get(upload_id=upload_id)
+    upload = Upload.get(upload_id)
     upload.create_time = datetime.now() - timedelta(days=2)
     upload.save()
 
@@ -163,9 +157,9 @@ def test_upload_to_upload(client, file):
 
 
 @pytest.mark.parametrize("file", example_files)
-@pytest.mark.timeout(30)
+@pytest.mark.timeout(10)
 def test_processing(client, file, celery_session_worker, mocksearch):
-    handle_uploads_thread = processing.handle_uploads_thread(quit=True)
+    handler = handle_uploads_thread(quit=True)
 
     rv = client.post('/uploads')
     assert rv.status_code == 200
@@ -176,7 +170,7 @@ def test_processing(client, file, celery_session_worker, mocksearch):
     cmd = files.create_curl_upload_cmd(upload_url).replace('<ZIPFILE>', file)
     subprocess.call(shlex.split(cmd))
 
-    handle_uploads_thread.join()
+    handler.join()
 
     while True:
         time.sleep(1)
@@ -185,20 +179,25 @@ def test_processing(client, file, celery_session_worker, mocksearch):
         assert rv.status_code == 200
         upload = assert_upload(rv.data)
         assert 'upload_time' in upload
-        if upload['proc']['status'] in ['SUCCESS', 'FAILURE']:
+        if upload['completed']:
             break
 
-    proc = upload['proc']
-    assert proc['status'] == 'SUCCESS'
-    assert 'calc_procs' in proc
-    assert proc['calc_procs'] is not None
-    assert proc['current_task_name'] == 'cleanup'
-    assert len(proc['task_names']) == 4
-    assert_exists(config.files.uploads_bucket, upload['upload_id'])
+    assert len(upload['tasks']) == 4
+    assert upload['status'] == 'SUCCESS'
+    assert upload['current_task'] == 'cleanup'
+    calcs = upload['calcs']
+    for calc in calcs:
+        assert calc['status'] == 'SUCCESS'
+        assert calc['current_task'] == 'archiving'
+        assert len(calc['tasks']) == 3
+        assert_exists(config.files.uploads_bucket, upload['upload_id'])
+
+    time.sleep(1)
 
 
-def test_repo_calc(client, example_calc):
-    rv = client.get('/repo/%s/%s' % (example_calc.upload_hash, example_calc.calc_hash))
+def test_repo_calc(client, example_elastic_calc):
+    rv = client.get(
+        '/repo/%s/%s' % (example_elastic_calc.upload_hash, example_elastic_calc.calc_hash))
     assert rv.status_code == 200
 
 
@@ -207,7 +206,7 @@ def test_non_existing_repo_cals(client):
     assert rv.status_code == 404
 
 
-def test_repo_calcs(client, example_calc):
+def test_repo_calcs(client, example_elastic_calc):
     rv = client.get('/repo')
     assert rv.status_code == 200
     data = json.loads(rv.data)
@@ -217,7 +216,7 @@ def test_repo_calcs(client, example_calc):
     assert len(results) >= 1
 
 
-def test_repo_calcs_pagination(client, example_calc):
+def test_repo_calcs_pagination(client, example_elastic_calc):
     rv = client.get('/repo?page=1&per_page=1')
     assert rv.status_code == 200
     data = json.loads(rv.data)
diff --git a/tests/test_processing.py b/tests/test_processing.py
deleted file mode 100644
index e1de1628d943da87e55411f3b38779343ec1fafa..0000000000000000000000000000000000000000
--- a/tests/test_processing.py
+++ /dev/null
@@ -1,172 +0,0 @@
-# Copyright 2018 Markus Scheidgen
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an"AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-To run this test, a celery worker must be running. The test worker provided by
-the celery pytest plugin is currently not working. It results on a timeout when
-reading from the redis result backend, even though all task apperently ended successfully.
-"""
-
-from typing import Generator
-import pytest
-import time
-import logging
-
-from nomad import config, files
-from nomad.data import Calc
-from nomad.processing import start_processing, ProcPipeline
-
-from tests.test_files import example_file, empty_file
-
-# import fixtures
-from tests.test_files import clear_files  # pylint: disable=unused-import
-
-example_files = [empty_file, example_file]
-
-
-@pytest.fixture(scope='function')
-def mocksearch(monkeypatch):
-    uploads = []
-
-    def create_from_backend(_, **kwargs):
-        upload_hash = kwargs.get('upload_hash', None)
-        uploads.append(upload_hash)
-        return {}
-
-    def upload_exists(upload_hash):
-        return upload_hash in uploads
-
-    monkeypatch.setattr('nomad.data.Calc.create_from_backend', create_from_backend)
-    monkeypatch.setattr('nomad.data.Calc.upload_exists', upload_exists)
-
-
-@pytest.fixture(scope='function', autouse=True)
-def mocksearch_forall(mocksearch):
-    pass
-
-
-@pytest.fixture(scope='session')
-def celery_includes():
-    return ['nomad.processing.tasks']
-
-
-@pytest.fixture(scope='session')
-def celery_config():
-    return {
-        'broker_url': config.celery.broker_url,
-        'result_backend': config.celery.backend_url,
-        'accept_content': ['json', 'pickle'],
-        'task_serializer': config.celery.serializer,
-        'result_serializer': config.celery.serializer
-    }
-
-
-@pytest.fixture(scope='function', params=example_files)
-def uploaded_id(request, clear_files) -> Generator[str, None, None]:
-    example_file = request.param
-    example_upload_id = example_file.replace('.zip', '')
-    files._client.fput_object(config.files.uploads_bucket, example_upload_id, example_file)
-
-    yield example_upload_id
-
-
-@pytest.mark.timeout(30)
-def test_processing(uploaded_id, celery_session_worker):
-    upload_proc = start_processing(uploaded_id)
-
-    upload_proc.update_from_backend()
-
-    assert upload_proc.status in ['PENDING', 'STARTED', 'PROGRESS']
-
-    while not upload_proc.ready():
-        time.sleep(1)
-        upload_proc.update_from_backend()
-
-    assert upload_proc.ready()
-    assert upload_proc.current_task_name == 'cleanup'
-    assert upload_proc.upload_hash is not None
-    assert len(upload_proc.errors) == 0
-    assert upload_proc.status == 'SUCCESS'
-    for calc_proc in upload_proc.calc_procs:
-        assert calc_proc.parser_name is not None
-        assert calc_proc.mainfile is not None
-        assert calc_proc.calc_hash is not None
-        assert calc_proc.status == 'SUCCESS'
-        assert len(calc_proc.errors) == 0
-
-    upload_proc.forget()
-
-
-@pytest.mark.parametrize('uploaded_id', [example_files[1]], indirect=True)
-def test_processing_doublets(uploaded_id, celery_session_worker, caplog):
-    caplog.set_level(logging.CRITICAL)
-    upload_proc = start_processing(uploaded_id)
-    upload_proc.get()
-    assert upload_proc.status == 'SUCCESS'
-
-    assert Calc.upload_exists(upload_proc.upload_hash)
-
-    upload_proc = start_processing(uploaded_id)
-    upload_proc.get()
-    assert upload_proc.status == 'FAILURE'
-    assert len(upload_proc.errors) > 0
-    assert 'already' in upload_proc.errors[0]
-
-
-@pytest.mark.timeout(30)
-def test_process_non_existing(celery_session_worker, caplog):
-    caplog.set_level(logging.CRITICAL)
-    upload_proc = start_processing('__does_not_exist')
-
-    upload_proc.get()
-
-    assert upload_proc.ready()
-    upload_proc.forget()
-
-    assert upload_proc.current_task_name == 'extracting'
-    assert upload_proc.status == 'FAILURE'
-    assert len(upload_proc.errors) > 0
-
-
-@pytest.mark.parametrize('task', ['extracting', 'parse_all', 'cleanup', 'parsers/vasp'])
-def test_task_failure(monkeypatch, uploaded_id, celery_session_worker, task, caplog):
-    caplog.set_level(logging.CRITICAL)
-
-    original_continue_with = ProcPipeline.continue_with
-
-    def continue_with(self: ProcPipeline, current_task):
-        if task == current_task:
-            raise Exception('fail for test')
-
-        return original_continue_with(self, current_task)
-
-    monkeypatch.setattr('nomad.processing.state.ProcPipeline.continue_with', continue_with)
-
-    upload_proc = start_processing(uploaded_id)
-    upload_proc.get()
-
-    assert upload_proc.ready()
-
-    if task != 'parsers/vasp':
-        assert upload_proc.status == 'FAILURE'
-        assert upload_proc.current_task_name == task
-        assert len(upload_proc.errors) > 0
-    elif len(upload_proc.calc_procs) > 0:  # ignore the empty example upload
-        assert upload_proc.status == 'FAILURE'
-        assert upload_proc.current_task_name == 'cleanup'
-        assert len(upload_proc.errors) > 0
-        for calc_proc in upload_proc.calc_procs:
-            assert calc_proc.status == 'FAILURE'
-            assert calc_proc.current_task_name == 'parser/vasp'
-            assert len(calc_proc.errors) > 0
diff --git a/tests/test_data.py b/tests/test_search.py
similarity index 64%
rename from tests/test_data.py
rename to tests/test_search.py
index 26fbdf2729bc129d0a1e816a4ab6f124cee44519..be0fb2a22a339dc7d0cf2e317d5cd0cd81ed4585 100644
--- a/tests/test_data.py
+++ b/tests/test_search.py
@@ -21,7 +21,7 @@ from elasticsearch import NotFoundError
 
 from nomad import config
 from nomad.parsing import LocalBackend
-from nomad.data import Calc, AlreadyExists, key_mappings
+from nomad.search import AlreadyExists, CalcElasticDocument, key_mappings
 
 from tests.test_normalizing import normalized_vasp_example  # pylint: disable=unused-import
 from tests.test_parsing import parsed_vasp_example  # pylint: disable=unused-import
@@ -29,16 +29,17 @@ from tests.test_files import assert_not_exists
 
 
 @pytest.fixture(scope='function')
-def example_calc(normalized_vasp_example: LocalBackend, caplog) -> Generator[Calc, None, None]:
+def example_elastic_calc(normalized_vasp_example: LocalBackend, caplog) \
+        -> Generator[CalcElasticDocument, None, None]:
     try:
         caplog.set_level(logging.ERROR)
-        Calc.get(id='test_upload_hash/test_calc_hash').delete()
+        CalcElasticDocument.get(id='test_upload_hash/test_calc_hash').delete()
     except Exception:
         pass
     finally:
         caplog.set_level(logging.WARNING)
 
-    entry = Calc.create_from_backend(
+    entry = CalcElasticDocument.create_from_backend(
         normalized_vasp_example,
         upload_hash='test_upload_hash',
         calc_hash='test_calc_hash',
@@ -58,28 +59,27 @@ def example_calc(normalized_vasp_example: LocalBackend, caplog) -> Generator[Cal
         caplog.set_level(logging.WARNING)
 
 
-def assert_calc(calc: Calc):
+def assert_elastic_calc(calc: CalcElasticDocument):
     assert calc is not None
-    for property in Calc._doc_type.mapping:
+    for property in CalcElasticDocument._doc_type.mapping:
         property = key_mappings.get(property, property)
         assert getattr(calc, property) is not None
 
 
-def test_create(example_calc: Calc):
-    assert_calc(example_calc)
-    assert Calc.upload_exists(example_calc.upload_hash)
+def test_create_elasitc_calc(example_elastic_calc: CalcElasticDocument):
+    assert_elastic_calc(example_elastic_calc)
+    assert CalcElasticDocument.upload_exists(example_elastic_calc.upload_hash)
 
-    get_result: Calc = Calc.get(id='%s/%s' % (example_calc.upload_hash, example_calc.calc_hash))
-    assert_calc(get_result)
+    get_result: CalcElasticDocument = CalcElasticDocument.get(
+        id='%s/%s' % (example_elastic_calc.upload_hash, example_elastic_calc.calc_hash))
+    assert_elastic_calc(get_result)
 
-    json_dict = get_result.json_dict
-    assert 'archive_id' in json_dict
 
-
-def test_create_existing(example_calc: Calc, normalized_vasp_example, caplog):
+def test_create_existing_elastic_calc(
+        example_elastic_calc: CalcElasticDocument, normalized_vasp_example, caplog):
     try:
         caplog.set_level(logging.ERROR)
-        Calc.create_from_backend(
+        CalcElasticDocument.create_from_backend(
             normalized_vasp_example,
             upload_hash='test_upload_hash',
             calc_hash='test_calc_hash',
@@ -94,13 +94,13 @@ def test_create_existing(example_calc: Calc, normalized_vasp_example, caplog):
         assert False
 
 
-def test_delete(example_calc: Calc, caplog):
-    example_calc.delete()
+def test_delete_elastic_calc(example_elastic_calc: CalcElasticDocument, caplog):
+    example_elastic_calc.delete()
 
     assert_not_exists(config.files.archive_bucket, 'test_upload_hash/test_calc_hash')
     try:
         caplog.set_level(logging.ERROR)
-        Calc.get(id='test_upload_hash/test_calc_hash')
+        CalcElasticDocument.get(id='test_upload_hash/test_calc_hash')
         assert False
     except NotFoundError:
         pass
@@ -108,19 +108,3 @@ def test_delete(example_calc: Calc, caplog):
         assert False
     finally:
         caplog.set_level(logging.WARNING)
-
-
-def test_delete_all(example_calc: Calc, caplog):
-    Calc.delete_all(upload_id=example_calc.upload_id)
-
-    assert_not_exists(config.files.archive_bucket, 'test_upload_hash/test_calc_hash')
-    try:
-        caplog.set_level(logging.ERROR)
-        Calc.get(id='test_upload_hash/test_calc_hash')
-        assert False
-    except NotFoundError:
-        pass
-    else:
-        assert False
-    finally:
-        caplog.set_level(logging.WARNING)
\ No newline at end of file