Commit 45cbbaf8 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Remove zipfile cache. Changed all nows to utcnows. Added timestamp to archive logs.

parent 3185a10f
Pipeline #52498 passed with stages
in 18 minutes and 16 seconds
......@@ -44,7 +44,7 @@
"cwd": "${workspaceFolder}",
"program": "${workspaceFolder}/.pyenv/bin/pytest",
"args": [
"-sv", "tests/test_api.py::TestRaw::test_raw_file_from_calc"
"-sv", "tests/test_cli.py::TestClient::test_mirror"
]
},
{
......
......@@ -216,7 +216,7 @@ class UserResource(Resource):
user = coe_repo.User.create_user(
email=data['email'], password=data.get('password', None), crypted=True,
first_name=data['first_name'], last_name=data['last_name'],
created=data.get('created', datetime.now()),
created=data.get('created', datetime.utcnow()),
affiliation=data.get('affiliation', None), token=data.get('token', None),
user_id=data.get('user_id', None))
......
......@@ -216,7 +216,7 @@ class RepoCalcsResource(Resource):
from_time = rfc3339DateTime.parse(request.args.get('from_time', '2000-01-01'))
until_time_str = request.args.get('until_time', None)
until_time = rfc3339DateTime.parse(until_time_str) if until_time_str is not None else datetime.datetime.now()
until_time = rfc3339DateTime.parse(until_time_str) if until_time_str is not None else datetime.datetime.utcnow()
time_range = (from_time, until_time)
metrics = [
......@@ -314,7 +314,7 @@ class RepoQuantityResource(Resource):
from_time = rfc3339DateTime.parse(request.args.get('from_time', '2000-01-01'))
until_time_str = request.args.get('until_time', None)
until_time = rfc3339DateTime.parse(until_time_str) if until_time_str is not None else datetime.datetime.now()
until_time = rfc3339DateTime.parse(until_time_str) if until_time_str is not None else datetime.datetime.utcnow()
time_range = (from_time, until_time)
except Exception:
abort(400, message='bad parameter types')
......
......@@ -314,7 +314,7 @@ class UploadListResource(Resource):
upload_id=upload_id,
user=g.user,
name=upload_name,
upload_time=datetime.now(),
upload_time=datetime.utcnow(),
upload_path=upload_path,
temporary=local_path != upload_path)
......
......@@ -224,7 +224,7 @@ class Calc(Base):
elif self.upload is not None and self.upload.upload_time is not None:
added_time = self.upload.upload_time
else:
added_time = datetime.now()
added_time = datetime.utcnow()
upload_id = context.upload_id
upload_files = context.upload_files
......
......@@ -58,7 +58,6 @@ import tarfile
import hashlib
import io
import pickle
import cachetools
from nomad import config, utils
from nomad.datamodel import UploadWithMetadata
......@@ -690,30 +689,25 @@ class PublicUploadFilesBasedStagingUploadFiles(StagingUploadFiles):
super().pack(upload, target_dir=self.public_upload_files, skip_raw=True)
class LRUZipFileCache(cachetools.LRUCache):
""" Specialized cache that closes the cached zipfiles on eviction """
def __init__(self, maxsize):
super().__init__(maxsize)
def popitem(self):
key, val = super().popitem()
val.close()
return key, val
class PublicUploadFiles(UploadFiles):
__zip_file_cache = LRUZipFileCache(maxsize=128)
def __init__(self, *args, **kwargs) -> None:
super().__init__(config.fs.public, *args, **kwargs)
self._zipfile_cache: Dict[str, zipfile.ZipFile] = {}
def get_zip_file(self, prefix: str, access: str, ext: str) -> PathObject:
return self.join_file('%s-%s.%s.zip' % (prefix, access, ext))
@cachetools.cached(cache=__zip_file_cache)
def open_zip_file(self, prefix: str, access: str, ext: str) -> zipfile.ZipFile:
zip_file = self.get_zip_file(prefix, access, ext)
return zipfile.ZipFile(zip_file.os_path)
zip_path = self.get_zip_file(prefix, access, ext).os_path
return zipfile.ZipFile(zip_path)
# if zip_path in self._zipfile_cache:
# f = self._zipfile_cache[zip_path]
# else:
# f = zipfile.ZipFile(zip_path)
# self._zipfile_cache[zip_path] = f
# return f
def _file(self, prefix: str, ext: str, path: str, *args, **kwargs) -> IO:
mode = kwargs.get('mode') if len(args) == 0 else args[0]
......@@ -822,3 +816,8 @@ class PublicUploadFiles(UploadFiles):
the restrictions on calculations. This is potentially a long running operation.
"""
raise NotImplementedError()
# TODO with zipfile cache ...
# def close():
# for zip_file in self._zipfile_cache.values():
# zip_file.close()
......@@ -177,7 +177,7 @@ class Proc(Document, metaclass=ProcMetaclass):
assert 'tasks_status' not in kwargs, \
""" do not set the status manually, its managed """
kwargs.setdefault('create_time', datetime.now())
kwargs.setdefault('create_time', datetime.utcnow())
self = cls(**kwargs)
if len(cls.tasks) == 0:
self.tasks_status = SUCCESS
......@@ -245,7 +245,7 @@ class Proc(Document, metaclass=ProcMetaclass):
exc_info=error, error=str(error))
self.errors = [str(error) for error in errors]
self.complete_time = datetime.now()
self.complete_time = datetime.utcnow()
if not failed_with_exception:
errors_str = "; ".join([str(error) for error in errors])
......@@ -302,7 +302,7 @@ class Proc(Document, metaclass=ProcMetaclass):
if self.tasks_status != FAILURE:
assert self.tasks_status == RUNNING, 'Can only complete a running process, process is %s' % self.tasks_status
self.tasks_status = SUCCESS
self.complete_time = datetime.now()
self.complete_time = datetime.utcnow()
self.on_tasks_complete()
self.save()
self.get_logger().info('completed process')
......
......@@ -132,9 +132,10 @@ class Calc(Proc):
if self._calc_proc_logwriter is not None:
program = event_dict.get('normalizer', 'parser')
event = event_dict.get('event', '')
entry = '[%s] %s: %s' % (method_name, program, event)
if len(entry) > 120:
self._calc_proc_logwriter.write(entry[:120])
entry = '[%s] %s, %s: %s' % (method_name, datetime.utcnow().isoformat(), program, event)
if len(entry) > 140:
self._calc_proc_logwriter.write(entry[:140])
self._calc_proc_logwriter.write('...')
else:
self._calc_proc_logwriter.write(entry)
......@@ -161,7 +162,7 @@ class Calc(Proc):
calc_with_metadata.mainfile = self.mainfile
calc_with_metadata.nomad_version = config.version
calc_with_metadata.nomad_commit = config.commit
calc_with_metadata.last_processing = datetime.now()
calc_with_metadata.last_processing = datetime.utcnow()
calc_with_metadata.files = self.upload_files.calc_files(self.mainfile)
self.metadata = calc_with_metadata.to_dict()
......@@ -200,7 +201,7 @@ class Calc(Proc):
calc_with_metadata.upload_time = self.upload.upload_time
calc_with_metadata.nomad_version = config.version
calc_with_metadata.nomad_commit = config.commit
calc_with_metadata.last_processing = datetime.now()
calc_with_metadata.last_processing = datetime.utcnow()
calc_with_metadata.files = self.upload_files.calc_files(self.mainfile)
self.metadata = calc_with_metadata.to_dict()
......@@ -602,11 +603,11 @@ class Upload(Proc):
upload_size=self.upload_files.size):
self.upload_files.delete()
self.published = True
self.publish_time = datetime.now()
self.last_update = datetime.now()
self.publish_time = datetime.utcnow()
self.last_update = datetime.utcnow()
self.save()
else:
self.last_update = datetime.now()
self.last_update = datetime.utcnow()
self.save()
@process
......@@ -855,7 +856,7 @@ class Upload(Proc):
upload_size=self.upload_files.size):
staging_upload_files.delete()
self.last_update = datetime.now()
self.last_update = datetime.utcnow()
self.save()
@task
......
......@@ -512,7 +512,7 @@ def example_user_metadata(other_test_user, test_user) -> dict:
'references': ['http://external.ref/one', 'http://external.ref/two'],
'_uploader': other_test_user.user_id,
'coauthors': [test_user.user_id],
'_upload_time': datetime.datetime.now(),
'_upload_time': datetime.datetime.utcnow(),
'_pid': 256
}
......
......@@ -54,7 +54,7 @@ def run_processing(uploaded: Tuple[str, str], test_user) -> Upload:
uploaded_id, uploaded_path = uploaded
upload = Upload.create(
upload_id=uploaded_id, user=test_user, upload_path=uploaded_path)
upload.upload_time = datetime.now()
upload.upload_time = datetime.utcnow()
assert upload.tasks_status == 'RUNNING'
assert upload.current_task == 'uploading'
......@@ -259,6 +259,9 @@ def test_re_processing(published: Upload, example_user_metadata, monkeypatch, wi
first_calc = published.all_calcs(0, 1).first()
old_calc_time = first_calc.metadata['last_processing']
if not with_failure:
with published.upload_files.archive_log_file(first_calc.calc_id) as f:
old_log_line = f.readline()
old_archive_files = list(
archive_file
for archive_file in os.listdir(published.upload_files.os_path)
......@@ -302,6 +305,12 @@ def test_re_processing(published: Upload, example_user_metadata, monkeypatch, wi
for archive_file in old_archive_files:
assert os.path.getsize(published.upload_files.join_file(archive_file).os_path) > 0
# assert changed archive log files
if not with_failure:
with published.upload_files.archive_log_file(first_calc.calc_id) as f:
new_log_line = f.readline()
assert old_log_line != new_log_line
# assert maintained user metadata (mongo+es)
assert_upload_files(upload, PublicUploadFiles, published=True)
assert_search_upload(upload, published=True)
......
......@@ -422,7 +422,7 @@ class TestUploads:
upload = self.assert_upload(rv.data)
self.assert_processing(client, test_user_auth, upload['upload_id'])
metadata = dict(**example_user_metadata)
metadata['_upload_time'] = datetime.datetime.now().isoformat()
metadata['_upload_time'] = datetime.datetime.utcnow().isoformat()
self.assert_published(client, admin_user_auth, upload['upload_id'], proc_infra, metadata)
def test_post_metadata_forbidden(self, client, proc_infra, test_user_auth, no_warn):
......@@ -443,7 +443,7 @@ class TestUploads:
upload = self.assert_upload(rv.data)
self.assert_processing(client, test_user_auth, upload['upload_id'])
metadata = dict(**example_user_metadata)
metadata['_upload_time'] = datetime.datetime.now().isoformat()
metadata['_upload_time'] = datetime.datetime.utcnow().isoformat()
self.assert_published(client, admin_user_auth, upload['upload_id'], proc_infra, metadata)
self.assert_published(client, admin_user_auth, upload['upload_id'], proc_infra, metadata, publish_with_metadata=False)
......
......@@ -134,6 +134,8 @@ class TestClient:
cli, ['client', 'mirror'], catch_exceptions=False, obj=utils.POPO())
assert result.exit_code == 0
assert published.upload_id in result.output
assert published.upload_files.os_path in result.output
assert proc.Upload.objects(upload_id=published.upload_id).count() == 1
assert proc.Calc.objects(upload_id=published.upload_id).count() == 1
new_search = search.entry_search(search_parameters=dict(upload_id=published.upload_id))
......
......@@ -133,7 +133,7 @@ def test_add_normalized_calc_with_metadata(
coe_upload = Upload(
upload_name='test_upload',
created=datetime.now(),
created=datetime.utcnow(),
user_id=0,
is_processed=True)
coe_calc = Calc(coe_calc_id=calc_with_metadata.pid, upload=coe_upload)
......
......@@ -71,7 +71,7 @@ def generate_calc(pid: int = 0, calc_id: str = None, upload_id: str = None) -> d
self.upload_id = upload_id if upload_id is not None else utils.create_uuid()
self.calc_id = calc_id if calc_id is not None else utils.create_uuid()
self.upload_time = datetime.datetime.now()
self.upload_time = datetime.datetime.utcnow()
self.calc_hash = utils.create_uuid()
self.pid = pid
self.mainfile = random.choice(filepaths)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment