selfevolveagent / evoagentx /hitl /interceptor_agent.py
iLOVE2D's picture
Upload 2846 files
5374a2d verified
# evoagentx/hitl/interceptor_agent.py
import asyncio
import sys
from typing import Tuple
from ..agents.agent import Agent
from ..actions.action import Action
from .approval_manager import HITLManager
from .hitl import HITLInteractionType, HITLMode, HITLDecision
from ..core.registry import MODULE_REGISTRY
from ..core.logging import logger
class HITLInterceptorAction(Action):
"""HITL Interceptor Action"""
def __init__(
self,
target_agent_name: str,
target_action_name: str,
name: str = None,
description: str = "A pre-defined action to proceed the Human-In-The-Loop",
interaction_type: HITLInteractionType = HITLInteractionType.APPROVE_REJECT,
mode: HITLMode = HITLMode.PRE_EXECUTION,
**kwargs
):
if not name:
name = f"hitl_intercept_{target_agent_name}_{target_action_name}_mode_{mode.value}_action"
super().__init__(
name=name,
description=description,
**kwargs
)
self.target_agent_name = target_agent_name
self.target_action_name = target_action_name
self.interaction_type = interaction_type
self.mode = mode
def execute(self, llm, inputs: dict, hitl_manager: HITLManager, sys_msg: str = None, **kwargs) -> Tuple[dict, str]:
try:
# get current running loop
loop = asyncio.get_running_loop()
if loop:
pass
# if in async context, cannot use asyncio.run()
raise RuntimeError("Cannot use asyncio.run() in async context. Use async_execute directly.")
except RuntimeError:
# if not in async context, use asyncio.run()
return asyncio.run(self.async_execute(llm, inputs, hitl_manager, sys_msg=sys_msg, **kwargs))
async def async_execute(self, llm, inputs: dict, hitl_manager:HITLManager, sys_msg: str = None, **kwargs) -> Tuple[dict, str]:
"""
Asynchronous execution of HITL Interceptor
"""
task_name = kwargs.get('wf_task', 'Unknown Task')
workflow_goal = kwargs.get('wf_goal', None)
# request HITL approval
response = await hitl_manager.request_approval(
task_name=task_name,
agent_name=self.target_agent_name,
action_name=self.target_action_name,
interaction_type=self.interaction_type,
mode=self.mode,
action_inputs_data=inputs,
workflow_goal=workflow_goal
)
result = {
"hitl_decision": response.decision,
"target_agent": self.target_agent_name,
"target_action": self.target_action_name,
"hitl_feedback": response.feedback
}
for output_name in self.outputs_format.get_attrs():
try:
result |= {output_name: inputs[hitl_manager.hitl_input_output_mapping[output_name]]}
except Exception as e:
logger.exception(e)
prompt = f"HITL Interceptor executed for {self.target_agent_name}.{self.target_action_name}"
if result["hitl_decision"] == HITLDecision.APPROVE:
prompt += "\nHITL approved, the action will be executed"
return result, prompt
elif result["hitl_decision"] == HITLDecision.REJECT:
prompt += "\nHITL rejected, the action will not be executed"
sys.exit()
# return result, prompt
class HITLPostExecutionAction(Action):
pass
class HITLBaseAgent(Agent):
"""
Include all Agent classes for hitl use case
"""
def _get_unique_class_name(self, candidate_name: str) -> str:
if not MODULE_REGISTRY.has_module(candidate_name):
return candidate_name
i = 1
while True:
unique_name = f"{candidate_name}V{i}"
if not MODULE_REGISTRY.has_module(unique_name):
break
i += 1
return unique_name
class HITLInterceptorAgent(HITLBaseAgent):
"""HITL Interceptor Agent - Intercept the execution of other agents"""
def __init__(self,
target_agent_name: str,
target_action_name: str,
name: str = None,
interaction_type: HITLInteractionType = HITLInteractionType.APPROVE_REJECT,
mode: HITLMode = HITLMode.PRE_EXECUTION,
**kwargs):
# generate agent name
if target_action_name:
agent_name = f"HITL_Interceptor_{target_agent_name}_{target_action_name}_mode_{mode.value}"
else:
agent_name = f"HITL_Interceptor_{target_agent_name}_mode_{mode.value}"
super().__init__(
name=agent_name,
description=f"HITL Interceptor - Intercept the execution of {target_agent_name}",
is_human=True,
**kwargs
)
self.target_agent_name = target_agent_name
self.target_action_name = target_action_name
self.interaction_type = interaction_type
self.mode = mode
# add intercept action
if mode == HITLMode.PRE_EXECUTION:
action = HITLInterceptorAction(
target_agent_name=target_agent_name,
target_action_name=target_action_name or "any",
interaction_type=interaction_type,
mode=mode
)
elif mode == HITLMode.POST_EXECUTION:
action = HITLPostExecutionAction(
target_agent_name=target_agent_name,
target_action_name=target_action_name or "any",
interaction_type=interaction_type
)
else:
raise ValueError(f"Invalid mode: {mode}")
self.add_action(action)
# self.default_action_name = action.name
def get_hitl_agent_name(self) -> str:
"""
Get the name of the HITL agent. Useful when the name of HITL agent is generated dynamically.
"""
return self.name
class HITLUserInputCollectorAction(Action):
"""HITL User Input Collector Action - Collect user input for the HITL Interceptor"""
def __init__(
self,
name: str = None,
agent_name: str = None,
description: str = "A pre-defined action to collect user input for the HITL Interceptor",
interaction_type: HITLInteractionType = HITLInteractionType.COLLECT_USER_INPUT,
input_fields: dict = None,
**kwargs
):
if not name:
pass # TODO: generate name
super().__init__(name=name, description=description, **kwargs)
self.interaction_type = interaction_type
self.input_fields = input_fields or {}
self.agent_name = agent_name
def execute(self, llm, inputs: dict, hitl_manager: HITLManager, sys_msg: str = None, **kwargs) -> Tuple[dict, str]:
try:
# get current running loop
loop = asyncio.get_running_loop()
if loop:
pass
# if in async context, cannot use asyncio.run()
raise RuntimeError("Cannot use asyncio.run() in async context. Use async_execute directly.")
except RuntimeError:
# if not in async context, use asyncio.run()
return asyncio.run(self.async_execute(llm, inputs, hitl_manager, sys_msg=sys_msg, **kwargs))
async def async_execute(self, llm, inputs: dict, hitl_manager: HITLManager, sys_msg: str = None, **kwargs) -> Tuple[dict, str]:
"""
Asynchronous execution of HITL User Input Collector
"""
task_name = kwargs.get('wf_task', 'Unknown Task')
workflow_goal = kwargs.get('wf_goal', None)
# request user input from HITL manager
response = await hitl_manager.request_user_input(
task_name=task_name,
agent_name=self.agent_name,
action_name=self.name,
input_fields=self.input_fields,
workflow_goal=workflow_goal
)
result = {
"hitl_decision": response.decision,
"collected_user_input": response.modified_content or {},
"hitl_feedback": response.feedback
}
# Map collected user input to outputs if output format is defined
if self.outputs_format:
for output_name in self.outputs_format.get_attrs():
if output_name in response.modified_content:
result[output_name] = response.modified_content[output_name]
prompt = f"HITL User Input Collector executed: {self.name}"
if result["hitl_decision"] == HITLDecision.CONTINUE:
prompt += f"\nUser input collection completed: {result['collected_user_input']}"
return result, prompt
elif result["hitl_decision"] == HITLDecision.REJECT:
prompt += "\nUser cancelled input or error occurred"
sys.exit()
class HITLUserInputCollectorAgent(HITLBaseAgent):
"""HITL User Input Collector Agent - Collect user input for the HITL Interceptor"""
def __init__(self,
name: str = None,
input_fields: dict = None,
interaction_type: HITLInteractionType = HITLInteractionType.COLLECT_USER_INPUT,
**kwargs):
# generate agent name
if name:
agent_name = f"HITL_User_Input_Collector_{name}"
else:
pass # TODO: generate name
super().__init__(
name=agent_name,
description="HITL User Input Collector - Collect predefined user inputs",
is_human=True,
**kwargs
)
self.interaction_type = interaction_type
self.input_fields = input_fields or {}
# generation Action name
action_name_validated = False
name_i = 0
action_name = None
while not action_name_validated:
action_name = "HITLUserInputCollectorAction"+f"_{name_i}"
if MODULE_REGISTRY.has_module(action_name):
continue
else:
action_name_validated = True
# add user input collector action
action = HITLUserInputCollectorAction(
name=action_name,
agent_name=agent_name,
interaction_type=interaction_type,
input_fields=self.input_fields
)
self.add_action(action)
def get_hitl_agent_name(self) -> str:
"""
Get the name of the HITL agent. Useful when the name of HITL agent is generated dynamically.
"""
return self.name
def set_input_fields(self, input_fields: dict):
"""Set the input fields for user input collection"""
self.input_fields = input_fields
# Update the action's input fields as well
for action in self.actions:
if isinstance(action, HITLUserInputCollectorAction):
action.input_fields = input_fields
class HITLConversationAgent(HITLBaseAgent):
pass
class HITLConversationAction(Action):
pass