Commit d53a7e5a authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Refactored the 'Upload' class.

parent 4c803105
No preview for this file type
from typing import Tuple
from flask import Flask, request, redirect
from flask_restful import Resource, Api, abort
from datetime import datetime
import mongoengine.errors
from flask_cors import CORS
from elasticsearch.exceptions import NotFoundError
from nomad import users, files, config
from nomad.utils import lnr, get_logger
from nomad.processing import UploadProc
from nomad.data import Calc
from nomad import config, files
from nomad.utils import get_logger
from nomad.data import Calc, Upload, User, InvalidId, NotAllowedDuringProcessing
base_path = config.services.api_base_path
......@@ -22,136 +18,46 @@ api = Api(app)
# provid a fake user for testing
me = users.User.objects(email='me@gmail.com').first()
me = User.objects(email='me@gmail.com').first()
if me is None:
me = users.User(email='me@gmail.com', name='Me Meyer')
me = User(email='me@gmail.com', name='Me Meyer')
me.save()
def _external_objects_url(url):
""" Replaces the given internal object storage url (minio) with an URL that allows
external access. """
port_with_colon = ''
if config.services.objects_port > 0:
port_with_colon = ':%d' % config.services.objects_port
return url.replace(
'%s:%s' % (config.minio.host, config.minio.port),
'%s%s%s' % (config.services.objects_host, port_with_colon, config.services.objects_base_path))
def _updated_proc(upload: users.Upload) -> Tuple[UploadProc, bool]:
is_stale = False
if upload.proc:
proc = UploadProc(**upload.proc)
if proc.update_from_backend():
upload.proc = proc
upload.save()
if proc.current_task_name == proc.task_names[0] and upload.upload_time is None:
is_stale = (datetime.now() - upload.create_time).days > 1
else:
proc = None
return proc, is_stale
def _render(upload: users.Upload, proc: UploadProc, is_stale: bool) -> dict:
data = {
'name': upload.name,
'upload_id': upload.upload_id,
'presigned_url': _external_objects_url(upload.presigned_url),
'presigned_orig': upload.presigned_url,
'create_time': upload.create_time.isoformat() if upload.create_time is not None else None,
'upload_time': upload.upload_time.isoformat() if upload.upload_time is not None else None,
'proc_time': upload.proc_time.isoformat() if upload.proc_time is not None else None,
'is_stale': is_stale,
'is_ready': proc.status in ['SUCCESS', 'FAILURE'],
'proc': proc
}
return {key: value for key, value in data.items() if value is not None}
def _update_and_render(upload: users.Upload) -> dict:
"""
If the given upload as a processing state attached, it will attempt to update this
state and store the results, before the upload is rendered for the client.
"""
proc, is_stale = _updated_proc(upload)
return _render(upload, proc, is_stale)
class Uploads(Resource):
class UploadsRes(Resource):
def get(self):
return [_update_and_render(user) for user in users.Upload.objects()], 200
return [upload.json_dict for upload in Upload.user_uploads(me)], 200
def post(self):
json_data = request.get_json()
if json_data is None:
json_data = {}
upload = users.Upload(user=me, name=json_data.get('name'))
upload.save()
upload.presigned_url = files.get_presigned_upload_url(upload.upload_id)
upload.create_time = datetime.now()
upload.proc = UploadProc(upload.upload_id)
upload.save()
return _update_and_render(upload), 200
return Upload.create(user=me, name=json_data.get('name', None)).json_dict, 200
class Upload(Resource):
class UploadRes(Resource):
def get(self, upload_id):
try:
upload = users.Upload.objects(id=upload_id).first()
except mongoengine.errors.ValidationError:
return Upload.get(upload_id=upload_id).json_dict, 200
except InvalidId:
abort(400, message='%s is not a valid upload id.' % upload_id)
if upload is None:
except KeyError:
abort(404, message='Upload with id %s does not exist.' % upload_id)
return _update_and_render(upload), 200
def delete(self, upload_id):
logger = get_logger(__name__, upload_id=upload_id, endpoint='upload', action='delete')
try:
upload = users.Upload.objects(id=upload_id).first()
except mongoengine.errors.ValidationError:
return Upload.get(upload_id=upload_id).delete().json_dict, 200
except InvalidId:
abort(400, message='%s is not a valid upload id.' % upload_id)
if upload is None:
except KeyError:
abort(404, message='Upload with id %s does not exist.' % upload_id)
proc, is_stale = _updated_proc(upload)
if not (proc.ready() or is_stale or proc.current_task_name == 'uploading'):
abort(400, message='%s has not finished processing.' % upload_id)
with lnr(logger, 'Delete upload file'):
try:
files.Upload(upload.upload_id).delete()
except KeyError:
if upload.proc['current_task_name'] == 'uploading':
logger.debug('Upload exist, but file does not exist. It was probably aborted and deleted.')
else:
logger.debug('Upload exist, but uploaded file does not exist.')
if proc.upload_hash is not None:
with lnr(logger, 'Deleting calcs'):
Calc.delete_all(upload_id=proc.upload_id)
with lnr(logger, 'Deleting user upload'):
upload.delete()
return _render(upload, proc, is_stale), 200
except NotAllowedDuringProcessing:
abort(400, message='You must not delete an upload during processing.')
class RepoCalc(Resource):
class RepoCalcRes(Resource):
def get(self, upload_hash, calc_hash):
try:
return Calc.get(id='%s/%s' % (upload_hash, calc_hash)).json_dict, 200
......@@ -161,7 +67,7 @@ class RepoCalc(Resource):
abort(500, message=str(e))
class RepoCalcs(Resource):
class RepoCalcsRes(Resource):
def get(self):
logger = get_logger(__name__, endpoint='repo', action='get')
......@@ -195,7 +101,7 @@ def get_calc(upload_hash, calc_hash):
archive_id = '%s/%s' % (upload_hash, calc_hash)
try:
url = _external_objects_url(files.archive_url(archive_id))
url = files.external_objects_url(files.archive_url(archive_id))
return redirect(url, 302)
except KeyError:
abort(404, message='Archive %s does not exist.' % archive_id)
......@@ -204,10 +110,10 @@ def get_calc(upload_hash, calc_hash):
abort(500, message='Could not accessing the archive.')
api.add_resource(Uploads, '%s/uploads' % base_path)
api.add_resource(Upload, '%s/uploads/<string:upload_id>' % base_path)
api.add_resource(RepoCalcs, '%s/repo' % base_path)
api.add_resource(RepoCalc, '%s/repo/<string:upload_hash>/<string:calc_hash>' % base_path)
api.add_resource(UploadsRes, '%s/uploads' % base_path)
api.add_resource(UploadRes, '%s/uploads/<string:upload_id>' % base_path)
api.add_resource(RepoCalcsRes, '%s/repo' % base_path)
api.add_resource(RepoCalcRes, '%s/repo/<string:upload_hash>/<string:calc_hash>' % base_path)
if __name__ == '__main__':
......
......@@ -13,27 +13,37 @@
# limitations under the License.
"""
The interface towards our search engine. It allows to store calculations as documents
of search relevant properties.
..autoclass:: nomad.search.Calc:
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files
..autoclass:: nomad.data.Calc
..autoclass:: nomad.data.Upload
..autoclass:: nomad.data.DataSet
..autoclass:: nomad.data.User
"""
from typing import List
import sys
from datetime import datetime
import elasticsearch.exceptions
from elasticsearch_dsl import Document, Date, Keyword, Search, connections
from elasticsearch_dsl import Document as ElasticDocument, Date, Keyword, Search, connections
from mongoengine import \
Document, EmailField, StringField, BooleanField, DateTimeField, \
ListField, DictField, ReferenceField, connect
import mongoengine.errors
from nomad import config, files
from nomad.processing import UploadProc, CalcProc
from nomad.parsing import LocalBackend
from nomad.utils import get_logger
from nomad.utils import get_logger, lnr
logger = get_logger(__name__)
# ensure elastic connection
# ensure elastic and mongo connections
if 'sphinx' not in sys.modules:
client = connections.create_connection(hosts=[config.elastic.host])
connect(db=config.mongo.users_db, host=config.mongo.host)
key_mappings = {
'basis_set_type': 'program_basis_set_type',
......@@ -44,7 +54,13 @@ key_mappings = {
class AlreadyExists(Exception): pass
class Calc(Document):
class InvalidId(Exception): pass
class NotAllowedDuringProcessing(Exception): pass
class Calc(ElasticDocument):
"""
Instances of this class represent calculations. This class manages the elastic
search index entry, files, and archive for the respective calculation.
......@@ -155,7 +171,7 @@ class Calc(Document):
# persist to elastic search
try:
calc.save(op_type='create')
except Exception as e:
except Exception:
raise AlreadyExists('Calculation %s does already exist.' % (calc.archive_id))
# persist the archive
......@@ -197,36 +213,176 @@ if 'sphinx' not in sys.modules:
raise e
# Taken from the IndexManifest (scala, NOMAD-coe)
#
# //calculation identifiers (these are treated separately in the importer)
# IndexEntry("calculation_gid"), +++
# IndexEntry("archive_gid"), +++
# //calculation data
# IndexEntry("main_file_uri"), +++
# IndexEntry("calculation_uploader_name"),
# IndexEntry("calculation_upload_date"),
# IndexEntry("calculation_pid"),
# IndexEntry("program_name"), +++ + version
# IndexEntry("stats_meta_present"),
# //system related data
# IndexEntry("atom_species"), // from normaliser stats +++ DONE
# IndexEntry("system_composition"), // from normaliser springer
# IndexEntry("system_reweighted_composition"), // ???
# IndexEntry("system_type"), // from normaliser system-type +++ DONE
# IndexEntry("crystal_system"), // from normaliser symmetry +++ DONE
# IndexEntry("space_group_number"), // from normaliser symmetry +++ DONE
# IndexEntry("springer_id"), // from normaliser springer
# IndexEntry("springer_classification"),// from normaliser springer
# IndexEntry("configuration_raw_gid"), // from normaliser stats DONE
# //computational setup
# IndexEntry("electronic_structure_method"), +++
# IndexEntry("XC_functional_name"), // from normaliser stats does XC_functional_type +++ DONE
# IndexEntry("program_basis_set_type"), +++
# //data related to other projects ingested into Nomad(e.g. AFlowLib)
# IndexEntry("prototype_aflow_id"), // from normalizer prototypes
# IndexEntry("prototype_aflow_url"), // from normalizer prototypes
# //auxiliary data of the query
# IndexEntry("number_of_single_configurations", Some(intField), Some { calc =>
# Seq(calc.sectionTable(Seq("section_run", "section_system")).lengthL)
# })
class User(Document):
""" Represents users in the database. """
email = EmailField(primary=True)
name = StringField()
class Upload(Document):
"""
Represents uploads in the databases. Provides persistence access to the files storage,
and processing state.
Attributes:
file_name: Optional user provided upload name
upload_id: The upload id. Generated by the database.
in_staging: True if the upload is still in staging and can be edited by the uploader.
is_private: True if the upload and its derivitaves are only visible to the uploader.
proc: The :class:`nomad.processing.UploadProc` that holds the processing state.
created_time: The timestamp this upload was created.
upload_time: The timestamp when the system realised the upload.
proc_time: The timestamp when the processing realised finished by the system.
"""
name = StringField(default=None)
in_staging = BooleanField(default=True)
is_private = BooleanField(default=False)
presigned_url = StringField()
upload_time = DateTimeField()
create_time = DateTimeField()
proc_time = DateTimeField()
proc = DictField()
user = ReferenceField(User, required=True)
meta = {
'indexes': [
'proc.upload_hash',
'user'
]
}
@staticmethod
def user_uploads(user: User) -> List['Upload']:
""" Returns all uploads for the given user. Currently returns all uploads. """
return [upload.update_proc() for upload in Upload.objects()]
@staticmethod
def get(upload_id: str) -> 'Upload':
try:
upload = Upload.objects(id=upload_id).first()
except mongoengine.errors.ValidationError:
raise InvalidId('Invalid upload id')
if upload is None:
raise KeyError('Upload does not exist')
upload.update_proc()
return upload
def logger(self, **kwargs):
return get_logger(__name__, upload_id=self.upload_id, cls='Upload', **kwargs)
def delete(self) -> 'Upload':
logger = self.logger(action='delete')
self.update_proc()
if not (self.is_ready or self.is_stale or self._proc.current_task_name == 'uploading'):
raise NotAllowedDuringProcessing()
with lnr(logger, 'Delete upload file'):
try:
files.Upload(self.upload_id).delete()
except KeyError:
if self._proc.current_task_name == 'uploading':
logger.debug('Upload exist, but file does not exist. It was probably aborted and deleted.')
else:
logger.debug('Upload exist, but uploaded file does not exist.')
if self._proc.upload_hash is not None:
with lnr(logger, 'Deleting calcs'):
Calc.delete_all(upload_id=self.upload_id)
with lnr(logger, 'Deleting upload'):
super().delete()
return self
@staticmethod
def create(user: User, name: str=None) -> 'Upload':
"""
Creates a new upload for the given user, a user given name is optional.
It will populate the record with a signed url and pending :class:`UploadProc`.
The upload will be already saved to the database.
"""
upload = Upload(user=user, name=name)
upload.save()
upload.presigned_url = files.get_presigned_upload_url(upload.upload_id)
upload.create_time = datetime.now()
upload.proc = UploadProc(upload.upload_id)
upload.save()
upload.update_proc()
return upload
@property
def upload_id(self) -> str:
return self.id.__str__()
@property
def is_stale(self) -> bool:
proc = self._proc
if proc.current_task_name == proc.task_names[0] and self.upload_time is None:
return (datetime.now() - self.create_time).days > 1
else:
return False
@property
def is_ready(self) -> bool:
return self._proc.status in ['SUCCESS', 'FAILURE']
@property
def _proc(self):
""" Cast the internal mongo dict to an actual :class:`UploadProc` instance. """
# keep the instance cached
if '__proc' not in self.__dict__:
self.__dict__['__proc'] = UploadProc(**self.proc)
return self.__dict__['__proc']
def update_proc(self) -> 'Upload':
""" Updates this instance with information from the celery results backend. """
if self._proc.update_from_backend():
self.proc = self._proc
self.save()
return self
@property
def json_dict(self) -> dict:
""" A json serializable dictionary representation. """
data = {
'name': self.name,
'upload_id': self.upload_id,
'presigned_url': files.external_objects_url(self.presigned_url),
'create_time': self.create_time.isoformat() if self.create_time is not None else None,
'upload_time': self.upload_time.isoformat() if self.upload_time is not None else None,
'proc_time': self.proc_time.isoformat() if self.proc_time is not None else None,
'is_stale': self.is_stale,
'is_ready': self.is_ready,
'proc': self._proc
}
return {key: value for key, value in data.items() if value is not None}
class DataSet(Document):
name = StringField()
description = StringField()
doi = StringField()
user = ReferenceField(User)
calcs = ListField(StringField)
meta = {
'indexes': [
'user',
'doi',
'calcs'
]
}
......@@ -331,3 +331,15 @@ def delete_archive(archive_id: str):
_client.remove_object(bucket, archive_id)
except minio.error.NoSuchKey:
raise KeyError()
def external_objects_url(url):
""" Replaces the given internal object storage url (minio) with an URL that allows
external access. """
port_with_colon = ''
if config.services.objects_port > 0:
port_with_colon = ':%d' % config.services.objects_port
return url.replace(
'%s:%s' % (config.minio.host, config.minio.port),
'%s%s%s' % (config.services.objects_host, port_with_colon, config.services.objects_base_path))
......@@ -15,7 +15,7 @@
from datetime import datetime
from threading import Thread
from nomad import files, utils, users
from nomad import files, utils
from nomad.processing.tasks import extracting_task, cleanup_task, parse_all_task
from nomad.processing.state import UploadProc
......@@ -64,14 +64,16 @@ def handle_uploads(quit=False):
@files.upload_put_handler
def handle_upload_put(received_upload_id: str):
from nomad.data import Upload
logger = get_logger(__name__, upload_id=received_upload_id)
logger.debug('Initiate upload processing')
try:
with lnr(logger, 'Could not load'):
upload = users.Upload.objects(id=received_upload_id).first()
if upload is None:
logger.error('Upload does not exist')
raise Exception()
try:
upload = Upload.get(upload_id=received_upload_id)
except KeyError as e:
logger.error('Upload does not exist')
raise e
if upload.upload_time is not None:
logger.warn('Ignore upload notification, since file is already uploaded')
......@@ -88,7 +90,7 @@ def handle_uploads(quit=False):
upload.save()
except Exception:
pass
logger.error('Exception while handling upload put notification.', exc_info=e)
if quit:
raise StopIteration
......
......@@ -18,7 +18,6 @@ from celery.canvas import Signature
from datetime import datetime
from nomad import files, utils
from nomad.data import Calc
from nomad.parsing import parsers, parser_dict
from nomad.normalizing import normalizers
import nomad.patch # pylint: disable=unused-import
......@@ -66,6 +65,7 @@ def extracting_task(task: Task, proc: UploadProc) -> UploadProc:
proc.fail(e)
return proc
from nomad.data import Calc
if Calc.upload_exists(proc.upload_hash):
logger.info('Upload hash doublet')
proc.fail('The same file was already uploaded and processed.')
......@@ -185,6 +185,7 @@ def parse_task(self, proc: CalcProc, upload_proc: UploadProc) -> CalcProc:
# update search
proc.continue_with('archiving')
try:
from nomad.data import Calc
Calc.create_from_backend(
parser_backend,
upload_hash=upload_hash,
......
......@@ -14,7 +14,8 @@ services_config = config.services._asdict()
services_config.update(api_base_path='')
config.services = config.NomadServicesConfig(**services_config)
from nomad import api, files, processing, users # noqa
from nomad import api, files, processing # noqa
from nomad.data import Upload # noqa
from tests.test_processing import example_files # noqa
from tests.test_files import assert_exists # noqa
......@@ -36,7 +37,7 @@ def client():
client = api.app.test_client()
yield client
users.Upload._get_collection().drop()
Upload._get_collection().drop()
def assert_uploads(upload_json_str, count=0, **kwargs):
......@@ -87,7 +88,7 @@ def test_stale_upload(client):
assert rv.status_code == 200
upload_id = assert_upload(rv.data)['upload_id']
upload = users.Upload.objects(id=upload_id).first()
upload = Upload.get(upload_id=upload_id)
upload.create_time = datetime.now() - timedelta(days=2)
upload.save()
......@@ -157,7 +158,8 @@ def test_upload_to_upload(client, file):
handle_uploads_thread.join()
assert_exists(config.files.uploads_bucket, upload['upload_id'])
upload_id = upload['upload_id']
assert_exists(config.files.uploads_bucket, upload_id)
@pytest.mark.parametrize("file", example_files)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment