"""User Memory Service - Manages user identification, likes, and play history""" import json import os import uuid import random from datetime import datetime from typing import Dict, Any, List, Optional # Get project root directory (parent of src/) PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # Word list for generating memorable passphrases PASSPHRASE_WORDS = [ # Animals "tiger", "eagle", "dolphin", "panda", "wolf", "fox", "bear", "hawk", "owl", "lion", "falcon", "phoenix", "dragon", "raven", "shark", "whale", "cobra", "jaguar", "panther", # Colors "crimson", "azure", "golden", "silver", "violet", "emerald", "coral", "amber", "jade", # Nature "storm", "river", "mountain", "ocean", "forest", "thunder", "crystal", "flame", "frost", "shadow", "lunar", "solar", "cosmic", "stellar", "aurora", "meadow", "canyon", "glacier", # Actions "swift", "brave", "noble", "mighty", "bold", "wild", "silent", "rapid", "fierce", # Music-related "melody", "rhythm", "harmony", "sonic", "echo", "tempo", "jazz", "blues", "rock", # Tech "cyber", "pixel", "quantum", "neon", "turbo", "nova", "spark", "laser", "pulse" ] class UserMemoryService: """Service for managing user memory, likes, and play history""" def __init__(self, data_file: str = None): """Initialize user memory service Args: data_file: Path to JSON file for storing user data """ self.data_file = data_file or os.path.join(PROJECT_ROOT, "user_memory.json") self.users: Dict[str, Dict[str, Any]] = {} self._load_data() def _load_data(self): """Load user data from file""" if os.path.exists(self.data_file): try: with open(self.data_file, 'r') as f: self.users = json.load(f) print(f"✅ Loaded {len(self.users)} users from memory") except Exception as e: print(f"Error loading user memory: {e}") self.users = {} else: self.users = {} def _save_data(self): """Save user data to file""" try: with open(self.data_file, 'w') as f: json.dump(self.users, f, indent=2) except Exception as e: print(f"Error saving user memory: {e}") def _generate_passphrase(self, num_words: int = 3) -> str: """Generate a unique memorable passphrase Args: num_words: Number of words in passphrase (2-3) Returns: Unique passphrase like "swift-tiger-aurora" """ max_attempts = 100 for _ in range(max_attempts): words = random.sample(PASSPHRASE_WORDS, num_words) passphrase = "-".join(words) # Check if passphrase already exists if not self._passphrase_exists(passphrase): return passphrase # Fallback: add random number if all combinations taken words = random.sample(PASSPHRASE_WORDS, num_words) return f"{'-'.join(words)}-{random.randint(100, 999)}" def _passphrase_exists(self, passphrase: str) -> bool: """Check if a passphrase is already in use""" for user_data in self.users.values(): if user_data.get("passphrase") == passphrase: return True return False def get_user_by_passphrase(self, passphrase: str) -> Optional[str]: """Find user ID by passphrase Args: passphrase: User's passphrase Returns: User ID if found, None otherwise """ passphrase = passphrase.strip().lower() for user_id, user_data in self.users.items(): if user_data.get("passphrase", "").lower() == passphrase: # Update last seen self.users[user_id]["last_seen"] = datetime.now().isoformat() self._save_data() print(f"✅ User authenticated via passphrase: {user_id}") return user_id return None def get_user_passphrase(self, user_id: str) -> Optional[str]: """Get passphrase for a user Args: user_id: User ID Returns: Passphrase if user exists, None otherwise """ if user_id in self.users: return self.users[user_id].get("passphrase") return None def get_or_create_user(self, user_id: str = None) -> tuple: """Get existing user or create new one Args: user_id: Optional existing user ID Returns: Tuple of (user_id, passphrase, is_new_user) """ if user_id and user_id in self.users: # Update last seen self.users[user_id]["last_seen"] = datetime.now().isoformat() self._save_data() passphrase = self.users[user_id].get("passphrase", "") return user_id, passphrase, False # Create new user with passphrase new_user_id = str(uuid.uuid4())[:8] # Short ID for convenience passphrase = self._generate_passphrase(num_words=3) self.users[new_user_id] = { "created_at": datetime.now().isoformat(), "last_seen": datetime.now().isoformat(), "passphrase": passphrase, "preferences": {}, "liked_tracks": [], # List of track dicts "disliked_tracks": [], # List of track identifiers (title+artist) "play_history": [], # Recent play history "stats": { "total_plays": 0, "total_likes": 0, "total_dislikes": 0 } } self._save_data() print(f"✅ Created new user: {new_user_id} with passphrase: {passphrase}") return new_user_id, passphrase, True def like_track(self, user_id: str, track: Dict[str, Any]) -> bool: """Like a track Args: user_id: User ID track: Track data (title, artist, url, etc.) Returns: Success status """ if user_id not in self.users: return False user = self.users[user_id] track_id = f"{track.get('title', '')}|{track.get('artist', '')}" # Check if already liked existing_ids = [f"{t.get('title', '')}|{t.get('artist', '')}" for t in user["liked_tracks"]] if track_id in existing_ids: return True # Already liked # Remove from dislikes if present user["disliked_tracks"] = [t for t in user["disliked_tracks"] if t != track_id] # Add to likes with timestamp liked_track = { "title": track.get("title", "Unknown"), "artist": track.get("artist", "Unknown"), "url": track.get("url", ""), "youtube_id": track.get("youtube_id", ""), "source": track.get("source", "youtube"), "genre": track.get("genre", ""), "liked_at": datetime.now().isoformat() } user["liked_tracks"].append(liked_track) user["stats"]["total_likes"] += 1 self._save_data() print(f"👍 User {user_id} liked: {track.get('title', 'Unknown')}") return True def dislike_track(self, user_id: str, track: Dict[str, Any]) -> bool: """Dislike a track Args: user_id: User ID track: Track data Returns: Success status """ if user_id not in self.users: return False user = self.users[user_id] track_id = f"{track.get('title', '')}|{track.get('artist', '')}" # Remove from likes if present user["liked_tracks"] = [t for t in user["liked_tracks"] if f"{t.get('title', '')}|{t.get('artist', '')}" != track_id] # Add to dislikes if not already there if track_id not in user["disliked_tracks"]: user["disliked_tracks"].append(track_id) user["stats"]["total_dislikes"] += 1 self._save_data() print(f"👎 User {user_id} disliked: {track.get('title', 'Unknown')}") return True def add_to_history(self, user_id: str, track: Dict[str, Any]): """Add track to play history Args: user_id: User ID track: Track data """ if user_id not in self.users: return user = self.users[user_id] history_entry = { "title": track.get("title", "Unknown"), "artist": track.get("artist", "Unknown"), "played_at": datetime.now().isoformat() } # Keep last 100 plays user["play_history"].append(history_entry) if len(user["play_history"]) > 100: user["play_history"] = user["play_history"][-100:] user["stats"]["total_plays"] += 1 self._save_data() def get_liked_tracks(self, user_id: str) -> List[Dict[str, Any]]: """Get user's liked tracks Args: user_id: User ID Returns: List of liked track dicts """ if user_id not in self.users: return [] return self.users[user_id].get("liked_tracks", []) def get_disliked_tracks(self, user_id: str) -> List[str]: """Get user's disliked track identifiers Args: user_id: User ID Returns: List of disliked track identifiers (title|artist) """ if user_id not in self.users: return [] return self.users[user_id].get("disliked_tracks", []) def get_play_history(self, user_id: str) -> List[Dict[str, Any]]: """Get user's play history Args: user_id: User ID Returns: List of played track dicts """ if user_id not in self.users: return [] return self.users[user_id].get("play_history", []) def get_random_liked_track(self, user_id: str, genre: str = None) -> Optional[Dict[str, Any]]: """Get a random liked track, optionally filtered by genre Args: user_id: User ID genre: Optional genre filter Returns: Random liked track or None """ liked = self.get_liked_tracks(user_id) if not liked: return None # Filter by genre if specified if genre: genre_lower = genre.lower() filtered = [t for t in liked if genre_lower in t.get("genre", "").lower()] if filtered: liked = filtered return random.choice(liked) def is_track_disliked(self, user_id: str, track: Dict[str, Any]) -> bool: """Check if track is disliked by user Args: user_id: User ID track: Track data Returns: True if disliked """ if user_id not in self.users: return False track_id = f"{track.get('title', '')}|{track.get('artist', '')}" return track_id in self.users[user_id].get("disliked_tracks", []) def get_user_stats(self, user_id: str) -> Dict[str, Any]: """Get user statistics Args: user_id: User ID Returns: User stats dict """ if user_id not in self.users: return {"total_plays": 0, "total_likes": 0, "total_dislikes": 0} return self.users[user_id].get("stats", {}) def save_user_preferences(self, user_id: str, preferences: Dict[str, Any]): """Save user preferences Args: user_id: User ID preferences: Preferences dict """ if user_id not in self.users: return self.users[user_id]["preferences"] = preferences self._save_data() def get_user_preferences(self, user_id: str) -> Dict[str, Any]: """Get user preferences Args: user_id: User ID Returns: Preferences dict """ if user_id not in self.users: return {} return self.users[user_id].get("preferences", {}) def should_play_liked_track(self, user_id: str, probability: float = 0.3) -> bool: """Determine if we should play a liked track based on probability Args: user_id: User ID probability: Probability of playing liked track (0.0 - 1.0) Returns: True if should play liked track """ liked = self.get_liked_tracks(user_id) if not liked: return False return random.random() < probability