Spaces:
Sleeping
Sleeping
| """ | |
| Advanced User Learning System | |
| This module implements a sophisticated user learning system with: | |
| - Machine learning-based user type classification | |
| - Preference inference from interaction patterns | |
| - Contextual personalization | |
| - Privacy-preserving user profiling | |
| - Real-time adaptation | |
| """ | |
| import json | |
| import logging | |
| import asyncio | |
| from datetime import datetime, timezone, timedelta | |
| from typing import Dict, List, Any, Optional, Tuple, Set | |
| from dataclasses import dataclass, asdict, field | |
| from enum import Enum | |
| from pathlib import Path | |
| import hashlib | |
| import uuid | |
| from ..utils.logging import get_logger | |
| logger = get_logger(__name__) | |
| class UserType(Enum): | |
| """Sophisticated user type classification based on behavioral patterns.""" | |
| NEW_USER = "new_user" | |
| BUDGET_TRAVELER = "budget_traveler" | |
| LUXURY_SEEKER = "luxury_seeker" | |
| FAMILY_TRAVELER = "family_traveler" | |
| BUSINESS_TRAVELER = "business_traveler" | |
| ADVENTURE_SEEKER = "adventure_seeker" | |
| FREQUENT_TRAVELER = "frequent_traveler" | |
| GROUP_TRAVELER = "group_traveler" | |
| SOLO_TRAVELER = "solo_traveler" | |
| LAST_MINUTE_TRAVELER = "last_minute_traveler" | |
| PLANNED_TRAVELER = "planned_traveler" | |
| class InteractionType(Enum): | |
| """Comprehensive interaction types for detailed analysis.""" | |
| SEARCH_INITIATED = "search_initiated" | |
| SEARCH_MODIFIED = "search_modified" | |
| RESULTS_VIEWED = "results_viewed" | |
| RESULT_SELECTED = "result_selected" | |
| BOOKING_STARTED = "booking_started" | |
| BOOKING_COMPLETED = "booking_completed" | |
| QUESTION_ASKED = "question_asked" | |
| FEEDBACK_PROVIDED = "feedback_provided" | |
| PREFERENCE_UPDATED = "preference_updated" | |
| SESSION_STARTED = "session_started" | |
| SESSION_ENDED = "session_ended" | |
| ERROR_ENCOUNTERED = "error_encountered" | |
| class UserPreferences: | |
| """Comprehensive user preferences with confidence scoring.""" | |
| budget_range: Optional[Tuple[float, float]] = None | |
| budget_confidence: float = 0.0 | |
| preferred_airlines: List[str] = field(default_factory=list) | |
| airline_confidence: float = 0.0 | |
| accommodation_types: List[str] = field(default_factory=list) | |
| accommodation_confidence: float = 0.0 | |
| activity_preferences: List[str] = field(default_factory=list) | |
| activity_confidence: float = 0.0 | |
| travel_style: Optional[str] = None | |
| style_confidence: float = 0.0 | |
| booking_lead_time: Optional[int] = None # days in advance | |
| lead_time_confidence: float = 0.0 | |
| group_size_preference: Optional[int] = None | |
| group_confidence: float = 0.0 | |
| def get_confidence_score(self) -> float: | |
| """Calculate overall confidence in user preferences.""" | |
| scores = [ | |
| self.budget_confidence, | |
| self.airline_confidence, | |
| self.accommodation_confidence, | |
| self.activity_confidence, | |
| self.style_confidence, | |
| self.lead_time_confidence, | |
| self.group_confidence | |
| ] | |
| return sum(scores) / len(scores) if scores else 0.0 | |
| class Interaction: | |
| """Rich interaction data with context and metadata.""" | |
| interaction_id: str | |
| user_id: str | |
| session_id: str | |
| interaction_type: InteractionType | |
| timestamp: datetime | |
| context: Dict[str, Any] | |
| data: Dict[str, Any] | |
| outcome: Optional[str] = None | |
| satisfaction_score: Optional[float] = None | |
| duration_seconds: Optional[float] = None | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| class UserProfile: | |
| """Advanced user profile with learning capabilities.""" | |
| user_id: str | |
| profile_version: int = 1 | |
| user_type: UserType = UserType.NEW_USER | |
| user_type_confidence: float = 0.0 | |
| preferences: UserPreferences = field(default_factory=UserPreferences) | |
| # Interaction tracking | |
| total_interactions: int = 0 | |
| successful_interactions: int = 0 | |
| average_satisfaction: float = 0.0 | |
| # Temporal patterns | |
| preferred_times: Dict[str, List[int]] = field(default_factory=dict) # hour of day | |
| preferred_days: Set[int] = field(default_factory=set) # day of week | |
| seasonal_patterns: Dict[str, int] = field(default_factory=dict) | |
| # Learning metrics | |
| learning_velocity: float = 0.0 # how quickly preferences are learned | |
| profile_stability: float = 1.0 # how stable the profile is | |
| # Privacy and consent | |
| data_retention_days: int = 365 | |
| consent_level: str = "basic" # basic, enhanced, full | |
| created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) | |
| last_updated: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) | |
| last_interaction: Optional[datetime] = None | |
| class UserLearningSystem: | |
| """ | |
| Advanced user learning system with ML-based personalization. | |
| Features: | |
| - Real-time preference learning | |
| - User type classification | |
| - Contextual adaptation | |
| - Privacy preservation | |
| - Performance optimization | |
| """ | |
| def __init__(self, | |
| storage_path: str = "user_profiles", | |
| max_profiles_in_memory: int = 1000, | |
| learning_rate: float = 0.1): | |
| self.storage_path = Path(storage_path) | |
| self.storage_path.mkdir(exist_ok=True) | |
| self.max_profiles_in_memory = max_profiles_in_memory | |
| self.learning_rate = learning_rate | |
| # In-memory cache for performance | |
| self.profiles_cache: Dict[str, UserProfile] = {} | |
| self.interaction_buffer: List[Interaction] = [] | |
| self.learning_models = self._initialize_learning_models() | |
| # Performance tracking | |
| self.cache_hits = 0 | |
| self.cache_misses = 0 | |
| logger.info(f"UserLearningSystem initialized with storage at {self.storage_path}") | |
| def _initialize_learning_models(self) -> Dict[str, Any]: | |
| """Initialize machine learning models for user classification.""" | |
| # In a real implementation, this would load trained ML models | |
| return { | |
| "user_type_classifier": None, | |
| "preference_predictor": None, | |
| "satisfaction_predictor": None, | |
| "churn_predictor": None | |
| } | |
| async def get_user_profile(self, user_id: str) -> UserProfile: | |
| """Get user profile with intelligent caching.""" | |
| # Check cache first | |
| if user_id in self.profiles_cache: | |
| self.cache_hits += 1 | |
| profile = self.profiles_cache[user_id] | |
| # Check if profile needs refresh | |
| if self._should_refresh_profile(profile): | |
| await self._refresh_profile(profile) | |
| return profile | |
| # Load from storage | |
| self.cache_misses += 1 | |
| profile = await self._load_profile_from_storage(user_id) | |
| # Add to cache | |
| await self._add_to_cache(profile) | |
| return profile | |
| async def record_interaction(self, | |
| user_id: str, | |
| session_id: str, | |
| interaction_type: InteractionType, | |
| context: Dict[str, Any], | |
| data: Dict[str, Any], | |
| outcome: Optional[str] = None, | |
| satisfaction_score: Optional[float] = None, | |
| duration_seconds: Optional[float] = None) -> str: | |
| """Record a user interaction with advanced learning.""" | |
| interaction_id = self._generate_interaction_id() | |
| interaction = Interaction( | |
| interaction_id=interaction_id, | |
| user_id=user_id, | |
| session_id=session_id, | |
| interaction_type=interaction_type, | |
| timestamp=datetime.now(timezone.utc), | |
| context=context, | |
| data=data, | |
| outcome=outcome, | |
| satisfaction_score=satisfaction_score, | |
| duration_seconds=duration_seconds | |
| ) | |
| # Add to buffer for batch processing | |
| self.interaction_buffer.append(interaction) | |
| # Process immediately if critical interaction | |
| if interaction_type in [InteractionType.BOOKING_COMPLETED, InteractionType.FEEDBACK_PROVIDED]: | |
| await self._process_interaction_immediately(interaction) | |
| # Batch process if buffer is full | |
| if len(self.interaction_buffer) >= 50: | |
| await self._process_interaction_batch() | |
| return interaction_id | |
| async def _process_interaction_immediately(self, interaction: Interaction): | |
| """Process critical interactions immediately.""" | |
| profile = await self.get_user_profile(interaction.user_id) | |
| await self._update_profile_from_interaction(profile, interaction) | |
| await self._save_profile(profile) | |
| async def _process_interaction_batch(self): | |
| """Process interactions in batch for efficiency.""" | |
| if not self.interaction_buffer: | |
| return | |
| # Group by user for efficient processing | |
| user_interactions = {} | |
| for interaction in self.interaction_buffer: | |
| user_id = interaction.user_id | |
| if user_id not in user_interactions: | |
| user_interactions[user_id] = [] | |
| user_interactions[user_id].append(interaction) | |
| # Process each user's interactions | |
| for user_id, interactions in user_interactions.items(): | |
| profile = await self.get_user_profile(user_id) | |
| for interaction in interactions: | |
| await self._update_profile_from_interaction(profile, interaction) | |
| await self._save_profile(profile) | |
| # Clear buffer | |
| self.interaction_buffer.clear() | |
| logger.info(f"Processed batch of {len(user_interactions)} users") | |
| async def _update_profile_from_interaction(self, profile: UserProfile, interaction: Interaction): | |
| """Update user profile based on interaction using ML techniques.""" | |
| # Update basic metrics | |
| profile.total_interactions += 1 | |
| profile.last_interaction = interaction.timestamp | |
| profile.last_updated = datetime.now(timezone.utc) | |
| if interaction.outcome == "success": | |
| profile.successful_interactions += 1 | |
| # Update satisfaction | |
| if interaction.satisfaction_score is not None: | |
| self._update_satisfaction_score(profile, interaction.satisfaction_score) | |
| # Learn preferences from interaction | |
| await self._learn_preferences_from_interaction(profile, interaction) | |
| # Update user type classification | |
| await self._update_user_type_classification(profile, interaction) | |
| # Update temporal patterns | |
| self._update_temporal_patterns(profile, interaction) | |
| # Update learning metrics | |
| self._update_learning_metrics(profile, interaction) | |
| def _update_satisfaction_score(self, profile: UserProfile, new_score: float): | |
| """Update running average satisfaction score.""" | |
| if profile.total_interactions == 1: | |
| profile.average_satisfaction = new_score | |
| else: | |
| # Exponential moving average | |
| alpha = self.learning_rate | |
| profile.average_satisfaction = ( | |
| alpha * new_score + (1 - alpha) * profile.average_satisfaction | |
| ) | |
| async def _learn_preferences_from_interaction(self, profile: UserProfile, interaction: Interaction): | |
| """Learn user preferences from interaction data.""" | |
| data = interaction.data | |
| # Budget learning | |
| if "budget" in data: | |
| budget = float(data["budget"]) | |
| if profile.preferences.budget_range is None: | |
| profile.preferences.budget_range = (budget * 0.8, budget * 1.2) | |
| profile.preferences.budget_confidence = 0.5 | |
| else: | |
| # Update budget range based on new information | |
| low, high = profile.preferences.budget_range | |
| new_low = min(low, budget * 0.8) | |
| new_high = max(high, budget * 1.2) | |
| profile.preferences.budget_range = (new_low, new_high) | |
| profile.preferences.budget_confidence = min(1.0, profile.preferences.budget_confidence + 0.1) | |
| # Airline preferences | |
| if "airline" in data: | |
| airline = data["airline"] | |
| if airline not in profile.preferences.preferred_airlines: | |
| profile.preferences.preferred_airlines.append(airline) | |
| profile.preferences.airline_confidence += 0.05 | |
| # Accommodation preferences | |
| if "accommodation_type" in data: | |
| acc_type = data["accommodation_type"] | |
| if acc_type not in profile.preferences.accommodation_types: | |
| profile.preferences.accommodation_types.append(acc_type) | |
| profile.preferences.accommodation_confidence += 0.05 | |
| # Activity preferences | |
| if "activities" in data: | |
| activities = data["activities"] | |
| if isinstance(activities, list): | |
| for activity in activities: | |
| if activity not in profile.preferences.activity_preferences: | |
| profile.preferences.activity_preferences.append(activity) | |
| profile.preferences.activity_confidence += 0.03 | |
| async def _update_user_type_classification(self, profile: UserProfile, interaction: Interaction): | |
| """Update user type classification using ML techniques.""" | |
| # Feature extraction | |
| features = self._extract_classification_features(profile, interaction) | |
| # In a real implementation, this would use a trained ML model | |
| # For now, use rule-based classification | |
| new_user_type = self._classify_user_type(features) | |
| # Update with confidence | |
| if new_user_type != profile.user_type: | |
| # Gradual transition based on confidence | |
| if profile.user_type_confidence < 0.7: | |
| profile.user_type = new_user_type | |
| profile.user_type_confidence = 0.6 | |
| else: | |
| # High confidence - require multiple confirmations | |
| profile.user_type_confidence *= 0.95 # Slight decay | |
| def _extract_classification_features(self, profile: UserProfile, interaction: Interaction) -> Dict[str, Any]: | |
| """Extract features for user type classification.""" | |
| return { | |
| "budget_range": profile.preferences.budget_range, | |
| "booking_lead_time": profile.preferences.booking_lead_time, | |
| "group_size": profile.preferences.group_size_preference, | |
| "interaction_frequency": profile.total_interactions, | |
| "satisfaction_score": profile.average_satisfaction, | |
| "preferred_airlines_count": len(profile.preferences.preferred_airlines), | |
| "activity_preferences_count": len(profile.preferences.activity_preferences), | |
| "session_duration": interaction.duration_seconds, | |
| "time_of_day": interaction.timestamp.hour, | |
| "day_of_week": interaction.timestamp.weekday() | |
| } | |
| def _classify_user_type(self, features: Dict[str, Any]) -> UserType: | |
| """Classify user type based on features.""" | |
| # Rule-based classification (in production, use ML model) | |
| budget_range = features.get("budget_range") | |
| group_size = features.get("group_size") | |
| lead_time = features.get("booking_lead_time") | |
| if budget_range and budget_range[1] < 1000: | |
| return UserType.BUDGET_TRAVELER | |
| elif budget_range and budget_range[0] > 5000: | |
| return UserType.LUXURY_SEEKER | |
| elif group_size and group_size > 2: | |
| return UserType.GROUP_TRAVELER | |
| elif group_size == 1: | |
| return UserType.SOLO_TRAVELER | |
| elif lead_time and lead_time < 7: | |
| return UserType.LAST_MINUTE_TRAVELER | |
| elif lead_time and lead_time > 30: | |
| return UserType.PLANNED_TRAVELER | |
| else: | |
| return UserType.FREQUENT_TRAVELER | |
| def _update_temporal_patterns(self, profile: UserProfile, interaction: Interaction): | |
| """Update temporal usage patterns.""" | |
| hour = interaction.timestamp.hour | |
| day = interaction.timestamp.weekday() | |
| month = interaction.timestamp.month | |
| # Update preferred times | |
| if "hourly" not in profile.preferred_times: | |
| profile.preferred_times["hourly"] = [] | |
| profile.preferred_times["hourly"].append(hour) | |
| # Keep only recent data (last 30 days) | |
| cutoff_date = datetime.now(timezone.utc) - timedelta(days=30) | |
| # This would filter the hourly data in a real implementation | |
| # Update preferred days | |
| profile.preferred_days.add(day) | |
| # Update seasonal patterns | |
| season = self._get_season(month) | |
| profile.seasonal_patterns[season] = profile.seasonal_patterns.get(season, 0) + 1 | |
| def _get_season(self, month: int) -> str: | |
| """Get season from month.""" | |
| if month in [12, 1, 2]: | |
| return "winter" | |
| elif month in [3, 4, 5]: | |
| return "spring" | |
| elif month in [6, 7, 8]: | |
| return "summer" | |
| else: | |
| return "fall" | |
| def _update_learning_metrics(self, profile: UserProfile, interaction: Interaction): | |
| """Update learning velocity and profile stability.""" | |
| # Learning velocity - how quickly preferences are being learned | |
| recent_interactions = 10 # Last 10 interactions | |
| if profile.total_interactions >= recent_interactions: | |
| # Calculate how much preferences have changed recently | |
| profile.learning_velocity = min(1.0, profile.preferences.get_confidence_score()) | |
| # Profile stability - how stable the profile is | |
| if profile.total_interactions > 1: | |
| # Decrease stability slightly with each interaction | |
| profile.profile_stability *= 0.999 | |
| else: | |
| profile.profile_stability = 1.0 | |
| async def get_personalized_recommendations(self, user_id: str, context: Dict[str, Any]) -> Dict[str, Any]: | |
| """Get personalized recommendations based on user profile.""" | |
| profile = await self.get_user_profile(user_id) | |
| recommendations = { | |
| "user_type": profile.user_type.value, | |
| "confidence": profile.user_type_confidence, | |
| "preferences": asdict(profile.preferences), | |
| "personalization_level": min(profile.total_interactions / 20, 1.0), | |
| "recommended_approach": self._get_conversation_approach(profile), | |
| "contextual_suggestions": await self._get_contextual_suggestions(profile, context), | |
| "risk_factors": self._identify_risk_factors(profile) | |
| } | |
| return recommendations | |
| def _get_conversation_approach(self, profile: UserProfile) -> str: | |
| """Get recommended conversation approach based on profile.""" | |
| approach_map = { | |
| UserType.NEW_USER: "guided_discovery", | |
| UserType.BUDGET_TRAVELER: "value_focused", | |
| UserType.LUXURY_SEEKER: "premium_experience", | |
| UserType.FAMILY_TRAVELER: "family_friendly", | |
| UserType.BUSINESS_TRAVELER: "efficient_direct", | |
| UserType.ADVENTURE_SEEKER: "adventure_focused", | |
| UserType.GROUP_TRAVELER: "collaborative", | |
| UserType.SOLO_TRAVELER: "flexible_independent", | |
| UserType.LAST_MINUTE_TRAVELER: "urgent_options", | |
| UserType.PLANNED_TRAVELER: "detailed_planning" | |
| } | |
| return approach_map.get(profile.user_type, "balanced") | |
| async def _get_contextual_suggestions(self, profile: UserProfile, context: Dict[str, Any]) -> List[str]: | |
| """Get contextual suggestions based on current situation.""" | |
| suggestions = [] | |
| # Time-based suggestions | |
| current_hour = datetime.now().hour | |
| if current_hour < 9 or current_hour > 18: | |
| suggestions.append("Consider booking outside business hours for better deals") | |
| # Seasonal suggestions | |
| current_month = datetime.now().month | |
| season = self._get_season(current_month) | |
| if season in profile.seasonal_patterns: | |
| suggestions.append(f"Based on your {season} travel history, here are some options") | |
| # Budget-based suggestions | |
| if profile.preferences.budget_range: | |
| suggestions.append(f"Options within your typical budget range of ${profile.preferences.budget_range[0]:.0f}-${profile.preferences.budget_range[1]:.0f}") | |
| return suggestions | |
| def _identify_risk_factors(self, profile: UserProfile) -> List[str]: | |
| """Identify potential risk factors for user satisfaction.""" | |
| risks = [] | |
| if profile.average_satisfaction < 3.0: | |
| risks.append("Low historical satisfaction - needs attention") | |
| if profile.total_interactions > 50 and profile.successful_interactions / profile.total_interactions < 0.7: | |
| risks.append("High interaction failure rate") | |
| if profile.profile_stability < 0.5: | |
| risks.append("Unstable preferences - user may be exploring") | |
| return risks | |
| async def _load_profile_from_storage(self, user_id: str) -> UserProfile: | |
| """Load user profile from persistent storage.""" | |
| profile_file = self.storage_path / f"{self._hash_user_id(user_id)}.json" | |
| if not profile_file.exists(): | |
| return self._create_new_profile(user_id) | |
| try: | |
| with open(profile_file, 'r') as f: | |
| data = json.load(f) | |
| # Convert loaded data back to UserProfile | |
| preferences_data = data.get('preferences', {}) | |
| preferences = UserPreferences(**preferences_data) | |
| profile = UserProfile( | |
| user_id=user_id, | |
| profile_version=data.get('profile_version', 1), | |
| user_type=UserType(data.get('user_type', 'new_user')), | |
| user_type_confidence=data.get('user_type_confidence', 0.0), | |
| preferences=preferences, | |
| total_interactions=data.get('total_interactions', 0), | |
| successful_interactions=data.get('successful_interactions', 0), | |
| average_satisfaction=data.get('average_satisfaction', 0.0), | |
| learning_velocity=data.get('learning_velocity', 0.0), | |
| profile_stability=data.get('profile_stability', 1.0), | |
| data_retention_days=data.get('data_retention_days', 365), | |
| consent_level=data.get('consent_level', 'basic'), | |
| created_at=datetime.fromisoformat(data.get('created_at')), | |
| last_updated=datetime.fromisoformat(data.get('last_updated')), | |
| last_interaction=datetime.fromisoformat(data['last_interaction']) if data.get('last_interaction') else None | |
| ) | |
| return profile | |
| except Exception as e: | |
| logger.error(f"Error loading profile for {user_id}: {e}") | |
| return self._create_new_profile(user_id) | |
| async def _save_profile(self, profile: UserProfile): | |
| """Save user profile to persistent storage.""" | |
| profile_file = self.storage_path / f"{self._hash_user_id(profile.user_id)}.json" | |
| try: | |
| # Convert to serializable format | |
| data = { | |
| 'profile_version': profile.profile_version, | |
| 'user_type': profile.user_type.value, | |
| 'user_type_confidence': profile.user_type_confidence, | |
| 'preferences': asdict(profile.preferences), | |
| 'total_interactions': profile.total_interactions, | |
| 'successful_interactions': profile.successful_interactions, | |
| 'average_satisfaction': profile.average_satisfaction, | |
| 'learning_velocity': profile.learning_velocity, | |
| 'profile_stability': profile.profile_stability, | |
| 'data_retention_days': profile.data_retention_days, | |
| 'consent_level': profile.consent_level, | |
| 'created_at': profile.created_at.isoformat(), | |
| 'last_updated': profile.last_updated.isoformat(), | |
| 'last_interaction': profile.last_interaction.isoformat() if profile.last_interaction else None | |
| } | |
| with open(profile_file, 'w') as f: | |
| json.dump(data, f, indent=2) | |
| except Exception as e: | |
| logger.error(f"Error saving profile for {profile.user_id}: {e}") | |
| def _create_new_profile(self, user_id: str) -> UserProfile: | |
| """Create a new user profile.""" | |
| return UserProfile(user_id=user_id) | |
| def _hash_user_id(self, user_id: str) -> str: | |
| """Hash user ID for privacy-preserving storage.""" | |
| return hashlib.sha256(user_id.encode()).hexdigest()[:16] | |
| def _generate_interaction_id(self) -> str: | |
| """Generate unique interaction ID.""" | |
| return f"int_{uuid.uuid4().hex[:12]}" | |
| async def _add_to_cache(self, profile: UserProfile): | |
| """Add profile to cache with LRU eviction.""" | |
| if len(self.profiles_cache) >= self.max_profiles_in_memory: | |
| # Remove oldest accessed profile | |
| oldest_key = min(self.profiles_cache.keys(), | |
| key=lambda k: self.profiles_cache[k].last_updated) | |
| del self.profiles_cache[oldest_key] | |
| self.profiles_cache[profile.user_id] = profile | |
| def _should_refresh_profile(self, profile: UserProfile) -> bool: | |
| """Check if profile needs refresh from storage.""" | |
| # Refresh if not accessed in last hour | |
| return (datetime.now(timezone.utc) - profile.last_updated).total_seconds() > 3600 | |
| async def _refresh_profile(self, profile: UserProfile): | |
| """Refresh profile from storage.""" | |
| fresh_profile = await self._load_profile_from_storage(profile.user_id) | |
| self.profiles_cache[profile.user_id] = fresh_profile | |
| def get_system_metrics(self) -> Dict[str, Any]: | |
| """Get system performance metrics.""" | |
| return { | |
| "cache_hit_rate": self.cache_hits / (self.cache_hits + self.cache_misses) if (self.cache_hits + self.cache_misses) > 0 else 0, | |
| "profiles_in_cache": len(self.profiles_cache), | |
| "interactions_in_buffer": len(self.interaction_buffer), | |
| "total_profiles_stored": len(list(self.storage_path.glob("*.json"))) | |
| } | |