selfevolveagent / evoagentx /hitl /approval_manager.py
iLOVE2D's picture
Upload 2846 files
5374a2d verified
import asyncio
import json
import sys
from typing import Dict
from pydantic import Field
from ..core.logging import logger
from ..core import BaseModule
from .hitl import HITLRequest, HITLResponse, HITLDecision, HITLContext, HITLInteractionType, HITLMode
class HITLManager(BaseModule):
"""
HITL Manager - Manages Human-in-the-Loop interactions
It must be instancialized and add as a parameter to the WorkFlow instance like: workflow = WorkFlow(graph=graph, llm=llm, agent_manager=AgentManager(agents=agents), hitl_manager=hitl_manager)
"""
# Pydantic fields
active: bool = Field(default=False, description="Whether HITL is currently active")
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._pending_requests: Dict[str, asyncio.Future] = {}
self.hitl_input_output_mapping = {}
def init_module(self):
"""Module initialization"""
self._pending_requests: Dict[str, asyncio.Future] = {}
def activate(self):
"""activate HITL feature"""
self.active = True
logger.info("HITL feature activated")
def deactivate(self):
"""deactivate HITL feature"""
self.active = False
logger.info("HITL feature deactivated")
@property
def is_active(self) -> bool:
return self.active
async def request_approval(
self,
task_name: str,
agent_name: str,
action_name: str,
interaction_type: HITLInteractionType,
mode: HITLMode,
action_inputs_data: dict = None,
execution_result = None,
workflow_goal: str = None,
display_context: Dict = None,
timeout: float = 1800.0
) -> HITLResponse:
"""Request human approval"""
if not self.active:
# HITL is not active, auto-approved
return HITLResponse(
request_id="auto_approved",
decision=HITLDecision.APPROVE,
feedback="HITL not active, auto-approved"
)
# create HITL context
context = HITLContext(
task_name=task_name,
agent_name=agent_name,
action_name=action_name,
workflow_goal=workflow_goal,
action_inputs=action_inputs_data or {},
execution_result=execution_result,
display_context=display_context or {}
)
# generate prompt message
prompt_message = self._generate_prompt_message(interaction_type, mode, context)
# create request
request = HITLRequest(
interaction_type=interaction_type,
mode=mode,
context=context,
prompt_message=prompt_message
)
# create Future to wait for response
future = asyncio.Future()
self._pending_requests[request.request_id] = future
# display request and wait for response in CLI
try:
response = await self._handle_cli_interaction(request, timeout)
future.set_result(response)
return response
except asyncio.TimeoutError:
response = HITLResponse(
request_id=request.request_id,
decision=HITLDecision.REJECT,
feedback="Timeout: No human response received"
)
future.set_result(response)
return response
finally:
# clean up
self._pending_requests.pop(request.request_id, None)
async def _handle_cli_interaction(self, request: HITLRequest, timeout: float) -> HITLResponse:
"""handle cli interaction"""
print("\n" + "="*80)
print("🔔 Human-in-the-Loop approval request")
print("="*80)
print(request.prompt_message)
print("="*80)
try:
if request.interaction_type == HITLInteractionType.APPROVE_REJECT:
return await self._handle_approve_reject(request)
elif request.interaction_type == HITLInteractionType.REVIEW_EDIT_STATE:
return await self._handle_review_edit(request)
elif request.interaction_type == HITLInteractionType.REVIEW_TOOL_CALLS:
return await self._handle_tool_calls(request)
elif request.interaction_type == HITLInteractionType.MULTI_TURN_CONVERSATION:
return await self._handle_conversation(request)
else:
return HITLResponse(
request_id=request.request_id,
decision=HITLDecision.REJECT,
feedback="Unknown interaction type"
)
except Exception as e:
logger.error(f"CLI interaction error: {e}")
return HITLResponse(
request_id=request.request_id,
decision=HITLDecision.REJECT,
feedback=f"Error: {str(e)}"
)
async def _handle_approve_reject(self, request: HITLRequest) -> HITLResponse:
"""handle approve/reject"""
def get_user_input():
while True:
choice = input("\nPlease select [a]pprove / [r]eject: ").lower().strip()
if choice in ['a', 'approve']:
return HITLDecision.APPROVE
elif choice in ['r', 'reject']:
return HITLDecision.REJECT
print("Invalid input, please input 'a' or 'r'")
# run blocking input in event loop
loop = asyncio.get_event_loop()
decision = await loop.run_in_executor(None, get_user_input)
feedback = ""
if decision == HITLDecision.REJECT:
def get_feedback():
return input("Please provide the reason for rejection (optional): ").strip()
feedback = await loop.run_in_executor(None, get_feedback)
return HITLResponse(
request_id=request.request_id,
decision=decision,
feedback=feedback if feedback else None
)
async def _handle_review_edit(self, request: HITLRequest) -> HITLResponse:
"""handle review edit"""
# TODO: implement review edit
raise NotImplementedError("Not implemented HITL type: HITLInteractionType.REVIEW_EDIT_STATE")
# def get_user_input():
# print(f"\nCurrent execution result:")
# result = request.context.execution_result
# if isinstance(result, dict):
# print(json.dumps(result, ensure_ascii=False, indent=2))
# else:
# print(str(result))
# while True:
# choice = input("\nPlease select [a]pprove / [m]odify / [r]eject: ").lower().strip()
# if choice in ['a', 'approve']:
# return HITLDecision.APPROVE, None
# elif choice in ['r', 'reject']:
# return HITLDecision.REJECT, None
# elif choice in ['m', 'modify']:
# new_content = input("Please input the modified content (JSON format): ").strip()
# try:
# if new_content:
# modified = json.loads(new_content)
# return HITLDecision.MODIFY, modified
# else:
# return HITLDecision.APPROVE, None
# except json.JSONDecodeError:
# print("JSON format error, please input again")
# continue
# print("Invalid input, please input 'a', 'm' or 'r'")
# loop = asyncio.get_event_loop()
# decision, modified_content = await loop.run_in_executor(None, get_user_input)
# return HITLResponse(
# request_id=request.request_id,
# decision=decision,
# modified_content=modified_content
# )
async def _handle_tool_calls(self, request: HITLRequest) -> HITLResponse:
"""handle tool calls review"""
# TODO: implement tool calls review
raise NotImplementedError("Not implemented HITL type: HITLInteractionType.REVIEW_TOOL_CALLS")
# def get_user_input():
# tool_calls = request.context.action_inputs.get('tool_calls', [])
# print(f"\nTool calls:")
# print(json.dumps(tool_calls, ensure_ascii=False, indent=2))
# while True:
# choice = input("\nPlease select [a]pprove / [m]odify / [r]eject: ").lower().strip()
# if choice in ['a', 'approve']:
# return HITLDecision.APPROVE, None
# elif choice in ['r', 'reject']:
# return HITLDecision.REJECT, None
# elif choice in ['m', 'modify']:
# new_calls = input("Please input the modified tool calls (JSON format): ").strip()
# try:
# if new_calls:
# modified = json.loads(new_calls)
# return HITLDecision.MODIFY, modified
# else:
# return HITLDecision.APPROVE, None
# except json.JSONDecodeError:
# print("JSON format
# loop = asyncio.get_event_loop()
# decision, modified_content = await loop.run_in_executor(None, get_user_input)
# return HITLResponse(
# request_id=request.request_id,
# decision=decision,
# modified_content=modified_content
# )
async def _handle_conversation(self, request: HITLRequest) -> HITLResponse:
"""handle multi-turn conversation"""
# TODO: implement multi-turn conversation
raise NotImplementedError("Not implemented HITL type: HITLInteractionType.MULTI_TURN_CONVERSATION")
# def get_user_input():
# user_input = input("\nPlease input the guidance content (or 'continue' to continue): ").strip()
# if user_input.lower() == 'continue':
# return HITLDecision.CONTINUE, None
# elif user_input.lower() in ['reject', 'stop']:
# return HITLDecision.REJECT, None
# else:
# return HITLDecision.MODIFY, user_input
# loop = asyncio.get_event_loop()
# decision, content = await loop.run_in_executor(None, get_user_input)
# return HITLResponse(
# request_id=request.request_id,
# decision=decision,
# modified_content=content,
# feedback=content
# )
def _generate_prompt_message(
self,
interaction_type: HITLInteractionType,
mode: HITLMode,
context: HITLContext
) -> str:
"""generate prompt message"""
base_info = f"""
Task: {context.task_name}
Agent: {context.agent_name}
Action: {context.action_name}
Workflow Goal: {context.workflow_goal or 'N/A'}
Mode: {'Pre-Execution Approval' if mode == HITLMode.PRE_EXECUTION else 'Post-Execution Review'}
"""
if mode == HITLMode.PRE_EXECUTION:
base_info += f"\nparameters to be executed:\n{json.dumps(context.action_inputs, ensure_ascii=False, indent=2)}"
else:
base_info += f"\nexecution_result:\n{json.dumps(context.execution_result, ensure_ascii=False, indent=2) if context.execution_result else 'None'}"
return base_info
async def request_user_input(
self,
task_name: str,
agent_name: str,
action_name: str,
input_fields: dict,
workflow_goal: str = None,
display_context: dict = None,
timeout: float = 3600.0
) -> HITLResponse:
"""Request user input based on predefined fields"""
if not self.active:
# HITL is not active, return empty input
return HITLResponse(
request_id="auto_approved",
decision=HITLDecision.CONTINUE,
modified_content={},
feedback="HITL not active, returning empty input"
)
# create HITL context
context = HITLContext(
task_name=task_name,
agent_name=agent_name,
action_name=action_name,
workflow_goal=workflow_goal,
action_inputs={"input_fields": input_fields},
execution_result=None,
display_context=display_context or {}
)
# generate prompt message for user input collection
prompt_message = self._generate_user_input_prompt_message(context, input_fields)
# create request
request = HITLRequest(
interaction_type=HITLInteractionType.COLLECT_USER_INPUT,
mode=HITLMode.PRE_EXECUTION,
context=context,
prompt_message=prompt_message
)
# create Future to wait for response
future = asyncio.Future()
self._pending_requests[request.request_id] = future
# display request and wait for response in CLI
try:
response = await self._handle_user_input_collection(request, input_fields, timeout)
future.set_result(response)
return response
except asyncio.TimeoutError:
response = HITLResponse(
request_id=request.request_id,
decision=HITLDecision.REJECT,
feedback="Timeout: No user input received"
)
future.set_result(response)
return response
finally:
# clean up
self._pending_requests.pop(request.request_id, None)
async def _handle_user_input_collection(self, request: HITLRequest, input_fields: dict, timeout: float) -> HITLResponse:
"""Handle user input collection"""
print("\n" + "="*80)
print("📝 User input collection request")
print("="*80)
print(request.prompt_message)
print("="*80)
try:
def get_user_inputs():
collected_inputs = {}
print("\nPlease provide the following inputs:")
for field_name, field_info in input_fields.items():
field_type = field_info.get('type', 'string')
description = field_info.get('description', '')
required = field_info.get('required', True)
default_value = field_info.get('default', None)
while True:
prompt_text = f"\n{field_name}"
if description:
prompt_text += f" ({description})"
if not required:
prompt_text += " [optional]"
if default_value is not None:
prompt_text += f" [default: {default_value}]"
prompt_text += ": "
user_input = input(prompt_text).strip()
# Handle empty input
if not user_input:
if not required and default_value is not None:
user_input = str(default_value)
elif not required:
user_input = ""
else:
print(f"Field '{field_name}' is required, please provide input.")
continue
# Type conversion and validation
try:
if field_type == 'int':
# collected_inputs[field_name] = int(user_input) if user_input else None
collected_inputs[field_name] = str(user_input) if user_input else None # prepare_action_prompt do not accept int
elif field_type == 'float':
# collected_inputs[field_name] = float(user_input) if user_input else None
collected_inputs[field_name] = str(user_input) if user_input else None
elif field_type == 'bool':
collected_inputs[field_name] = user_input.lower() in ['true', '1', 'yes', 'y'] if user_input else None
else: # string or others
collected_inputs[field_name] = user_input
break
except ValueError:
print(f"Input format error, field '{field_name}' needs {field_type} type value.")
continue
# Confirmation
print("\nCollected inputs:")
for field_name, value in collected_inputs.items():
print(f" {field_name}: {value}")
while True:
confirm = input("\nConfirm these inputs? [y]es / [n]o / [r]etry: ").lower().strip()
if confirm in ['y', 'yes']:
return collected_inputs
elif confirm in ['n', 'no']:
sys.exit()
elif confirm in ['r', 'retry']:
return get_user_inputs() # Retry input collection
else:
print("Invalid input, please input 'y', 'n' or 'r'")
# Run blocking input in event loop
loop = asyncio.get_event_loop()
collected_data = await loop.run_in_executor(None, get_user_inputs)
if collected_data is not None:
return HITLResponse(
request_id=request.request_id,
decision=HITLDecision.CONTINUE,
modified_content=collected_data,
feedback="User input collection completed"
)
else:
return HITLResponse(
request_id=request.request_id,
decision=HITLDecision.REJECT,
feedback="User cancelled input"
)
except Exception as e:
logger.error(f"User input collection error: {e}")
return HITLResponse(
request_id=request.request_id,
decision=HITLDecision.REJECT,
feedback=f"Error: {str(e)}"
)
def _generate_user_input_prompt_message(self, context: HITLContext, input_fields: dict) -> str:
"""Generate prompt message for user input collection"""
base_info = f"""
Task: {context.task_name}
Agent: {context.agent_name}
Action: {context.action_name}
Workflow Goal: {context.workflow_goal or 'N/A'}
User input fields to be collected:
"""
for field_name, field_info in input_fields.items():
field_type = field_info.get('type', 'string')
description = field_info.get('description', '')
required = field_info.get('required', True)
default_value = field_info.get('default', None)
base_info += f"\n- {field_name} ({field_type})"
if description:
base_info += f": {description}"
if not required:
base_info += " [optional]"
if default_value is not None:
base_info += f" [default: {default_value}]"
return base_info