Commit ca81038e authored by David Sikter's avatar David Sikter
Browse files

Adding arguments to process-functions and using it to set embargo data in the new API.

parent 44f7c60f
Pipeline #103294 passed with stages
in 28 minutes and 56 seconds
......@@ -1021,16 +1021,12 @@ async def post_upload_action_publish(
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='The upload is already published.')
metadata_dict: Dict[str, Any] = {'with_embargo': with_embargo}
if with_embargo:
if not embargo_length or not 0 < embargo_length <= 36:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='embargo_length needs to be between 1 and 36 months.')
metadata_dict.update(embargo_length=embargo_length)
if not 1 <= embargo_length <= 36 and not (embargo_length == 0 and not with_embargo):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Invalid embargo_length. Must be between 1 and 36 months.')
try:
upload.compress_and_set_metadata(metadata_dict)
upload.publish_upload()
upload.publish_upload(with_embargo=with_embargo, embargo_length=embargo_length)
except ProcessAlreadyRunning:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
......
......@@ -379,7 +379,9 @@ class Proc(Document, metaclass=ProcMetaclass):
self.reload()
@classmethod
def process_all(cls, func, query: Dict[str, Any], exclude: List[str] = []):
def process_all(
cls, func, query: Dict[str, Any], exclude: List[str] = [],
process_args: List[Any] = [], process_kwargs: Dict[str, Any] = {}):
'''
Allows to run process functions for all objects on the given query. Calling
process functions though the func:`process` wrapper might be slow, because
......@@ -397,9 +399,9 @@ class Proc(Document, metaclass=ProcMetaclass):
process_status=PROCESS_CALLED)})
for obj in cls.objects(**query).exclude(*exclude):
obj._run_process(func)
obj._run_process(func, process_args, process_kwargs)
def _run_process(self, func):
def _run_process(self, func, process_args, process_kwargs):
if hasattr(func, '__process_unwrapped'):
func = getattr(func, '__process_unwrapped')
......@@ -416,7 +418,7 @@ class Proc(Document, metaclass=ProcMetaclass):
logger.debug('calling process function', queue=queue, priority=priority)
return proc_task.apply_async(
args=[cls_name, self_id, func.__name__],
args=[cls_name, self_id, func.__name__, process_args, process_kwargs],
queue=queue, priority=priority)
def __str__(self):
......@@ -556,7 +558,7 @@ def unwarp_task(task, cls_name, self_id, *args, **kwargs):
bind=True, base=NomadCeleryTask, ignore_results=True, max_retries=3,
acks_late=config.celery.acks_late, soft_time_limit=config.celery.timeout,
time_limit=config.celery.timeout * 2)
def proc_task(task, cls_name, self_id, func_attr):
def proc_task(task, cls_name, self_id, func_attr, process_args, process_kwargs):
'''
The celery task that is used to execute async process functions.
It ignores results, since all results are handled via the self document.
......@@ -597,7 +599,7 @@ def proc_task(task, cls_name, self_id, func_attr):
self.process_status = PROCESS_RUNNING
os.chdir(config.fs.working_directory)
with utils.timer(logger, 'process executed on worker'):
deleted = func(self)
deleted = func(self, *process_args, **process_kwargs)
except SoftTimeLimitExceeded as e:
logger.error('exceeded the celery task soft time limit')
self.fail(e)
......@@ -623,7 +625,6 @@ def process(func):
'''
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
assert len(args) == 0 and len(kwargs) == 0, 'process functions must not have arguments'
if self.process_running:
raise ProcessAlreadyRunning('Tried to call a processing function on an already processing process.')
......@@ -631,7 +632,7 @@ def process(func):
self.process_status = PROCESS_CALLED
self.save()
self._run_process(func)
self._run_process(func, args, kwargs)
task = getattr(func, '__task_name', None)
if task is not None:
......
......@@ -908,7 +908,7 @@ class Upload(Proc):
return True # do not save the process status on the delete upload
@process
def publish_upload(self):
def publish_upload(self, with_embargo: bool = None, embargo_length: int = None):
'''
Moves the upload out of staging to the public area. It will
pack the staging upload files in to public upload files.
......@@ -924,7 +924,10 @@ class Upload(Proc):
with utils.timer(logger, 'upload metadata updated'):
def create_update(calc):
calc.published = True
calc.with_embargo = calc.with_embargo if calc.with_embargo is not None else False
if with_embargo is not None:
calc.with_embargo = with_embargo
elif calc.with_embargo is None:
calc.with_embargo = False
return UpdateOne(
{'_id': calc.calc_id},
{'$set': {'metadata': calc.m_to_dict(
......@@ -941,6 +944,8 @@ class Upload(Proc):
if isinstance(self.upload_files, StagingUploadFiles):
with utils.timer(logger, 'upload staging files deleted'):
if embargo_length is not None:
self.embargo_length = embargo_length
self.upload_files.delete()
self.published = True
self.publish_time = datetime.utcnow()
......
......@@ -169,7 +169,7 @@ def index(
# TODO this depends on how we merge section_metadata
def publish(entries: List[EntryMetadata], index: str = None) -> int:
def publish(entries: Iterable[EntryMetadata], index: str = None) -> int:
'''
Publishes the given entries based on their entry metadata. Sets publishes to true,
and updates most user provided metadata with a partial update. Returns the number
......@@ -180,7 +180,7 @@ def publish(entries: List[EntryMetadata], index: str = None) -> int:
def update_metadata(
entries: List[EntryMetadata], index: str = None,
entries: Iterable[EntryMetadata], index: str = None,
update_materials: bool = False, refresh: bool = False,
**kwargs) -> int:
'''
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment