| """ |
| Memory Agent - Manages user preferences and historical data |
| """ |
| import asyncio |
| from typing import Dict, Any, List |
| from core.base_agent import BaseAgent, AgentMessage, TaskResult, TaskStatus |
|
|
| class MemoryAgent(BaseAgent): |
| """Agent specialized in managing user data and preferences""" |
| |
| def __init__(self): |
| super().__init__("memory_agent", ["memory_lookup", "preference_management", "history"]) |
| self.user_profiles = self._initialize_user_data() |
| self.interaction_history = [] |
| |
| def _initialize_user_data(self) -> Dict[str, Any]: |
| """Initialize mock user data""" |
| return { |
| "user_001": { |
| "preferences": { |
| "preferred_airlines": ["Delta", "United"], |
| "seat_preference": "aisle", |
| "budget_range": {"min": 200, "max": 500}, |
| "travel_class": "economy", |
| "notification_method": "email" |
| }, |
| "travel_history": [ |
| {"destination": "NYC", "date": "2023-12-01", "airline": "Delta", "satisfaction": 4.5}, |
| {"destination": "LAX", "date": "2023-10-15", "airline": "United", "satisfaction": 4.2} |
| ], |
| "loyalty_programs": ["Delta SkyMiles", "United MileagePlus"] |
| }, |
| "default": { |
| "preferences": { |
| "preferred_airlines": [], |
| "seat_preference": "any", |
| "budget_range": {"min": 0, "max": 1000}, |
| "travel_class": "economy", |
| "notification_method": "email" |
| }, |
| "travel_history": [], |
| "loyalty_programs": [] |
| } |
| } |
| |
| async def process_task(self, message: AgentMessage) -> TaskResult: |
| """Process memory-related tasks""" |
| start_time = asyncio.get_event_loop().time() |
| |
| try: |
| if message.message_type == "memory_lookup": |
| result = await self._lookup_user_data(message.data) |
| elif message.message_type == "preference_management": |
| result = await self._manage_preferences(message.data) |
| elif message.message_type == "history": |
| result = await self._get_history(message.data) |
| else: |
| raise ValueError(f"Unknown task type: {message.message_type}") |
| |
| return TaskResult( |
| task_id=message.task_id, |
| agent_id=self.agent_id, |
| status=TaskStatus.COMPLETED, |
| result=result, |
| execution_time=asyncio.get_event_loop().time() - start_time |
| ) |
| |
| except Exception as e: |
| return TaskResult( |
| task_id=message.task_id, |
| agent_id=self.agent_id, |
| status=TaskStatus.FAILED, |
| result={}, |
| error_message=str(e), |
| execution_time=asyncio.get_event_loop().time() - start_time |
| ) |
| |
| async def _lookup_user_data(self, data: Dict[str, Any]) -> Dict[str, Any]: |
| """Look up user preferences and history""" |
| user_id = data.get("user_id", "default") |
| request = data.get("request", "").lower() |
| |
| |
| profile = self.user_profiles.get(user_id, self.user_profiles["default"]) |
| |
| |
| relevant_prefs = self._extract_relevant_preferences(request, profile) |
| |
| |
| travel_patterns = self._analyze_travel_patterns(profile["travel_history"]) |
| |
| return { |
| "findings": [ |
| f"User profile found for {user_id}", |
| f"Travel history: {len(profile['travel_history'])} trips", |
| f"Loyalty programs: {len(profile['loyalty_programs'])}" |
| ], |
| "user_preferences": relevant_prefs, |
| "travel_patterns": travel_patterns, |
| "recommendation": self._generate_personalized_recommendation(profile, request) |
| } |
| |
| def _extract_relevant_preferences(self, request: str, profile: Dict[str, Any]) -> Dict[str, Any]: |
| """Extract preferences relevant to the current request""" |
| prefs = profile["preferences"].copy() |
| |
| |
| if "flight" in request: |
| return { |
| "preferred_airlines": prefs["preferred_airlines"], |
| "seat_preference": prefs["seat_preference"], |
| "budget_range": prefs["budget_range"], |
| "travel_class": prefs["travel_class"] |
| } |
| |
| return prefs |
| |
| def _analyze_travel_patterns(self, history: List[Dict[str, Any]]) -> Dict[str, Any]: |
| """Analyze user's travel patterns""" |
| if not history: |
| return {"pattern": "no_history", "insights": []} |
| |
| |
| avg_satisfaction = sum(trip["satisfaction"] for trip in history) / len(history) |
| |
| |
| destinations = {} |
| airlines = {} |
| |
| for trip in history: |
| dest = trip["destination"] |
| airline = trip["airline"] |
| |
| destinations[dest] = destinations.get(dest, 0) + 1 |
| airlines[airline] = airlines.get(airline, 0) + 1 |
| |
| most_visited = max(destinations, key=destinations.get) if destinations else None |
| preferred_airline = max(airlines, key=airlines.get) if airlines else None |
| |
| return { |
| "pattern": "frequent_traveler" if len(history) > 3 else "occasional_traveler", |
| "insights": [ |
| f"Average satisfaction: {avg_satisfaction:.1f}/5.0", |
| f"Most visited: {most_visited}" if most_visited else "No pattern found", |
| f"Preferred airline: {preferred_airline}" if preferred_airline else "No preference" |
| ], |
| "statistics": { |
| "total_trips": len(history), |
| "destinations": destinations, |
| "airlines": airlines |
| } |
| } |
| |
| def _generate_personalized_recommendation(self, profile: Dict[str, Any], request: str) -> str: |
| """Generate personalized recommendation based on user profile""" |
| prefs = profile["preferences"] |
| history = profile["travel_history"] |
| |
| recommendations = [] |
| |
| |
| budget = prefs["budget_range"] |
| recommendations.append(f"Consider flights in your budget range: ${budget['min']}-${budget['max']}") |
| |
| |
| if prefs["preferred_airlines"]: |
| airlines = ", ".join(prefs["preferred_airlines"]) |
| recommendations.append(f"Prioritize your preferred airlines: {airlines}") |
| |
| |
| if history: |
| high_satisfaction_airlines = [ |
| trip["airline"] for trip in history |
| if trip["satisfaction"] >= 4.0 |
| ] |
| if high_satisfaction_airlines: |
| recommendations.append(f"Consider airlines with your high satisfaction history") |
| |
| return " | ".join(recommendations) if recommendations else "No specific recommendations based on profile" |
| |
| async def _manage_preferences(self, data: Dict[str, Any]) -> Dict[str, Any]: |
| """Update user preferences""" |
| user_id = data.get("user_id", "default") |
| new_preferences = data.get("preferences", {}) |
| |
| if user_id not in self.user_profiles: |
| self.user_profiles[user_id] = self.user_profiles["default"].copy() |
| |
| |
| self.user_profiles[user_id]["preferences"].update(new_preferences) |
| |
| return { |
| "findings": [ |
| f"Updated preferences for user {user_id}", |
| f"Modified {len(new_preferences)} preference settings" |
| ], |
| "updated_preferences": self.user_profiles[user_id]["preferences"], |
| "recommendation": "Preferences updated successfully" |
| } |
| |
| async def _get_history(self, data: Dict[str, Any]) -> Dict[str, Any]: |
| """Retrieve user history""" |
| user_id = data.get("user_id", "default") |
| history_type = data.get("type", "travel") |
| |
| profile = self.user_profiles.get(user_id, self.user_profiles["default"]) |
| |
| if history_type == "travel": |
| history = profile["travel_history"] |
| else: |
| history = self.interaction_history |
| |
| return { |
| "findings": [ |
| f"Retrieved {len(history)} {history_type} history records", |
| f"Date range: {self._get_date_range(history)}" |
| ], |
| "history": history, |
| "recommendation": f"History data available for analysis" |
| } |
| |
| def _get_date_range(self, history: List[Dict[str, Any]]) -> str: |
| """Get date range from history""" |
| if not history: |
| return "No dates available" |
| |
| dates = [trip.get("date", "") for trip in history if trip.get("date")] |
| if not dates: |
| return "No dates available" |
| |
| return f"{min(dates)} to {max(dates)}" |
| |
| def get_agent_info(self) -> Dict[str, Any]: |
| """Return memory agent information""" |
| return { |
| "agent_id": self.agent_id, |
| "type": "memory", |
| "capabilities": self.capabilities, |
| "specialization": "User preference management and history tracking", |
| "total_users": len(self.user_profiles), |
| "interaction_records": len(self.interaction_history) |
| } |
|
|