Commit d0c80d3e authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added a new example upload script for parallel upload.

parent 5d6e8e49
"""
This example shows how to read files from many sources (here .tar.gz files),
chunk the data into even sized uploads and upload/process them in parallel.
"""
from bravado.requests_client import RequestsClient
from bravado.client import SwaggerClient
from urllib.parse import urlparse
import time
import os.path
import sys
# config
nomad_url = 'http://labdev-nomad.esc.rzg.mpg.de/fairdi/nomad/testing/api'
user = 'leonard.hofstadter@nomad-fairdi.tests.de'
password = 'password'
approx_upload_size = 1 * 1024 # 32 * 1024^3
parallel_uploads = 6
# create the bravado client
host = urlparse(nomad_url).netloc.split(':')[0]
http_client = RequestsClient()
http_client.set_basic_auth(host, user, password)
client = SwaggerClient.from_url('%s/swagger.json' % nomad_url, http_client=http_client)
def source_generator():
"""
Yields all data sources.
"""
yield os.path.join(os.path.dirname(__file__), 'example-1.tar.gz')
yield os.path.join(os.path.dirname(__file__), 'example-2.tar.gz')
yield os.path.join(os.path.dirname(__file__), 'example-3.tar.gz')
def source_file_generator(source):
"""
Yields [filepath, file] tuples from :func:`source_generator`
"""
pass
def calc_generator(source_file):
pass
def upload_generator(zip_streams):
"""
Yields nomad uploads that are already uploaded, but are still processing.
"""
size = 0
streamed_files: Set[str] = set()
def generator():
""" Stream a zip file with all files using zipstream. """
def iterator():
"""
Replace the directory based iter of zipstream with an iter over all given
files.
"""
for zipped_filename, upload_filename, upload_files in files:
if zipped_filename in streamed_files:
continue
streamed_files.add(zipped_filename)
# Write a file to the zipstream.
try:
with upload_files.raw_file(upload_filename, 'rb') as f:
def iter_content():
while True:
data = f.read(1024 * 64)
if not data:
break
yield data
yield dict(arcname=zipped_filename, iterable=iter_content())
except KeyError:
# files that are not found, will not be returned
pass
except Restricted:
# due to the streaming nature, we cannot raise 401 here
# we just leave it out in the download
pass
compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
zip_stream = zipstream.ZipFile(mode='w', compression=compression, allowZip64=True)
zip_stream.paths_to_write = iterator()
for chunk in zip_stream:
yield chunk
response = Response(stream_with_context(generator()), mimetype='application/zip')
response.headers['Content-Disposition'] = 'attachment; filename={}'.format(zipfile_name)
return response
def upload_and_process():
"""
Uses the chain of generators to upload data sequentially, awaits processing in
parallel.
"""
upload_file = os.path.join(os.path.dirname(__file__), 'external_project_example.zip')
# upload data
print('uploading a file with "external_id/AcAg/vasp.xml" inside ...')
with open(upload_file, 'rb') as f:
upload = client.uploads.upload(file=f).response().result
print('processing ...')
while upload.tasks_running:
upload = client.uploads.get_upload(upload_id=upload.upload_id).response().result
time.sleep(5)
print('processed: %d, failures: %d' % (upload.processed_calcs, upload.failed_calcs))
# check if processing was a success
if upload.tasks_status != 'SUCCESS':
print('something went wrong')
print('errors: %s' % str(upload.errors))
# delete the unsuccessful upload
client.uploads.delete_upload(upload_id=upload.upload_id).response().result
sys.exit(1)
# publish data
print('publishing ...')
client.uploads.exec_upload_operation(upload_id=upload.upload_id, payload={
'operation': 'publish',
'metadata': {
# these metadata are applied to all calcs in the upload
'comment': 'Data from a cool external project',
'references': ['http://external.project.eu'],
'calculations': [
{
# these metadata are only applied to the calc identified by its 'mainfile'
'mainfile': 'external_id/AcAg/vasp.xml',
# 'coauthors': ['sheldon.cooper@ucla.edu'], this does not YET work with emails,
# Currently you have to use user_ids: leonard (the uploader, who is automatically an author) is 2 and sheldon is 1.
# Ask NOMAD developers about how to find out about user_ids.
'coauthors': [1],
# If users demand, we can implement a specific metadata keys (e.g. 'external_id', 'external_url') for external projects.
# This could allow to directly search for, or even have API endpoints that work with external_ids
# 'external_id': 'external_id',
# 'external_url': 'http://external.project.eu/data/calc/external_id/'
}
]
}
}).response().result
while upload.process_running:
upload = client.uploads.get_upload(upload_id=upload.upload_id).response().result
time.sleep(1)
if upload.tasks_status != 'SUCCESS' or len(upload.errors) > 0:
print('something went wrong')
print('errors: %s' % str(upload.errors))
# delete the unsuccessful upload
client.uploads.delete_upload(upload_id=upload.upload_id).response().result
sys.exit(1)
# search for data
result = client.repo.search(paths=['external_id']).response().result
if result.pagination.total == 0:
print('not found')
sys.exit(1)
elif result.pagination.total > 1:
print('my ids are not specific enough, bummer ... or did I uploaded stuff multiple times?')
# The results key holds an array with the current page data
print('Found the following calcs for my "external_id".')
print(', '.join(calc['calc_id'] for calc in result.results))
# download data
calc = result.results[0]
client.raw.get(upload_id=calc['upload_id'], path=calc['mainfile']).response()
print('Download of first calc works.')
# download urls, e.g. for curl
print('Possible download URLs are:')
print('%s/raw/%s/%s' % (nomad_url, calc['upload_id'], calc['mainfile']))
print('%s/raw/%s/%s/*' % (nomad_url, calc['upload_id'], os.path.dirname(calc['mainfile'])))
# direct download urls without having to search before
print('%s/raw/query?paths=external_id' % nomad_url)
"""
This example shows how to read files from many sources (here .tar.gz files),
chunk the data into even sized uploads and upload/process them in parallel. The assumption
is that each source file is much smaller than the targeted upload size.
"""
from typing import Iterator, Iterable, Union, Tuple
from bravado.requests_client import RequestsClient
from bravado.client import SwaggerClient
from urllib.parse import urlparse, urlencode
import requests
import time
import os.path
import tarfile
import io
import zipfile
import zipstream
# config
nomad_url = 'http://labdev-nomad.esc.rzg.mpg.de/fairdi/nomad/testing/api'
user = 'leonard.hofstadter@nomad-fairdi.tests.de'
password = 'password'
approx_upload_size = 32 * 1024 * 1024 * 1024 # you can make it really small for testing
max_parallel_uploads = 9
# create the bravado client
host = urlparse(nomad_url).netloc.split(':')[0]
http_client = RequestsClient()
http_client.set_basic_auth(host, user, password)
client = SwaggerClient.from_url('%s/swagger.json' % nomad_url, http_client=http_client)
def source_generator() -> Iterable[Tuple[str, Union[str, None]]]:
"""
Yields all data sources. Yields tuples (path to .tgz, prefix). Prefix denotes
a subdirectory to put the contents in. Use None for no prefix.
"""
yield os.path.join(os.path.dirname(__file__), 'example-1.tar.gz'), 'example_1'
yield os.path.join(os.path.dirname(__file__), 'example-2.tar.gz'), 'example_2'
yield os.path.join(os.path.dirname(__file__), 'example-3.tar.gz'), 'example_3'
def upload_next_data(sources: Iterator[Tuple[str, str]], upload_name='next upload'):
"""
Reads data from the given sources iterator. Creates and uploads a .zip-stream of
approx. size. Returns the upload, or raises StopIteration if the sources iterator
was empty. Should be used repeatedly on the same iterator until it is empty.
"""
# potentially raises StopIteration before being streamed
first_source = next(sources)
def iterator():
"""
Yields dicts with keys arcname, iterable, as required for the zipstream
library. Will read from generator until the zip-stream has the desired size.
"""
size = 0
first = True
while(True):
if first:
source_file, prefix = first_source
first = False
else:
try:
source_file, prefix = next(sources)
except StopIteration:
break
source_tar = tarfile.open(source_file)
source = source_tar.fileobj
bufsize = source_tar.copybufsize
for source_member in source_tar.getmembers():
if not source_member.isfile():
continue
target = io.BytesIO()
source.seek(source_member.offset_data)
tarfile.copyfileobj( # type: ignore
source, target, source_member.size, tarfile.ReadError, bufsize)
size += source_member.size
target.seek(0)
def iter_content():
while True:
data = target.read(io.DEFAULT_BUFFER_SIZE)
if not data:
break
yield data
name = source_member.name
if prefix is not None:
name = os.path.join(prefix, name)
yield dict(arcname=source_member.name, iterable=iter_content())
if size > approx_upload_size:
break
# create the zip-stream from the iterator above
zip_stream = zipstream.ZipFile(mode='w', compression=zipfile.ZIP_STORED, allowZip64=True)
zip_stream.paths_to_write = iterator()
zip_stream
user = client.auth.get_user().response().result
token = user.token
url = nomad_url + '/uploads/?%s' % urlencode(dict(name=upload_name))
def content():
for chunk in zip_stream:
if len(chunk) != 0:
yield chunk
# stream .zip to nomad
response = requests.put(url=url, headers={'X-Token': token}, data=content())
if response.status_code != 200:
raise Exception('nomad return status %d' % response.status_code)
upload_id = response.json()['upload_id']
return client.uploads.get_upload(upload_id=upload_id).response().result
def publish_upload(upload):
client.uploads.exec_upload_operation(upload_id=upload.upload_id, payload={
'operation': 'publish',
'metadata': {
# these metadata are applied to all calcs in the upload
'comment': 'Data from a cool external project',
'references': ['http://external.project.eu']
}
}).response()
if __name__ == '__main__':
source_iter = iter(source_generator())
all_uploaded = False
processing_completed = False
# run until there are no more uploads and everything is processed (and published)
while not (all_uploaded and processing_completed):
# process existing uploads
while True:
uploads = client.uploads.get_uploads().response().result
for upload in uploads.results:
if not upload.process_running:
if upload.tasks_status == 'SUCCESS':
print('publish %s(%s)' % (upload.name, upload.upload_id))
publish_upload(upload)
elif upload.tasks_status == 'FAILURE':
print('could not process %s(%s)' % (upload.name, upload.upload_id))
client.uploads.delete_upload(upload_id=upload.upload_id).response().result
if uploads.pagination.total < max_parallel_uploads:
# still processing some, but there is room for more uploads
break
else:
# wait for processing
time.sleep(10)
# add a new upload
if all_uploaded:
processing_completed = uploads.pagination.total == 0
try:
upload = upload_next_data(source_iter)
processing_completed = False
print('uploaded %s(%s)' % (upload.name, upload.upload_id))
except StopIteration:
all_uploaded = True
except Exception as e:
print('could not upload next upload: %s' % str(e))
......@@ -291,8 +291,11 @@ class UploadListResource(Resource):
with open(upload_path, 'wb') as f:
received_data = 0
received_last = 0
while not request.stream.is_exhausted:
while True:
data = request.stream.read(io.DEFAULT_BUFFER_SIZE)
if len(data) == 0:
break
received_data += len(data)
received_last += len(data)
if received_last > 1e9:
......@@ -301,6 +304,8 @@ class UploadListResource(Resource):
logger.info('received streaming data', size=received_data)
f.write(data)
print(received_data)
except Exception as e:
logger.warning('Error on streaming upload', exc_info=e)
abort(400, message='Some IO went wrong, download probably aborted/disrupted.')
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment