Commit a5da4b66 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Removed archive json/zip support.

parent 036e6b23
......@@ -17,19 +17,21 @@ The archive API of the nomad@FAIRDI APIs. This API is about serving processed
(parsed and normalized) calculation data in nomad's *meta-info* format.
'''
from typing import Dict, Any, List
from typing import Dict, Any
from io import BytesIO
import os.path
from flask import send_file, request, g
from flask import request, g
from flask_restplus import abort, Resource, fields
import json
import orjson
import importlib
import urllib.parse
import metainfo
from nomad.files import UploadFiles, Restricted
from nomad import search, config, archive
from nomad.archive import query_archive
from nomad import search, config
from nomad.app import common
from .auth import authenticate, create_authorization_predicate
......@@ -64,12 +66,9 @@ class ArchiveCalcLogResource(Resource):
abort(404, message='Upload %s does not exist.' % upload_id)
try:
return send_file(
upload_files.archive_log_file(calc_id, 'rb'),
mimetype='text/plain',
as_attachment=True,
cache_timeout=0,
attachment_filename='%s.log' % archive_id)
with upload_files.read_archive(calc_id) as archive:
data = archive[calc_id]['processing_logs']
return '\n'.join([json.dumps(entry.to_dict()) for entry in data])
except Restricted:
abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
except KeyError:
......@@ -81,7 +80,7 @@ class ArchiveCalcResource(Resource):
@api.doc('get_archive_calc')
@api.response(404, 'The upload or calculation does not exist')
@api.response(401, 'Not authorized to access the data.')
@api.response(200, 'Archive data send')
@api.response(200, 'Archive data send', headers={'Content-Type': 'application/json'})
@authenticate(signature_token=True)
def get(self, upload_id, calc_id):
'''
......@@ -91,19 +90,15 @@ class ArchiveCalcResource(Resource):
'''
archive_id = '%s/%s' % (upload_id, calc_id)
upload_file = UploadFiles.get(
upload_files = UploadFiles.get(
upload_id, is_authorized=create_authorization_predicate(upload_id, calc_id))
if upload_file is None:
if upload_files is None:
abort(404, message='Archive %s does not exist.' % upload_id)
try:
return send_file(
upload_file.archive_file(calc_id, 'rb'),
mimetype='application/json',
as_attachment=True,
cache_timeout=0,
attachment_filename='%s.json' % archive_id)
with upload_files.read_archive(calc_id) as archive:
return archive[calc_id].to_dict()
except Restricted:
abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
except KeyError:
......@@ -163,7 +158,7 @@ class ArchiveDownloadResource(Resource):
calc_id = entry['calc_id']
if upload_files is None or upload_files.upload_id != upload_id:
if upload_files is not None:
upload_files.close_zipfile_cache()
upload_files.close()
upload_files = UploadFiles.get(
upload_id, create_authorization_predicate(upload_id))
......@@ -172,12 +167,15 @@ class ArchiveDownloadResource(Resource):
common.logger.error('upload files do not exist', upload_id=upload_id)
continue
upload_files.open_zipfile_cache()
with upload_files.read_archive(calc_id) as archive:
f = BytesIO(orjson.dumps(
archive[calc_id].to_dict(),
option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS))
yield (
'%s.%s' % (calc_id, upload_files._archive_ext), calc_id,
lambda calc_id: upload_files.archive_file(calc_id, 'rb'),
lambda calc_id: upload_files.archive_file_size(calc_id))
yield (
'%s.%s' % (calc_id, 'json'), calc_id,
lambda calc_id: f,
lambda calc_id: f.getbuffer().nbytes)
manifest[calc_id] = {
key: entry[key]
......@@ -186,7 +184,7 @@ class ArchiveDownloadResource(Resource):
}
if upload_files is not None:
upload_files.close_zipfile_cache()
upload_files.close()
try:
manifest_contents = json.dumps(manifest).encode('utf-8')
......@@ -285,42 +283,39 @@ class ArchiveQueryResource(Resource):
data = []
calcs = results['results']
archive_readers: List[archive.ArchiveReader] = []
upload_files = None
current_upload_id = None
for entry in calcs:
upload_id = entry['upload_id']
calc_id = entry['calc_id']
if current_upload_id is None or current_upload_id != upload_id:
if upload_files is None or current_upload_id != upload_id:
if upload_files is not None:
upload_files.close()
upload_files = UploadFiles.get(upload_id, create_authorization_predicate(upload_id))
if upload_files is None:
return []
for archive_reader in archive_readers:
if archive_reader is not None:
archive_reader.close()
archive_readers = [
archive.ArchiveReader(f) if f is not None else None
for f in upload_files.archive_file_msgs()]
current_upload_id = upload_id
if entry['with_embargo']:
archive_reader = archive_readers[1]
access = 'restricted'
else:
archive_reader = archive_readers[0]
if archive_reader is None:
continue
data.append(
{
'calc_id': calc_id,
'parser_name': entry['parser_name'],
'archive': archive.query_archive(
archive_reader, {calc_id: query_schema})[calc_id]
})
access = 'public'
try:
with upload_files.read_archive(calc_id, access) as archive:
data.append({
'calc_id': calc_id,
'parser_name': entry['parser_name'],
'archive': query_archive(
archive, {calc_id: query_schema})[calc_id]
})
except Restricted:
# optimize and not access restricted for same upload again
pass
# assign archive data to results
results['results'] = data
......
......@@ -463,7 +463,7 @@ class RawFileQueryResource(Resource):
if upload_files is None or upload_files.upload_id != upload_id:
logger.info('opening next upload for raw file streaming', upload_id=upload_id)
if upload_files is not None:
upload_files.close_zipfile_cache()
upload_files.close()
upload_files = UploadFiles.get(upload_id)
......@@ -471,8 +471,6 @@ class RawFileQueryResource(Resource):
logger.error('upload files do not exist', upload_id=upload_id)
continue
upload_files.open_zipfile_cache()
def open_file(upload_filename):
return upload_files.raw_file(upload_filename, 'rb')
......@@ -506,7 +504,7 @@ class RawFileQueryResource(Resource):
}
if upload_files is not None:
upload_files.close_zipfile_cache()
upload_files.close()
logger.info('streaming raw file manifest')
try:
......@@ -536,10 +534,6 @@ def respond_to_get_raw_files(upload_id, files, compress=False, strip=False):
if upload_files is None:
abort(404, message='The upload with id %s does not exist.' % upload_id)
# the zipfile cache allows to access many raw-files from public upload files without
# having to re-open the underlying zip files all the time
upload_files.open_zipfile_cache()
if strip:
common_prefix_len = len(utils.common_prefix(files))
else:
......@@ -554,4 +548,4 @@ def respond_to_get_raw_files(upload_id, files, compress=False, strip=False):
) for filename in files],
zipfile_name='%s.zip' % upload_id, compress=compress)
finally:
upload_files.close_zipfile_cache()
upload_files.close()
......@@ -51,7 +51,7 @@ class TOCPacker(Packer):
super().__init__(*args, **kwargs)
def pack(self, obj, *args, **kwargs):
assert isinstance(obj, dict), 'TOC packer can only pack dicts'
assert isinstance(obj, dict), 'TOC packer can only pack dicts, %s' % obj.__class__
self._depth = 0
self._buffer = StringIO()
result = super().pack(obj, *args, **kwargs)
......@@ -291,17 +291,19 @@ class ArchiveReader(ArchiveObject):
super().__init__(None, f)
self._toc_entry = None
# this number is determined by the msgpack encoding of the file beginning:
# { 'toc_pos': <...>
# ^11
self._f.seek(11)
toc_position = ArchiveReader._decode_position(self._f.read(10))
self.toc_position = ArchiveReader._decode_position(self._f.read(10))
self.use_blocked_toc = use_blocked_toc
if use_blocked_toc:
self._f.seek(11)
self._toc: Dict[str, Any] = {}
toc_start = toc_position[0]
toc_start = self.toc_position[0]
self._f.seek(toc_start)
b = self._f.read(1)[0]
if b & 0b11110000 == 0b10000000:
......@@ -317,7 +319,7 @@ class ArchiveReader(ArchiveObject):
raise ArchiveError('Archive top-level TOC is not a msgpack map (dictionary).')
else:
self.toc_entry = self._read(toc_position)
self.toc_entry = self._read(self.toc_position)
def __enter__(self):
return self
......@@ -354,7 +356,9 @@ class ArchiveReader(ArchiveObject):
return first, last
def __getitem__(self, key):
if self.use_blocked_toc:
key = adjust_uuid_size(key)
if self.use_blocked_toc and self.toc_entry is None:
positions = self._toc.get(key)
# TODO use hash algo instead of binary search
if positions is None:
......@@ -385,9 +389,17 @@ class ArchiveReader(ArchiveObject):
return ArchiveObject(toc, self._f, data_position[0])
def __iter__(self):
if self.toc_entry is None:
# is not necessarely read when using blocked toc
self.toc_entry = self._read(self.toc_position)
return self.toc_entry.__iter__()
def __len__(self):
if self.toc_entry is None:
# is not necessarely read when using blocked toc
self.toc_entry = self._read(self.toc_position)
return self.toc_entry.__len__()
def close(self):
......@@ -402,6 +414,9 @@ class ArchiveReader(ArchiveObject):
return int.from_bytes(position[0:5], byteorder='little', signed=False), \
int.from_bytes(position[5:], byteorder='little', signed=False)
def is_closed(self):
return self._f.closed
def write_archive(
path_or_file: Union[str, BytesIO], n_entries: int, data: Iterable[Tuple[str, Any]],
......
......@@ -256,11 +256,11 @@ def msgpack(ctx, uploads):
yield (calc_id, json.load(f))
for access in ['public', 'restricted']:
with upload_files.open_zip_file('archive', access, upload_files._archive_ext) as zf:
archive_name = zf.filename.replace('.zip', '.msg')
with upload_files._open_zip_file('archive', access, 'json') as zf:
archive_path = upload_files._file_object('archive', access, 'msg', 'msg').os_path
names = [name for name in zf.namelist() if name.endswith('json')]
archive.write_archive(archive_name, len(names), iterator(zf, names))
print('wrote msgpack archive %s' % archive_name)
archive.write_archive(archive_path, len(names), iterator(zf, names))
print('wrote msgpack archive %s' % archive_path)
@uploads.command(help='Reprocess selected uploads.')
......
This diff is collapsed.
......@@ -20,6 +20,7 @@ exist to facilitate testing, :py:mod:`nomad.migration`, aspects of :py:mod:`noma
'''
import os.path
import os
import shutil
from elasticsearch.exceptions import RequestError
from elasticsearch_dsl import connections
......@@ -56,6 +57,7 @@ def setup():
can be used.
'''
setup_logging()
setup_files()
setup_mongo()
setup_elastic()
......@@ -74,6 +76,12 @@ def setup_logging():
logstash_level=config.logstash.level)
def setup_files():
for directory in [config.fs.public, config.fs.staging, config.fs.tmp]:
if not os.path.exists(directory):
os.makedirs(directory)
def setup_mongo():
''' Creates connection to mongodb. '''
global mongo_client
......
......@@ -24,7 +24,7 @@ calculations, and files
'''
from typing import cast, List, Any, ContextManager, Tuple, Generator, Dict, cast, Iterable
from typing import cast, List, Any, Tuple, Generator, Dict, cast, Iterable
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField
import logging
from structlog import wrap_logger
......@@ -34,7 +34,6 @@ from datetime import datetime
from pymongo import UpdateOne
import hashlib
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
import json
from nomad import utils, config, infrastructure, search, datamodel
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles, StagingUploadFiles
......@@ -109,8 +108,7 @@ class Calc(Proc):
self._parser_backend: Backend = None
self._upload: Upload = None
self._upload_files: ArchiveBasedStagingUploadFiles = None
self._calc_proc_logwriter = None
self._calc_proc_logwriter_ctx: ContextManager = None
self._calc_proc_logs: List[Any] = None
@classmethod
def from_entry_metadata(cls, entry_metadata):
......@@ -153,32 +151,22 @@ class Calc(Proc):
logger = logger.bind(
upload_id=self.upload_id, mainfile=self.mainfile, calc_id=self.calc_id, **kwargs)
if self._calc_proc_logwriter_ctx is None:
try:
self._calc_proc_logwriter_ctx = self.upload_files.archive_log_file(self.calc_id, 'wt')
self._calc_proc_logwriter = self._calc_proc_logwriter_ctx.__enter__() # pylint: disable=E1101
except KeyError:
# cannot open log file
pass
if self._calc_proc_logs is None:
self._calc_proc_logs = []
if self._calc_proc_logwriter_ctx is None:
return logger
else:
def save_to_calc_log(logger, method_name, event_dict):
if self._calc_proc_logwriter is not None:
try:
dump_dict = dict(event_dict)
dump_dict.update(level=method_name.upper())
json.dump(dump_dict, self._calc_proc_logwriter, sort_keys=True)
self._calc_proc_logwriter.write('\n')
def save_to_calc_log(logger, method_name, event_dict):
try:
dump_dict = dict(event_dict)
dump_dict.update(level=method_name.upper())
self._calc_proc_logs.append(dump_dict)
except Exception:
# Exceptions here will cause indefinite loop
pass
except Exception:
# Exceptions here will cause indefinite loop
pass
return event_dict
return event_dict
return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
@process
def re_process_calc(self):
......@@ -190,17 +178,6 @@ class Calc(Proc):
parser = match_parser(self.upload_files.raw_file_object(self.mainfile).os_path, strict=False)
if parser is None and not config.reprocess_unmatched:
# Remove the logsfile and set a fake logwriter to avoid creating a log file,
# because we will probably remove this calc and don't want to have ghost logfiles.
if self._calc_proc_logwriter_ctx is not None:
self._calc_proc_logwriter_ctx.__exit__(None, None, None)
self.upload_files.archive_log_file_object(self.calc_id).delete()
self._calc_proc_logwriter_ctx = open('/dev/null', 'wt')
self._calc_proc_logwriter = self._calc_proc_logwriter_ctx.__enter__()
self.get_logger().error(
'no parser matches during re-process, will not re-process this calc')
self.errors = ['no parser matches during re-process, will not re-process this calc']
# mock the steps of actual processing
......@@ -243,13 +220,6 @@ class Calc(Proc):
except Exception as e:
logger.error('could unload processing results', exc_info=e)
try:
if self._calc_proc_logwriter is not None:
self._calc_proc_logwriter.close()
self._calc_proc_logwriter = None
except Exception as e:
logger.error('could not close calculation proc log', exc_info=e)
@process
def process_calc(self):
'''
......@@ -295,13 +265,6 @@ class Calc(Proc):
except Exception as e:
logger.error('could unload processing results', exc_info=e)
try:
if self._calc_proc_logwriter is not None:
self._calc_proc_logwriter.close()
self._calc_proc_logwriter = None
except Exception as e:
logger.error('could not close calculation proc log', exc_info=e)
def fail(self, *errors, log_level=logging.ERROR, **kwargs):
# in case of failure, index a minimum set of metadata and mark
# processing failure
......@@ -318,6 +281,11 @@ class Calc(Proc):
except Exception as e:
self.get_logger().error('could not index after processing failure', exc_info=e)
try:
self.upload_files.write_archive(self.calc_id, {'processing_logs': self._calc_proc_logs})
except Exception as e:
self.get_logger().error('could not write archive (logs) after processing failure', exc_info=e)
super().fail(*errors, log_level=log_level, **kwargs)
def on_process_complete(self, process_name):
......@@ -450,21 +418,14 @@ class Calc(Proc):
logger, 'archived', step='archive',
input_size=self.mainfile_file.size) as log_data:
with self.upload_files.archive_file(self.calc_id, 'wt') as out:
json.dump(self._parser_backend.resource.m_to_dict(
lambda section: section.m_def.name in datamodel.root_sections), out, indent=2)
archive_data = self._parser_backend.resource.m_to_dict(
lambda section: section.m_def.name in datamodel.root_sections)
log_data.update(archive_size=self.upload_files.archive_file_object(self.calc_id).size)
# close loghandler
if self._calc_proc_logwriter is not None:
with utils.timer(
logger, 'archived log', step='logs',
input_size=self.mainfile_file.size) as log_data:
self._calc_proc_logwriter_ctx.__exit__(None, None, None) # pylint: disable=E1101
self._calc_proc_logwriter = None
archive_data['processing_logs'] = self._calc_proc_logs
self._calc_proc_logs = None
log_data.update(log_size=self.upload_files.archive_log_file_object(self.calc_id).size)
archive_size = self.upload_files.write_archive(self.calc_id, archive_data)
log_data.update(archive_size=archive_size)
def __str__(self):
return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
......
......@@ -1419,8 +1419,8 @@ def test_edit_lift_embargo(api, published, other_test_user_auth):
Upload.get(published.upload_id).block_until_complete()
# should not raise Restricted anymore
with files.UploadFiles.get(published.upload_id).archive_file(example_calc.calc_id) as f:
f.read()
with files.UploadFiles.get(published.upload_id).read_archive(example_calc.calc_id) as archive:
archive[example_calc.calc_id].to_dict()
@pytest.mark.timeout(config.tests.default_timeout)
......
......@@ -31,9 +31,9 @@ def api(session_client):
def test_get_entry(published: Upload):
calc_id = list(published.calcs)[0].calc_id
with published.upload_files.archive_file(calc_id) as f:
data = json.load(f)
assert 'OptimadeEntry' in data, data.keys()
with published.upload_files.read_archive(calc_id) as archive:
data = archive[calc_id]
assert 'OptimadeEntry' in data
search_result = search.SearchRequest().search_parameter('calc_id', calc_id).execute_paginated()['results'][0]
assert 'dft.optimade.chemical_formula_hill' in search.flat(search_result)
......
......@@ -16,7 +16,6 @@ from typing import Generator, Tuple
import pytest
from datetime import datetime
import os.path
import json
import re
import shutil
......@@ -84,18 +83,18 @@ def assert_processing(upload: Upload, published: bool = False):
assert calc.tasks_status == SUCCESS
assert calc.metadata['published'] == published
with upload_files.archive_file(calc.calc_id) as archive_json:
archive = json.load(archive_json)
assert 'section_run' in archive
assert 'section_entry_info' in archive
with upload_files.read_archive(calc.calc_id) as archive:
calc_archive = archive[calc.calc_id]
assert 'section_run' in calc_archive
assert 'section_entry_info' in calc_archive
assert 'processing_logs' in calc_archive
with upload_files.archive_log_file(calc.calc_id, 'rt') as f:
has_test_event = False
for line in f.readlines():
log_data = json.loads(line)
for log_data in calc_archive['processing_logs']:
for key in ['event', 'calc_id', 'level']:
key in log_data
has_test_event = has_test_event or log_data['event'] == 'a test log entry'
assert has_test_event
assert len(calc.errors) == 0
......@@ -228,12 +227,14 @@ def test_re_processing(published: Upload, internal_example_user_metadata, monkey
first_calc = published.all_calcs(0, 1).first()
old_calc_time = first_calc.metadata['last_processing']
with published.upload_files.archive_log_file(first_calc.calc_id) as f:
old_log_lines = f.readlines()
with published.upload_files.read_archive(first_calc.calc_id) as archive:
old_logs = archive[first_calc.calc_id]['processing_logs']
old_archive_files = list(
archive_file
for archive_file in os.listdir(published.upload_files.os_path)
if 'archive' in archive_file)
for archive_file in old_archive_files:
with open(published.upload_files.join_file(archive_file).os_path, 'wt') as f:
f.write('')
......@@ -274,21 +275,26 @@ def test_re_processing(published: Upload, internal_example_user_metadata, monkey
assert first_calc.metadata['nomad_commit'] == 're_process_test_commit'
# assert changed archive files
if with_failure in ['after', 'not-matched']:
with pytest.raises(Exception):
published.upload_files.archive_file(first_calc.calc_id)
if with_failure == 'after':
with published.upload_files.read_archive(first_calc.calc_id) as archive:
assert list(archive[first_calc.calc_id].keys()) == ['processing_logs']
elif with_failure == 'not-matched':
with published.upload_files.read_archive(first_calc.calc_id) as archive:
assert len(archive[first_calc.calc_id]) == 0
else:
with published.upload_files.archive_file(first_calc.calc_id) as f:
assert len(f.readlines()) > 0
with published.upload_files.read_archive(first_calc.calc_id) as archive:
assert len(archive[first_calc.calc_id]) > 1 # contains more then logs
# assert changed archive log files
if with_failure in ['not-matched']:
with pytest.raises(Exception):
published.upload_files.archive_log_file(first_calc.calc_id)
with published.upload_files.read_archive(first_calc.calc_id) as archive:
assert len(archive[first_calc.calc_id]) == 0
else:
with published.upload_files.archive_log_file(first_calc.calc_id) as f:
new_log_lines = f.readlines()
assert old_log_lines != new_log_lines
with published.upload_files.read_archive(first_calc.calc_id) as archive:
assert archive[first_calc.calc_id]['processing_logs'] != old_logs