mcp_client / agents /coordinator.py
SrikanthNagelli's picture
initial commit
100c46f
"""
Coordinator agent that manages and routes queries to specialized agents.
"""
import re
from typing import List, Dict, Any
from smolagents import LiteLLMModel
from .base_agent import BaseAgent
class CoordinatorAgent:
"""Coordinator that manages multiple specialized agents and routes queries appropriately."""
def __init__(self, specialized_agents: List[BaseAgent], model: LiteLLMModel):
self.agents = specialized_agents
self.model = model
def _decompose_query(self, query: str) -> List[str]:
"""Decompose a complex query into subtasks."""
if not query.strip():
return ["Please provide a query to process"]
query_lower = query.lower()
tasks = []
# Extract location if present
location_pattern = r"(?:in|at|near|around|close to)\s+([a-zA-Z\s,]+)(?:\.|$|\s)"
location_match = re.search(location_pattern, query_lower)
location = location_match.group(1).strip() if location_match else ""
# Direct task identification patterns
patterns = {
"sentiment": ["sentiment", "feeling", "opinion", "review", "analyze", "how is", "what do you think"],
"places": ["hotel", "hotels", "stay", "accommodation", "accommodations", "lodging", "motel", "resort"],
"restaurants": ["restaurant", "restaurants", "food", "dining", "eat", "dinner", "lunch", "breakfast", "cafe"],
"hiking": ["hike", "hikes", "hiking", "trail", "trails", "trek", "trekking", "outdoor", "mountain", "walk", "walking", "nature"],
"web_search": ["search", "web search", "find", "lookup", "google", "bing", "weather", "news", "current", "latest", "today", "information", "what is", "definition", "meaning", "stock", "price", "market", "finance", "recipe", "movie", "film", "tv show"]
}
# Check for direct task matches
found_match = False
for category, keywords in patterns.items():
if any(keyword in query_lower for keyword in keywords):
found_match = True
if category == "sentiment":
tasks.append(query) # Keep original query for sentiment analysis
elif category == "places" and location:
tasks.append(f"Find hotels in {location}")
elif category == "restaurants" and location:
tasks.append(f"Find restaurants in {location}")
elif category == "hiking" and location:
tasks.append(f"Find hiking trails in {location}")
elif category == "web_search":
tasks.append(query) # Keep original query for web search
elif category == "places":
tasks.append("Find hotels " + query)
elif category == "restaurants":
tasks.append("Find restaurants " + query)
elif category == "hiking":
tasks.append("Find hiking trails " + query)
# If no direct matches but location is found
if not found_match and location:
tasks.extend([
f"Find hotels in {location}",
f"Find restaurants in {location}",
f"Find hiking trails in {location}"
])
# If still no tasks, default to web search
if not tasks:
tasks.append(query) # Let web agent handle it
return tasks
def _assign_tasks(self, tasks: List[str]) -> Dict[str, List[str]]:
"""Assign tasks to specialized agents based on their capabilities."""
assignments = {agent.name: [] for agent in self.agents}
unassigned_tasks = []
for task in tasks:
best_agent = None
best_confidence = 0.1 # Lowered threshold to catch more cases
# First, try to assign to specialized agents (excluding web agent)
specialized_agents = [agent for agent in self.agents if agent.name != "web_agent"]
for agent in specialized_agents:
confidence = agent.can_handle(task)
if confidence > best_confidence:
best_confidence = confidence
best_agent = agent
if best_agent:
assignments[best_agent.name].append(task)
else:
unassigned_tasks.append(task)
# Route all unassigned tasks to web agent as fallback
if unassigned_tasks:
for task in unassigned_tasks:
assignments["web_agent"].append(task)
return assignments
def process_query(self, query: str) -> str:
"""Process a complex query by coordinating multiple specialized agents."""
try:
# Decompose the query into subtasks
tasks = self._decompose_query(query)
if not tasks:
return "❓ I couldn't understand how to help with that request. Please try rephrasing it."
# Assign tasks to specialized agents
assignments = self._assign_tasks(tasks)
# Execute tasks and collect results
results = []
has_meaningful_assignment = False
for agent in self.agents:
agent_tasks = assignments[agent.name]
if agent_tasks:
for task in agent_tasks:
try:
# For web agent, always execute (it's our fallback)
# For other agents, check confidence first
if agent.name == "web_agent":
has_meaningful_assignment = True
result = agent.execute(task)
if result and len(result.strip()) > 20:
results.append(result)
else:
# Check if this task was actually assigned based on agent capability
confidence = agent.can_handle(task)
if confidence > 0.1: # Only proceed if agent has some confidence
has_meaningful_assignment = True
result = agent.execute(task)
# Only add meaningful results (skip simple confirmations)
if result and len(result.strip()) > 20:
results.append(result)
except Exception as e:
results.append(f"❌ Error processing '{task}': {str(e)}")
# If no meaningful assignments were made or no results, provide helpful message
if not has_meaningful_assignment or not results:
return """
❓ **Unable to Process Query**
I couldn't find any agents to handle your request properly. This shouldn't happen with the web agent as fallback.
🎯 **Try these specific formats:**
β€’ 🏨 **Hotels:** "Find hotels in [city]"
β€’ 🍽️ **Restaurants:** "Find restaurants in [city]"
β€’ πŸ”οΈ **Hiking:** "Find hiking trails near [location]"
β€’ 😊 **Sentiment:** "Analyze sentiment of: [text]"
β€’ 🌐 **Web Search:** "Search for [anything]" or just ask any question
πŸ’‘ **Example:** "What's the weather in Tokyo?" or "Find hotels in Paris"
""".strip()
# If only one result, return it directly
if len(results) == 1:
return results[0]
# If multiple results, combine them nicely
combined_output = "🎯 **Here's what I found for you:**\n\n"
for i, result in enumerate(results, 1):
combined_output += f"**Result #{i}:**\n"
combined_output += result + "\n\n"
if i < len(results):
combined_output += "═" * 60 + "\n\n"
return combined_output
except Exception as e:
return f"❌ Error processing query: {str(e)}"