| """ |
| Planning Agent - Analyzes options and creates strategies |
| """ |
| import asyncio |
| from typing import Dict, Any |
| from core.base_agent import BaseAgent, AgentMessage, TaskResult, TaskStatus |
|
|
| class PlanningAgent(BaseAgent): |
| """Agent specialized in planning and strategy creation""" |
| |
| def __init__(self): |
| super().__init__("planning_agent", ["planning", "analysis", "strategy"]) |
| self.flight_database = self._initialize_flight_data() |
| |
| def _initialize_flight_data(self) -> Dict[str, Any]: |
| """Mock flight database for demonstration""" |
| return { |
| "routes": { |
| "NYC": { |
| "LAX": [ |
| {"airline": "Delta", "price": 350, "duration": "6h 15m", "stops": 0}, |
| {"airline": "United", "price": 320, "duration": "6h 30m", "stops": 0}, |
| {"airline": "JetBlue", "price": 280, "duration": "7h 45m", "stops": 1} |
| ], |
| "SFO": [ |
| {"airline": "American", "price": 380, "duration": "6h 45m", "stops": 0}, |
| {"airline": "United", "price": 340, "duration": "6h 20m", "stops": 0} |
| ] |
| } |
| } |
| } |
| |
| async def process_task(self, message: AgentMessage) -> TaskResult: |
| """Process planning-related tasks""" |
| start_time = asyncio.get_event_loop().time() |
| |
| try: |
| if message.message_type == "planning": |
| result = await self._plan_flight_options(message.data) |
| elif message.message_type == "analysis": |
| result = await self._analyze_request(message.data) |
| else: |
| raise ValueError(f"Unknown task type: {message.message_type}") |
| |
| return TaskResult( |
| task_id=message.task_id, |
| agent_id=self.agent_id, |
| status=TaskStatus.COMPLETED, |
| result=result, |
| execution_time=asyncio.get_event_loop().time() - start_time |
| ) |
| |
| except Exception as e: |
| return TaskResult( |
| task_id=message.task_id, |
| agent_id=self.agent_id, |
| status=TaskStatus.FAILED, |
| result={}, |
| error_message=str(e), |
| execution_time=asyncio.get_event_loop().time() - start_time |
| ) |
| |
| async def _plan_flight_options(self, data: Dict[str, Any]) -> Dict[str, Any]: |
| """Plan optimal flight options based on request""" |
| request = data.get("request", "").lower() |
| |
| |
| destination = None |
| if "new york" in request or "nyc" in request: |
| destination = "NYC" |
| elif "los angeles" in request or "lax" in request: |
| destination = "LAX" |
| elif "san francisco" in request or "sfo" in request: |
| destination = "SFO" |
| |
| if not destination: |
| return { |
| "findings": ["Could not determine destination from request"], |
| "options": [], |
| "recommendation": "Please specify a clear destination" |
| } |
| |
| |
| available_routes = [] |
| for origin, destinations in self.flight_database["routes"].items(): |
| if destination in destinations: |
| for flight in destinations[destination]: |
| available_routes.append({ |
| "route": f"{origin} → {destination}", |
| **flight |
| }) |
| |
| |
| available_routes.sort(key=lambda x: x["price"]) |
| |
| |
| best_value = available_routes[0] if available_routes else None |
| fastest = min(available_routes, key=lambda x: self._parse_duration(x["duration"])) if available_routes else None |
| |
| return { |
| "findings": [ |
| f"Found {len(available_routes)} flight options to {destination}", |
| f"Price range: ${min(r['price'] for r in available_routes)} - ${max(r['price'] for r in available_routes)}" if available_routes else "No flights found" |
| ], |
| "options": available_routes[:5], |
| "recommendation": { |
| "best_value": best_value, |
| "fastest": fastest, |
| "reasoning": "Recommendations based on price and duration optimization" |
| } |
| } |
| |
| async def _analyze_request(self, data: Dict[str, Any]) -> Dict[str, Any]: |
| """Analyze user request for planning purposes""" |
| request = data.get("request", "") |
| |
| |
| analysis = { |
| "request_type": "travel" if any(word in request.lower() for word in ["flight", "travel", "book"]) else "general", |
| "urgency": "high" if any(word in request.lower() for word in ["urgent", "asap", "immediately"]) else "normal", |
| "budget_conscious": "yes" if any(word in request.lower() for word in ["cheap", "budget", "affordable"]) else "unknown", |
| "extracted_entities": [] |
| } |
| |
| |
| cities = ["new york", "los angeles", "san francisco", "chicago", "miami"] |
| for city in cities: |
| if city in request.lower(): |
| analysis["extracted_entities"].append({"type": "destination", "value": city}) |
| |
| return { |
| "findings": [ |
| f"Request type: {analysis['request_type']}", |
| f"Urgency level: {analysis['urgency']}", |
| f"Entities found: {len(analysis['extracted_entities'])}" |
| ], |
| "analysis": analysis, |
| "recommendation": "Proceed with specialized planning based on request type" |
| } |
| |
| def _parse_duration(self, duration_str: str) -> int: |
| """Parse duration string to minutes for comparison""" |
| |
| parts = duration_str.replace("h", "").replace("m", "").split() |
| hours = int(parts[0]) if len(parts) > 0 else 0 |
| minutes = int(parts[1]) if len(parts) > 1 else 0 |
| return hours * 60 + minutes |
| |
| def get_agent_info(self) -> Dict[str, Any]: |
| """Return planning agent information""" |
| return { |
| "agent_id": self.agent_id, |
| "type": "planning", |
| "capabilities": self.capabilities, |
| "specialization": "Flight planning and travel optimization", |
| "database_size": len(self.flight_database["routes"]) |
| } |
|
|