AI-RADIO / src /user_memory.py
Nikita Makarov
works cool
0df1705
"""User Memory Service - Manages user identification, likes, and play history"""
import json
import os
import uuid
import random
from datetime import datetime
from typing import Dict, Any, List, Optional
# Get project root directory (parent of src/)
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# Word list for generating memorable passphrases
PASSPHRASE_WORDS = [
# Animals
"tiger", "eagle", "dolphin", "panda", "wolf", "fox", "bear", "hawk", "owl", "lion",
"falcon", "phoenix", "dragon", "raven", "shark", "whale", "cobra", "jaguar", "panther",
# Colors
"crimson", "azure", "golden", "silver", "violet", "emerald", "coral", "amber", "jade",
# Nature
"storm", "river", "mountain", "ocean", "forest", "thunder", "crystal", "flame", "frost",
"shadow", "lunar", "solar", "cosmic", "stellar", "aurora", "meadow", "canyon", "glacier",
# Actions
"swift", "brave", "noble", "mighty", "bold", "wild", "silent", "rapid", "fierce",
# Music-related
"melody", "rhythm", "harmony", "sonic", "echo", "tempo", "jazz", "blues", "rock",
# Tech
"cyber", "pixel", "quantum", "neon", "turbo", "nova", "spark", "laser", "pulse"
]
class UserMemoryService:
"""Service for managing user memory, likes, and play history"""
def __init__(self, data_file: str = None):
"""Initialize user memory service
Args:
data_file: Path to JSON file for storing user data
"""
self.data_file = data_file or os.path.join(PROJECT_ROOT, "user_memory.json")
self.users: Dict[str, Dict[str, Any]] = {}
self._load_data()
def _load_data(self):
"""Load user data from file"""
if os.path.exists(self.data_file):
try:
with open(self.data_file, 'r') as f:
self.users = json.load(f)
print(f"✅ Loaded {len(self.users)} users from memory")
except Exception as e:
print(f"Error loading user memory: {e}")
self.users = {}
else:
self.users = {}
def _save_data(self):
"""Save user data to file"""
try:
with open(self.data_file, 'w') as f:
json.dump(self.users, f, indent=2)
except Exception as e:
print(f"Error saving user memory: {e}")
def _generate_passphrase(self, num_words: int = 3) -> str:
"""Generate a unique memorable passphrase
Args:
num_words: Number of words in passphrase (2-3)
Returns:
Unique passphrase like "swift-tiger-aurora"
"""
max_attempts = 100
for _ in range(max_attempts):
words = random.sample(PASSPHRASE_WORDS, num_words)
passphrase = "-".join(words)
# Check if passphrase already exists
if not self._passphrase_exists(passphrase):
return passphrase
# Fallback: add random number if all combinations taken
words = random.sample(PASSPHRASE_WORDS, num_words)
return f"{'-'.join(words)}-{random.randint(100, 999)}"
def _passphrase_exists(self, passphrase: str) -> bool:
"""Check if a passphrase is already in use"""
for user_data in self.users.values():
if user_data.get("passphrase") == passphrase:
return True
return False
def get_user_by_passphrase(self, passphrase: str) -> Optional[str]:
"""Find user ID by passphrase
Args:
passphrase: User's passphrase
Returns:
User ID if found, None otherwise
"""
passphrase = passphrase.strip().lower()
for user_id, user_data in self.users.items():
if user_data.get("passphrase", "").lower() == passphrase:
# Update last seen
self.users[user_id]["last_seen"] = datetime.now().isoformat()
self._save_data()
print(f"✅ User authenticated via passphrase: {user_id}")
return user_id
return None
def get_user_passphrase(self, user_id: str) -> Optional[str]:
"""Get passphrase for a user
Args:
user_id: User ID
Returns:
Passphrase if user exists, None otherwise
"""
if user_id in self.users:
return self.users[user_id].get("passphrase")
return None
def get_or_create_user(self, user_id: str = None) -> tuple:
"""Get existing user or create new one
Args:
user_id: Optional existing user ID
Returns:
Tuple of (user_id, passphrase, is_new_user)
"""
if user_id and user_id in self.users:
# Update last seen
self.users[user_id]["last_seen"] = datetime.now().isoformat()
self._save_data()
passphrase = self.users[user_id].get("passphrase", "")
return user_id, passphrase, False
# Create new user with passphrase
new_user_id = str(uuid.uuid4())[:8] # Short ID for convenience
passphrase = self._generate_passphrase(num_words=3)
self.users[new_user_id] = {
"created_at": datetime.now().isoformat(),
"last_seen": datetime.now().isoformat(),
"passphrase": passphrase,
"preferences": {},
"liked_tracks": [], # List of track dicts
"disliked_tracks": [], # List of track identifiers (title+artist)
"play_history": [], # Recent play history
"stats": {
"total_plays": 0,
"total_likes": 0,
"total_dislikes": 0
}
}
self._save_data()
print(f"✅ Created new user: {new_user_id} with passphrase: {passphrase}")
return new_user_id, passphrase, True
def like_track(self, user_id: str, track: Dict[str, Any]) -> bool:
"""Like a track
Args:
user_id: User ID
track: Track data (title, artist, url, etc.)
Returns:
Success status
"""
if user_id not in self.users:
return False
user = self.users[user_id]
track_id = f"{track.get('title', '')}|{track.get('artist', '')}"
# Check if already liked
existing_ids = [f"{t.get('title', '')}|{t.get('artist', '')}" for t in user["liked_tracks"]]
if track_id in existing_ids:
return True # Already liked
# Remove from dislikes if present
user["disliked_tracks"] = [t for t in user["disliked_tracks"] if t != track_id]
# Add to likes with timestamp
liked_track = {
"title": track.get("title", "Unknown"),
"artist": track.get("artist", "Unknown"),
"url": track.get("url", ""),
"youtube_id": track.get("youtube_id", ""),
"source": track.get("source", "youtube"),
"genre": track.get("genre", ""),
"liked_at": datetime.now().isoformat()
}
user["liked_tracks"].append(liked_track)
user["stats"]["total_likes"] += 1
self._save_data()
print(f"👍 User {user_id} liked: {track.get('title', 'Unknown')}")
return True
def dislike_track(self, user_id: str, track: Dict[str, Any]) -> bool:
"""Dislike a track
Args:
user_id: User ID
track: Track data
Returns:
Success status
"""
if user_id not in self.users:
return False
user = self.users[user_id]
track_id = f"{track.get('title', '')}|{track.get('artist', '')}"
# Remove from likes if present
user["liked_tracks"] = [t for t in user["liked_tracks"]
if f"{t.get('title', '')}|{t.get('artist', '')}" != track_id]
# Add to dislikes if not already there
if track_id not in user["disliked_tracks"]:
user["disliked_tracks"].append(track_id)
user["stats"]["total_dislikes"] += 1
self._save_data()
print(f"👎 User {user_id} disliked: {track.get('title', 'Unknown')}")
return True
def add_to_history(self, user_id: str, track: Dict[str, Any]):
"""Add track to play history
Args:
user_id: User ID
track: Track data
"""
if user_id not in self.users:
return
user = self.users[user_id]
history_entry = {
"title": track.get("title", "Unknown"),
"artist": track.get("artist", "Unknown"),
"played_at": datetime.now().isoformat()
}
# Keep last 100 plays
user["play_history"].append(history_entry)
if len(user["play_history"]) > 100:
user["play_history"] = user["play_history"][-100:]
user["stats"]["total_plays"] += 1
self._save_data()
def get_liked_tracks(self, user_id: str) -> List[Dict[str, Any]]:
"""Get user's liked tracks
Args:
user_id: User ID
Returns:
List of liked track dicts
"""
if user_id not in self.users:
return []
return self.users[user_id].get("liked_tracks", [])
def get_disliked_tracks(self, user_id: str) -> List[str]:
"""Get user's disliked track identifiers
Args:
user_id: User ID
Returns:
List of disliked track identifiers (title|artist)
"""
if user_id not in self.users:
return []
return self.users[user_id].get("disliked_tracks", [])
def get_play_history(self, user_id: str) -> List[Dict[str, Any]]:
"""Get user's play history
Args:
user_id: User ID
Returns:
List of played track dicts
"""
if user_id not in self.users:
return []
return self.users[user_id].get("play_history", [])
def get_random_liked_track(self, user_id: str, genre: str = None) -> Optional[Dict[str, Any]]:
"""Get a random liked track, optionally filtered by genre
Args:
user_id: User ID
genre: Optional genre filter
Returns:
Random liked track or None
"""
liked = self.get_liked_tracks(user_id)
if not liked:
return None
# Filter by genre if specified
if genre:
genre_lower = genre.lower()
filtered = [t for t in liked if genre_lower in t.get("genre", "").lower()]
if filtered:
liked = filtered
return random.choice(liked)
def is_track_disliked(self, user_id: str, track: Dict[str, Any]) -> bool:
"""Check if track is disliked by user
Args:
user_id: User ID
track: Track data
Returns:
True if disliked
"""
if user_id not in self.users:
return False
track_id = f"{track.get('title', '')}|{track.get('artist', '')}"
return track_id in self.users[user_id].get("disliked_tracks", [])
def get_user_stats(self, user_id: str) -> Dict[str, Any]:
"""Get user statistics
Args:
user_id: User ID
Returns:
User stats dict
"""
if user_id not in self.users:
return {"total_plays": 0, "total_likes": 0, "total_dislikes": 0}
return self.users[user_id].get("stats", {})
def save_user_preferences(self, user_id: str, preferences: Dict[str, Any]):
"""Save user preferences
Args:
user_id: User ID
preferences: Preferences dict
"""
if user_id not in self.users:
return
self.users[user_id]["preferences"] = preferences
self._save_data()
def get_user_preferences(self, user_id: str) -> Dict[str, Any]:
"""Get user preferences
Args:
user_id: User ID
Returns:
Preferences dict
"""
if user_id not in self.users:
return {}
return self.users[user_id].get("preferences", {})
def should_play_liked_track(self, user_id: str, probability: float = 0.3) -> bool:
"""Determine if we should play a liked track based on probability
Args:
user_id: User ID
probability: Probability of playing liked track (0.0 - 1.0)
Returns:
True if should play liked track
"""
liked = self.get_liked_tracks(user_id)
if not liked:
return False
return random.random() < probability