multi-ai-agentic-system / agents /memory_agent.py
CHKIM79's picture
Deploy Multi-AI Agentic System v1.0.0 - Production Ready
d4b6ffc
"""
Memory Agent - Manages user preferences and historical data
"""
import asyncio
from typing import Dict, Any, List
from core.base_agent import BaseAgent, AgentMessage, TaskResult, TaskStatus
class MemoryAgent(BaseAgent):
"""Agent specialized in managing user data and preferences"""
def __init__(self):
super().__init__("memory_agent", ["memory_lookup", "preference_management", "history"])
self.user_profiles = self._initialize_user_data()
self.interaction_history = []
def _initialize_user_data(self) -> Dict[str, Any]:
"""Initialize mock user data"""
return {
"user_001": {
"preferences": {
"preferred_airlines": ["Delta", "United"],
"seat_preference": "aisle",
"budget_range": {"min": 200, "max": 500},
"travel_class": "economy",
"notification_method": "email"
},
"travel_history": [
{"destination": "NYC", "date": "2023-12-01", "airline": "Delta", "satisfaction": 4.5},
{"destination": "LAX", "date": "2023-10-15", "airline": "United", "satisfaction": 4.2}
],
"loyalty_programs": ["Delta SkyMiles", "United MileagePlus"]
},
"default": {
"preferences": {
"preferred_airlines": [],
"seat_preference": "any",
"budget_range": {"min": 0, "max": 1000},
"travel_class": "economy",
"notification_method": "email"
},
"travel_history": [],
"loyalty_programs": []
}
}
async def process_task(self, message: AgentMessage) -> TaskResult:
"""Process memory-related tasks"""
start_time = asyncio.get_event_loop().time()
try:
if message.message_type == "memory_lookup":
result = await self._lookup_user_data(message.data)
elif message.message_type == "preference_management":
result = await self._manage_preferences(message.data)
elif message.message_type == "history":
result = await self._get_history(message.data)
else:
raise ValueError(f"Unknown task type: {message.message_type}")
return TaskResult(
task_id=message.task_id,
agent_id=self.agent_id,
status=TaskStatus.COMPLETED,
result=result,
execution_time=asyncio.get_event_loop().time() - start_time
)
except Exception as e:
return TaskResult(
task_id=message.task_id,
agent_id=self.agent_id,
status=TaskStatus.FAILED,
result={},
error_message=str(e),
execution_time=asyncio.get_event_loop().time() - start_time
)
async def _lookup_user_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Look up user preferences and history"""
user_id = data.get("user_id", "default")
request = data.get("request", "").lower()
# Get user profile
profile = self.user_profiles.get(user_id, self.user_profiles["default"])
# Analyze request for relevant preferences
relevant_prefs = self._extract_relevant_preferences(request, profile)
# Get travel patterns
travel_patterns = self._analyze_travel_patterns(profile["travel_history"])
return {
"findings": [
f"User profile found for {user_id}",
f"Travel history: {len(profile['travel_history'])} trips",
f"Loyalty programs: {len(profile['loyalty_programs'])}"
],
"user_preferences": relevant_prefs,
"travel_patterns": travel_patterns,
"recommendation": self._generate_personalized_recommendation(profile, request)
}
def _extract_relevant_preferences(self, request: str, profile: Dict[str, Any]) -> Dict[str, Any]:
"""Extract preferences relevant to the current request"""
prefs = profile["preferences"].copy()
# Filter preferences based on request context
if "flight" in request:
return {
"preferred_airlines": prefs["preferred_airlines"],
"seat_preference": prefs["seat_preference"],
"budget_range": prefs["budget_range"],
"travel_class": prefs["travel_class"]
}
return prefs
def _analyze_travel_patterns(self, history: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze user's travel patterns"""
if not history:
return {"pattern": "no_history", "insights": []}
# Calculate average satisfaction
avg_satisfaction = sum(trip["satisfaction"] for trip in history) / len(history)
# Find most visited destinations
destinations = {}
airlines = {}
for trip in history:
dest = trip["destination"]
airline = trip["airline"]
destinations[dest] = destinations.get(dest, 0) + 1
airlines[airline] = airlines.get(airline, 0) + 1
most_visited = max(destinations, key=destinations.get) if destinations else None
preferred_airline = max(airlines, key=airlines.get) if airlines else None
return {
"pattern": "frequent_traveler" if len(history) > 3 else "occasional_traveler",
"insights": [
f"Average satisfaction: {avg_satisfaction:.1f}/5.0",
f"Most visited: {most_visited}" if most_visited else "No pattern found",
f"Preferred airline: {preferred_airline}" if preferred_airline else "No preference"
],
"statistics": {
"total_trips": len(history),
"destinations": destinations,
"airlines": airlines
}
}
def _generate_personalized_recommendation(self, profile: Dict[str, Any], request: str) -> str:
"""Generate personalized recommendation based on user profile"""
prefs = profile["preferences"]
history = profile["travel_history"]
recommendations = []
# Budget-based recommendation
budget = prefs["budget_range"]
recommendations.append(f"Consider flights in your budget range: ${budget['min']}-${budget['max']}")
# Airline preference
if prefs["preferred_airlines"]:
airlines = ", ".join(prefs["preferred_airlines"])
recommendations.append(f"Prioritize your preferred airlines: {airlines}")
# Historical satisfaction
if history:
high_satisfaction_airlines = [
trip["airline"] for trip in history
if trip["satisfaction"] >= 4.0
]
if high_satisfaction_airlines:
recommendations.append(f"Consider airlines with your high satisfaction history")
return " | ".join(recommendations) if recommendations else "No specific recommendations based on profile"
async def _manage_preferences(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Update user preferences"""
user_id = data.get("user_id", "default")
new_preferences = data.get("preferences", {})
if user_id not in self.user_profiles:
self.user_profiles[user_id] = self.user_profiles["default"].copy()
# Update preferences
self.user_profiles[user_id]["preferences"].update(new_preferences)
return {
"findings": [
f"Updated preferences for user {user_id}",
f"Modified {len(new_preferences)} preference settings"
],
"updated_preferences": self.user_profiles[user_id]["preferences"],
"recommendation": "Preferences updated successfully"
}
async def _get_history(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Retrieve user history"""
user_id = data.get("user_id", "default")
history_type = data.get("type", "travel")
profile = self.user_profiles.get(user_id, self.user_profiles["default"])
if history_type == "travel":
history = profile["travel_history"]
else:
history = self.interaction_history
return {
"findings": [
f"Retrieved {len(history)} {history_type} history records",
f"Date range: {self._get_date_range(history)}"
],
"history": history,
"recommendation": f"History data available for analysis"
}
def _get_date_range(self, history: List[Dict[str, Any]]) -> str:
"""Get date range from history"""
if not history:
return "No dates available"
dates = [trip.get("date", "") for trip in history if trip.get("date")]
if not dates:
return "No dates available"
return f"{min(dates)} to {max(dates)}"
def get_agent_info(self) -> Dict[str, Any]:
"""Return memory agent information"""
return {
"agent_id": self.agent_id,
"type": "memory",
"capabilities": self.capabilities,
"specialization": "User preference management and history tracking",
"total_users": len(self.user_profiles),
"interaction_records": len(self.interaction_history)
}