Spaces:
Sleeping
Sleeping
| """ | |
| Hybrid Executor module for the User Interaction Layer. | |
| This module supports both autonomous and human-assisted operation modes, | |
| allowing users to pause, override, or provide input during task execution. | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| import time | |
| import uuid | |
| from typing import Dict, List, Any, Optional, Union, Callable | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class HybridExecutor: | |
| """ | |
| Enables hybrid operation with human assistance. | |
| This class manages the execution of tasks with optional human intervention | |
| and incorporates feedback for continuous improvement. | |
| """ | |
| def __init__(self, action_executor): | |
| """ | |
| Initialize the HybridExecutor. | |
| Args: | |
| action_executor: ActionExecutor instance for executing actions | |
| """ | |
| self.action_executor = action_executor | |
| self.pending_user_inputs = {} # Request ID -> future object | |
| self.user_feedback = {} # Action ID -> feedback | |
| self.human_assistance_mode = "review" # "review", "approval", "manual", "autonomous" | |
| logger.info("HybridExecutor instance created") | |
| async def initialize(self): | |
| """Initialize resources.""" | |
| logger.info("HybridExecutor initialized successfully") | |
| return True | |
| async def execute_task(self, task: Dict, human_assist: bool = False) -> Dict: | |
| """ | |
| Execute a task with optional human assistance. | |
| Args: | |
| task: Task configuration | |
| human_assist: Whether to use human-assisted mode | |
| Returns: | |
| Dict: Task execution result | |
| """ | |
| task_id = task.get("id", str(uuid.uuid4())) | |
| start_time = time.time() | |
| # If human assistance is not required, execute autonomously | |
| if not human_assist: | |
| return await self.execute_autonomous_action(task) | |
| # Determine the mode of human assistance | |
| mode = task.get("human_assist_mode", self.human_assistance_mode) | |
| try: | |
| if mode == "review": | |
| # Execute autonomously but allow human review after execution | |
| result = await self.execute_autonomous_action(task) | |
| # Store result for potential review | |
| self.user_feedback[task_id] = { | |
| "task": task, | |
| "result": result, | |
| "feedback": None, | |
| "timestamp": time.time() | |
| } | |
| return result | |
| elif mode == "approval": | |
| # Get approval before execution | |
| approval = await self.get_user_approval(task) | |
| if approval.get("approved", False): | |
| # Execute with any user modifications | |
| modified_task = approval.get("modified_task", task) | |
| return await self.execute_autonomous_action(modified_task) | |
| else: | |
| # User rejected the task | |
| return { | |
| "success": False, | |
| "error": "Task rejected by user", | |
| "elapsed_time": time.time() - start_time | |
| } | |
| elif mode == "manual": | |
| # Let user specify the exact action to take | |
| user_action = await self.get_user_input(task) | |
| if user_action: | |
| # Execute the user-specified action | |
| return await self.execute_user_action(user_action) | |
| else: | |
| # User did not provide input | |
| return { | |
| "success": False, | |
| "error": "No user input provided", | |
| "elapsed_time": time.time() - start_time | |
| } | |
| else: # "autonomous" or any other value | |
| # Execute autonomously | |
| return await self.execute_autonomous_action(task) | |
| except Exception as e: | |
| logger.error(f"Error in hybrid execution: {str(e)}") | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "elapsed_time": time.time() - start_time | |
| } | |
| async def execute_autonomous_action(self, task: Dict) -> Dict: | |
| """ | |
| Execute an action autonomously without human intervention. | |
| Args: | |
| task: Task configuration | |
| Returns: | |
| Dict: Action execution result | |
| """ | |
| # Use the action executor to perform the action | |
| return await self.action_executor.execute_action(task) | |
| async def execute_user_action(self, action: Dict) -> Dict: | |
| """ | |
| Execute an action specified by the user. | |
| Args: | |
| action: User-specified action configuration | |
| Returns: | |
| Dict: Action execution result | |
| """ | |
| # Record that this is a user-specified action | |
| action["source"] = "user" | |
| # Use the action executor to perform the action | |
| return await self.action_executor.execute_action(action) | |
| async def get_user_approval(self, task: Dict) -> Dict: | |
| """ | |
| Get user approval for a task. | |
| Args: | |
| task: Task configuration to get approval for | |
| Returns: | |
| Dict: Approval result with potential task modifications | |
| """ | |
| request_id = str(uuid.uuid4()) | |
| future = asyncio.get_event_loop().create_future() | |
| self.pending_user_inputs[request_id] = future | |
| try: | |
| # Here, we'd typically send a request to the UI for approval | |
| # For now, we'll simulate with a timeout | |
| approval = await asyncio.wait_for(future, timeout=60) # 60-second timeout | |
| # Clean up | |
| del self.pending_user_inputs[request_id] | |
| return approval | |
| except asyncio.TimeoutError: | |
| # Handle timeout (no user response) | |
| del self.pending_user_inputs[request_id] | |
| return { | |
| "approved": False, | |
| "reason": "User did not respond within the timeout period", | |
| "request_id": request_id | |
| } | |
| async def get_user_input(self, context: Dict) -> Optional[Dict]: | |
| """ | |
| Get input from the user for an action. | |
| Args: | |
| context: Context information for the user input | |
| Returns: | |
| Optional[Dict]: User input or None if not provided | |
| """ | |
| request_id = str(uuid.uuid4()) | |
| future = asyncio.get_event_loop().create_future() | |
| self.pending_user_inputs[request_id] = future | |
| try: | |
| # Here, we'd typically send a request to the UI for input | |
| # For now, we'll simulate with a timeout | |
| user_input = await asyncio.wait_for(future, timeout=60) # 60-second timeout | |
| # Clean up | |
| del self.pending_user_inputs[request_id] | |
| return user_input | |
| except asyncio.TimeoutError: | |
| # Handle timeout (no user response) | |
| del self.pending_user_inputs[request_id] | |
| return None | |
| def provide_user_input(self, request_id: str, input_data: Dict) -> bool: | |
| """ | |
| Provide user input for a pending request. | |
| This method is called externally when user input is received. | |
| Args: | |
| request_id: ID of the request | |
| input_data: User input data | |
| Returns: | |
| bool: True if the input was provided to a pending request, False otherwise | |
| """ | |
| if request_id in self.pending_user_inputs: | |
| future = self.pending_user_inputs[request_id] | |
| if not future.done(): | |
| future.set_result(input_data) | |
| return True | |
| return False | |
| def provide_user_approval(self, request_id: str, approved: bool, modified_task: Dict = None) -> bool: | |
| """ | |
| Provide user approval for a pending request. | |
| This method is called externally when user approval is received. | |
| Args: | |
| request_id: ID of the approval request | |
| approved: Whether the task is approved | |
| modified_task: Optional modified task configuration | |
| Returns: | |
| bool: True if the approval was provided to a pending request, False otherwise | |
| """ | |
| if request_id in self.pending_user_inputs: | |
| future = self.pending_user_inputs[request_id] | |
| if not future.done(): | |
| future.set_result({ | |
| "approved": approved, | |
| "modified_task": modified_task, | |
| "timestamp": time.time(), | |
| "request_id": request_id | |
| }) | |
| return True | |
| return False | |
| def provide_feedback(self, action_id: str, feedback: Dict) -> bool: | |
| """ | |
| Provide feedback for a completed action. | |
| Args: | |
| action_id: ID of the action | |
| feedback: Feedback data | |
| Returns: | |
| bool: True if feedback was recorded, False otherwise | |
| """ | |
| if action_id in self.user_feedback: | |
| self.user_feedback[action_id]["feedback"] = feedback | |
| self.user_feedback[action_id]["feedback_timestamp"] = time.time() | |
| return True | |
| return False | |
| def set_assistance_mode(self, mode: str) -> bool: | |
| """ | |
| Set the human assistance mode. | |
| Args: | |
| mode: Assistance mode ("review", "approval", "manual", "autonomous") | |
| Returns: | |
| bool: True if the mode was set successfully, False otherwise | |
| """ | |
| valid_modes = ["review", "approval", "manual", "autonomous"] | |
| if mode.lower() in valid_modes: | |
| self.human_assistance_mode = mode.lower() | |
| logger.info(f"Human assistance mode set to {mode}") | |
| return True | |
| logger.warning(f"Invalid human assistance mode: {mode}") | |
| return False | |
| def get_assistance_mode(self) -> str: | |
| """ | |
| Get the current human assistance mode. | |
| Returns: | |
| str: Current assistance mode | |
| """ | |
| return self.human_assistance_mode | |
| def register_user_input_callback(self, callback: Callable[[str, Dict], None]) -> str: | |
| """ | |
| Register a callback for when user input is needed. | |
| Args: | |
| callback: Callback function taking request_id and context | |
| Returns: | |
| str: Registration ID | |
| """ | |
| # This would typically be implemented with a proper event system | |
| # For now, we'll just return a placeholder | |
| return "callback-registration-placeholder" | |