Commit f3d4cd72 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Updated documentation.

parent a5485fee
No preview for this file type
docs/components.png

75.3 KB | W: | H:

docs/components.png

82 KB | W: | H:

docs/components.png
docs/components.png
docs/components.png
docs/components.png
  • 2-up
  • Swipe
  • Onion skin
......@@ -48,6 +48,7 @@ extensions = [
'sphinxcontrib.httpdomain',
'sphinxcontrib.autohttp.flask',
'sphinxcontrib.autohttp.flaskqref',
'celery.contrib.sphinx'
]
# Add any paths that contain templates here, relative to this directory.
......
......@@ -7,8 +7,8 @@ and infrastructure with a simplyfied and improved architecture.
.. toctree::
:maxdepth: 2
setup
introduction
setup
api
reference
contributing
......@@ -14,12 +14,64 @@ search enginines, etc.)
.. figure:: components.png
:alt: nomad xt components
The main components of nomad xt and their dependencies.
The main modules of nomad xt
Principles
Nomad xt uses a series of 3rd party technologies that already solve most of nomads
processing, storage, availability, and scaling goals:
minio.io
^^^^^^^^
http://minio.io is a s3 compatible object storage API that can be used in scaled in
various cloud and HPC contexts, running on a simple fily system, NAS, or actual object
storage. We use it to store uploaded files, raw repo files for download, and archive files.
Minio enables clients to downlaod and upload files via presigned URLs. This us to provide
selective secure scalable access to data files.
celery
^^^^^^
http://celeryproject.org (incl. rabbitmq, redis) is a popular combination for realizing
long running tasks in internet applications. We use it to drive the processing of uploaded files.
It allows us to transparently distribute processing load, while keeping processing state available.
elastic search
^^^^^^^^^^^^^^
Elastic search is used to store repository data (not the raw files, they are keps in minio).
Elastic search allows for flexible scalable search and analytics.
mongodb
^^^^^^^
Mongo is used to store all other user relavant data, like users, data sets, DOIs, etc.
elastic stack
^^^^^^^^^^^^^
The *elastic stack* (previously *ELK* stack) is a central logging, metrics, and monitoring
solution that collects data within the cluster and provides a flexible analytics frontend
for said data.
Data model
----------
* simple first, complicated only when necessary
* adopting generic established 3rd party solutions before implementing specific solutions
* only uni directional dependencies between components/modules, no circles
* only one language: Python (except, GUI of course)
\ No newline at end of file
.. figure:: data.png
:alt: nomad xt data model
The main data classes in nomad xt
See :py:mod:`nomad.data` for further information.
Processing
----------
.. figure:: proc.png
:alt: nomad xt processing workflow
The workflow of nomad xt processing tasks
See :py:mod:`nomad.processing` for further information.
Design principles
-----------------
- simple first, complicated only when necessary
- adopting generic established 3rd party solutions before implementing specific solutions
- only uni directional dependencies between components/modules, no circles
- only one language: Python (except, GUI of course)
\ No newline at end of file
......@@ -55,7 +55,7 @@ You can alos run services selectively, e.g.
```
docker-compose up -d redis, rabbitmq, minio, minio-config, mongo, elastic, elk
docker-compose up worker handler
docker-compose up api gui
docker-compose up api gui proxy
```
If you run the ELK stack (and enable logstash in nomad/config.py),
......@@ -70,10 +70,8 @@ mc config host add minio http://localhost:9007 AKIAIOSFODNN7EXAMPLE wJalrXUtnFEM
### Serving minio, api, gui from one nginx
The docker-compose is setup to server all client accessible services from one webserver
via nginx *proxy_pass* directives.
This is currelty part of the gui image/container. The api is served at `/nomadxt/api`,
minio ad `/nomadxt/objects` and the gui ad `/nomadxt`. This also means that there
is some URL rewriting and prefixing in the api and gui.
via nginx *proxy_pass* directives. This is done in the `proxy` container.
### Run the nomad worker manually
You can run the worker, handler, api, and gui as part of the docker infrastructure, like
......
......@@ -5,7 +5,7 @@ from elasticsearch.exceptions import NotFoundError
from nomad import config, files
from nomad.utils import get_logger
from nomad.data import Calc, Upload, User, InvalidId, NotAllowedDuringProcessing, me
from nomad.data import Calc, Upload, InvalidId, NotAllowedDuringProcessing, me
base_path = config.services.api_base_path
......@@ -23,7 +23,7 @@ class UploadsRes(Resource):
"""
Get a list of current users uploads.
.. :quickref: Get a list of current users uploads.
.. :quickref: upload; Get a list of current users uploads.
**Example request**:
......@@ -79,7 +79,7 @@ class UploadsRes(Resource):
a *presigned* upload URL. PUT a file to this URL to do the actual upload and
initiate the processing.
.. :quickref: Create a new upload.
.. :quickref: upload; Create a new upload.
**Example request**:
......@@ -125,6 +125,7 @@ class UploadsRes(Resource):
}
}
]
:jsonparam string name: An optional name for the upload.
:reqheader Content-Type: application/json
:resheader Content-Type: application/json
......@@ -146,7 +147,7 @@ class UploadRes(Resource):
processing infrastucture. You can use this endpoint to periodically pull the
new processing state until the upload ``is_ready``.
.. :quickref: Get an update for an existing upload.
.. :quickref: upload; Get an update for an existing upload.
**Example request**:
......@@ -188,6 +189,7 @@ class UploadRes(Resource):
}
}
]
:param string upload_id: the id for the upload
:resheader Content-Type: application/json
:status 200: upload successfully updated and retrieved
......@@ -207,7 +209,7 @@ class UploadRes(Resource):
Deletes an existing upload. Only ``is_ready`` or ``is_stale`` uploads
can be deleted. Deleting an upload in processing is not allowed.
.. :quickref: Delete an existing upload.
.. :quickref: upload; Delete an existing upload.
**Example request**:
......@@ -235,6 +237,59 @@ class UploadRes(Resource):
class RepoCalcRes(Resource):
def get(self, upload_hash, calc_hash):
"""
Get calculation data in repository form, which only entails the quanties shown
in the repository. This is basically the elastic search index entry for the
requested calculations. Calcs are references via *upload_hash*, *calc_hash*
pairs.
.. :quickref: repo; Get calculation data in repository form.
**Example request**:
.. sourcecode:: http
GET /nomadxt/api/repo/W36aqCzAKxOCfIiMFsBJh3nHPb4a/7ddvtfRfZAvc3Crr7jOJ8UH0T34I HTTP/1.1
Accept: application/json
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Vary: Accept
Content-Type: application/json
{
"calc_hash":"7ddvtfRfZAvc3Crr7jOJ8UH0T34I",
"upload_time":"2018-08-30T08:41:51.771367",
"upload_id":"5b87adb813a441000a70a968",
"upload_hash":"W36aqCzAKxOCfIiMFsBJh3nHPb4a",
"mainfile":"RopD3Mo8oMV_-E5bh8uW5PiiCRkH1/data/BrK_svSi/TFCC010.CAB/vasprun.xml.relax1",
"program_name":"VASP",
"program_version":"4.6.35 3Apr08 complex parallel LinuxIFC",
"chemical_composition_bulk_reduced":"BrKSi2",
"program_basis_set_type":"plane waves",
"atom_species":[
35,
19,
14,
14
],
"system_type":"Bulk",
"crystal_system":"orthorhombic",
"space_group_number":47,
"configuration_raw_gid":"sq6wTJjRKb2VTajoDLVWDxHCgyN6i",
"XC_functional_name":"GGA_X_PBE"
}
:param string upload_hash: the hash of the upload (from uploaded file contents)
:param string calc_hash: the hash of the calculation (from mainfile)
:resheader Content-Type: application/json
:status 200: calc successfully retrieved
:status 404: calc with given hashes does not exist
:returns: the repository calculation entry
"""
try:
return Calc.get(id='%s/%s' % (upload_hash, calc_hash)).json_dict, 200
except NotFoundError:
......@@ -245,6 +300,64 @@ class RepoCalcRes(Resource):
class RepoCalcsRes(Resource):
def get(self):
"""
Get *'all'* calculations in repository from, paginated.
.. :quickref: repo; Get *'all'* calculations in repository from, paginated.
**Example request**:
.. sourcecode:: http
GET /nomadxt/api/repo?page=1&per_page=25 HTTP/1.1
Accept: application/json
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Vary: Accept
Content-Type: application/json
{
"pagination":{
"total":1,
"page":1,
"per_page":25
},
"results":[
{
"calc_hash":"7ddvtfRfZAvc3Crr7jOJ8UH0T34I",
"upload_time":"2018-08-30T08:41:51.771367",
"upload_id":"5b87adb813a441000a70a968",
"upload_hash":"W36aqCzAKxOCfIiMFsBJh3nHPb4a",
"mainfile":"RopD3Mo8oMV_-E5bh8uW5PiiCRkH1/data/BrK_svSi/TFCC010.CAB/vasprun.xml.relax1",
"program_name":"VASP",
"program_version":"4.6.35 3Apr08 complex parallel LinuxIFC",
"chemical_composition_bulk_reduced":"BrKSi2",
"program_basis_set_type":"plane waves",
"atom_species":[
35,
19,
14,
14
],
"system_type":"Bulk",
"crystal_system":"orthorhombic",
"space_group_number":47,
"configuration_raw_gid":"sq6wTJjRKb2VTajoDLVWDxHCgyN6i",
"XC_functional_name":"GGA_X_PBE"
}
]
}
:qparam int page: the page starting with 1
:qparam int per_page: desired calcs per page
:resheader Content-Type: application/json
:status 200: calcs successfully retrieved
:returns: a list of repository entries in ``results`` and pagination info
"""
logger = get_logger(__name__, endpoint='repo', action='get')
# TODO use argparse? bad request reponse an bad params, pagination as decorator
......@@ -272,6 +385,26 @@ class RepoCalcsRes(Resource):
@app.route('%s/archive/<string:upload_hash>/<string:calc_hash>' % base_path, methods=['GET'])
def get_calc(upload_hash, calc_hash):
"""
Get calculation data in archive form. Calcs are references via *upload_hash*, *calc_hash*
pairs.
.. :quickref: archive; Get calculation data in archive form.
**Example request**:
.. sourcecode:: http
GET /nomadxt/api/archive/W36aqCzAKxOCfIiMFsBJh3nHPb4a/7ddvtfRfZAvc3Crr7jOJ8UH0T34I HTTP/1.1
Accept: application/json
:param string upload_hash: the hash of the upload (from uploaded file contents)
:param string calc_hash: the hash of the calculation (from mainfile)
:resheader Content-Type: application/json
:status 200: calc successfully retrieved
:status 404: calc with given hashes does not exist
:returns: the metainfo formated JSON data of the requested calculation
"""
logger = get_logger(__name__, endpoint='archive', action='get', upload_hash=upload_hash, calc_hash=calc_hash)
archive_id = '%s/%s' % (upload_hash, calc_hash)
......
......@@ -15,8 +15,17 @@
"""
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files
calculations, and files.
.. figure:: data.png
:alt: nomad xt data model
The main nomad xt data classes and their associations
The blue properties are exposed via our REST API.
For detailed information on :class:`nomad.processing.UploadProc` and
:class:`nomad.processing.CalcProc` refer to :py:mod:`nomad.processing`.
.. autoclass:: Calc
:members:
......
......@@ -13,29 +13,56 @@
# limitations under the License.
"""
Processing comprises everything that is necessary to take a file uploaded by a user
and processes it until we have all data necessary for repository, archive, and
potentially further services. This includes storing respective data and information
in the data services (e.g. *minio*, *mongo*, or *elastic*).
Processing comprises everything that is necessary to take an uploaded user file,
processes it, and store all necessary data for *repository*, *archive*, and potential
future services (e.g. *encyclopedia*).
A further responsiblity of this module is to provide state information about
running and completed processings. It also needs to provide static information about
the existing processing steps and tasks.
Processing is build on top of *celery* (http://www.celeryproject.org/).
Celery provides a task-based programming model for distributed computing. It uses
a broker, e.g. a distributed task queue like *RabbitMQ* (), to distribute task requests,
and a result backend, e.g. a *Redis* database (), to access (intermediate) task results.
This combination allows us to easily distribute processing work while having
the processing state, i.e. (intermediate) results, always available.
This module is structures into our *celery app* (``app.py``), the task definitions
(``tasks.py``), classes that represent state for processing *uploads* and their
*calculations* (``state.py``), and the *handler* service that initiates processing
based on file storage notifications (``handler.py``, ``handlerdaemon.py``).
This module does not contain the functions to do the actual work. Those are encapsulated
in :py:mod:`nomad.files`, :py:mod:`nomad.search`, :py:mod:`nomad.users`,
:py:mod:`nomad.parsing`, and :py:mod:`nomad.normalizing`.
Represent processing state
--------------------------
Processing app
--------------
Refer to http://www.celeryproject.org/ to learn about celery apps and workers. The
nomad celery app uses a *RabbitMQ* broker and *Redis* result backend. It uses *pickle*
for serialization of arguments and results (usually instances of the :class:`Proc` state
classes).
Processing tasks
----------------
There are *upload* processing tasks (extracting, parse_all, cleanup) and a calculation
processing task (parse). The upload processing tasks are ment to be executed in sequence.
.. figure:: process.png
.. figure:: proc.png
:alt: nomad xt processing workflow
This is the basic workflow of a nomad xt upload processing.
.. autoclass:: ProcPipeline
.. autotask:: nomad.processing.tasks.extracting_task
.. autotask:: nomad.processing.tasks.cleanup_task
.. autotask:: nomad.processing.tasks.parse_all_task
.. autotask:: nomad.processing.tasks.parse_task
Represent processing state
--------------------------
.. autoclass:: Proc
:members:
.. autoclass:: UploadProc
:members:
......@@ -53,5 +80,5 @@ Initiate processing
from nomad.processing.app import app
from nomad.processing import tasks
from nomad.processing.state import ProcPipeline, UploadProc, CalcProc
from nomad.processing.state import Proc, UploadProc, CalcProc
from nomad.processing.handler import handle_uploads, handle_uploads_thread, start_processing
......@@ -100,6 +100,7 @@ def handle_uploads(quit=False):
def handle_uploads_thread(quit=True):
""" Same as :func:`handle_uploads` but run in a separate thread. """
thread = Thread(target=lambda: handle_uploads(quit))
thread.start()
return thread
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Just starts a handler that will initiate upload processing on uploaded file
notifications from the file store. See :func:`nomad.processing.handler.handle_uploads`.
"""
from nomad.processing.handler import handle_uploads
if __name__ == '__main__':
......
......@@ -22,15 +22,24 @@ from nomad.normalizing import normalizers
from nomad.processing.app import app
class ProcPipeline(utils.DataObject):
class Proc(utils.DataObject):
"""
This is a base class for processing state. Processing state is reprented as a
sequence of pseudo tasks (not necessarely celery tasks). This class keeps the
tasks names, the currently active tasks, and a general *status*, that can be
``PENDING``, ``PROGRESS``, ``SUCCESS``, or ``FAILURE``. A processing can be
failed on each *task*; successiv tasks are only executed if the processing has not
yet failed.
The processing state can be persistet as JSON (e.g. in mongodb).
Arguments:
task_names: A list of task names in pipeline order.
task_names: a list of task names in sequence
Attributes:
current_task_name: Name of the currently running task.
status: Aggregated status for the whole process. Celery status names as convention.
errors: A list of potential error that caused failure.
current_task_name: name of the currently running task
status: aggregated status for the whole process, celery status names as convention
errors: a list of potential error that caused failure
"""
def __init__(self, task_names: List[str], *args, **kwargs) -> None:
super().__init__(*args)
......@@ -81,22 +90,25 @@ class ProcPipeline(utils.DataObject):
self.status = 'FAILURE'
class CalcProc(ProcPipeline):
class CalcProc(Proc):
"""
Used to represent the state of an calc processing. It is used to provide
information to the user (via api, users) and act as a state object within the
more complex calc processing task. Keep in mind that this task might become several
more complex calc processing task.
Currently just pseudo tasks. Keep in mind that this task might become several
celery tasks in the future.
The list of tasks is derived from the given parser, and the list of defined
normalizers (:mod:`nomad.normalizing`).
Arguments:
upload_hash: The hash that identifies the upload in the archive.
mainfile: The path to the mainfile in the upload.
parser_name: The name of the parser to use/used.
tmp_mainfile: The full path to the mainfile in the local fs.
upload_hash: the hash that identifies the upload in the archive
mainfile: the path to the mainfile in the upload
parser_name: the name of the parser to use/used
tmp_mainfile: the full path to the mainfile in the local fs
Attributes:
calc_hash: The mainfile hash that identifies the calc in the archive.
celery_task_id: The celery task id for the calc parse celery task.
calc_hash: the mainfile hash that identifies the calc in the archive
celery_task_id: the celery task id for the calc parse celery task
"""
def __init__(self, mainfile, parser_name, tmp_mainfile, *args, **kwargs):
task_names = [
......@@ -137,7 +149,7 @@ class CalcProc(ProcPipeline):
return False
class UploadProc(ProcPipeline):
class UploadProc(Proc):
"""
Used to represent the state of an upload processing. It is used to provide
information to the user (via api, users) and act as a state object that is passed
......@@ -147,6 +159,10 @@ class UploadProc(ProcPipeline):
:class:`~celery.results.AsyncResults` instance in serialized *tuple* form to
keep connected to the results backend.
The state of calculation processings (:class:`CalcProc`) is keept as a list of
children. This might be inappropriate in the future, when there are uploads with
thousands of calculations.
Warning:
You have to call :func:`forget` eventually to free all resources and the celery
results backend.
......@@ -155,14 +171,13 @@ class UploadProc(ProcPipeline):
<http://docs.celeryproject.org/en/latest/userguide/configuration.html#result-expires>`_.
Arguments:
upload_id: The id of the uploaded file in the object storage,
see also :mod:`nomad.files`.
task_name: The names of all task in pipeline order.
upload_id: the id of the uploaded file in the object storage,
see also :mod:`nomad.files`
Attributes:
upload_hash: The hash of the uploaded file. E.g., used for archive/repo ids.
calc_procs: The state data for all child calc processings.
celery_task_ids: Serialized form of the celery async_results tree for the processing.
upload_hash: the hash of the uploaded file; e.g., used for archive/repo ids
calc_procs: the state data for all child calc processings
celery_task_ids: serialized form of the celery async_results tree for the processing
"""
def __init__(self, upload_id: str, *args, **kwargs) -> None:
assert upload_id is not None
......
......@@ -33,6 +33,16 @@ def _report_progress(task, dct):
@app.task(bind=True, name='extracting')
def extracting_task(task: Task, proc: UploadProc) -> UploadProc:
"""
The task that extracts an already uploaded file. It goes through all files in
the upload and identifies parser/mainfile matches. The matched parser, mainfile
pairs are stored in the upload state.
Arguments:
proc: The :class:`UploadProc` instance that represents the upload processing.
Returns: an updated version of the upload processing state.
"""
logger = utils.get_logger(__name__, task=task.name, upload_id=proc.upload_id)
if not proc.continue_with(task.name):
return proc
......@@ -94,6 +104,15 @@ def extracting_task(task: Task, proc: UploadProc) -> UploadProc:
@app.task(bind=True, name='cleanup')
def cleanup_task(task, calc_procs: List[CalcProc], upload_proc: UploadProc) -> UploadProc:
"""
This task is used to removed any extracted files after the upload completed processing.
Arguments:
calc_procs: a list of cacl processing states
upload_proc: the upload processing state
Returns: an updated upload processing state
"""
logger = utils.get_logger(__name__, task=task.name, upload_id=upload_proc.upload_id)
if upload_proc.continue_with(task.name):
......@@ -121,6 +140,17 @@ def cleanup_task(task, calc_procs: List[CalcProc], upload_proc: UploadProc) -> U
@app.task(bind=True, name='parse_all')
def parse_all_task(task: Task, upload_proc: UploadProc, cleanup: Signature) -> UploadProc:
"""
Initiates the processing for all mainfail, parser pairs stored in the uploading
processing state. Parsing is called async and in parallel via celery *chord*.
Arguments:
upload_proc: the upload processing state
cleanup: a signature for the cleanup task that is used to call the clean up task
when all parse tasks (i.e. the chord) finished
Returns: an updated upload processing state
"""
cleanup = cleanup.clone(args=(upload_proc,))
if not upload_proc.continue_with(task.name):
chord(group())(cleanup)
......@@ -141,6 +171,17 @@ def parse_all_task(task: Task, upload_proc: UploadProc, cleanup: Signature) -> U
@app.task(bind=True, name='parse')
def parse_task(self, proc: CalcProc, upload_proc: UploadProc) -> CalcProc:
"""
This task processes a calculation based on a parser, mainfile pair. It reports
intermediate progress via the logical *'tasks'* defined in the
calculation processing state.
Arguments:
proc: the calculation processing state
upload_proc: the upload processing state
Returns: an updated calcualtion processing state.
"""
assert upload_proc.upload_hash is not None
upload_hash, parser, mainfile = upload_proc.upload_hash, proc.parser_name, proc.mainfile
......
......@@ -25,7 +25,7 @@ import logging
from nomad import config, files
from nomad.data import Calc
from nomad.processing import start_processing, ProcPipeline
from nomad.processing import start_processing, Proc
from tests.test_files import example_file, empty_file