Commit 8918e397 authored by Alvin Noe Ladines's avatar Alvin Noe Ladines
Browse files

Added msgpacked function in uploads

parent 64a67b2e
Pipeline #69809 passed with stages
in 20 minutes and 6 seconds
...@@ -19,9 +19,11 @@ from mongoengine import Q ...@@ -19,9 +19,11 @@ from mongoengine import Q
from pymongo import UpdateOne from pymongo import UpdateOne
import threading import threading
import elasticsearch_dsl as es import elasticsearch_dsl as es
import json
from nomad import processing as proc, config, infrastructure, utils, search, files, datamodel from nomad import processing as proc, config, infrastructure, utils, search, files, datamodel
from .admin import admin from .admin import admin
from nomad.archive import write_archive
@admin.group(help='Upload related commands') @admin.group(help='Upload related commands')
...@@ -231,6 +233,30 @@ def rm(ctx, uploads, skip_es, skip_mongo, skip_files): ...@@ -231,6 +233,30 @@ def rm(ctx, uploads, skip_es, skip_mongo, skip_files):
delete_upload(upload, skip_es=skip_es, skip_mongo=skip_mongo, skip_files=skip_files) delete_upload(upload, skip_es=skip_es, skip_mongo=skip_mongo, skip_files=skip_files)
@uploads.command(help='Create msgpack file for upload')
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
def msgpacked(ctx, uploads):
_, uploads = query_uploads(ctx, uploads)
for upload in uploads:
upload_files = files.UploadFiles.get(upload_id=upload.upload_id)
if isinstance(upload_files, files.PublicUploadFiles):
def iterator(zf, names):
for name in names:
calc_id = name.strip('.json')
with zf.open(name) as f:
yield (calc_id, json.load(f))
for access in ['public', 'restricted']:
zf = upload_files.open_zip_file('archive', access, upload_files._archive_ext)
archive_name = zf.filename.replace('.zip', '.msg')
names = [name for name in zf.namelist() if name.endswith('json')]
print('writing msgpack archive %s' % archive_name)
write_archive(archive_name, len(names), iterator(zf, names))
def __run_processing( def __run_processing(
ctx, uploads, parallel: int, process: Callable[[proc.Upload], None], label: str): ctx, uploads, parallel: int, process: Callable[[proc.Upload], None], label: str):
_, uploads = query_uploads(ctx, uploads) _, uploads = query_uploads(ctx, uploads)
......
...@@ -134,6 +134,15 @@ class TestAdminUploads: ...@@ -134,6 +134,15 @@ class TestAdminUploads:
assert Upload.objects(upload_id=upload_id).first() is None assert Upload.objects(upload_id=upload_id).first() is None
assert Calc.objects(upload_id=upload_id).first() is None assert Calc.objects(upload_id=upload_id).first() is None
def test_create_msgpack(self, published):
upload_id = published.upload_id
result = click.testing.CliRunner().invoke(
cli, ['admin', 'uploads', 'msgpacked', upload_id], catch_exceptions=False, obj=utils.POPO())
assert result.exit_code == 0
assert 'writing msgpack archive' in result.stdout
def test_index(self, published): def test_index(self, published):
upload_id = published.upload_id upload_id = published.upload_id
calc = Calc.objects(upload_id=upload_id).first() calc = Calc.objects(upload_id=upload_id).first()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment