Spaces:
Sleeping
Sleeping
| """User Memory Service - Manages user identification, likes, and play history""" | |
| import json | |
| import os | |
| import uuid | |
| import random | |
| from datetime import datetime | |
| from typing import Dict, Any, List, Optional | |
| # Get project root directory (parent of src/) | |
| PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| # Word list for generating memorable passphrases | |
| PASSPHRASE_WORDS = [ | |
| # Animals | |
| "tiger", "eagle", "dolphin", "panda", "wolf", "fox", "bear", "hawk", "owl", "lion", | |
| "falcon", "phoenix", "dragon", "raven", "shark", "whale", "cobra", "jaguar", "panther", | |
| # Colors | |
| "crimson", "azure", "golden", "silver", "violet", "emerald", "coral", "amber", "jade", | |
| # Nature | |
| "storm", "river", "mountain", "ocean", "forest", "thunder", "crystal", "flame", "frost", | |
| "shadow", "lunar", "solar", "cosmic", "stellar", "aurora", "meadow", "canyon", "glacier", | |
| # Actions | |
| "swift", "brave", "noble", "mighty", "bold", "wild", "silent", "rapid", "fierce", | |
| # Music-related | |
| "melody", "rhythm", "harmony", "sonic", "echo", "tempo", "jazz", "blues", "rock", | |
| # Tech | |
| "cyber", "pixel", "quantum", "neon", "turbo", "nova", "spark", "laser", "pulse" | |
| ] | |
| class UserMemoryService: | |
| """Service for managing user memory, likes, and play history""" | |
| def __init__(self, data_file: str = None): | |
| """Initialize user memory service | |
| Args: | |
| data_file: Path to JSON file for storing user data | |
| """ | |
| self.data_file = data_file or os.path.join(PROJECT_ROOT, "user_memory.json") | |
| self.users: Dict[str, Dict[str, Any]] = {} | |
| self._load_data() | |
| def _load_data(self): | |
| """Load user data from file""" | |
| if os.path.exists(self.data_file): | |
| try: | |
| with open(self.data_file, 'r') as f: | |
| self.users = json.load(f) | |
| print(f"✅ Loaded {len(self.users)} users from memory") | |
| except Exception as e: | |
| print(f"Error loading user memory: {e}") | |
| self.users = {} | |
| else: | |
| self.users = {} | |
| def _save_data(self): | |
| """Save user data to file""" | |
| try: | |
| with open(self.data_file, 'w') as f: | |
| json.dump(self.users, f, indent=2) | |
| except Exception as e: | |
| print(f"Error saving user memory: {e}") | |
| def _generate_passphrase(self, num_words: int = 3) -> str: | |
| """Generate a unique memorable passphrase | |
| Args: | |
| num_words: Number of words in passphrase (2-3) | |
| Returns: | |
| Unique passphrase like "swift-tiger-aurora" | |
| """ | |
| max_attempts = 100 | |
| for _ in range(max_attempts): | |
| words = random.sample(PASSPHRASE_WORDS, num_words) | |
| passphrase = "-".join(words) | |
| # Check if passphrase already exists | |
| if not self._passphrase_exists(passphrase): | |
| return passphrase | |
| # Fallback: add random number if all combinations taken | |
| words = random.sample(PASSPHRASE_WORDS, num_words) | |
| return f"{'-'.join(words)}-{random.randint(100, 999)}" | |
| def _passphrase_exists(self, passphrase: str) -> bool: | |
| """Check if a passphrase is already in use""" | |
| for user_data in self.users.values(): | |
| if user_data.get("passphrase") == passphrase: | |
| return True | |
| return False | |
| def get_user_by_passphrase(self, passphrase: str) -> Optional[str]: | |
| """Find user ID by passphrase | |
| Args: | |
| passphrase: User's passphrase | |
| Returns: | |
| User ID if found, None otherwise | |
| """ | |
| passphrase = passphrase.strip().lower() | |
| for user_id, user_data in self.users.items(): | |
| if user_data.get("passphrase", "").lower() == passphrase: | |
| # Update last seen | |
| self.users[user_id]["last_seen"] = datetime.now().isoformat() | |
| self._save_data() | |
| print(f"✅ User authenticated via passphrase: {user_id}") | |
| return user_id | |
| return None | |
| def get_user_passphrase(self, user_id: str) -> Optional[str]: | |
| """Get passphrase for a user | |
| Args: | |
| user_id: User ID | |
| Returns: | |
| Passphrase if user exists, None otherwise | |
| """ | |
| if user_id in self.users: | |
| return self.users[user_id].get("passphrase") | |
| return None | |
| def get_or_create_user(self, user_id: str = None) -> tuple: | |
| """Get existing user or create new one | |
| Args: | |
| user_id: Optional existing user ID | |
| Returns: | |
| Tuple of (user_id, passphrase, is_new_user) | |
| """ | |
| if user_id and user_id in self.users: | |
| # Update last seen | |
| self.users[user_id]["last_seen"] = datetime.now().isoformat() | |
| self._save_data() | |
| passphrase = self.users[user_id].get("passphrase", "") | |
| return user_id, passphrase, False | |
| # Create new user with passphrase | |
| new_user_id = str(uuid.uuid4())[:8] # Short ID for convenience | |
| passphrase = self._generate_passphrase(num_words=3) | |
| self.users[new_user_id] = { | |
| "created_at": datetime.now().isoformat(), | |
| "last_seen": datetime.now().isoformat(), | |
| "passphrase": passphrase, | |
| "preferences": {}, | |
| "liked_tracks": [], # List of track dicts | |
| "disliked_tracks": [], # List of track identifiers (title+artist) | |
| "play_history": [], # Recent play history | |
| "stats": { | |
| "total_plays": 0, | |
| "total_likes": 0, | |
| "total_dislikes": 0 | |
| } | |
| } | |
| self._save_data() | |
| print(f"✅ Created new user: {new_user_id} with passphrase: {passphrase}") | |
| return new_user_id, passphrase, True | |
| def like_track(self, user_id: str, track: Dict[str, Any]) -> bool: | |
| """Like a track | |
| Args: | |
| user_id: User ID | |
| track: Track data (title, artist, url, etc.) | |
| Returns: | |
| Success status | |
| """ | |
| if user_id not in self.users: | |
| return False | |
| user = self.users[user_id] | |
| track_id = f"{track.get('title', '')}|{track.get('artist', '')}" | |
| # Check if already liked | |
| existing_ids = [f"{t.get('title', '')}|{t.get('artist', '')}" for t in user["liked_tracks"]] | |
| if track_id in existing_ids: | |
| return True # Already liked | |
| # Remove from dislikes if present | |
| user["disliked_tracks"] = [t for t in user["disliked_tracks"] if t != track_id] | |
| # Add to likes with timestamp | |
| liked_track = { | |
| "title": track.get("title", "Unknown"), | |
| "artist": track.get("artist", "Unknown"), | |
| "url": track.get("url", ""), | |
| "youtube_id": track.get("youtube_id", ""), | |
| "source": track.get("source", "youtube"), | |
| "genre": track.get("genre", ""), | |
| "liked_at": datetime.now().isoformat() | |
| } | |
| user["liked_tracks"].append(liked_track) | |
| user["stats"]["total_likes"] += 1 | |
| self._save_data() | |
| print(f"👍 User {user_id} liked: {track.get('title', 'Unknown')}") | |
| return True | |
| def dislike_track(self, user_id: str, track: Dict[str, Any]) -> bool: | |
| """Dislike a track | |
| Args: | |
| user_id: User ID | |
| track: Track data | |
| Returns: | |
| Success status | |
| """ | |
| if user_id not in self.users: | |
| return False | |
| user = self.users[user_id] | |
| track_id = f"{track.get('title', '')}|{track.get('artist', '')}" | |
| # Remove from likes if present | |
| user["liked_tracks"] = [t for t in user["liked_tracks"] | |
| if f"{t.get('title', '')}|{t.get('artist', '')}" != track_id] | |
| # Add to dislikes if not already there | |
| if track_id not in user["disliked_tracks"]: | |
| user["disliked_tracks"].append(track_id) | |
| user["stats"]["total_dislikes"] += 1 | |
| self._save_data() | |
| print(f"👎 User {user_id} disliked: {track.get('title', 'Unknown')}") | |
| return True | |
| def add_to_history(self, user_id: str, track: Dict[str, Any]): | |
| """Add track to play history | |
| Args: | |
| user_id: User ID | |
| track: Track data | |
| """ | |
| if user_id not in self.users: | |
| return | |
| user = self.users[user_id] | |
| history_entry = { | |
| "title": track.get("title", "Unknown"), | |
| "artist": track.get("artist", "Unknown"), | |
| "played_at": datetime.now().isoformat() | |
| } | |
| # Keep last 100 plays | |
| user["play_history"].append(history_entry) | |
| if len(user["play_history"]) > 100: | |
| user["play_history"] = user["play_history"][-100:] | |
| user["stats"]["total_plays"] += 1 | |
| self._save_data() | |
| def get_liked_tracks(self, user_id: str) -> List[Dict[str, Any]]: | |
| """Get user's liked tracks | |
| Args: | |
| user_id: User ID | |
| Returns: | |
| List of liked track dicts | |
| """ | |
| if user_id not in self.users: | |
| return [] | |
| return self.users[user_id].get("liked_tracks", []) | |
| def get_disliked_tracks(self, user_id: str) -> List[str]: | |
| """Get user's disliked track identifiers | |
| Args: | |
| user_id: User ID | |
| Returns: | |
| List of disliked track identifiers (title|artist) | |
| """ | |
| if user_id not in self.users: | |
| return [] | |
| return self.users[user_id].get("disliked_tracks", []) | |
| def get_play_history(self, user_id: str) -> List[Dict[str, Any]]: | |
| """Get user's play history | |
| Args: | |
| user_id: User ID | |
| Returns: | |
| List of played track dicts | |
| """ | |
| if user_id not in self.users: | |
| return [] | |
| return self.users[user_id].get("play_history", []) | |
| def get_random_liked_track(self, user_id: str, genre: str = None) -> Optional[Dict[str, Any]]: | |
| """Get a random liked track, optionally filtered by genre | |
| Args: | |
| user_id: User ID | |
| genre: Optional genre filter | |
| Returns: | |
| Random liked track or None | |
| """ | |
| liked = self.get_liked_tracks(user_id) | |
| if not liked: | |
| return None | |
| # Filter by genre if specified | |
| if genre: | |
| genre_lower = genre.lower() | |
| filtered = [t for t in liked if genre_lower in t.get("genre", "").lower()] | |
| if filtered: | |
| liked = filtered | |
| return random.choice(liked) | |
| def is_track_disliked(self, user_id: str, track: Dict[str, Any]) -> bool: | |
| """Check if track is disliked by user | |
| Args: | |
| user_id: User ID | |
| track: Track data | |
| Returns: | |
| True if disliked | |
| """ | |
| if user_id not in self.users: | |
| return False | |
| track_id = f"{track.get('title', '')}|{track.get('artist', '')}" | |
| return track_id in self.users[user_id].get("disliked_tracks", []) | |
| def get_user_stats(self, user_id: str) -> Dict[str, Any]: | |
| """Get user statistics | |
| Args: | |
| user_id: User ID | |
| Returns: | |
| User stats dict | |
| """ | |
| if user_id not in self.users: | |
| return {"total_plays": 0, "total_likes": 0, "total_dislikes": 0} | |
| return self.users[user_id].get("stats", {}) | |
| def save_user_preferences(self, user_id: str, preferences: Dict[str, Any]): | |
| """Save user preferences | |
| Args: | |
| user_id: User ID | |
| preferences: Preferences dict | |
| """ | |
| if user_id not in self.users: | |
| return | |
| self.users[user_id]["preferences"] = preferences | |
| self._save_data() | |
| def get_user_preferences(self, user_id: str) -> Dict[str, Any]: | |
| """Get user preferences | |
| Args: | |
| user_id: User ID | |
| Returns: | |
| Preferences dict | |
| """ | |
| if user_id not in self.users: | |
| return {} | |
| return self.users[user_id].get("preferences", {}) | |
| def should_play_liked_track(self, user_id: str, probability: float = 0.3) -> bool: | |
| """Determine if we should play a liked track based on probability | |
| Args: | |
| user_id: User ID | |
| probability: Probability of playing liked track (0.0 - 1.0) | |
| Returns: | |
| True if should play liked track | |
| """ | |
| liked = self.get_liked_tracks(user_id) | |
| if not liked: | |
| return False | |
| return random.random() < probability | |