multi-ai-agentic-system / agents /planning_agent.py
CHKIM79's picture
Deploy Multi-AI Agentic System v1.0.0 - Production Ready
d4b6ffc
"""
Planning Agent - Analyzes options and creates strategies
"""
import asyncio
from typing import Dict, Any
from core.base_agent import BaseAgent, AgentMessage, TaskResult, TaskStatus
class PlanningAgent(BaseAgent):
"""Agent specialized in planning and strategy creation"""
def __init__(self):
super().__init__("planning_agent", ["planning", "analysis", "strategy"])
self.flight_database = self._initialize_flight_data()
def _initialize_flight_data(self) -> Dict[str, Any]:
"""Mock flight database for demonstration"""
return {
"routes": {
"NYC": {
"LAX": [
{"airline": "Delta", "price": 350, "duration": "6h 15m", "stops": 0},
{"airline": "United", "price": 320, "duration": "6h 30m", "stops": 0},
{"airline": "JetBlue", "price": 280, "duration": "7h 45m", "stops": 1}
],
"SFO": [
{"airline": "American", "price": 380, "duration": "6h 45m", "stops": 0},
{"airline": "United", "price": 340, "duration": "6h 20m", "stops": 0}
]
}
}
}
async def process_task(self, message: AgentMessage) -> TaskResult:
"""Process planning-related tasks"""
start_time = asyncio.get_event_loop().time()
try:
if message.message_type == "planning":
result = await self._plan_flight_options(message.data)
elif message.message_type == "analysis":
result = await self._analyze_request(message.data)
else:
raise ValueError(f"Unknown task type: {message.message_type}")
return TaskResult(
task_id=message.task_id,
agent_id=self.agent_id,
status=TaskStatus.COMPLETED,
result=result,
execution_time=asyncio.get_event_loop().time() - start_time
)
except Exception as e:
return TaskResult(
task_id=message.task_id,
agent_id=self.agent_id,
status=TaskStatus.FAILED,
result={},
error_message=str(e),
execution_time=asyncio.get_event_loop().time() - start_time
)
async def _plan_flight_options(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Plan optimal flight options based on request"""
request = data.get("request", "").lower()
# Extract destination from request (simplified parsing)
destination = None
if "new york" in request or "nyc" in request:
destination = "NYC"
elif "los angeles" in request or "lax" in request:
destination = "LAX"
elif "san francisco" in request or "sfo" in request:
destination = "SFO"
if not destination:
return {
"findings": ["Could not determine destination from request"],
"options": [],
"recommendation": "Please specify a clear destination"
}
# Find available routes (mock logic)
available_routes = []
for origin, destinations in self.flight_database["routes"].items():
if destination in destinations:
for flight in destinations[destination]:
available_routes.append({
"route": f"{origin}{destination}",
**flight
})
# Sort by price (could implement more sophisticated ranking)
available_routes.sort(key=lambda x: x["price"])
# Create recommendations
best_value = available_routes[0] if available_routes else None
fastest = min(available_routes, key=lambda x: self._parse_duration(x["duration"])) if available_routes else None
return {
"findings": [
f"Found {len(available_routes)} flight options to {destination}",
f"Price range: ${min(r['price'] for r in available_routes)} - ${max(r['price'] for r in available_routes)}" if available_routes else "No flights found"
],
"options": available_routes[:5], # Top 5 options
"recommendation": {
"best_value": best_value,
"fastest": fastest,
"reasoning": "Recommendations based on price and duration optimization"
}
}
async def _analyze_request(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze user request for planning purposes"""
request = data.get("request", "")
# Extract key information from request
analysis = {
"request_type": "travel" if any(word in request.lower() for word in ["flight", "travel", "book"]) else "general",
"urgency": "high" if any(word in request.lower() for word in ["urgent", "asap", "immediately"]) else "normal",
"budget_conscious": "yes" if any(word in request.lower() for word in ["cheap", "budget", "affordable"]) else "unknown",
"extracted_entities": []
}
# Simple entity extraction
cities = ["new york", "los angeles", "san francisco", "chicago", "miami"]
for city in cities:
if city in request.lower():
analysis["extracted_entities"].append({"type": "destination", "value": city})
return {
"findings": [
f"Request type: {analysis['request_type']}",
f"Urgency level: {analysis['urgency']}",
f"Entities found: {len(analysis['extracted_entities'])}"
],
"analysis": analysis,
"recommendation": "Proceed with specialized planning based on request type"
}
def _parse_duration(self, duration_str: str) -> int:
"""Parse duration string to minutes for comparison"""
# Simple parser for "6h 15m" format
parts = duration_str.replace("h", "").replace("m", "").split()
hours = int(parts[0]) if len(parts) > 0 else 0
minutes = int(parts[1]) if len(parts) > 1 else 0
return hours * 60 + minutes
def get_agent_info(self) -> Dict[str, Any]:
"""Return planning agent information"""
return {
"agent_id": self.agent_id,
"type": "planning",
"capabilities": self.capabilities,
"specialization": "Flight planning and travel optimization",
"database_size": len(self.flight_database["routes"])
}