Spaces:
Sleeping
Sleeping
| """ | |
| Task Planner module for the Planning & Reasoning Layer. | |
| This module implements task decomposition and decision-making capabilities, | |
| using chain-of-thought or tree-of-thought reasoning for complex tasks. | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| import time | |
| from typing import Dict, List, Any, Optional, Union | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class TaskPlanner: | |
| """ | |
| Decomposes tasks and makes dynamic decisions. | |
| This class uses LFMs for chain-of-thought or tree-of-thought reasoning | |
| to break down high-level goals into actionable steps. | |
| """ | |
| def __init__(self): | |
| """Initialize the TaskPlanner.""" | |
| self.llm_client = None | |
| self.planning_model = os.environ.get("PLANNING_MODEL", "gpt-4-turbo") | |
| self.planning_approach = os.environ.get("PLANNING_APPROACH", "chain-of-thought") | |
| # Cache for similar tasks planning | |
| self.plan_cache = {} | |
| logger.info("TaskPlanner instance created") | |
| async def initialize(self): | |
| """Initialize resources.""" | |
| try: | |
| import openai | |
| self.llm_client = openai.AsyncClient( | |
| api_key=os.environ.get("OPENAI_API_KEY") | |
| ) | |
| logger.info("TaskPlanner initialized successfully") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error initializing task planner: {str(e)}") | |
| return False | |
| async def decompose_task(self, high_level_goal: str, current_state: Dict = None, urls: List[str] = None) -> Dict: | |
| """ | |
| Decompose a high-level goal into actionable steps. | |
| Args: | |
| high_level_goal: The high-level task description | |
| current_state: Current state of the system (optional) | |
| urls: Relevant URLs for the task (optional) | |
| Returns: | |
| Dict: Decomposed task plan with actionable steps | |
| """ | |
| start_time = time.time() | |
| try: | |
| # Check for cached similar plans | |
| cache_key = self._generate_cache_key(high_level_goal, urls) | |
| if cache_key in self.plan_cache: | |
| # Adapt the cached plan to the current task | |
| return await self._adapt_cached_plan(self.plan_cache[cache_key], high_level_goal, current_state) | |
| # Generate a plan based on the planning approach | |
| if self.planning_approach == "chain-of-thought": | |
| plan = await self._chain_of_thought_planning(high_level_goal, current_state, urls) | |
| elif self.planning_approach == "tree-of-thought": | |
| plan = await self._tree_of_thought_planning(high_level_goal, current_state, urls) | |
| else: | |
| # Default to chain-of-thought | |
| plan = await self._chain_of_thought_planning(high_level_goal, current_state, urls) | |
| # Cache the plan for future reference | |
| self.plan_cache[cache_key] = plan | |
| elapsed_time = time.time() - start_time | |
| logger.info(f"Task decomposition completed in {elapsed_time:.2f} seconds") | |
| return plan | |
| except Exception as e: | |
| elapsed_time = time.time() - start_time | |
| logger.error(f"Error decomposing task: {str(e)}") | |
| # Return a minimal plan with the error | |
| return { | |
| "task": high_level_goal, | |
| "error": str(e), | |
| "steps": [], | |
| "elapsed_time": elapsed_time | |
| } | |
| async def _chain_of_thought_planning(self, goal: str, current_state: Dict = None, urls: List[str] = None) -> Dict: | |
| """ | |
| Use chain-of-thought reasoning to decompose a task. | |
| Args: | |
| goal: High-level goal description | |
| current_state: Current state of the system (optional) | |
| urls: Relevant URLs for the task (optional) | |
| Returns: | |
| Dict: Decomposed task plan | |
| """ | |
| if not self.llm_client: | |
| raise ValueError("LLM client not initialized") | |
| url_context = "No specific URLs provided." | |
| if urls: | |
| url_context = f"Task involves the following URLs: {', '.join(urls)}" | |
| state_context = "No specific current state information provided." | |
| if current_state: | |
| state_context = f"Current state: {json.dumps(current_state, indent=2)}" | |
| prompt = f""" | |
| I need to decompose this high-level task into a sequence of executable steps: | |
| Task: {goal} | |
| {url_context} | |
| {state_context} | |
| Please think step by step and break this task down into: | |
| 1. A sequence of actionable steps that a browser automation agent can execute | |
| 2. Each step should have a specific action type (navigate, click, type, etc.) | |
| 3. Include necessary parameters for each action | |
| 4. Consider potential error cases and decision points | |
| For each step, provide: | |
| - A clear description of what the step does | |
| - The action type (navigate, click, type, select, wait, extract, api_call, etc.) | |
| - All required parameters for that action type | |
| - Any conditional logic or decision points | |
| Structure your response as a valid JSON object with fields: | |
| - task: the original task | |
| - steps: array of step objects with action parameters | |
| - estimated_time_seconds: estimated time to complete all steps | |
| - potential_issues: array of potential issues that might arise | |
| """ | |
| response = await self.llm_client.chat.completions.create( | |
| model=self.planning_model, | |
| messages=[ | |
| {"role": "system", "content": "You are a specialized AI task planner that decomposes high-level tasks into precise, executable steps for a browser automation agent. You excel at translating goals into structured action plans."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| response_format={"type": "json_object"} | |
| ) | |
| # Parse and validate the plan | |
| plan_json = response.choices[0].message.content | |
| plan = json.loads(plan_json) | |
| # Ensure plan has required fields | |
| if "steps" not in plan: | |
| plan["steps"] = [] | |
| if "task" not in plan: | |
| plan["task"] = goal | |
| if "estimated_time_seconds" not in plan: | |
| plan["estimated_time_seconds"] = 60 # Default estimate | |
| if "potential_issues" not in plan: | |
| plan["potential_issues"] = [] | |
| return plan | |
| async def _tree_of_thought_planning(self, goal: str, current_state: Dict = None, urls: List[str] = None) -> Dict: | |
| """ | |
| Use tree-of-thought reasoning to decompose a task with alternatives. | |
| Args: | |
| goal: High-level goal description | |
| current_state: Current state of the system (optional) | |
| urls: Relevant URLs for the task (optional) | |
| Returns: | |
| Dict: Decomposed task plan with alternatives | |
| """ | |
| if not self.llm_client: | |
| raise ValueError("LLM client not initialized") | |
| url_context = "No specific URLs provided." | |
| if urls: | |
| url_context = f"Task involves the following URLs: {', '.join(urls)}" | |
| state_context = "No specific current state information provided." | |
| if current_state: | |
| state_context = f"Current state: {json.dumps(current_state, indent=2)}" | |
| prompt = f""" | |
| I need to decompose this high-level task into a sequence of executable steps with alternatives for complex decision points: | |
| Task: {goal} | |
| {url_context} | |
| {state_context} | |
| Please use tree-of-thought reasoning to: | |
| 1. Break this task down into a primary sequence of actionable steps | |
| 2. For complex steps, provide alternative approaches (creating a tree structure) | |
| 3. Each step should have a specific action type and parameters | |
| 4. Include decision logic to choose between alternatives based on runtime conditions | |
| For each step, provide: | |
| - A clear description of what the step does | |
| - The action type (navigate, click, type, select, wait, extract, api_call, etc.) | |
| - All required parameters for that action type | |
| - For complex steps, alternative approaches with conditions for choosing each approach | |
| Structure your response as a valid JSON object with fields: | |
| - task: the original task | |
| - steps: array of step objects with action parameters and alternatives | |
| - decision_points: array of points where runtime decisions must be made | |
| - estimated_time_seconds: estimated time to complete | |
| - potential_issues: array of potential issues | |
| """ | |
| response = await self.llm_client.chat.completions.create( | |
| model=self.planning_model, | |
| messages=[ | |
| {"role": "system", "content": "You are a specialized AI task planner that decomposes complex tasks using tree-of-thought reasoning, considering multiple approaches and decision points. You create structured plans with alternatives for a browser automation agent."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| response_format={"type": "json_object"} | |
| ) | |
| # Parse and validate the plan | |
| plan_json = response.choices[0].message.content | |
| plan = json.loads(plan_json) | |
| # Ensure plan has required fields | |
| if "steps" not in plan: | |
| plan["steps"] = [] | |
| if "task" not in plan: | |
| plan["task"] = goal | |
| if "decision_points" not in plan: | |
| plan["decision_points"] = [] | |
| if "estimated_time_seconds" not in plan: | |
| plan["estimated_time_seconds"] = 60 # Default estimate | |
| if "potential_issues" not in plan: | |
| plan["potential_issues"] = [] | |
| return plan | |
| def _generate_cache_key(self, goal: str, urls: List[str] = None) -> str: | |
| """ | |
| Generate a cache key for plan caching. | |
| Args: | |
| goal: High-level goal description | |
| urls: Relevant URLs for the task (optional) | |
| Returns: | |
| str: Cache key | |
| """ | |
| # Simple cache key generation, could be enhanced with embedding-based similarity | |
| key_parts = [goal.lower().strip()] | |
| if urls: | |
| key_parts.append(",".join(sorted(urls))) | |
| return "_".join(key_parts) | |
| async def _adapt_cached_plan(self, cached_plan: Dict, new_goal: str, current_state: Dict = None) -> Dict: | |
| """ | |
| Adapt a cached plan to a new similar task. | |
| Args: | |
| cached_plan: Previously generated plan | |
| new_goal: New high-level goal | |
| current_state: Current state of the system (optional) | |
| Returns: | |
| Dict: Adapted plan for new task | |
| """ | |
| if not self.llm_client: | |
| # Just return the cached plan with updated task field | |
| adapted_plan = cached_plan.copy() | |
| adapted_plan["task"] = new_goal | |
| return adapted_plan | |
| prompt = f""" | |
| I have a previously generated plan for a similar task. Please adapt this plan to fit the new task: | |
| Original Plan: {json.dumps(cached_plan, indent=2)} | |
| New Task: {new_goal} | |
| Current State: {json.dumps(current_state, indent=2) if current_state else "No specific state information provided."} | |
| Modify the plan as needed while maintaining its structure. You can: | |
| 1. Update step descriptions and parameters | |
| 2. Add or remove steps as necessary | |
| 3. Adjust decision points and alternatives | |
| 4. Update time estimates and potential issues | |
| Return the adapted plan as a valid JSON object with the same structure as the original plan. | |
| """ | |
| response = await self.llm_client.chat.completions.create( | |
| model=self.planning_model, | |
| messages=[ | |
| {"role": "system", "content": "You are a specialized AI task planner that can adapt existing plans to new but similar tasks. You maintain the structure while making appropriate adjustments."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| response_format={"type": "json_object"} | |
| ) | |
| # Parse and validate the adapted plan | |
| adapted_plan_json = response.choices[0].message.content | |
| adapted_plan = json.loads(adapted_plan_json) | |
| # Ensure plan has required fields | |
| if "task" not in adapted_plan: | |
| adapted_plan["task"] = new_goal | |
| return adapted_plan | |
| async def replan_on_failure(self, original_plan: Dict, failed_step: int, failure_reason: str, current_state: Dict = None) -> Dict: | |
| """ | |
| Generate a new plan when a step fails. | |
| Args: | |
| original_plan: The original plan that failed | |
| failed_step: Index of the step that failed | |
| failure_reason: Reason for the failure | |
| current_state: Current state of the system (optional) | |
| Returns: | |
| Dict: Updated plan to handle the failure | |
| """ | |
| if not self.llm_client: | |
| raise ValueError("LLM client not initialized") | |
| prompt = f""" | |
| A step in our task execution plan has failed. Please generate a revised plan to handle this failure: | |
| Original Plan: {json.dumps(original_plan, indent=2)} | |
| Failed Step: Step #{failed_step + 1} - {json.dumps(original_plan["steps"][failed_step], indent=2) if failed_step < len(original_plan.get("steps", [])) else "Unknown step"} | |
| Failure Reason: {failure_reason} | |
| Current State: {json.dumps(current_state, indent=2) if current_state else "No specific state information provided."} | |
| Please: | |
| 1. Analyze the failure and determine if we need to retry the step with modifications, try an alternative approach, or skip the failed step | |
| 2. Create a revised plan that adapts to this failure | |
| 3. Include any additional error handling steps that may be needed | |
| Return the revised plan as a valid JSON object with the same structure as the original plan. | |
| """ | |
| response = await self.llm_client.chat.completions.create( | |
| model=self.planning_model, | |
| messages=[ | |
| {"role": "system", "content": "You are a specialized AI task planner that can revise plans when steps fail. You adapt plans to handle failures and continue making progress toward the goal."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| response_format={"type": "json_object"} | |
| ) | |
| # Parse and validate the revised plan | |
| revised_plan_json = response.choices[0].message.content | |
| revised_plan = json.loads(revised_plan_json) | |
| # Add metadata about the replanning | |
| revised_plan["replanned"] = True | |
| revised_plan["original_failed_step"] = failed_step | |
| revised_plan["failure_reason"] = failure_reason | |
| return revised_plan | |
| async def analyze_decision_point(self, decision_point: Dict, current_state: Dict) -> Dict: | |
| """ | |
| Analyze a decision point and recommend the best option. | |
| Args: | |
| decision_point: Decision point configuration | |
| current_state: Current state of the system | |
| Returns: | |
| Dict: Decision analysis with recommended option | |
| """ | |
| if not self.llm_client: | |
| # Return first option as default if no LLM is available | |
| return { | |
| "decision_id": decision_point.get("id", "unknown"), | |
| "recommended_option": decision_point.get("options", [{}])[0].get("id", "default"), | |
| "confidence": 0.5, | |
| "reasoning": "Default selection due to LLM unavailability." | |
| } | |
| prompt = f""" | |
| I need to analyze this decision point based on the current state: | |
| Decision Point: {json.dumps(decision_point, indent=2)} | |
| Current State: {json.dumps(current_state, indent=2)} | |
| Please: | |
| 1. Analyze each option in the decision point | |
| 2. Consider the current state and how it affects the decision | |
| 3. Provide reasoning for your recommendation | |
| 4. Return your analysis with a confidence score | |
| Structure your response as a valid JSON object with fields: | |
| - decision_id: the ID from the decision point | |
| - recommended_option: the ID of your recommended option | |
| - confidence: a number between 0 and 1 indicating confidence | |
| - reasoning: explanation of your recommendation | |
| - alternative_options: ranked list of other options in order of preference | |
| """ | |
| response = await self.llm_client.chat.completions.create( | |
| model=self.planning_model, | |
| messages=[ | |
| {"role": "system", "content": "You are a specialized AI decision analyzer that evaluates options at decision points based on current state. You provide clear recommendations with confidence scores and reasoning."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| response_format={"type": "json_object"} | |
| ) | |
| # Parse and validate the decision analysis | |
| analysis_json = response.choices[0].message.content | |
| analysis = json.loads(analysis_json) | |
| # Ensure analysis has required fields | |
| if "decision_id" not in analysis: | |
| analysis["decision_id"] = decision_point.get("id", "unknown") | |
| if "recommended_option" not in analysis: | |
| # Default to first option | |
| analysis["recommended_option"] = decision_point.get("options", [{}])[0].get("id", "default") | |
| if "confidence" not in analysis: | |
| analysis["confidence"] = 0.5 # Default confidence | |
| if "reasoning" not in analysis: | |
| analysis["reasoning"] = "No specific reasoning provided." | |
| return analysis | |