Commit 3e68f100 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added main file detection part of processing.

parent 2d82a1d9
......@@ -16,6 +16,11 @@ docker-compose build
docker-compose up
```
Optionally register the infrastructue minio host to the minio client (mc).
```
mc config host add minio http://localhost:9007 AKIAIOSFODNN7EXAMPLE wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
```
### Run the celery worker (should be moved to docker TODO)
```
......@@ -23,6 +28,15 @@ celery -A nomad.processing worker -l info
```
You can use different debug level (e.g. switch `info` to `debug`)
Use watchdog during development. Install (i.e. [fixed](https://github.com/gorakhargosh/watchdog/issues/330) version fo MacOS)
```
pip install git+https://github.com/gorakhargosh/watchdog.git
```
Now use this to auto relead worker:
```
watchmedo auto-restart -d ./nomad -p '*.py' -- celery worker -l info -A nomad.processing
```
### Run tests.
```
......
......@@ -19,6 +19,7 @@ Responsibilities: create, access files; create, receive, notify on, and access u
import pika
import os
from zipfile import ZipFile
import shutil
from minio import Minio
from minio.error import BucketAlreadyOwnedByYou
import logging
......@@ -76,17 +77,23 @@ class Upload():
def __init__(self, upload_id):
self.upload_id = upload_id
self.upload_file = '%s/uploads/%s.zip' % (config.fs.tmp, upload_id)
self.upload_extract_dir = '%s/uploads_extracted/%s' % (config.fs.tmp, upload_id)
self._zipFile = None
def __enter__(self):
_client.fget_object(config.s3.uploads_bucket, self.upload_id, self.upload_file)
self._zipFile = ZipFile(self.upload_file)
self._zipFile.extractall(self.upload_extract_dir)
return self
def __exit__(self, exc_type, exc, exc_tb):
self._zipFile.close()
os.remove(self.upload_file)
shutil.rmtree(self.upload_extract_dir)
@property
def filelist(self):
return self._zipFile.filelist
return [zipInfo.filename for zipInfo in self._zipFile.filelist]
def open(self, filename, *args, **kwargs):
return open('%s/%s' % (self.upload_extract_dir, filename), *args, **kwargs)
......@@ -13,6 +13,7 @@
# limitations under the License.
from celery import Celery, chain, chord, group
import re
import nomad.config as config
import nomad.files as files
......@@ -21,15 +22,55 @@ backend_url = 'rpc://localhost'
app = Celery('nomad.processing', backend=backend_url, broker=broker_url)
class Parser():
"""
Instances specify a parser. It allows to find *main files* from given uploaded
and extracted files. Further, allows to run the parser on those 'main files'.
"""
def __init__(self, name, main_file_re, main_contents_re):
self.name = name
self._main_file_re = re.compile(main_file_re)
self._main_contents_re = re.compile(main_contents_re)
def matches(self, upload, filename):
if self._main_file_re.match(filename):
try:
file = upload.open(filename)
return self._main_contents_re.match(file.read(500))
finally:
file.close()
parsers = [
Parser(
name='VaspRun',
main_file_re=r'^.*\.xml$',
main_contents_re=(
r'^\s*<\?xml version="1\.0" encoding="ISO-8859-1"\?>\s*'
r'?\s*<modeling>'
r'?\s*<generator>'
r'?\s*<i name="program" type="string">\s*vasp\s*</i>'
r'?'
)
)
]
@app.task()
def process(upload):
mainfiles = [('a', 'pa'), ('b', 'pb'), ('c', 'pc')]
parsers = group([parse.s(mainfile, parser) for mainfile, parser in mainfiles]).delay()
return parsers
def process(upload_id):
mainfiles = list()
with files.upload(upload_id) as upload:
for filename in upload.filelist:
for parser in parsers:
if parser.matches(upload, filename):
mainfiles.append((filename, parser.name))
return group([parse.s(mainfile, parser) for mainfile, parser in mainfiles]).delay()
@app.task()
def parse(mainfile, parser):
return 'parsed %s with %s' % (mainfile, parser)
return 'parsed %s with %s' % (mainfile, parser)
if __name__ == '__main__':
print(~process.s('test'))
print(~process.s('examples_vasp.zip'))
......@@ -36,6 +36,11 @@ class FilesTests(TestCase):
with files.upload(test_upload_id) as upload:
self.assertEqual(106, len(upload.filelist))
# now just try to open the first file (not directory), without error
for filename in upload.filelist:
if filename.endswith('.xml'):
upload.open(filename).close()
break
def test_upload_notification(self):
@files.upload_put_handler
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment