-
Theodore Chang authoredTheodore Chang authored
converter.py 11.44 KiB
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations
import functools
import hashlib
import os.path
import signal
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
from typing import Iterable, Callable
from nomad.config import config
from nomad.archive import to_json, read_archive
from nomad.archive.storage_v2 import ArchiveWriter as ArchiveWriterNew
from nomad.files import StagingUploadFiles, PublicUploadFiles
from nomad.infrastructure import setup
from nomad.processing import Upload
def flush(*args, **kwargs):
print(flush=True, *args, **kwargs)
class Counter:
def __init__(self, total: int):
manager = Manager()
self.counter = manager.Value('i', 0)
self.lock = manager.Lock()
self.total = total
def increment(self):
with self.lock:
self.counter.value += 1
return f'[{self.counter.value}/{self.total}]'
def convert_archive(
original_path: str,
*,
transform: Callable = None,
overwrite: bool = False,
delete_old: bool = False,
counter: Counter = None,
force_repack: bool = False,
size_limit: int = 4,
):
"""
Convert an archive of the old format to the new format.
This code defines a function convert_archive that takes in an original file path, an
optional transformation function, and a flag to indicate whether to overwrite
existing files. The function checks if the file exists and is in the correct
format, then reads the file and performs a conversion if necessary. Finally, it
handles errors and overwrites the original file with the converted version if
necessary.
`transform` is a function that takes in the original file path as an argument, and
returns the transformed file path. If `transform` is not provided, the original
file path is overwritten with the converted version.
`overwrite` is a boolean flag that specifies whether to overwrite the target
file if it already exists. The default value is `False`.
Args:
original_path (str): The path to the original archive file.
transform (Callable, optional): A function to transform the file name. Defaults to None.
overwrite (bool, optional): Whether to overwrite existing files. Defaults to False.
delete_old (bool, optional): Whether to delete the old file after conversion. Defaults to False.
counter (Counter, optional): A counter to track the progress of the conversion. Defaults to None.
force_repack (bool, optional): Force repacking the archive that is already in the new format. Defaults to False.
size_limit (int, optional): The size limit in GB for the archive. Defaults to 4.
"""
prefix: str = counter.increment() if counter else ''
if not os.path.exists(original_path):
flush(f'{prefix} [ERROR] File not found: {original_path}')
return
if not original_path.endswith('.msg'):
flush(f'{prefix} [ERROR] Not a msgpack file: {original_path}')
return
original_path = os.path.abspath(original_path)
if not force_repack:
with open(original_path, 'rb') as f:
magic_bytes = f.read(ArchiveWriterNew.magic_len)
if magic_bytes == ArchiveWriterNew.magic:
flush(
f'{prefix} [INFO] Skipping as already in the new format: {original_path}'
)
return
new_path: str = transform(original_path) if transform else original_path
if os.path.exists(new_path) and not overwrite:
flush(f'{prefix} [ERROR] File already exists: {new_path}')
return
original_size = os.path.getsize(original_path)
if original_size > size_limit * 1024**3:
flush(
f'{prefix} [WARNING] File size exceeds limit {size_limit} GB: {original_path}'
)
return
def safe_remove(path: str):
if not path:
return
try:
os.remove(path)
except OSError:
pass
try:
tmp_path = ''
with read_archive(original_path, use_blocked_toc=False) as reader:
flush(f'{prefix} [INFO] Converting: {original_path}')
tmp_path = (
f'{original_path}.{hashlib.md5(original_path.encode()).hexdigest()}'
)
with ArchiveWriterNew(
tmp_path, len(reader), config.archive.toc_depth
) as writer:
for uuid, entry in reader.items():
writer.add(uuid, to_json(entry))
except Exception as e:
flush(f'{prefix} [ERROR] Failed to convert {original_path}: {e}')
safe_remove(tmp_path)
else:
backup_int = signal.getsignal(signal.SIGINT)
backup_term = signal.getsignal(signal.SIGTERM)
# override SIGINT and SIGTERM to ensure no data loss
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
if os.path.exists(new_path): # overwrite=True
safe_remove(new_path)
# the old path and the new path could be the same
if delete_old and os.path.exists(original_path):
safe_remove(original_path)
os.rename(tmp_path, new_path)
signal.signal(signal.SIGINT, backup_int)
signal.signal(signal.SIGTERM, backup_term)
def convert_folder(
folders: str | list[str],
*,
processes: int = os.cpu_count(),
transform: Callable = None,
if_include: Callable = None,
overwrite: bool = False,
delete_old: bool = False,
force_repack: bool = False,
size_limit: int = 4,
):
"""
Convert archives in the specified folder to the new format using parallel processing.
transform(file_path:str) -> str
if_include(file_path:str) -> bool
Args:
folders (str | list[str]): The path to the folder(s) containing the archives.
processes (int): The number of parallel processes to use (default is 1).
transform (Callable): A function to transform the file name (default is None).
if_include (Callable): A function to filter the files to be converted (default is None).
overwrite (bool): Whether to overwrite existing files (default is False).
delete_old (bool): Whether to delete the old file after conversion (default is False).
force_repack (bool): Force repacking the archive (default is False).
size_limit (int): Size limit in GB for the archive (default is 4).
"""
file_list: list = []
if isinstance(folders, str):
folders = [folders]
if if_include is None:
def _if_include(_):
return True
if_include = _if_include
for folder in folders:
if not os.path.exists(folder):
flush(f'[ERROR] Folder not found: {folder}')
continue
for root, _, files in os.walk(folder):
for file in files:
full_name = os.path.join(root, file)
if file.endswith('.msg') and if_include(full_name):
file_list.append(full_name)
if not file_list:
return
counter = Counter(len(file_list))
_converter = functools.partial(
convert_archive,
transform=transform,
overwrite=overwrite,
delete_old=delete_old,
counter=counter,
force_repack=force_repack,
size_limit=size_limit,
)
with ProcessPoolExecutor(max_workers=processes) as executor:
try:
futures = [executor.submit(_converter, file) for file in file_list]
for index, future in enumerate(futures):
try:
future.result()
except Exception: # noqa
flush(f'[ERROR] (OOM): {file_list[index]}')
except KeyboardInterrupt:
for pid in executor._processes: # noqa
os.kill(pid, signal.SIGTERM)
def convert_upload(
uploads: Upload | Iterable[Upload] | str | Iterable[str],
*,
processes: int = os.cpu_count(),
transform: Callable = None,
if_include: Callable = None,
overwrite: bool = False,
delete_old: bool = False,
force_repack: bool = False,
size_limit: int = 4,
):
"""
Function to convert an upload with the given upload_id to the new format.
transform(file_path:str) -> str
if_include(file_path:str) -> bool
Args:
uploads (str): The upload(s) to be converted.
processes (int, optional): The number of processes to use for conversion. Defaults to 1.
transform (Callable, optional): A function to apply transformation to the file name. Defaults to None.
if_include (Callable, optional): A function to filter the files to be converted. Defaults to None.
overwrite (bool, optional): Whether to overwrite existing files. Defaults to False.
delete_old (bool, optional): Whether to delete the old file after conversion. Defaults to False.
force_repack (bool, optional): Force repacking the existing archive (in new format). Defaults to False.
size_limit (int, optional): Size limit in GB for the archive. Defaults to 4.
"""
if isinstance(uploads, (str, Upload)):
uploads = [uploads]
if not uploads:
return
if isinstance(uploads[0], str): # type: ignore
setup()
uploads = Upload.objects(upload_id__in=uploads)
all_folders: list = []
for upload in uploads:
assert isinstance(upload, Upload)
upload_class = PublicUploadFiles if upload.published else StagingUploadFiles
base_folder = upload_class.base_folder_for(upload.upload_id)
if not os.path.exists(base_folder):
flush(f'[ERROR] Base folder not found for upload: {upload.upload_id}')
continue
all_folders.append(os.path.abspath(base_folder))
convert_folder(
all_folders,
processes=processes,
transform=transform,
if_include=if_include,
overwrite=overwrite,
delete_old=delete_old,
force_repack=force_repack,
size_limit=size_limit,
)
if __name__ == '__main__':
def rename(path):
return path.replace('v1.msg', 'v3.msg')
def only_new(path):
return 'old' not in path
# filter files to be converted
# convert_folder(
# '/home/theodore/PycharmProjects/nomad-FAIR/.volumes/fs/staging',
# transform=rename,
# if_include=only_new,
# )
#
# # delete old archives after successful conversion
# convert_folder(
# '/home/theodore/PycharmProjects/nomad-FAIR/.volumes/fs/staging',
# transform=rename,
# delete_old=True,
# )
#
# # overwrite any existing files
# convert_folder(
# '/home/theodore/PycharmProjects/nomad-FAIR/.volumes/fs/staging',
# transform=rename,
# overwrite=True,
# )
# overwrite existing archive
convert_folder(
'/home/theodore/PycharmProjects/nomad-FAIR/.volumes/fs/staging',
overwrite=True,
)