import asyncio from collections import Counter from typing import List, Dict, Optional from googleapiclient.discovery import build from src.utils.logger import setup_logger import random # import anthropic from groq import Groq logger = setup_logger(__name__) class RecommendationService: """ Service for suggesting videos based on user's saved notes. Pipeline: 1. Top 3 most-repeated categories across all user notes 2. Extract key keywords from the latest note per category (via Claude) 3. Build a YouTube search query and return recommendations """ def __init__(self, api_key: Optional[str] = None): self.api_key = "AIzaSyA3erB-Lxd5SOoBOXaumOCVaEr3TcgYG60" self.youtube = build("youtube", "v3", developerKey=self.api_key) self.groq_client = Groq(api_key="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") # هنا # ────────────────────────────────────────────── # Step 1: top 3 categories # ────────────────────────────────────────────── def _get_top_categories(self, notes: List[Dict], top_n: int = 3) -> List[str]: """Count category frequency across all notes and return the top N.""" counter: Counter = Counter() for note in notes: cat = note.get("category") if not cat: continue cats = cat if isinstance(cat, list) else [cat] for c in cats: if c and c != "Uncategorized": counter[c] += 1 top = [cat for cat, _ in counter.most_common(top_n)] logger.info(f"🏆 Top categories: {top}") return top # ────────────────────────────────────────────── # Step 2: keywords from latest note per category # ────────────────────────────────────────────── def _latest_notes_per_category( self, notes: List[Dict], categories: List[str], top_n: int = 2 ) -> Dict[str, List[Dict]]: """ return a dict mapping each category to its latest N notes, sorted by createdAt. """ buckets: Dict[str, List[Dict]] = {cat: [] for cat in categories} for note in notes: cat = note.get("category") cats = cat if isinstance(cat, list) else [cat] if cat else [] for c in cats: if c in buckets: buckets[c].append(note) # sort each category's notes by createdAt and keep top N return { cat: sorted(notes_list, key=lambda n: n.get("createdAt", 0), reverse=True)[:top_n] for cat, notes_list in buckets.items() } async def _extract_keywords_with_claude( self, notes: List[Dict], category: str # ← List بدل Dict ) -> List[str]: # combine all relevant text fields from the notes into one string for context combined_content = "\n---\n".join([ note.get("content") or note.get("text") or note.get("videoTitle") or "" for note in notes ]).strip() if not combined_content: return [category] prompt = ( f"You are a search-query assistant. " f"Given the notes below (category: {category}), " f"extract 3 to 5 concise English keywords or short phrases that best " f"represent the core topic for a YouTube educational search. " f"Reply with ONLY a JSON array of strings, no explanation.\n\n" f"Notes:\n{combined_content[:2000]}" # ← زودي الحد شوية ) try: loop = asyncio.get_event_loop() # groq_client = Groq(api_key="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") response = await loop.run_in_executor( None, lambda: self.groq_client.chat.completions.create( model="llama-3.3-70b-versatile", messages=[{"role": "user", "content": prompt}], max_tokens=120, ) ) raw = response.choices[0].message.content.strip() import json, re # strip accidental markdown fences raw = re.sub(r"```json|```", "", raw).strip() keywords = json.loads(raw) if isinstance(keywords, list): logger.info(f"🔑 Keywords for '{category}': {keywords}") return [str(k) for k in keywords[:5]] except Exception as e: logger.warning(f"⚠️ Claude keyword extraction failed for '{category}': {e}") return [category] # fallback # ────────────────────────────────────────────── # Step 3: build query & search YouTube # ────────────────────────────────────────────── async def _build_search_query( self, category_keywords: Dict[str, List[str]] ) -> str: """ Merge keywords from each top category into one balanced search query. Takes up to 2 keywords per category to keep the query focused. """ parts = [] for keywords in category_keywords.values(): parts.extend(keywords[:2]) query = " OR ".join(parts[:6]) # YouTube search works best under ~60 chars logger.info(f"🔍 Final search query: {query}") return query async def get_youtube_recommendations( self, query: str, limit: int = 5 ) -> List[Dict]: """Search YouTube for educational videos matching the query.""" if not query: return [] enhanced_query = f"{query} tutorial " logger.info(f"🎬 Searching YouTube: {enhanced_query}") try: loop = asyncio.get_event_loop() search_response = await loop.run_in_executor( None, lambda: self.youtube.search() .list( q=enhanced_query, part="snippet", maxResults=limit * 3, type="video", relevanceLanguage="en", videoEmbeddable="true", videoDuration="medium", ) .execute(), ) videos = [] for item in search_response.get("items", []): snippet = item["snippet"] videos.append( { "videoId": item["id"]["videoId"], "title": snippet["title"], "description": snippet["description"], "thumbnail": snippet["thumbnails"]["medium"]["url"], "channelTitle": snippet["channelTitle"], "url": f"https://www.youtube.com/watch?v={item['id']['videoId']}", "type": "youtube_video", } ) random.shuffle(videos) result = videos[:limit] logger.info(f"✅ Returning {len(result)} recommendations") return result except Exception as e: logger.error(f"❌ YouTube search failed: {e}") return [] # ────────────────────────────────────────────── # Main entry point # ────────────────────────────────────────────── async def get_recommendations_for_user( self, db, user_id: str, limit: int = 5 ) -> List[Dict]: logger.info(f"📚 Fetching notes for user: {user_id}") # ── Fetch notes ────────────────────────── try: notes_docs = ( db.collection("notes") .where("userId", "==", user_id) .stream() ) notes = [doc.to_dict() for doc in notes_docs] logger.info(f"📝 Found {len(notes)} notes") except Exception as e: logger.error(f"❌ Firebase fetch failed: {e}") notes = [] if not notes: logger.info("⚠️ No notes — falling back to general recommendations") return await self.get_youtube_recommendations("educational tutorials", limit) # ── Step 1: top 3 categories ───────────── top_categories = self._get_top_categories(notes, top_n=3) if not top_categories: logger.info("⚠️ No valid categories — falling back") return await self.get_youtube_recommendations("educational tutorials", limit) # ── Step 2: keywords via Claude ────────── latest_notes = self._latest_notes_per_category(notes, top_categories, top_n=2) valid_categories = [ cat for cat in top_categories if cat in latest_notes and latest_notes[cat] ] keyword_tasks = [ self._extract_keywords_with_claude(latest_notes[cat], cat) for cat in valid_categories ] keyword_results = await asyncio.gather(*keyword_tasks) category_keywords: Dict[str, List[str]] = { cat: kws for cat, kws in zip(valid_categories, keyword_results) # ✅ zip على نفس الليست } # ── Step 3: build query & recommend ────── all_videos = [] for category, keywords in category_keywords.items(): query = " ".join(keywords[:3]) logger.info(f"🎯 Searching category: {category} | Query: {query}") videos = await self.get_youtube_recommendations(query, limit=2) for v in videos: v["category"] = category all_videos.extend(videos) random.shuffle(all_videos) return all_videos[:limit * 2]