""" Memory Agent - Manages user preferences and historical data """ import asyncio from typing import Dict, Any, List from core.base_agent import BaseAgent, AgentMessage, TaskResult, TaskStatus class MemoryAgent(BaseAgent): """Agent specialized in managing user data and preferences""" def __init__(self): super().__init__("memory_agent", ["memory_lookup", "preference_management", "history"]) self.user_profiles = self._initialize_user_data() self.interaction_history = [] def _initialize_user_data(self) -> Dict[str, Any]: """Initialize mock user data""" return { "user_001": { "preferences": { "preferred_airlines": ["Delta", "United"], "seat_preference": "aisle", "budget_range": {"min": 200, "max": 500}, "travel_class": "economy", "notification_method": "email" }, "travel_history": [ {"destination": "NYC", "date": "2023-12-01", "airline": "Delta", "satisfaction": 4.5}, {"destination": "LAX", "date": "2023-10-15", "airline": "United", "satisfaction": 4.2} ], "loyalty_programs": ["Delta SkyMiles", "United MileagePlus"] }, "default": { "preferences": { "preferred_airlines": [], "seat_preference": "any", "budget_range": {"min": 0, "max": 1000}, "travel_class": "economy", "notification_method": "email" }, "travel_history": [], "loyalty_programs": [] } } async def process_task(self, message: AgentMessage) -> TaskResult: """Process memory-related tasks""" start_time = asyncio.get_event_loop().time() try: if message.message_type == "memory_lookup": result = await self._lookup_user_data(message.data) elif message.message_type == "preference_management": result = await self._manage_preferences(message.data) elif message.message_type == "history": result = await self._get_history(message.data) else: raise ValueError(f"Unknown task type: {message.message_type}") return TaskResult( task_id=message.task_id, agent_id=self.agent_id, status=TaskStatus.COMPLETED, result=result, execution_time=asyncio.get_event_loop().time() - start_time ) except Exception as e: return TaskResult( task_id=message.task_id, agent_id=self.agent_id, status=TaskStatus.FAILED, result={}, error_message=str(e), execution_time=asyncio.get_event_loop().time() - start_time ) async def _lookup_user_data(self, data: Dict[str, Any]) -> Dict[str, Any]: """Look up user preferences and history""" user_id = data.get("user_id", "default") request = data.get("request", "").lower() # Get user profile profile = self.user_profiles.get(user_id, self.user_profiles["default"]) # Analyze request for relevant preferences relevant_prefs = self._extract_relevant_preferences(request, profile) # Get travel patterns travel_patterns = self._analyze_travel_patterns(profile["travel_history"]) return { "findings": [ f"User profile found for {user_id}", f"Travel history: {len(profile['travel_history'])} trips", f"Loyalty programs: {len(profile['loyalty_programs'])}" ], "user_preferences": relevant_prefs, "travel_patterns": travel_patterns, "recommendation": self._generate_personalized_recommendation(profile, request) } def _extract_relevant_preferences(self, request: str, profile: Dict[str, Any]) -> Dict[str, Any]: """Extract preferences relevant to the current request""" prefs = profile["preferences"].copy() # Filter preferences based on request context if "flight" in request: return { "preferred_airlines": prefs["preferred_airlines"], "seat_preference": prefs["seat_preference"], "budget_range": prefs["budget_range"], "travel_class": prefs["travel_class"] } return prefs def _analyze_travel_patterns(self, history: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze user's travel patterns""" if not history: return {"pattern": "no_history", "insights": []} # Calculate average satisfaction avg_satisfaction = sum(trip["satisfaction"] for trip in history) / len(history) # Find most visited destinations destinations = {} airlines = {} for trip in history: dest = trip["destination"] airline = trip["airline"] destinations[dest] = destinations.get(dest, 0) + 1 airlines[airline] = airlines.get(airline, 0) + 1 most_visited = max(destinations, key=destinations.get) if destinations else None preferred_airline = max(airlines, key=airlines.get) if airlines else None return { "pattern": "frequent_traveler" if len(history) > 3 else "occasional_traveler", "insights": [ f"Average satisfaction: {avg_satisfaction:.1f}/5.0", f"Most visited: {most_visited}" if most_visited else "No pattern found", f"Preferred airline: {preferred_airline}" if preferred_airline else "No preference" ], "statistics": { "total_trips": len(history), "destinations": destinations, "airlines": airlines } } def _generate_personalized_recommendation(self, profile: Dict[str, Any], request: str) -> str: """Generate personalized recommendation based on user profile""" prefs = profile["preferences"] history = profile["travel_history"] recommendations = [] # Budget-based recommendation budget = prefs["budget_range"] recommendations.append(f"Consider flights in your budget range: ${budget['min']}-${budget['max']}") # Airline preference if prefs["preferred_airlines"]: airlines = ", ".join(prefs["preferred_airlines"]) recommendations.append(f"Prioritize your preferred airlines: {airlines}") # Historical satisfaction if history: high_satisfaction_airlines = [ trip["airline"] for trip in history if trip["satisfaction"] >= 4.0 ] if high_satisfaction_airlines: recommendations.append(f"Consider airlines with your high satisfaction history") return " | ".join(recommendations) if recommendations else "No specific recommendations based on profile" async def _manage_preferences(self, data: Dict[str, Any]) -> Dict[str, Any]: """Update user preferences""" user_id = data.get("user_id", "default") new_preferences = data.get("preferences", {}) if user_id not in self.user_profiles: self.user_profiles[user_id] = self.user_profiles["default"].copy() # Update preferences self.user_profiles[user_id]["preferences"].update(new_preferences) return { "findings": [ f"Updated preferences for user {user_id}", f"Modified {len(new_preferences)} preference settings" ], "updated_preferences": self.user_profiles[user_id]["preferences"], "recommendation": "Preferences updated successfully" } async def _get_history(self, data: Dict[str, Any]) -> Dict[str, Any]: """Retrieve user history""" user_id = data.get("user_id", "default") history_type = data.get("type", "travel") profile = self.user_profiles.get(user_id, self.user_profiles["default"]) if history_type == "travel": history = profile["travel_history"] else: history = self.interaction_history return { "findings": [ f"Retrieved {len(history)} {history_type} history records", f"Date range: {self._get_date_range(history)}" ], "history": history, "recommendation": f"History data available for analysis" } def _get_date_range(self, history: List[Dict[str, Any]]) -> str: """Get date range from history""" if not history: return "No dates available" dates = [trip.get("date", "") for trip in history if trip.get("date")] if not dates: return "No dates available" return f"{min(dates)} to {max(dates)}" def get_agent_info(self) -> Dict[str, Any]: """Return memory agent information""" return { "agent_id": self.agent_id, "type": "memory", "capabilities": self.capabilities, "specialization": "User preference management and history tracking", "total_users": len(self.user_profiles), "interaction_records": len(self.interaction_history) }