diff --git a/examples/external_project_parallel_upload/example-1.tar.gz b/examples/external_project_parallel_upload/example-1.tar.gz new file mode 100644 index 0000000000000000000000000000000000000000..75eeee81bc990d103421db27a574a54afb39d040 Binary files /dev/null and b/examples/external_project_parallel_upload/example-1.tar.gz differ diff --git a/examples/external_project_parallel_upload/example-2.tar.gz b/examples/external_project_parallel_upload/example-2.tar.gz new file mode 100644 index 0000000000000000000000000000000000000000..445ef22a4be2329ca5ee112e4fbebd385cd64731 Binary files /dev/null and b/examples/external_project_parallel_upload/example-2.tar.gz differ diff --git a/examples/external_project_parallel_upload/example-3.tar.gz b/examples/external_project_parallel_upload/example-3.tar.gz new file mode 100644 index 0000000000000000000000000000000000000000..17f4eb89cfbac709c0bb77a5df36ed6e500f2cfc Binary files /dev/null and b/examples/external_project_parallel_upload/example-3.tar.gz differ diff --git a/examples/external_project_parallel_upload/externalproject.py b/examples/external_project_parallel_upload/externalproject.py deleted file mode 100644 index 26c5d920c98365ac25d2501aca71acbe98771b99..0000000000000000000000000000000000000000 --- a/examples/external_project_parallel_upload/externalproject.py +++ /dev/null @@ -1,184 +0,0 @@ -""" -This example shows how to read files from many sources (here .tar.gz files), -chunk the data into even sized uploads and upload/process them in parallel. -""" - -from bravado.requests_client import RequestsClient -from bravado.client import SwaggerClient -from urllib.parse import urlparse -import time -import os.path -import sys - -# config -nomad_url = 'http://labdev-nomad.esc.rzg.mpg.de/fairdi/nomad/testing/api' -user = 'leonard.hofstadter@nomad-fairdi.tests.de' -password = 'password' -approx_upload_size = 1 * 1024 # 32 * 1024^3 -parallel_uploads = 6 - -# create the bravado client -host = urlparse(nomad_url).netloc.split(':')[0] -http_client = RequestsClient() -http_client.set_basic_auth(host, user, password) -client = SwaggerClient.from_url('%s/swagger.json' % nomad_url, http_client=http_client) - -def source_generator(): - """ - Yields all data sources. - """ - yield os.path.join(os.path.dirname(__file__), 'example-1.tar.gz') - yield os.path.join(os.path.dirname(__file__), 'example-2.tar.gz') - yield os.path.join(os.path.dirname(__file__), 'example-3.tar.gz') - -def source_file_generator(source): - """ - Yields [filepath, file] tuples from :func:`source_generator` - """ - pass - -def calc_generator(source_file): - pass - -def upload_generator(zip_streams): - """ - Yields nomad uploads that are already uploaded, but are still processing. - """ - size = 0 - streamed_files: Set[str] = set() - - def generator(): - """ Stream a zip file with all files using zipstream. """ - def iterator(): - """ - Replace the directory based iter of zipstream with an iter over all given - files. - """ - for zipped_filename, upload_filename, upload_files in files: - if zipped_filename in streamed_files: - continue - streamed_files.add(zipped_filename) - - # Write a file to the zipstream. - try: - with upload_files.raw_file(upload_filename, 'rb') as f: - def iter_content(): - while True: - data = f.read(1024 * 64) - if not data: - break - yield data - - yield dict(arcname=zipped_filename, iterable=iter_content()) - except KeyError: - # files that are not found, will not be returned - pass - except Restricted: - # due to the streaming nature, we cannot raise 401 here - # we just leave it out in the download - pass - - compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED - zip_stream = zipstream.ZipFile(mode='w', compression=compression, allowZip64=True) - zip_stream.paths_to_write = iterator() - - for chunk in zip_stream: - yield chunk - - response = Response(stream_with_context(generator()), mimetype='application/zip') - response.headers['Content-Disposition'] = 'attachment; filename={}'.format(zipfile_name) - return response - - -def upload_and_process(): - """ - Uses the chain of generators to upload data sequentially, awaits processing in - parallel. - """ - -upload_file = os.path.join(os.path.dirname(__file__), 'external_project_example.zip') - - - -# upload data -print('uploading a file with "external_id/AcAg/vasp.xml" inside ...') -with open(upload_file, 'rb') as f: - upload = client.uploads.upload(file=f).response().result - -print('processing ...') -while upload.tasks_running: - upload = client.uploads.get_upload(upload_id=upload.upload_id).response().result - time.sleep(5) - print('processed: %d, failures: %d' % (upload.processed_calcs, upload.failed_calcs)) - -# check if processing was a success -if upload.tasks_status != 'SUCCESS': - print('something went wrong') - print('errors: %s' % str(upload.errors)) - # delete the unsuccessful upload - client.uploads.delete_upload(upload_id=upload.upload_id).response().result - sys.exit(1) - -# publish data -print('publishing ...') -client.uploads.exec_upload_operation(upload_id=upload.upload_id, payload={ - 'operation': 'publish', - 'metadata': { - # these metadata are applied to all calcs in the upload - 'comment': 'Data from a cool external project', - 'references': ['http://external.project.eu'], - 'calculations': [ - { - # these metadata are only applied to the calc identified by its 'mainfile' - 'mainfile': 'external_id/AcAg/vasp.xml', - - # 'coauthors': ['sheldon.cooper@ucla.edu'], this does not YET work with emails, - # Currently you have to use user_ids: leonard (the uploader, who is automatically an author) is 2 and sheldon is 1. - # Ask NOMAD developers about how to find out about user_ids. - 'coauthors': [1], - - # If users demand, we can implement a specific metadata keys (e.g. 'external_id', 'external_url') for external projects. - # This could allow to directly search for, or even have API endpoints that work with external_ids - # 'external_id': 'external_id', - # 'external_url': 'http://external.project.eu/data/calc/external_id/' - } - ] - - - } -}).response().result - -while upload.process_running: - upload = client.uploads.get_upload(upload_id=upload.upload_id).response().result - time.sleep(1) -if upload.tasks_status != 'SUCCESS' or len(upload.errors) > 0: - print('something went wrong') - print('errors: %s' % str(upload.errors)) - # delete the unsuccessful upload - client.uploads.delete_upload(upload_id=upload.upload_id).response().result - sys.exit(1) - - -# search for data -result = client.repo.search(paths=['external_id']).response().result -if result.pagination.total == 0: - print('not found') - sys.exit(1) -elif result.pagination.total > 1: - print('my ids are not specific enough, bummer ... or did I uploaded stuff multiple times?') -# The results key holds an array with the current page data -print('Found the following calcs for my "external_id".') -print(', '.join(calc['calc_id'] for calc in result.results)) - -# download data -calc = result.results[0] -client.raw.get(upload_id=calc['upload_id'], path=calc['mainfile']).response() -print('Download of first calc works.') - -# download urls, e.g. for curl -print('Possible download URLs are:') -print('%s/raw/%s/%s' % (nomad_url, calc['upload_id'], calc['mainfile'])) -print('%s/raw/%s/%s/*' % (nomad_url, calc['upload_id'], os.path.dirname(calc['mainfile']))) - -# direct download urls without having to search before -print('%s/raw/query?paths=external_id' % nomad_url) diff --git a/examples/external_project_parallel_upload/upload.py b/examples/external_project_parallel_upload/upload.py new file mode 100644 index 0000000000000000000000000000000000000000..85bc32dec63aae0d6414d473b40f4964b172089f --- /dev/null +++ b/examples/external_project_parallel_upload/upload.py @@ -0,0 +1,175 @@ +""" +This example shows how to read files from many sources (here .tar.gz files), +chunk the data into even sized uploads and upload/process them in parallel. The assumption +is that each source file is much smaller than the targeted upload size. +""" + +from typing import Iterator, Iterable, Union, Tuple +from bravado.requests_client import RequestsClient +from bravado.client import SwaggerClient +from urllib.parse import urlparse, urlencode +import requests +import time +import os.path +import tarfile +import io +import zipfile +import zipstream + +# config +nomad_url = 'http://labdev-nomad.esc.rzg.mpg.de/fairdi/nomad/testing/api' +user = 'leonard.hofstadter@nomad-fairdi.tests.de' +password = 'password' +approx_upload_size = 32 * 1024 * 1024 * 1024 # you can make it really small for testing +max_parallel_uploads = 9 + +# create the bravado client +host = urlparse(nomad_url).netloc.split(':')[0] +http_client = RequestsClient() +http_client.set_basic_auth(host, user, password) +client = SwaggerClient.from_url('%s/swagger.json' % nomad_url, http_client=http_client) + + +def source_generator() -> Iterable[Tuple[str, Union[str, None]]]: + """ + Yields all data sources. Yields tuples (path to .tgz, prefix). Prefix denotes + a subdirectory to put the contents in. Use None for no prefix. + """ + yield os.path.join(os.path.dirname(__file__), 'example-1.tar.gz'), 'example_1' + yield os.path.join(os.path.dirname(__file__), 'example-2.tar.gz'), 'example_2' + yield os.path.join(os.path.dirname(__file__), 'example-3.tar.gz'), 'example_3' + + +def upload_next_data(sources: Iterator[Tuple[str, str]], upload_name='next upload'): + """ + Reads data from the given sources iterator. Creates and uploads a .zip-stream of + approx. size. Returns the upload, or raises StopIteration if the sources iterator + was empty. Should be used repeatedly on the same iterator until it is empty. + """ + + # potentially raises StopIteration before being streamed + first_source = next(sources) + + def iterator(): + """ + Yields dicts with keys arcname, iterable, as required for the zipstream + library. Will read from generator until the zip-stream has the desired size. + """ + size = 0 + first = True + while(True): + if first: + source_file, prefix = first_source + first = False + else: + try: + source_file, prefix = next(sources) + except StopIteration: + break + + source_tar = tarfile.open(source_file) + source = source_tar.fileobj + bufsize = source_tar.copybufsize + for source_member in source_tar.getmembers(): + if not source_member.isfile(): + continue + + target = io.BytesIO() + source.seek(source_member.offset_data) + tarfile.copyfileobj( # type: ignore + source, target, source_member.size, tarfile.ReadError, bufsize) + + size += source_member.size + target.seek(0) + + def iter_content(): + while True: + data = target.read(io.DEFAULT_BUFFER_SIZE) + if not data: + break + yield data + + name = source_member.name + if prefix is not None: + name = os.path.join(prefix, name) + + yield dict(arcname=source_member.name, iterable=iter_content()) + + if size > approx_upload_size: + break + + # create the zip-stream from the iterator above + zip_stream = zipstream.ZipFile(mode='w', compression=zipfile.ZIP_STORED, allowZip64=True) + zip_stream.paths_to_write = iterator() + + zip_stream + + user = client.auth.get_user().response().result + token = user.token + url = nomad_url + '/uploads/?%s' % urlencode(dict(name=upload_name)) + + def content(): + for chunk in zip_stream: + if len(chunk) != 0: + yield chunk + + # stream .zip to nomad + response = requests.put(url=url, headers={'X-Token': token}, data=content()) + + if response.status_code != 200: + raise Exception('nomad return status %d' % response.status_code) + + upload_id = response.json()['upload_id'] + + return client.uploads.get_upload(upload_id=upload_id).response().result + + +def publish_upload(upload): + client.uploads.exec_upload_operation(upload_id=upload.upload_id, payload={ + 'operation': 'publish', + 'metadata': { + # these metadata are applied to all calcs in the upload + 'comment': 'Data from a cool external project', + 'references': ['http://external.project.eu'] + } + }).response() + + +if __name__ == '__main__': + source_iter = iter(source_generator()) + all_uploaded = False + processing_completed = False + + # run until there are no more uploads and everything is processed (and published) + while not (all_uploaded and processing_completed): + # process existing uploads + while True: + uploads = client.uploads.get_uploads().response().result + for upload in uploads.results: + if not upload.process_running: + if upload.tasks_status == 'SUCCESS': + print('publish %s(%s)' % (upload.name, upload.upload_id)) + publish_upload(upload) + elif upload.tasks_status == 'FAILURE': + print('could not process %s(%s)' % (upload.name, upload.upload_id)) + client.uploads.delete_upload(upload_id=upload.upload_id).response().result + + if uploads.pagination.total < max_parallel_uploads: + # still processing some, but there is room for more uploads + break + else: + # wait for processing + time.sleep(10) + + # add a new upload + if all_uploaded: + processing_completed = uploads.pagination.total == 0 + + try: + upload = upload_next_data(source_iter) + processing_completed = False + print('uploaded %s(%s)' % (upload.name, upload.upload_id)) + except StopIteration: + all_uploaded = True + except Exception as e: + print('could not upload next upload: %s' % str(e)) diff --git a/examples/external_project_example/external_project_example.zip b/examples/external_project_upload/external_project_example.zip similarity index 100% rename from examples/external_project_example/external_project_example.zip rename to examples/external_project_upload/external_project_example.zip diff --git a/examples/external_project_example/externalproject.py b/examples/external_project_upload/upload.py similarity index 100% rename from examples/external_project_example/externalproject.py rename to examples/external_project_upload/upload.py diff --git a/nomad/app/api/upload.py b/nomad/app/api/upload.py index 8fb6c10d7cc20881df0fa66234bbccc506731939..3a23e3d2d01aaf17629c9d4c97110c5da150ab6a 100644 --- a/nomad/app/api/upload.py +++ b/nomad/app/api/upload.py @@ -291,8 +291,11 @@ class UploadListResource(Resource): with open(upload_path, 'wb') as f: received_data = 0 received_last = 0 - while not request.stream.is_exhausted: + while True: data = request.stream.read(io.DEFAULT_BUFFER_SIZE) + if len(data) == 0: + break + received_data += len(data) received_last += len(data) if received_last > 1e9: @@ -301,6 +304,8 @@ class UploadListResource(Resource): logger.info('received streaming data', size=received_data) f.write(data) + print(received_data) + except Exception as e: logger.warning('Error on streaming upload', exc_info=e) abort(400, message='Some IO went wrong, download probably aborted/disrupted.')