"""RAG System for User Preferences and History using LlamaIndex""" import json import os import logging from typing import Dict, Any, List from datetime import datetime from llama_index.core import VectorStoreIndex, Document, Settings from llama_index.core.storage.storage_context import StorageContext from llama_index.core.vector_stores import SimpleVectorStore from llama_index.core.node_parser import SimpleNodeParser from llama_index.llms.openai import OpenAI as LlamaOpenAI from llama_index.embeddings.openai import OpenAIEmbedding # Get project root directory (parent of src/) PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) LOG_DIR = os.path.join(PROJECT_ROOT, "logs") os.makedirs(LOG_DIR, exist_ok=True) # Setup RAG logger rag_logger = logging.getLogger("ai_radio.rag") if not rag_logger.handlers: rag_logger.setLevel(logging.INFO) _fh = logging.FileHandler(os.path.join(LOG_DIR, "rag_system.log"), encoding="utf-8") _fmt = logging.Formatter("%(asctime)s [RAG] %(levelname)s: %(message)s") _fh.setFormatter(_fmt) rag_logger.addHandler(_fh) class RadioRAGSystem: """RAG system for storing and retrieving user preferences and listening history""" def __init__(self, nebius_api_key: str, nebius_api_base: str, nebius_model: str): """Initialize RAG system with LlamaIndex using Nebius/OpenAI""" self.nebius_api_key = nebius_api_key self.nebius_api_base = nebius_api_base self.nebius_model = nebius_model self.llm_available = False self.embedding_available = False # Configure LlamaIndex settings with Nebius/OpenAI if nebius_api_key: try: Settings.llm = LlamaOpenAI( api_key=nebius_api_key, api_base=nebius_api_base, model=nebius_model, temperature=0.7 ) self.llm_available = True print("✅ RAG LLM initialized (Nebius GPT-OSS-120B)") except Exception as e: print(f"Warning: Could not initialize Nebius/OpenAI LLM: {e}") print("RAG system will work in fallback mode without LLM features") self.llm_available = False # Enable embeddings - try local model first, then OpenAI # Note: Nebius doesn't support embeddings, so we use local or OpenAI self.embedding_available = False try: # First, try local sentence-transformers (no API key needed) try: from llama_index.embeddings.huggingface import HuggingFaceEmbedding Settings.embed_model = HuggingFaceEmbedding( model_name="sentence-transformers/all-MiniLM-L6-v2" ) self.embedding_available = True print("✅ RAG Embeddings enabled (local sentence-transformers/all-MiniLM-L6-v2)") rag_logger.info("✅ Using local HuggingFace embeddings: sentence-transformers/all-MiniLM-L6-v2") except ImportError: # Fallback: Try OpenAI embeddings endpoint (requires OpenAI API key) openai_key = os.environ.get("OPENAI_API_KEY") if openai_key: try: Settings.embed_model = OpenAIEmbedding( api_key=openai_key, api_base="https://api.openai.com/v1", model="text-embedding-3-small" ) self.embedding_available = True print("✅ RAG Embeddings enabled (OpenAI text-embedding-3-small via OPENAI_API_KEY)") rag_logger.info("✅ Using OpenAI embeddings: text-embedding-3-small") except Exception as e: print(f"⚠️ OpenAI embeddings failed: {e}") print("ℹ️ Embeddings disabled. RAG will use fallback mode.") rag_logger.warning(f"⚠️ OpenAI embeddings failed: {e}") self.embedding_available = False else: print("ℹ️ Embeddings disabled: No local model or OPENAI_API_KEY found.") print(" Install: pip install sentence-transformers") print(" Or set OPENAI_API_KEY environment variable.") rag_logger.warning("⚠️ No embeddings available - install sentence-transformers or set OPENAI_API_KEY") self.embedding_available = False except Exception as e: print(f"⚠️ Local embeddings failed: {e}") # Try OpenAI as last resort openai_key = os.environ.get("OPENAI_API_KEY") if openai_key: try: Settings.embed_model = OpenAIEmbedding( api_key=openai_key, api_base="https://api.openai.com/v1", model="text-embedding-ada-002" # Try older model ) self.embedding_available = True print("✅ RAG Embeddings enabled (OpenAI text-embedding-ada-002)") rag_logger.info("✅ Using OpenAI embeddings: text-embedding-ada-002") except: print("ℹ️ All embedding options failed. RAG will use fallback mode.") rag_logger.warning("⚠️ All embedding options failed") self.embedding_available = False else: print("ℹ️ Embeddings disabled. RAG will use fallback mode.") rag_logger.warning("⚠️ Embeddings disabled - no fallback available") self.embedding_available = False except Exception as e: print(f"Warning: Could not initialize embeddings: {e}") print("RAG will work without vector search (fallback mode)") rag_logger.error(f"❌ Embedding initialization error: {e}") self.embedding_available = False # Initialize vector store self.vector_store = SimpleVectorStore() self.storage_context = StorageContext.from_defaults(vector_store=self.vector_store) # Configure chunk size to handle larger metadata # Increase chunk size to 4096 to accommodate metadata (metadata can be up to 1438 chars) Settings.chunk_size = 4096 Settings.chunk_overlap = 400 rag_logger.info(f"📏 RAG chunk size set to {Settings.chunk_size} (overlap: {Settings.chunk_overlap})") print(f"📏 [RAG] Chunk size: {Settings.chunk_size}, Overlap: {Settings.chunk_overlap}") # Load existing index or create new one self.index = None self.documents = [] self.user_data_file = os.path.join(PROJECT_ROOT, "user_data.json") self._load_user_data() def _load_user_data(self): """Load user data from file and build vector index""" if os.path.exists(self.user_data_file): try: with open(self.user_data_file, 'r') as f: data = json.load(f) # Convert old format (JSON in text) to new format (descriptive text + raw_data in metadata) self.documents = [] for d in data: try: # Create descriptive text for RAG retrieval if d.get("type") == "preferences": prefs = d.get("data", {}) pref_text = f"""User Preferences: Name: {prefs.get('name', 'Unknown')} Favorite Genres: {', '.join(prefs.get('favorite_genres', []))} Interests: {', '.join(prefs.get('interests', []))} Podcast Interests: {', '.join(prefs.get('podcast_interests', []))} Mood: {prefs.get('mood', 'neutral')} Content Filter: Music={prefs.get('content_filter', {}).get('music', True)}, News={prefs.get('content_filter', {}).get('news', True)}, Podcasts={prefs.get('content_filter', {}).get('podcasts', True)}, Stories={prefs.get('content_filter', {}).get('stories', True)} """ doc = Document( text=pref_text, metadata={ "type": "preferences", "user_id": d.get("user_id"), "timestamp": d.get("timestamp", datetime.now().isoformat()), # Removed raw_data to reduce metadata size - essential fields stored separately } ) elif d.get("type") == "history": item_type = d.get("item_type", "") item_data = d.get("data", {}) user_feedback = d.get("feedback") if item_type == "music": track = item_data.get("track", {}) history_text = f"""Music Listening History: Title: {track.get('title', 'Unknown')} Artist: {track.get('artist', 'Unknown')} Genre: {track.get('genre', 'Unknown')} Source: {track.get('source', 'Unknown')} Feedback: {user_feedback or 'No feedback'} """ elif item_type == "news": history_text = f"""News Listening History: Items: {len(item_data.get('news_items', []))} news items Topics: {', '.join([item.get('category', '') for item in item_data.get('news_items', [])[:3]])} Feedback: {user_feedback or 'No feedback'} """ elif item_type == "podcast": podcast = item_data.get("podcast", {}) history_text = f"""Podcast Listening History: Title: {podcast.get('title', 'Unknown')} Host: {podcast.get('host', 'Unknown')} Category: {podcast.get('category', 'Unknown')} Feedback: {user_feedback or 'No feedback'} """ else: history_text = f"""Story Listening History: Type: {item_type} Feedback: {user_feedback or 'No feedback'} """ # Store minimal metadata to avoid chunk size issues doc = Document( text=history_text, metadata={ "type": "history", "user_id": d.get("user_id"), "item_type": item_type, "timestamp": d.get("timestamp", datetime.now().isoformat()), # Store only essential fields, not full raw_data "feedback": user_feedback or "" } ) else: # Unknown type, create basic document doc = Document( text=json.dumps(d), metadata={ "type": d.get("type", "unknown"), "user_id": d.get("user_id"), "timestamp": d.get("timestamp", datetime.now().isoformat()), # Removed raw_data to reduce metadata size - essential fields stored separately } ) self.documents.append(doc) except Exception as e: rag_logger.warning(f"⚠️ Skipping invalid document during load: {e}") continue rag_logger.info(f"📂 Loaded {len(self.documents)} documents from RAG storage") print(f"📂 [RAG] Loaded {len(self.documents)} documents from storage") # Build vector index if embeddings are available if self.documents and self.embedding_available: try: rag_logger.info(f"🔨 Building vector index from {len(self.documents)} documents...") print(f"🔨 [RAG] Building vector index from {len(self.documents)} documents...") self.index = VectorStoreIndex.from_documents( self.documents, storage_context=self.storage_context ) rag_logger.info(f"✅ Vector index built successfully with {len(self.documents)} documents") print(f"✅ [RAG] Vector index built with {len(self.documents)} documents") except Exception as e: rag_logger.error(f"❌ Failed to build vector index: {e}") print(f"Warning: Could not build vector index: {e}") self.index = None elif self.documents: rag_logger.info(f"ℹ️ {len(self.documents)} documents loaded but embeddings disabled - using fallback mode") print(f"ℹ️ [RAG] {len(self.documents)} documents loaded but embeddings disabled - using fallback mode") except Exception as e: rag_logger.error(f"❌ Error loading user data: {e}") print(f"Error loading user data: {e}") def _save_user_data(self): """Save user data to file""" try: data = [] for doc in self.documents: try: # Try to get raw_data from metadata first (new format) raw_data = doc.metadata.get("raw_data") if raw_data: data.append(json.loads(raw_data)) else: # Fallback: try to parse doc.text as JSON (old format) data.append(json.loads(doc.text)) except (json.JSONDecodeError, KeyError) as e: # Skip documents that can't be parsed rag_logger.warning(f"⚠️ Skipping document that couldn't be parsed: {e}") continue with open(self.user_data_file, 'w') as f: json.dump(data, f, indent=2) rag_logger.info(f"💾 Saved {len(data)} documents to {self.user_data_file}") except Exception as e: rag_logger.error(f"❌ Error saving user data: {e}") import traceback rag_logger.error(traceback.format_exc()) print(f"Error saving user data: {e}") def store_user_preferences(self, preferences: Dict[str, Any], user_id: str = None): """Store user preferences in RAG system with user ID""" if not user_id: rag_logger.warning("⚠️ Storing preferences without user_id - data will not be user-specific") print("⚠️ [RAG] Warning: Storing preferences without user_id") pref_doc = { "type": "preferences", "user_id": user_id, "timestamp": datetime.now().isoformat(), "data": preferences } # Create a more descriptive document for better RAG retrieval pref_text = f"""User Preferences: Name: {preferences.get('name', 'Unknown')} Favorite Genres: {', '.join(preferences.get('favorite_genres', []))} Interests: {', '.join(preferences.get('interests', []))} Podcast Interests: {', '.join(preferences.get('podcast_interests', []))} Mood: {preferences.get('mood', 'neutral')} Content Filter: Music={preferences.get('content_filter', {}).get('music', True)}, News={preferences.get('content_filter', {}).get('news', True)}, Podcasts={preferences.get('content_filter', {}).get('podcasts', True)}, Stories={preferences.get('content_filter', {}).get('stories', True)} """ doc = Document( text=pref_text, metadata={ "type": "preferences", "user_id": user_id, "timestamp": datetime.now().isoformat(), "raw_data": json.dumps(pref_doc) } ) self.documents.append(doc) rag_logger.info(f"📝 STORING PREFERENCES: user_id={user_id}, Name={preferences.get('name')}, Genres={preferences.get('favorite_genres')}, Mood={preferences.get('mood')}") print(f"📝 [RAG] Storing preferences for user {user_id} ({preferences.get('name', 'user')})") # Rebuild index if embeddings are available if self.embedding_available: try: self.index = VectorStoreIndex.from_documents( self.documents, storage_context=self.storage_context ) rag_logger.info(f"✅ Vector index rebuilt with {len(self.documents)} documents (embeddings enabled)") print(f"✅ [RAG] Index updated with {len(self.documents)} documents") except Exception as e: rag_logger.error(f"❌ Failed to rebuild index: {e}") print(f"Warning: Could not rebuild index: {e}") self.index = None else: rag_logger.info(f"ℹ️ Preferences stored (embeddings disabled, {len(self.documents)} total documents)") print(f"ℹ️ [RAG] Preferences stored (embeddings disabled)") self._save_user_data() def store_listening_history(self, item_type: str, item_data: Dict[str, Any], user_id: str = None, user_feedback: str = None): """Store listening history with optional feedback and user ID""" if not user_id: rag_logger.warning(f"⚠️ Storing {item_type} history without user_id - data will not be user-specific") print(f"⚠️ [RAG] Warning: Storing {item_type} history without user_id") history_doc = { "type": "history", "user_id": user_id, "item_type": item_type, # music, news, podcast, story "timestamp": datetime.now().isoformat(), "data": item_data, "feedback": user_feedback } # Create descriptive text for better RAG retrieval if item_type == "music": track = item_data.get("track", {}) history_text = f"""Music Listening History: Title: {track.get('title', 'Unknown')} Artist: {track.get('artist', 'Unknown')} Genre: {track.get('genre', 'Unknown')} Source: {track.get('source', 'Unknown')} Feedback: {user_feedback or 'No feedback'} """ rag_logger.info(f"🎵 STORING MUSIC HISTORY: user_id={user_id}, {track.get('title', 'Unknown')} by {track.get('artist', 'Unknown')} ({track.get('genre', 'Unknown')}) - Feedback: {user_feedback or 'None'}") print(f"🎵 [RAG] Storing music for user {user_id}: {track.get('title', 'Unknown')} by {track.get('artist', 'Unknown')}") elif item_type == "news": history_text = f"""News Listening History: Items: {len(item_data.get('news_items', []))} news items Topics: {', '.join([item.get('category', '') for item in item_data.get('news_items', [])[:3]])} Feedback: {user_feedback or 'No feedback'} """ rag_logger.info(f"📰 STORING NEWS HISTORY: user_id={user_id}, {len(item_data.get('news_items', []))} items - Feedback: {user_feedback or 'None'}") print(f"📰 [RAG] Storing news history for user {user_id}: {len(item_data.get('news_items', []))} items") elif item_type == "podcast": podcast = item_data.get("podcast", {}) history_text = f"""Podcast Listening History: Title: {podcast.get('title', 'Unknown')} Host: {podcast.get('host', 'Unknown')} Category: {podcast.get('category', 'Unknown')} Feedback: {user_feedback or 'No feedback'} """ rag_logger.info(f"🎙️ STORING PODCAST HISTORY: user_id={user_id}, {podcast.get('title', 'Unknown')} - Feedback: {user_feedback or 'None'}") print(f"🎙️ [RAG] Storing podcast for user {user_id}: {podcast.get('title', 'Unknown')}") else: history_text = f"""Story Listening History: Type: {item_type} Feedback: {user_feedback or 'No feedback'} """ rag_logger.info(f"📖 STORING STORY HISTORY: user_id={user_id}, {item_type} - Feedback: {user_feedback or 'None'}") print(f"📖 [RAG] Storing story history for user {user_id}: {item_type}") doc = Document( text=history_text, metadata={ "type": "history", "user_id": user_id, "item_type": item_type, "timestamp": datetime.now().isoformat(), "raw_data": json.dumps(history_doc) } ) self.documents.append(doc) # Rebuild index if embeddings are available (but only periodically to avoid too many rebuilds) # Rebuild every 5 documents or if index doesn't exist if self.embedding_available and (not self.index or len(self.documents) % 5 == 0): try: self.index = VectorStoreIndex.from_documents( self.documents, storage_context=self.storage_context ) if len(self.documents) % 5 == 0: rag_logger.info(f"✅ Vector index rebuilt (total documents: {len(self.documents)})") print(f"✅ [RAG] Index updated (total documents: {len(self.documents)})") except Exception as e: rag_logger.error(f"❌ Failed to rebuild index: {e}") print(f"Warning: Could not rebuild index: {e}") self.index = None self._save_user_data() def get_user_preferences(self, user_id: str = None) -> Dict[str, Any]: """Retrieve latest user preferences for a specific user""" preferences = {} for doc in reversed(self.documents): try: # Check metadata first (new format with user_id) doc_user_id = doc.metadata.get("user_id") if user_id and doc_user_id != user_id: continue # Skip documents from other users # Try to parse from metadata raw_data (new format) raw_data = doc.metadata.get("raw_data") if raw_data: data = json.loads(raw_data) if data.get("type") == "preferences": if not user_id or data.get("user_id") == user_id: preferences = data.get("data", {}) break else: # Fallback: try to parse doc.text as JSON (old format) data = json.loads(doc.text) if data.get("type") == "preferences": if not user_id or data.get("user_id") == user_id: preferences = data.get("data", {}) break except (json.JSONDecodeError, KeyError, AttributeError) as e: rag_logger.debug(f"Skipping document in get_user_preferences: {e}") continue if user_id and not preferences: rag_logger.warning(f"⚠️ No preferences found for user_id={user_id}") return preferences def get_recommendations(self, query: str, user_id: str = None) -> Dict[str, Any]: """Get personalized recommendations based on user history and preferences using RAG""" if not self.index or not self.llm_available: rag_logger.warning(f"⚠️ RAG RECOMMENDATIONS UNAVAILABLE: user_id={user_id}, query='{query}' (no index or LLM)") print("ℹ️ RAG recommendations unavailable (no index or LLM) - using defaults") return self._get_default_recommendations(user_id=user_id) try: rag_logger.info(f"🔍 RAG RECOMMENDATIONS QUERY: '{query}'") print(f"🔍 [RAG] Getting recommendations for: '{query[:60]}...'") # Use RAG to query user history and preferences query_engine = self.index.as_query_engine( similarity_top_k=5, # Get top 5 relevant documents response_mode="compact" # Compact response ) response = query_engine.query(query) response_text = str(response) rag_logger.info(f"✅ RAG RECOMMENDATIONS RESPONSE: {response_text[:200]}...") print(f"✅ [RAG] LLM generated recommendations: {response_text[:150]}...") # Extract recommendations from RAG response recommendations = { "recommendations": response_text, "source": "RAG", "query": query } # Also try to extract structured data from response response_lower = response_text.lower() if "genre" in response_lower or "music" in response_lower: # Try to extract genre preferences for genre in ["pop", "rock", "jazz", "classical", "electronic", "hip-hop", "country", "indie", "rap", "blues", "folk"]: if genre in response_lower: recommendations.setdefault("suggested_genres", []).append(genre) rag_logger.info(f" 🎵 Extracted genre from RAG: {genre}") return recommendations except Exception as e: rag_logger.error(f"❌ RAG RECOMMENDATIONS ERROR: {e}") import traceback rag_logger.error(traceback.format_exc()) print(f"Error getting RAG recommendations: {e}") traceback.print_exc() return self._get_default_recommendations(user_id=user_id) def query_user_context(self, query: str, user_id: str = None, top_k: int = 3) -> List[Dict[str, Any]]: """Query user context using vector search - returns relevant documents filtered by user_id""" if not self.index or not self.embedding_available: rag_logger.warning(f"⚠️ RAG QUERY SKIPPED (no index/embeddings): '{query}'") print(f"⚠️ [RAG] Query skipped - embeddings not available") return [] try: rag_logger.info(f"🔍 RAG QUERY: user_id={user_id}, query='{query}' (top_k={top_k})") print(f"🔍 [RAG] Querying for user {user_id}: '{query[:60]}...'") # Retrieve more documents than needed, then filter by user_id # This ensures we get top_k results for the specific user retrieve_count = top_k * 3 if user_id else top_k # Get more if filtering retriever = self.index.as_retriever(similarity_top_k=retrieve_count) nodes = retriever.retrieve(query) results = [] for i, node in enumerate(nodes): try: # Filter by user_id if provided node_user_id = node.metadata.get("user_id") if user_id and node_user_id != user_id: continue # Skip documents from other users score = node.score if hasattr(node, 'score') else None node_type = node.metadata.get("type", "unknown") item_type = node.metadata.get("item_type", "") data = json.loads(node.metadata.get("raw_data", "{}")) result = { "text": node.text, "score": score, "metadata": node.metadata, "data": data } results.append(result) # Log each retrieved document preview = node.text[:100].replace('\n', ' ') try: score_str = f"{float(score):.4f}" if score is not None else "N/A" except (TypeError, ValueError): score_str = str(score) if score is not None else "N/A" rag_logger.info(f" 📄 Retrieved #{len(results)}: user_id={node_user_id}, type={node_type}, item_type={item_type}, score={score_str}, preview='{preview}...'") print(f" 📄 [RAG] Retrieved #{len(results)}: {node_type} (user: {node_user_id}, score: {score_str}) - {preview}...") # Stop if we have enough results for this user if len(results) >= top_k: break except Exception as parse_error: # Still check user_id even if parsing fails node_user_id = node.metadata.get("user_id") if user_id and node_user_id != user_id: continue score = node.score if hasattr(node, 'score') else None result = { "text": node.text, "score": score, "metadata": node.metadata } results.append(result) try: score_str = f"{float(score):.4f}" if score is not None else "N/A" except (TypeError, ValueError): score_str = str(score) if score is not None else "N/A" rag_logger.warning(f" ⚠️ Retrieved #{len(results)} (parse error): user_id={node_user_id}, score={score_str}, text_preview='{node.text[:50]}...'") print(f" ⚠️ [RAG] Retrieved #{len(results)} (parse error, user: {node_user_id}, score: {score_str})") if len(results) >= top_k: break rag_logger.info(f"✅ RAG QUERY COMPLETE: Retrieved {len(results)} documents for user_id={user_id}") print(f"✅ [RAG] Query complete: {len(results)} documents retrieved for user {user_id}") return results except Exception as e: rag_logger.error(f"❌ RAG QUERY ERROR: {e}") import traceback rag_logger.error(traceback.format_exc()) print(f"❌ [RAG] Query error: {e}") return [] def _get_default_recommendations(self) -> Dict[str, Any]: """Return default recommendations when RAG is not available""" preferences = self.get_user_preferences() return { "favorite_genres": preferences.get("favorite_genres", ["pop", "rock"]), "interests": preferences.get("interests", ["technology", "world"]), "podcast_interests": preferences.get("podcast_interests", ["technology"]), "mood": preferences.get("mood", "happy"), "source": "preferences" } def get_listening_stats(self) -> Dict[str, Any]: """Get statistics about user's listening history""" stats = { "total_sessions": 0, "music_played": 0, "news_heard": 0, "podcasts_listened": 0, "stories_enjoyed": 0 } for doc in self.documents: try: data = json.loads(doc.text) if data.get("type") == "history": stats["total_sessions"] += 1 item_type = data.get("item_type", "") if item_type == "music": stats["music_played"] += 1 elif item_type == "news": stats["news_heard"] += 1 elif item_type == "podcast": stats["podcasts_listened"] += 1 elif item_type == "story": stats["stories_enjoyed"] += 1 except: continue return stats