Skip to content
Snippets Groups Projects
Commit 866c9b71 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

More experiments with celery canvas.

parent 3e68f100
No related branches found
No related tags found
No related merge requests found
......@@ -47,7 +47,7 @@ def create_curl_upload_cmd(presigned_url):
return 'curl -X PUT "%s" -H "Content-Type: application/octet-steam" -F file=@<ZIPFILE>' % presigned_url
def upload(upload_id):
return Upload(upload_id)
return Upload(upload_id)
def upload_put_handler(func):
def wrapper(*args, **kwargs):
......@@ -78,22 +78,25 @@ class Upload():
self.upload_id = upload_id
self.upload_file = '%s/uploads/%s.zip' % (config.fs.tmp, upload_id)
self.upload_extract_dir = '%s/uploads_extracted/%s' % (config.fs.tmp, upload_id)
self._zipFile = None
self.filelist = None
def __enter__(self):
def open(self):
_client.fget_object(config.s3.uploads_bucket, self.upload_id, self.upload_file)
self._zipFile = ZipFile(self.upload_file)
self._zipFile.extractall(self.upload_extract_dir)
return self
zipFile = ZipFile(self.upload_file)
zipFile.extractall(self.upload_extract_dir)
self.filelist = [zipInfo.filename for zipInfo in zipFile.filelist]
zipFile.close()
def __exit__(self, exc_type, exc, exc_tb):
self._zipFile.close()
def close(self):
os.remove(self.upload_file)
shutil.rmtree(self.upload_extract_dir)
@property
def filelist(self):
return [zipInfo.filename for zipInfo in self._zipFile.filelist]
def __enter__(self):
self.open()
return self
def open(self, filename, *args, **kwargs):
def __exit__(self, exc_type, exc, exc_tb):
self.close()
def open_file(self, filename, *args, **kwargs):
return open('%s/%s' % (self.upload_extract_dir, filename), *args, **kwargs)
......@@ -12,15 +12,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from celery import Celery, chain, chord, group
from celery import Celery, group, subtask
import re
import logging
import nomad.config as config
import nomad.files as files
broker_url = 'pyamqp://%s:%s@localhost//' % (config.rabbitmq.user, config.rabbitmq.password)
backend_url = 'rpc://localhost'
app = Celery('nomad.processing', backend=backend_url, broker=broker_url)
app.conf.update(
accept_content = ['pickle'],
task_serializer = 'pickle',
result_serializer = 'pickle',
)
LOGGER = logging.getLogger(__name__)
class Parser():
"""
......@@ -32,13 +39,18 @@ class Parser():
self._main_file_re = re.compile(main_file_re)
self._main_contents_re = re.compile(main_contents_re)
def matches(self, upload, filename):
def is_mainfile(self, upload, filename):
if self._main_file_re.match(filename):
file = None
try:
file = upload.open(filename)
file = upload.open_file(filename)
return self._main_contents_re.match(file.read(500))
finally:
file.close()
if file:
file.close()
def run(self, upload, filename):
pass
parsers = [
......@@ -55,22 +67,43 @@ parsers = [
)
]
parser_dict = { parser.name: parser for parser in parsers }
@app.task()
def process(upload_id):
mainfiles = list()
with files.upload(upload_id) as upload:
for filename in upload.filelist:
for parser in parsers:
if parser.matches(upload, filename):
mainfiles.append((filename, parser.name))
def find_mainfiles(upload):
mainfile_specs = list()
for filename in upload.filelist:
for parser in parsers:
if parser.is_mainfile(upload, filename):
mainfile_specs.append((upload, filename, parser.name))
return group([parse.s(mainfile, parser) for mainfile, parser in mainfiles]).delay()
return mainfile_specs
@app.task()
def open_upload(upload_id):
upload = files.upload(upload_id)
upload.open()
return upload
@app.task()
def parse(mainfile, parser):
return 'parsed %s with %s' % (mainfile, parser)
def close_upload(upload):
upload.close()
@app.task()
def parse(mainfile_spec):
upload, mainfile, parser = mainfile_spec
LOGGER.debug('Start parsing mainfile %s/%s with %s.' % (upload, mainfile, parser))
parser_dict[parser].run(upload, mainfile)
return True
@app.task()
def dmap(it, callback):
callback = subtask(callback)
return group(callback.clone([arg,]) for arg in it)()
if __name__ == '__main__':
print(~process.s('examples_vasp.zip'))
upload_id = 'examples_vasp.zip'
parsing_workflow = (open_upload.s(upload_id) | find_mainfiles.s() | dmap.s(parse.s()))
print(~parsing_workflow)
......@@ -39,7 +39,7 @@ class FilesTests(TestCase):
# now just try to open the first file (not directory), without error
for filename in upload.filelist:
if filename.endswith('.xml'):
upload.open(filename).close()
upload.open_file(filename).close()
break
def test_upload_notification(self):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment