Spaces:
Sleeping
Sleeping
| """ | |
| Coordinator agent that manages and routes queries to specialized agents. | |
| """ | |
| import re | |
| from typing import List, Dict, Any | |
| from smolagents import LiteLLMModel | |
| from .base_agent import BaseAgent | |
| class CoordinatorAgent: | |
| """Coordinator that manages multiple specialized agents and routes queries appropriately.""" | |
| def __init__(self, specialized_agents: List[BaseAgent], model: LiteLLMModel): | |
| self.agents = specialized_agents | |
| self.model = model | |
| def _decompose_query(self, query: str) -> List[str]: | |
| """Decompose a complex query into subtasks.""" | |
| if not query.strip(): | |
| return ["Please provide a query to process"] | |
| query_lower = query.lower() | |
| tasks = [] | |
| # Extract location if present | |
| location_pattern = r"(?:in|at|near|around|close to)\s+([a-zA-Z\s,]+)(?:\.|$|\s)" | |
| location_match = re.search(location_pattern, query_lower) | |
| location = location_match.group(1).strip() if location_match else "" | |
| # Direct task identification patterns | |
| patterns = { | |
| "sentiment": ["sentiment", "feeling", "opinion", "review", "analyze", "how is", "what do you think"], | |
| "places": ["hotel", "hotels", "stay", "accommodation", "accommodations", "lodging", "motel", "resort"], | |
| "restaurants": ["restaurant", "restaurants", "food", "dining", "eat", "dinner", "lunch", "breakfast", "cafe"], | |
| "hiking": ["hike", "hikes", "hiking", "trail", "trails", "trek", "trekking", "outdoor", "mountain", "walk", "walking", "nature"], | |
| "web_search": ["search", "web search", "find", "lookup", "google", "bing", "weather", "news", "current", "latest", "today", "information", "what is", "definition", "meaning", "stock", "price", "market", "finance", "recipe", "movie", "film", "tv show"] | |
| } | |
| # Check for direct task matches | |
| found_match = False | |
| for category, keywords in patterns.items(): | |
| if any(keyword in query_lower for keyword in keywords): | |
| found_match = True | |
| if category == "sentiment": | |
| tasks.append(query) # Keep original query for sentiment analysis | |
| elif category == "places" and location: | |
| tasks.append(f"Find hotels in {location}") | |
| elif category == "restaurants" and location: | |
| tasks.append(f"Find restaurants in {location}") | |
| elif category == "hiking" and location: | |
| tasks.append(f"Find hiking trails in {location}") | |
| elif category == "web_search": | |
| tasks.append(query) # Keep original query for web search | |
| elif category == "places": | |
| tasks.append("Find hotels " + query) | |
| elif category == "restaurants": | |
| tasks.append("Find restaurants " + query) | |
| elif category == "hiking": | |
| tasks.append("Find hiking trails " + query) | |
| # If no direct matches but location is found | |
| if not found_match and location: | |
| tasks.extend([ | |
| f"Find hotels in {location}", | |
| f"Find restaurants in {location}", | |
| f"Find hiking trails in {location}" | |
| ]) | |
| # If still no tasks, default to web search | |
| if not tasks: | |
| tasks.append(query) # Let web agent handle it | |
| return tasks | |
| def _assign_tasks(self, tasks: List[str]) -> Dict[str, List[str]]: | |
| """Assign tasks to specialized agents based on their capabilities.""" | |
| assignments = {agent.name: [] for agent in self.agents} | |
| unassigned_tasks = [] | |
| for task in tasks: | |
| best_agent = None | |
| best_confidence = 0.1 # Lowered threshold to catch more cases | |
| # First, try to assign to specialized agents (excluding web agent) | |
| specialized_agents = [agent for agent in self.agents if agent.name != "web_agent"] | |
| for agent in specialized_agents: | |
| confidence = agent.can_handle(task) | |
| if confidence > best_confidence: | |
| best_confidence = confidence | |
| best_agent = agent | |
| if best_agent: | |
| assignments[best_agent.name].append(task) | |
| else: | |
| unassigned_tasks.append(task) | |
| # Route all unassigned tasks to web agent as fallback | |
| if unassigned_tasks: | |
| for task in unassigned_tasks: | |
| assignments["web_agent"].append(task) | |
| return assignments | |
| def process_query(self, query: str) -> str: | |
| """Process a complex query by coordinating multiple specialized agents.""" | |
| try: | |
| # Decompose the query into subtasks | |
| tasks = self._decompose_query(query) | |
| if not tasks: | |
| return "β I couldn't understand how to help with that request. Please try rephrasing it." | |
| # Assign tasks to specialized agents | |
| assignments = self._assign_tasks(tasks) | |
| # Execute tasks and collect results | |
| results = [] | |
| has_meaningful_assignment = False | |
| for agent in self.agents: | |
| agent_tasks = assignments[agent.name] | |
| if agent_tasks: | |
| for task in agent_tasks: | |
| try: | |
| # For web agent, always execute (it's our fallback) | |
| # For other agents, check confidence first | |
| if agent.name == "web_agent": | |
| has_meaningful_assignment = True | |
| result = agent.execute(task) | |
| if result and len(result.strip()) > 20: | |
| results.append(result) | |
| else: | |
| # Check if this task was actually assigned based on agent capability | |
| confidence = agent.can_handle(task) | |
| if confidence > 0.1: # Only proceed if agent has some confidence | |
| has_meaningful_assignment = True | |
| result = agent.execute(task) | |
| # Only add meaningful results (skip simple confirmations) | |
| if result and len(result.strip()) > 20: | |
| results.append(result) | |
| except Exception as e: | |
| results.append(f"β Error processing '{task}': {str(e)}") | |
| # If no meaningful assignments were made or no results, provide helpful message | |
| if not has_meaningful_assignment or not results: | |
| return """ | |
| β **Unable to Process Query** | |
| I couldn't find any agents to handle your request properly. This shouldn't happen with the web agent as fallback. | |
| π― **Try these specific formats:** | |
| β’ π¨ **Hotels:** "Find hotels in [city]" | |
| β’ π½οΈ **Restaurants:** "Find restaurants in [city]" | |
| β’ ποΈ **Hiking:** "Find hiking trails near [location]" | |
| β’ π **Sentiment:** "Analyze sentiment of: [text]" | |
| β’ π **Web Search:** "Search for [anything]" or just ask any question | |
| π‘ **Example:** "What's the weather in Tokyo?" or "Find hotels in Paris" | |
| """.strip() | |
| # If only one result, return it directly | |
| if len(results) == 1: | |
| return results[0] | |
| # If multiple results, combine them nicely | |
| combined_output = "π― **Here's what I found for you:**\n\n" | |
| for i, result in enumerate(results, 1): | |
| combined_output += f"**Result #{i}:**\n" | |
| combined_output += result + "\n\n" | |
| if i < len(results): | |
| combined_output += "β" * 60 + "\n\n" | |
| return combined_output | |
| except Exception as e: | |
| return f"β Error processing query: {str(e)}" |