Allow actions to request user input for human-in-the-loop interactions
Closes #2484 (closed)
User Input Support for Workflows
- Action State Tracking: Added user_input_requests to the MongoDB ActionDocument and API models so workflows can track when they are waiting for human input.
- Workflow Pausing: Centralized a new request_user_input_activity within the core actions manager that any Temporal workflow can use to register that it needs input.
- Schema Generation: Overhauled schema extraction to dynamically inspect @workflow.signal definitions, automatically generating JSON Schemas for the expected inputs.
- Input Submission API: Exposed a new /{action_instance_id}/user-input HTTP POST endpoint that takes a payload and relays it as a signal to the Temporal workflow.
UI is still wip, but rough idea of how it works:
Example workflow:
Only step 2 is NOMAD specific code, everything else is standard Temporal features.
class UserInputExampleWorkflow:
def __init__(self) -> None:
self._user_input: UserInputData | None = None
@workflow.signal
def provide_input(self, data: UserInputData) -> None:
self._user_input = data
@workflow.run
async def run(self, data: UserInputExampleWorkflowInput) -> dict:
# 1. Generate a random number between the specified bounds
random_number = await workflow.execute_activity(
generate_random_number_activity,
args=[data.lower_bound, data.upper_bound],
start_to_close_timeout=timedelta(seconds=10),
)
markdown_content = f"""### Random Number Generated
We have successfully generated a random number for you!
**Details:**
- Generated value: `{random_number}`
- Requested range: `[{data.lower_bound}, {data.upper_bound}]`
Please review this number and **approve** or **reject** it below.
"""
# 2. Ask the backend to log that we are waiting for user input
await request_user_input(
action_instance_id=workflow.info().workflow_id,
user_id=data.user_id,
signal_fn_name='provide_input',
title='Review Required',
description=f'Please approve or reject the randomly generated number: {random_number}.',
content=markdown_content,
)
# 3. Suspend workflow execution until the signal is received
await workflow.wait_condition(lambda: self._user_input is not None)
# 4. Resume and return the final decision regarding the random number
decision_str = "approved" if self._user_input.decision.lower() == "approve" else "rejected"
return {
'status': 'success',
'message': f"user {decision_str} {random_number}",
'user_notes': self._user_input.notes,
}
Edited by Ahmed Ilyas



