Spaces:
Sleeping
Sleeping
| """RAG System for User Preferences and History using LlamaIndex""" | |
| import json | |
| import os | |
| import logging | |
| from typing import Dict, Any, List | |
| from datetime import datetime | |
| from llama_index.core import VectorStoreIndex, Document, Settings | |
| from llama_index.core.storage.storage_context import StorageContext | |
| from llama_index.core.vector_stores import SimpleVectorStore | |
| from llama_index.core.node_parser import SimpleNodeParser | |
| from llama_index.llms.openai import OpenAI as LlamaOpenAI | |
| from llama_index.embeddings.openai import OpenAIEmbedding | |
| # Get project root directory (parent of src/) | |
| PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| LOG_DIR = os.path.join(PROJECT_ROOT, "logs") | |
| os.makedirs(LOG_DIR, exist_ok=True) | |
| # Setup RAG logger | |
| rag_logger = logging.getLogger("ai_radio.rag") | |
| if not rag_logger.handlers: | |
| rag_logger.setLevel(logging.INFO) | |
| _fh = logging.FileHandler(os.path.join(LOG_DIR, "rag_system.log"), encoding="utf-8") | |
| _fmt = logging.Formatter("%(asctime)s [RAG] %(levelname)s: %(message)s") | |
| _fh.setFormatter(_fmt) | |
| rag_logger.addHandler(_fh) | |
| class RadioRAGSystem: | |
| """RAG system for storing and retrieving user preferences and listening history""" | |
| def __init__(self, nebius_api_key: str, nebius_api_base: str, nebius_model: str): | |
| """Initialize RAG system with LlamaIndex using Nebius/OpenAI""" | |
| self.nebius_api_key = nebius_api_key | |
| self.nebius_api_base = nebius_api_base | |
| self.nebius_model = nebius_model | |
| self.llm_available = False | |
| self.embedding_available = False | |
| # Configure LlamaIndex settings with Nebius/OpenAI | |
| if nebius_api_key: | |
| try: | |
| Settings.llm = LlamaOpenAI( | |
| api_key=nebius_api_key, | |
| api_base=nebius_api_base, | |
| model=nebius_model, | |
| temperature=0.7 | |
| ) | |
| self.llm_available = True | |
| print("✅ RAG LLM initialized (Nebius GPT-OSS-120B)") | |
| except Exception as e: | |
| print(f"Warning: Could not initialize Nebius/OpenAI LLM: {e}") | |
| print("RAG system will work in fallback mode without LLM features") | |
| self.llm_available = False | |
| # Enable embeddings - try local model first, then OpenAI | |
| # Note: Nebius doesn't support embeddings, so we use local or OpenAI | |
| self.embedding_available = False | |
| try: | |
| # First, try local sentence-transformers (no API key needed) | |
| try: | |
| from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
| Settings.embed_model = HuggingFaceEmbedding( | |
| model_name="sentence-transformers/all-MiniLM-L6-v2" | |
| ) | |
| self.embedding_available = True | |
| print("✅ RAG Embeddings enabled (local sentence-transformers/all-MiniLM-L6-v2)") | |
| rag_logger.info("✅ Using local HuggingFace embeddings: sentence-transformers/all-MiniLM-L6-v2") | |
| except ImportError: | |
| # Fallback: Try OpenAI embeddings endpoint (requires OpenAI API key) | |
| openai_key = os.environ.get("OPENAI_API_KEY") | |
| if openai_key: | |
| try: | |
| Settings.embed_model = OpenAIEmbedding( | |
| api_key=openai_key, | |
| api_base="https://api.openai.com/v1", | |
| model="text-embedding-3-small" | |
| ) | |
| self.embedding_available = True | |
| print("✅ RAG Embeddings enabled (OpenAI text-embedding-3-small via OPENAI_API_KEY)") | |
| rag_logger.info("✅ Using OpenAI embeddings: text-embedding-3-small") | |
| except Exception as e: | |
| print(f"⚠️ OpenAI embeddings failed: {e}") | |
| print("ℹ️ Embeddings disabled. RAG will use fallback mode.") | |
| rag_logger.warning(f"⚠️ OpenAI embeddings failed: {e}") | |
| self.embedding_available = False | |
| else: | |
| print("ℹ️ Embeddings disabled: No local model or OPENAI_API_KEY found.") | |
| print(" Install: pip install sentence-transformers") | |
| print(" Or set OPENAI_API_KEY environment variable.") | |
| rag_logger.warning("⚠️ No embeddings available - install sentence-transformers or set OPENAI_API_KEY") | |
| self.embedding_available = False | |
| except Exception as e: | |
| print(f"⚠️ Local embeddings failed: {e}") | |
| # Try OpenAI as last resort | |
| openai_key = os.environ.get("OPENAI_API_KEY") | |
| if openai_key: | |
| try: | |
| Settings.embed_model = OpenAIEmbedding( | |
| api_key=openai_key, | |
| api_base="https://api.openai.com/v1", | |
| model="text-embedding-ada-002" # Try older model | |
| ) | |
| self.embedding_available = True | |
| print("✅ RAG Embeddings enabled (OpenAI text-embedding-ada-002)") | |
| rag_logger.info("✅ Using OpenAI embeddings: text-embedding-ada-002") | |
| except: | |
| print("ℹ️ All embedding options failed. RAG will use fallback mode.") | |
| rag_logger.warning("⚠️ All embedding options failed") | |
| self.embedding_available = False | |
| else: | |
| print("ℹ️ Embeddings disabled. RAG will use fallback mode.") | |
| rag_logger.warning("⚠️ Embeddings disabled - no fallback available") | |
| self.embedding_available = False | |
| except Exception as e: | |
| print(f"Warning: Could not initialize embeddings: {e}") | |
| print("RAG will work without vector search (fallback mode)") | |
| rag_logger.error(f"❌ Embedding initialization error: {e}") | |
| self.embedding_available = False | |
| # Initialize vector store | |
| self.vector_store = SimpleVectorStore() | |
| self.storage_context = StorageContext.from_defaults(vector_store=self.vector_store) | |
| # Configure chunk size to handle larger metadata | |
| # Increase chunk size to 4096 to accommodate metadata (metadata can be up to 1438 chars) | |
| Settings.chunk_size = 4096 | |
| Settings.chunk_overlap = 400 | |
| rag_logger.info(f"📏 RAG chunk size set to {Settings.chunk_size} (overlap: {Settings.chunk_overlap})") | |
| print(f"📏 [RAG] Chunk size: {Settings.chunk_size}, Overlap: {Settings.chunk_overlap}") | |
| # Load existing index or create new one | |
| self.index = None | |
| self.documents = [] | |
| self.user_data_file = os.path.join(PROJECT_ROOT, "user_data.json") | |
| self._load_user_data() | |
| def _load_user_data(self): | |
| """Load user data from file and build vector index""" | |
| if os.path.exists(self.user_data_file): | |
| try: | |
| with open(self.user_data_file, 'r') as f: | |
| data = json.load(f) | |
| # Convert old format (JSON in text) to new format (descriptive text + raw_data in metadata) | |
| self.documents = [] | |
| for d in data: | |
| try: | |
| # Create descriptive text for RAG retrieval | |
| if d.get("type") == "preferences": | |
| prefs = d.get("data", {}) | |
| pref_text = f"""User Preferences: | |
| Name: {prefs.get('name', 'Unknown')} | |
| Favorite Genres: {', '.join(prefs.get('favorite_genres', []))} | |
| Interests: {', '.join(prefs.get('interests', []))} | |
| Podcast Interests: {', '.join(prefs.get('podcast_interests', []))} | |
| Mood: {prefs.get('mood', 'neutral')} | |
| Content Filter: Music={prefs.get('content_filter', {}).get('music', True)}, News={prefs.get('content_filter', {}).get('news', True)}, Podcasts={prefs.get('content_filter', {}).get('podcasts', True)}, Stories={prefs.get('content_filter', {}).get('stories', True)} | |
| """ | |
| doc = Document( | |
| text=pref_text, | |
| metadata={ | |
| "type": "preferences", | |
| "user_id": d.get("user_id"), | |
| "timestamp": d.get("timestamp", datetime.now().isoformat()), | |
| # Removed raw_data to reduce metadata size - essential fields stored separately | |
| } | |
| ) | |
| elif d.get("type") == "history": | |
| item_type = d.get("item_type", "") | |
| item_data = d.get("data", {}) | |
| user_feedback = d.get("feedback") | |
| if item_type == "music": | |
| track = item_data.get("track", {}) | |
| history_text = f"""Music Listening History: | |
| Title: {track.get('title', 'Unknown')} | |
| Artist: {track.get('artist', 'Unknown')} | |
| Genre: {track.get('genre', 'Unknown')} | |
| Source: {track.get('source', 'Unknown')} | |
| Feedback: {user_feedback or 'No feedback'} | |
| """ | |
| elif item_type == "news": | |
| history_text = f"""News Listening History: | |
| Items: {len(item_data.get('news_items', []))} news items | |
| Topics: {', '.join([item.get('category', '') for item in item_data.get('news_items', [])[:3]])} | |
| Feedback: {user_feedback or 'No feedback'} | |
| """ | |
| elif item_type == "podcast": | |
| podcast = item_data.get("podcast", {}) | |
| history_text = f"""Podcast Listening History: | |
| Title: {podcast.get('title', 'Unknown')} | |
| Host: {podcast.get('host', 'Unknown')} | |
| Category: {podcast.get('category', 'Unknown')} | |
| Feedback: {user_feedback or 'No feedback'} | |
| """ | |
| else: | |
| history_text = f"""Story Listening History: | |
| Type: {item_type} | |
| Feedback: {user_feedback or 'No feedback'} | |
| """ | |
| # Store minimal metadata to avoid chunk size issues | |
| doc = Document( | |
| text=history_text, | |
| metadata={ | |
| "type": "history", | |
| "user_id": d.get("user_id"), | |
| "item_type": item_type, | |
| "timestamp": d.get("timestamp", datetime.now().isoformat()), | |
| # Store only essential fields, not full raw_data | |
| "feedback": user_feedback or "" | |
| } | |
| ) | |
| else: | |
| # Unknown type, create basic document | |
| doc = Document( | |
| text=json.dumps(d), | |
| metadata={ | |
| "type": d.get("type", "unknown"), | |
| "user_id": d.get("user_id"), | |
| "timestamp": d.get("timestamp", datetime.now().isoformat()), | |
| # Removed raw_data to reduce metadata size - essential fields stored separately | |
| } | |
| ) | |
| self.documents.append(doc) | |
| except Exception as e: | |
| rag_logger.warning(f"⚠️ Skipping invalid document during load: {e}") | |
| continue | |
| rag_logger.info(f"📂 Loaded {len(self.documents)} documents from RAG storage") | |
| print(f"📂 [RAG] Loaded {len(self.documents)} documents from storage") | |
| # Build vector index if embeddings are available | |
| if self.documents and self.embedding_available: | |
| try: | |
| rag_logger.info(f"🔨 Building vector index from {len(self.documents)} documents...") | |
| print(f"🔨 [RAG] Building vector index from {len(self.documents)} documents...") | |
| self.index = VectorStoreIndex.from_documents( | |
| self.documents, | |
| storage_context=self.storage_context | |
| ) | |
| rag_logger.info(f"✅ Vector index built successfully with {len(self.documents)} documents") | |
| print(f"✅ [RAG] Vector index built with {len(self.documents)} documents") | |
| except Exception as e: | |
| rag_logger.error(f"❌ Failed to build vector index: {e}") | |
| print(f"Warning: Could not build vector index: {e}") | |
| self.index = None | |
| elif self.documents: | |
| rag_logger.info(f"ℹ️ {len(self.documents)} documents loaded but embeddings disabled - using fallback mode") | |
| print(f"ℹ️ [RAG] {len(self.documents)} documents loaded but embeddings disabled - using fallback mode") | |
| except Exception as e: | |
| rag_logger.error(f"❌ Error loading user data: {e}") | |
| print(f"Error loading user data: {e}") | |
| def _save_user_data(self): | |
| """Save user data to file""" | |
| try: | |
| data = [] | |
| for doc in self.documents: | |
| try: | |
| # Try to get raw_data from metadata first (new format) | |
| raw_data = doc.metadata.get("raw_data") | |
| if raw_data: | |
| data.append(json.loads(raw_data)) | |
| else: | |
| # Fallback: try to parse doc.text as JSON (old format) | |
| data.append(json.loads(doc.text)) | |
| except (json.JSONDecodeError, KeyError) as e: | |
| # Skip documents that can't be parsed | |
| rag_logger.warning(f"⚠️ Skipping document that couldn't be parsed: {e}") | |
| continue | |
| with open(self.user_data_file, 'w') as f: | |
| json.dump(data, f, indent=2) | |
| rag_logger.info(f"💾 Saved {len(data)} documents to {self.user_data_file}") | |
| except Exception as e: | |
| rag_logger.error(f"❌ Error saving user data: {e}") | |
| import traceback | |
| rag_logger.error(traceback.format_exc()) | |
| print(f"Error saving user data: {e}") | |
| def store_user_preferences(self, preferences: Dict[str, Any], user_id: str = None): | |
| """Store user preferences in RAG system with user ID""" | |
| if not user_id: | |
| rag_logger.warning("⚠️ Storing preferences without user_id - data will not be user-specific") | |
| print("⚠️ [RAG] Warning: Storing preferences without user_id") | |
| pref_doc = { | |
| "type": "preferences", | |
| "user_id": user_id, | |
| "timestamp": datetime.now().isoformat(), | |
| "data": preferences | |
| } | |
| # Create a more descriptive document for better RAG retrieval | |
| pref_text = f"""User Preferences: | |
| Name: {preferences.get('name', 'Unknown')} | |
| Favorite Genres: {', '.join(preferences.get('favorite_genres', []))} | |
| Interests: {', '.join(preferences.get('interests', []))} | |
| Podcast Interests: {', '.join(preferences.get('podcast_interests', []))} | |
| Mood: {preferences.get('mood', 'neutral')} | |
| Content Filter: Music={preferences.get('content_filter', {}).get('music', True)}, News={preferences.get('content_filter', {}).get('news', True)}, Podcasts={preferences.get('content_filter', {}).get('podcasts', True)}, Stories={preferences.get('content_filter', {}).get('stories', True)} | |
| """ | |
| doc = Document( | |
| text=pref_text, | |
| metadata={ | |
| "type": "preferences", | |
| "user_id": user_id, | |
| "timestamp": datetime.now().isoformat(), | |
| "raw_data": json.dumps(pref_doc) | |
| } | |
| ) | |
| self.documents.append(doc) | |
| rag_logger.info(f"📝 STORING PREFERENCES: user_id={user_id}, Name={preferences.get('name')}, Genres={preferences.get('favorite_genres')}, Mood={preferences.get('mood')}") | |
| print(f"📝 [RAG] Storing preferences for user {user_id} ({preferences.get('name', 'user')})") | |
| # Rebuild index if embeddings are available | |
| if self.embedding_available: | |
| try: | |
| self.index = VectorStoreIndex.from_documents( | |
| self.documents, | |
| storage_context=self.storage_context | |
| ) | |
| rag_logger.info(f"✅ Vector index rebuilt with {len(self.documents)} documents (embeddings enabled)") | |
| print(f"✅ [RAG] Index updated with {len(self.documents)} documents") | |
| except Exception as e: | |
| rag_logger.error(f"❌ Failed to rebuild index: {e}") | |
| print(f"Warning: Could not rebuild index: {e}") | |
| self.index = None | |
| else: | |
| rag_logger.info(f"ℹ️ Preferences stored (embeddings disabled, {len(self.documents)} total documents)") | |
| print(f"ℹ️ [RAG] Preferences stored (embeddings disabled)") | |
| self._save_user_data() | |
| def store_listening_history(self, item_type: str, item_data: Dict[str, Any], user_id: str = None, user_feedback: str = None): | |
| """Store listening history with optional feedback and user ID""" | |
| if not user_id: | |
| rag_logger.warning(f"⚠️ Storing {item_type} history without user_id - data will not be user-specific") | |
| print(f"⚠️ [RAG] Warning: Storing {item_type} history without user_id") | |
| history_doc = { | |
| "type": "history", | |
| "user_id": user_id, | |
| "item_type": item_type, # music, news, podcast, story | |
| "timestamp": datetime.now().isoformat(), | |
| "data": item_data, | |
| "feedback": user_feedback | |
| } | |
| # Create descriptive text for better RAG retrieval | |
| if item_type == "music": | |
| track = item_data.get("track", {}) | |
| history_text = f"""Music Listening History: | |
| Title: {track.get('title', 'Unknown')} | |
| Artist: {track.get('artist', 'Unknown')} | |
| Genre: {track.get('genre', 'Unknown')} | |
| Source: {track.get('source', 'Unknown')} | |
| Feedback: {user_feedback or 'No feedback'} | |
| """ | |
| rag_logger.info(f"🎵 STORING MUSIC HISTORY: user_id={user_id}, {track.get('title', 'Unknown')} by {track.get('artist', 'Unknown')} ({track.get('genre', 'Unknown')}) - Feedback: {user_feedback or 'None'}") | |
| print(f"🎵 [RAG] Storing music for user {user_id}: {track.get('title', 'Unknown')} by {track.get('artist', 'Unknown')}") | |
| elif item_type == "news": | |
| history_text = f"""News Listening History: | |
| Items: {len(item_data.get('news_items', []))} news items | |
| Topics: {', '.join([item.get('category', '') for item in item_data.get('news_items', [])[:3]])} | |
| Feedback: {user_feedback or 'No feedback'} | |
| """ | |
| rag_logger.info(f"📰 STORING NEWS HISTORY: user_id={user_id}, {len(item_data.get('news_items', []))} items - Feedback: {user_feedback or 'None'}") | |
| print(f"📰 [RAG] Storing news history for user {user_id}: {len(item_data.get('news_items', []))} items") | |
| elif item_type == "podcast": | |
| podcast = item_data.get("podcast", {}) | |
| history_text = f"""Podcast Listening History: | |
| Title: {podcast.get('title', 'Unknown')} | |
| Host: {podcast.get('host', 'Unknown')} | |
| Category: {podcast.get('category', 'Unknown')} | |
| Feedback: {user_feedback or 'No feedback'} | |
| """ | |
| rag_logger.info(f"🎙️ STORING PODCAST HISTORY: user_id={user_id}, {podcast.get('title', 'Unknown')} - Feedback: {user_feedback or 'None'}") | |
| print(f"🎙️ [RAG] Storing podcast for user {user_id}: {podcast.get('title', 'Unknown')}") | |
| else: | |
| history_text = f"""Story Listening History: | |
| Type: {item_type} | |
| Feedback: {user_feedback or 'No feedback'} | |
| """ | |
| rag_logger.info(f"📖 STORING STORY HISTORY: user_id={user_id}, {item_type} - Feedback: {user_feedback or 'None'}") | |
| print(f"📖 [RAG] Storing story history for user {user_id}: {item_type}") | |
| doc = Document( | |
| text=history_text, | |
| metadata={ | |
| "type": "history", | |
| "user_id": user_id, | |
| "item_type": item_type, | |
| "timestamp": datetime.now().isoformat(), | |
| "raw_data": json.dumps(history_doc) | |
| } | |
| ) | |
| self.documents.append(doc) | |
| # Rebuild index if embeddings are available (but only periodically to avoid too many rebuilds) | |
| # Rebuild every 5 documents or if index doesn't exist | |
| if self.embedding_available and (not self.index or len(self.documents) % 5 == 0): | |
| try: | |
| self.index = VectorStoreIndex.from_documents( | |
| self.documents, | |
| storage_context=self.storage_context | |
| ) | |
| if len(self.documents) % 5 == 0: | |
| rag_logger.info(f"✅ Vector index rebuilt (total documents: {len(self.documents)})") | |
| print(f"✅ [RAG] Index updated (total documents: {len(self.documents)})") | |
| except Exception as e: | |
| rag_logger.error(f"❌ Failed to rebuild index: {e}") | |
| print(f"Warning: Could not rebuild index: {e}") | |
| self.index = None | |
| self._save_user_data() | |
| def get_user_preferences(self, user_id: str = None) -> Dict[str, Any]: | |
| """Retrieve latest user preferences for a specific user""" | |
| preferences = {} | |
| for doc in reversed(self.documents): | |
| try: | |
| # Check metadata first (new format with user_id) | |
| doc_user_id = doc.metadata.get("user_id") | |
| if user_id and doc_user_id != user_id: | |
| continue # Skip documents from other users | |
| # Try to parse from metadata raw_data (new format) | |
| raw_data = doc.metadata.get("raw_data") | |
| if raw_data: | |
| data = json.loads(raw_data) | |
| if data.get("type") == "preferences": | |
| if not user_id or data.get("user_id") == user_id: | |
| preferences = data.get("data", {}) | |
| break | |
| else: | |
| # Fallback: try to parse doc.text as JSON (old format) | |
| data = json.loads(doc.text) | |
| if data.get("type") == "preferences": | |
| if not user_id or data.get("user_id") == user_id: | |
| preferences = data.get("data", {}) | |
| break | |
| except (json.JSONDecodeError, KeyError, AttributeError) as e: | |
| rag_logger.debug(f"Skipping document in get_user_preferences: {e}") | |
| continue | |
| if user_id and not preferences: | |
| rag_logger.warning(f"⚠️ No preferences found for user_id={user_id}") | |
| return preferences | |
| def get_recommendations(self, query: str, user_id: str = None) -> Dict[str, Any]: | |
| """Get personalized recommendations based on user history and preferences using RAG""" | |
| if not self.index or not self.llm_available: | |
| rag_logger.warning(f"⚠️ RAG RECOMMENDATIONS UNAVAILABLE: user_id={user_id}, query='{query}' (no index or LLM)") | |
| print("ℹ️ RAG recommendations unavailable (no index or LLM) - using defaults") | |
| return self._get_default_recommendations(user_id=user_id) | |
| try: | |
| rag_logger.info(f"🔍 RAG RECOMMENDATIONS QUERY: '{query}'") | |
| print(f"🔍 [RAG] Getting recommendations for: '{query[:60]}...'") | |
| # Use RAG to query user history and preferences | |
| query_engine = self.index.as_query_engine( | |
| similarity_top_k=5, # Get top 5 relevant documents | |
| response_mode="compact" # Compact response | |
| ) | |
| response = query_engine.query(query) | |
| response_text = str(response) | |
| rag_logger.info(f"✅ RAG RECOMMENDATIONS RESPONSE: {response_text[:200]}...") | |
| print(f"✅ [RAG] LLM generated recommendations: {response_text[:150]}...") | |
| # Extract recommendations from RAG response | |
| recommendations = { | |
| "recommendations": response_text, | |
| "source": "RAG", | |
| "query": query | |
| } | |
| # Also try to extract structured data from response | |
| response_lower = response_text.lower() | |
| if "genre" in response_lower or "music" in response_lower: | |
| # Try to extract genre preferences | |
| for genre in ["pop", "rock", "jazz", "classical", "electronic", "hip-hop", "country", "indie", "rap", "blues", "folk"]: | |
| if genre in response_lower: | |
| recommendations.setdefault("suggested_genres", []).append(genre) | |
| rag_logger.info(f" 🎵 Extracted genre from RAG: {genre}") | |
| return recommendations | |
| except Exception as e: | |
| rag_logger.error(f"❌ RAG RECOMMENDATIONS ERROR: {e}") | |
| import traceback | |
| rag_logger.error(traceback.format_exc()) | |
| print(f"Error getting RAG recommendations: {e}") | |
| traceback.print_exc() | |
| return self._get_default_recommendations(user_id=user_id) | |
| def query_user_context(self, query: str, user_id: str = None, top_k: int = 3) -> List[Dict[str, Any]]: | |
| """Query user context using vector search - returns relevant documents filtered by user_id""" | |
| if not self.index or not self.embedding_available: | |
| rag_logger.warning(f"⚠️ RAG QUERY SKIPPED (no index/embeddings): '{query}'") | |
| print(f"⚠️ [RAG] Query skipped - embeddings not available") | |
| return [] | |
| try: | |
| rag_logger.info(f"🔍 RAG QUERY: user_id={user_id}, query='{query}' (top_k={top_k})") | |
| print(f"🔍 [RAG] Querying for user {user_id}: '{query[:60]}...'") | |
| # Retrieve more documents than needed, then filter by user_id | |
| # This ensures we get top_k results for the specific user | |
| retrieve_count = top_k * 3 if user_id else top_k # Get more if filtering | |
| retriever = self.index.as_retriever(similarity_top_k=retrieve_count) | |
| nodes = retriever.retrieve(query) | |
| results = [] | |
| for i, node in enumerate(nodes): | |
| try: | |
| # Filter by user_id if provided | |
| node_user_id = node.metadata.get("user_id") | |
| if user_id and node_user_id != user_id: | |
| continue # Skip documents from other users | |
| score = node.score if hasattr(node, 'score') else None | |
| node_type = node.metadata.get("type", "unknown") | |
| item_type = node.metadata.get("item_type", "") | |
| data = json.loads(node.metadata.get("raw_data", "{}")) | |
| result = { | |
| "text": node.text, | |
| "score": score, | |
| "metadata": node.metadata, | |
| "data": data | |
| } | |
| results.append(result) | |
| # Log each retrieved document | |
| preview = node.text[:100].replace('\n', ' ') | |
| try: | |
| score_str = f"{float(score):.4f}" if score is not None else "N/A" | |
| except (TypeError, ValueError): | |
| score_str = str(score) if score is not None else "N/A" | |
| rag_logger.info(f" 📄 Retrieved #{len(results)}: user_id={node_user_id}, type={node_type}, item_type={item_type}, score={score_str}, preview='{preview}...'") | |
| print(f" 📄 [RAG] Retrieved #{len(results)}: {node_type} (user: {node_user_id}, score: {score_str}) - {preview}...") | |
| # Stop if we have enough results for this user | |
| if len(results) >= top_k: | |
| break | |
| except Exception as parse_error: | |
| # Still check user_id even if parsing fails | |
| node_user_id = node.metadata.get("user_id") | |
| if user_id and node_user_id != user_id: | |
| continue | |
| score = node.score if hasattr(node, 'score') else None | |
| result = { | |
| "text": node.text, | |
| "score": score, | |
| "metadata": node.metadata | |
| } | |
| results.append(result) | |
| try: | |
| score_str = f"{float(score):.4f}" if score is not None else "N/A" | |
| except (TypeError, ValueError): | |
| score_str = str(score) if score is not None else "N/A" | |
| rag_logger.warning(f" ⚠️ Retrieved #{len(results)} (parse error): user_id={node_user_id}, score={score_str}, text_preview='{node.text[:50]}...'") | |
| print(f" ⚠️ [RAG] Retrieved #{len(results)} (parse error, user: {node_user_id}, score: {score_str})") | |
| if len(results) >= top_k: | |
| break | |
| rag_logger.info(f"✅ RAG QUERY COMPLETE: Retrieved {len(results)} documents for user_id={user_id}") | |
| print(f"✅ [RAG] Query complete: {len(results)} documents retrieved for user {user_id}") | |
| return results | |
| except Exception as e: | |
| rag_logger.error(f"❌ RAG QUERY ERROR: {e}") | |
| import traceback | |
| rag_logger.error(traceback.format_exc()) | |
| print(f"❌ [RAG] Query error: {e}") | |
| return [] | |
| def _get_default_recommendations(self) -> Dict[str, Any]: | |
| """Return default recommendations when RAG is not available""" | |
| preferences = self.get_user_preferences() | |
| return { | |
| "favorite_genres": preferences.get("favorite_genres", ["pop", "rock"]), | |
| "interests": preferences.get("interests", ["technology", "world"]), | |
| "podcast_interests": preferences.get("podcast_interests", ["technology"]), | |
| "mood": preferences.get("mood", "happy"), | |
| "source": "preferences" | |
| } | |
| def get_listening_stats(self) -> Dict[str, Any]: | |
| """Get statistics about user's listening history""" | |
| stats = { | |
| "total_sessions": 0, | |
| "music_played": 0, | |
| "news_heard": 0, | |
| "podcasts_listened": 0, | |
| "stories_enjoyed": 0 | |
| } | |
| for doc in self.documents: | |
| try: | |
| data = json.loads(doc.text) | |
| if data.get("type") == "history": | |
| stats["total_sessions"] += 1 | |
| item_type = data.get("item_type", "") | |
| if item_type == "music": | |
| stats["music_played"] += 1 | |
| elif item_type == "news": | |
| stats["news_heard"] += 1 | |
| elif item_type == "podcast": | |
| stats["podcasts_listened"] += 1 | |
| elif item_type == "story": | |
| stats["stories_enjoyed"] += 1 | |
| except: | |
| continue | |
| return stats | |