Commit 112cbcec authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Enabled pep8 checks and made corrections.

parent 7112878f
{
"python.venvPath": "${workspaceFolder}/.pyenv",
"python.pythonPath": "${workspaceFolder}/.pyenv/bin/python",
"git.ignoreLimitWarning": true,
"editor.rulers": [90]
"python.venvPath": "${workspaceFolder}/.pyenv",
"python.pythonPath": "${workspaceFolder}/.pyenv/bin/python",
"git.ignoreLimitWarning": true,
"editor.rulers": [90],
"editor.renderWhitespace": "all",
"editor.tabSize": 4,
"files.trimTrailingWhitespace": true,
"git.enableSmartCommit": true,
"eslint.autoFixOnSave": true,
"python.linting.pylintArgs": [
"--disable=all",
"--enable=F,E,unreachable,duplicate-key,unnecessary-semicolon,global-variable-not-assigned,unused-variable,binary-op-exception,bad-format-string,anomalous-backslash-in-string,bad-open-mode,unused-import"
],
"python.linting.pep8Enabled": true,
"python.linting.pep8Args": ["--ignore=E501"]
}
\ No newline at end of file
......@@ -15,27 +15,27 @@
version: '3'
services:
minio:
restart: always
image: minio/minio:RELEASE.2018-06-08T03-49-38Z
# image: minio/minio
ports:
- 9007:9000
volumes:
- ./data/minio:/data
- ./config/minio:/root/.minio
environment:
- "MINIO_ACCESS_KEY=AKIAIOSFODNN7EXAMPLE"
- "MINIO_SECRET_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
command: server /data
minio:
restart: always
image: minio/minio:RELEASE.2018-06-08T03-49-38Z
# image: minio/minio
ports:
- 9007:9000
volumes:
- ./data/minio:/data
- ./config/minio:/root/.minio
environment:
- "MINIO_ACCESS_KEY=AKIAIOSFODNN7EXAMPLE"
- "MINIO_SECRET_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
command: server /data
rabbitmq:
image: rabbitmq
hostname: "rabbitmq"
environment:
- "RABBITMQ_ERLANG_COOKIE=SWQOKODSQALRPCLNMEQG"
- "RABBITMQ_DEFAULT_USER=rabbitmq"
- "RABBITMQ_DEFAULT_PASS=rabbitmq"
- "RABBITMQ_DEFAULT_VHOST=/"
ports:
- 5672:5672
rabbitmq:
image: rabbitmq
hostname: "rabbitmq"
environment:
- "RABBITMQ_ERLANG_COOKIE=SWQOKODSQALRPCLNMEQG"
- "RABBITMQ_DEFAULT_USER=rabbitmq"
- "RABBITMQ_DEFAULT_PASS=rabbitmq"
- "RABBITMQ_DEFAULT_VHOST=/"
ports:
- 5672:5672
......@@ -20,22 +20,22 @@ MinioConfig = namedtuple('Minio', ['host', 'port', 'accesskey', 'secret'])
FSConfig = namedtuple('FSConfig', ['tmp'])
s3 = S3Config(
uploads_bucket='uploads',
repository_bucket='repository',
archive_bucket='archive'
uploads_bucket='uploads',
repository_bucket='repository',
archive_bucket='archive'
)
rabbitmq = RabitMQConfig(
host = 'localhost',
port = None,
user = 'rabbitmq',
password = 'rabbitmq'
host = 'localhost',
port = None,
user = 'rabbitmq',
password = 'rabbitmq'
)
minio = MinioConfig(
host = 'localhost',
port = 9007,
accesskey = 'AKIAIOSFODNN7EXAMPLE',
secret = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'
host = 'localhost',
port = 9007,
accesskey = 'AKIAIOSFODNN7EXAMPLE',
secret = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'
)
fs = FSConfig(
tmp = './infrastructure/data/tmp'
tmp = './infrastructure/data/tmp'
)
......@@ -13,7 +13,9 @@
# limitations under the License.
"""
This module (and its main class :class:`Files`) represents an abstraction for NOMAD file storage system.
This module (and its main class :class:`Files`) represents an abstraction for NOMAD
file storage system.
Responsibilities: create, access files; create, receive, notify on, and access uploads.
"""
import os
......@@ -27,75 +29,89 @@ import nomad.config as config
logger = logging.getLogger(__name__)
_client = Minio('%s:%s' % (config.minio.host, config.minio.port),
access_key=config.minio.accesskey,
secret_key=config.minio.secret,
secure=False)
_client = Minio('%s:%s' % (config.minio.host, config.minio.port),
access_key=config.minio.accesskey,
secret_key=config.minio.secret,
secure=False)
# ensure buckets exist
# ensure all neccessary buckets exist
try:
_client.make_bucket(bucket_name=config.s3.uploads_bucket)
logger.info("Created uploads bucket with name %s." % config.s3.uploads_bucket)
_client.make_bucket(bucket_name=config.s3.uploads_bucket)
logger.info("Created uploads bucket with name %s." % config.s3.uploads_bucket)
except BucketAlreadyOwnedByYou:
logger.debug("Uploads bucket with name %s already existed." % config.s3.uploads_bucket)
logger.debug("Uploads bucket with name %s already existed." % config.s3.uploads_bucket)
def get_presigned_upload_url(upload_id):
return _client.presigned_put_object(config.s3.uploads_bucket, upload_id)
return _client.presigned_put_object(config.s3.uploads_bucket, upload_id)
def create_curl_upload_cmd(presigned_url, file_dummy='<ZIPFILE>'):
headers = 'Content-Type: application/octet-steam'
return 'curl -X PUT "%s" -H "%s" -F file=@%s' % (presigned_url, headers, file_dummy)
def create_curl_upload_cmd(presigned_url):
return 'curl -X PUT "%s" -H "Content-Type: application/octet-steam" -F file=@<ZIPFILE>' % presigned_url
def upload(upload_id):
return Upload(upload_id)
return Upload(upload_id)
def upload_put_handler(func):
def wrapper(*args, **kwargs):
logger.info('Start listening to uploads notifications.')
events = _client.listen_bucket_notification(config.s3.uploads_bucket)
# The given events is a generator that will block and yield indefinetely.
for event in events:
for notification in event['Records']:
event_name = notification['eventName']
if event_name == 's3:ObjectCreated:Put':
upload_id = notification['s3']['object']['key']
try:
func(upload_id)
except StopIteration:
logging.debug('Handling of upload notifications was stopped via StopIteration.')
return
except Exception as e:
logger.error('Unexpected exception in uploads notification handler for notification: %s.' % notification, exc_info=e)
else:
logger.debug('Unhandled bucket event of type %s.' % event_name)
return wrapper
def upload_notifications(events):
notifications = [event['Records'] for event in events]
for notification in notifications:
event_name = notification['eventName']
if event_name == 's3:ObjectCreated:Put':
upload_id = notification['s3']['object']['key']
yield upload_id
else:
logger.debug('Unhandled bucket event of type %s.' % event_name)
def wrapper(*args, **kwargs):
logger.info('Start listening to uploads notifications.')
# The given events is a generator that will block and yield indefinetely.
events = _client.listen_bucket_notification(config.s3.uploads_bucket)
notifications = upload_notifications(events)
for upload_id in notifications:
try:
func(upload_id)
except StopIteration:
logging.debug(
'Handling of upload notifications was stopped via StopIteration.')
return
except Exception as e:
logger.error(
'Unexpected exception in upload handler for upload:id:' % upload_id,
exc_info=e)
return wrapper
class Upload():
def __init__(self, upload_id):
self.upload_id = upload_id
self.upload_file = '%s/uploads/%s.zip' % (config.fs.tmp, upload_id)
self.upload_extract_dir = '%s/uploads_extracted/%s' % (config.fs.tmp, upload_id)
self.filelist = None
def open(self):
_client.fget_object(config.s3.uploads_bucket, self.upload_id, self.upload_file)
zipFile = ZipFile(self.upload_file)
zipFile.extractall(self.upload_extract_dir)
self.filelist = [zipInfo.filename for zipInfo in zipFile.filelist]
zipFile.close()
def close(self):
os.remove(self.upload_file)
shutil.rmtree(self.upload_extract_dir)
def __enter__(self):
self.open()
return self
def __exit__(self, exc_type, exc, exc_tb):
self.close()
def open_file(self, filename, *args, **kwargs):
return open('%s/%s' % (self.upload_extract_dir, filename), *args, **kwargs)
def __init__(self, upload_id):
self.upload_id = upload_id
self.upload_file = '%s/uploads/%s.zip' % (config.fs.tmp, upload_id)
self.upload_extract_dir = '%s/uploads_extracted/%s' % (config.fs.tmp, upload_id)
self.filelist = None
def open(self):
_client.fget_object(config.s3.uploads_bucket, self.upload_id, self.upload_file)
zipFile = ZipFile(self.upload_file)
zipFile.extractall(self.upload_extract_dir)
self.filelist = [zipInfo.filename for zipInfo in zipFile.filelist]
zipFile.close()
def close(self):
os.remove(self.upload_file)
shutil.rmtree(self.upload_extract_dir)
def __enter__(self):
self.open()
return self
def __exit__(self, exc_type, exc, exc_tb):
self.close()
def open_file(self, filename, *args, **kwargs):
return open('%s/%s' % (self.upload_extract_dir, filename), *args, **kwargs)
......@@ -22,17 +22,18 @@ broker_url = 'pyamqp://%s:%s@localhost//' % (config.rabbitmq.user, config.rabbit
backend_url = 'rpc://localhost'
app = Celery('nomad.processing', backend=backend_url, broker=broker_url)
app.conf.update(
accept_content = ['pickle'],
task_serializer = 'pickle',
result_serializer = 'pickle',
accept_content=['pickle'],
task_serializer='pickle',
result_serializer='pickle',
)
LOGGER = logging.getLogger(__name__)
class Parser():
"""
Instances specify a parser. It allows to find *main files* from given uploaded
and extracted files. Further, allows to run the parser on those 'main files'.
Instances specify a parser. It allows to find *main files* from given uploaded
and extracted files. Further, allows to run the parser on those 'main files'.
"""
def __init__(self, name, main_file_re, main_contents_re):
self.name = name
......@@ -67,7 +68,8 @@ parsers = [
)
]
parser_dict = { parser.name: parser for parser in parsers }
parser_dict = {parser.name: parser for parser in parsers}
@app.task()
def find_mainfiles(upload):
......@@ -79,16 +81,19 @@ def find_mainfiles(upload):
return mainfile_specs
@app.task()
def open_upload(upload_id):
upload = files.upload(upload_id)
upload.open()
return upload
@app.task()
def close_upload(upload):
upload.close()
@app.task()
def parse(mainfile_spec):
upload, mainfile, parser = mainfile_spec
......@@ -97,13 +102,14 @@ def parse(mainfile_spec):
return True
@app.task()
def dmap(it, callback):
callback = subtask(callback)
return group(callback.clone([arg,]) for arg in it)()
return group(callback.clone([arg, ]) for arg in it)()
if __name__ == '__main__':
upload_id = 'examples_vasp.zip'
parsing_workflow = (open_upload.s(upload_id) | find_mainfiles.s() | dmap.s(parse.s()))
print(~parsing_workflow)
import unittest
from minio import ResponseError
import requests
import logging
import time
from unittest import TestCase
from threading import Thread
import subprocess
......@@ -13,52 +11,56 @@ import nomad.config as config
test_upload_id = '__test_upload_id'
def upload_test_file():
example_file = './data/examples_vasp.zip'
cmd = files.create_curl_upload_cmd(files.get_presigned_upload_url(test_upload_id)).replace('<ZIPFILE>', example_file)
subprocess.call(shlex.split(cmd))
example_file = './data/examples_vasp.zip'
upload_url = files.get_presigned_upload_url(test_upload_id)
cmd = files.create_curl_upload_cmd(upload_url).replace('<ZIPFILE>', example_file)
subprocess.call(shlex.split(cmd))
class FilesTests(TestCase):
def tearDown(self):
try:
files._client.remove_object(config.s3.uploads_bucket, test_upload_id)
except ResponseError: pass
def tearDown(self):
try:
files._client.remove_object(config.s3.uploads_bucket, test_upload_id)
except ResponseError:
pass
def test_presigned_url(self):
url = files.get_presigned_upload_url(test_upload_id)
def test_presigned_url(self):
url = files.get_presigned_upload_url(test_upload_id)
self.assertIsNotNone(url)
self.assertIsInstance(url, str)
self.assertIsNotNone(url)
self.assertIsInstance(url, str)
def test_upload(self):
upload_test_file()
def test_upload(self):
upload_test_file()
with files.upload(test_upload_id) as upload:
self.assertEqual(106, len(upload.filelist))
# now just try to open the first file (not directory), without error
for filename in upload.filelist:
if filename.endswith('.xml'):
upload.open_file(filename).close()
break
with files.upload(test_upload_id) as upload:
self.assertEqual(106, len(upload.filelist))
# now just try to open the first file (not directory), without error
for filename in upload.filelist:
if filename.endswith('.xml'):
upload.open_file(filename).close()
break
def test_upload_notification(self):
@files.upload_put_handler
def handle_upload_put(upload_id):
self.assertEqual(test_upload_id, upload_id)
raise StopIteration
def test_upload_notification(self):
@files.upload_put_handler
def handle_upload_put(upload_id):
self.assertEqual(test_upload_id, upload_id)
raise StopIteration
def handle_uploads():
handle_upload_put(upload_id='provided by decorator')
def handle_uploads():
handle_upload_put(upload_id='provided by decorator')
handle_uploads_thread = Thread(target=handle_uploads)
handle_uploads_thread.start()
handle_uploads_thread = Thread(target=handle_uploads)
handle_uploads_thread.start()
upload_test_file()
upload_test_file()
handle_uploads_thread.join()
handle_uploads_thread.join()
if __name__ == '__main__':
logging.basicConfig(level=logging.WARNING)
unittest.main()
logging.basicConfig(level=logging.WARNING)
unittest.main()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment