Spaces:
Running
Running
| import asyncio | |
| from collections import Counter | |
| from typing import List, Dict, Optional | |
| from googleapiclient.discovery import build | |
| from src.utils.logger import setup_logger | |
| import random | |
| # import anthropic | |
| from groq import Groq | |
| logger = setup_logger(__name__) | |
| class RecommendationService: | |
| """ | |
| Service for suggesting videos based on user's saved notes. | |
| Pipeline: | |
| 1. Top 3 most-repeated categories across all user notes | |
| 2. Extract key keywords from the latest note per category (via Claude) | |
| 3. Build a YouTube search query and return recommendations | |
| """ | |
| def __init__(self, api_key: Optional[str] = None): | |
| self.api_key = "AIzaSyA3erB-Lxd5SOoBOXaumOCVaEr3TcgYG60" | |
| self.youtube = build("youtube", "v3", developerKey=self.api_key) | |
| self.groq_client = Groq(api_key="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") # ΩΩΨ§ | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Step 1: top 3 categories | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _get_top_categories(self, notes: List[Dict], top_n: int = 3) -> List[str]: | |
| """Count category frequency across all notes and return the top N.""" | |
| counter: Counter = Counter() | |
| for note in notes: | |
| cat = note.get("category") | |
| if not cat: | |
| continue | |
| cats = cat if isinstance(cat, list) else [cat] | |
| for c in cats: | |
| if c and c != "Uncategorized": | |
| counter[c] += 1 | |
| top = [cat for cat, _ in counter.most_common(top_n)] | |
| logger.info(f"π Top categories: {top}") | |
| return top | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Step 2: keywords from latest note per category | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _latest_notes_per_category( | |
| self, notes: List[Dict], categories: List[str], top_n: int = 2 | |
| ) -> Dict[str, List[Dict]]: | |
| """ | |
| return a dict mapping each category to its latest N notes, sorted by createdAt. | |
| """ | |
| buckets: Dict[str, List[Dict]] = {cat: [] for cat in categories} | |
| for note in notes: | |
| cat = note.get("category") | |
| cats = cat if isinstance(cat, list) else [cat] if cat else [] | |
| for c in cats: | |
| if c in buckets: | |
| buckets[c].append(note) | |
| # sort each category's notes by createdAt and keep top N | |
| return { | |
| cat: sorted(notes_list, key=lambda n: n.get("createdAt", 0), reverse=True)[:top_n] | |
| for cat, notes_list in buckets.items() | |
| } | |
| async def _extract_keywords_with_claude( | |
| self, notes: List[Dict], category: str # β List Ψ¨Ψ―Ω Dict | |
| ) -> List[str]: | |
| # combine all relevant text fields from the notes into one string for context | |
| combined_content = "\n---\n".join([ | |
| note.get("content") or note.get("text") or note.get("videoTitle") or "" | |
| for note in notes | |
| ]).strip() | |
| if not combined_content: | |
| return [category] | |
| prompt = ( | |
| f"You are a search-query assistant. " | |
| f"Given the notes below (category: {category}), " | |
| f"extract 3 to 5 concise English keywords or short phrases that best " | |
| f"represent the core topic for a YouTube educational search. " | |
| f"Reply with ONLY a JSON array of strings, no explanation.\n\n" | |
| f"Notes:\n{combined_content[:2000]}" # β Ψ²ΩΨ―Ω Ψ§ΩΨΨ― Ψ΄ΩΩΨ© | |
| ) | |
| try: | |
| loop = asyncio.get_event_loop() | |
| # groq_client = Groq(api_key="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") | |
| response = await loop.run_in_executor( | |
| None, | |
| lambda: self.groq_client.chat.completions.create( | |
| model="llama-3.3-70b-versatile", | |
| messages=[{"role": "user", "content": prompt}], | |
| max_tokens=120, | |
| ) | |
| ) | |
| raw = response.choices[0].message.content.strip() | |
| import json, re | |
| # strip accidental markdown fences | |
| raw = re.sub(r"```json|```", "", raw).strip() | |
| keywords = json.loads(raw) | |
| if isinstance(keywords, list): | |
| logger.info(f"π Keywords for '{category}': {keywords}") | |
| return [str(k) for k in keywords[:5]] | |
| except Exception as e: | |
| logger.warning(f"β οΈ Claude keyword extraction failed for '{category}': {e}") | |
| return [category] # fallback | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Step 3: build query & search YouTube | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def _build_search_query( | |
| self, category_keywords: Dict[str, List[str]] | |
| ) -> str: | |
| """ | |
| Merge keywords from each top category into one balanced search query. | |
| Takes up to 2 keywords per category to keep the query focused. | |
| """ | |
| parts = [] | |
| for keywords in category_keywords.values(): | |
| parts.extend(keywords[:2]) | |
| query = " OR ".join(parts[:6]) # YouTube search works best under ~60 chars | |
| logger.info(f"π Final search query: {query}") | |
| return query | |
| async def get_youtube_recommendations( | |
| self, query: str, limit: int = 5 | |
| ) -> List[Dict]: | |
| """Search YouTube for educational videos matching the query.""" | |
| if not query: | |
| return [] | |
| enhanced_query = f"{query} tutorial " | |
| logger.info(f"π¬ Searching YouTube: {enhanced_query}") | |
| try: | |
| loop = asyncio.get_event_loop() | |
| search_response = await loop.run_in_executor( | |
| None, | |
| lambda: self.youtube.search() | |
| .list( | |
| q=enhanced_query, | |
| part="snippet", | |
| maxResults=limit * 3, | |
| type="video", | |
| relevanceLanguage="en", | |
| videoEmbeddable="true", | |
| videoDuration="medium", | |
| ) | |
| .execute(), | |
| ) | |
| videos = [] | |
| for item in search_response.get("items", []): | |
| snippet = item["snippet"] | |
| videos.append( | |
| { | |
| "videoId": item["id"]["videoId"], | |
| "title": snippet["title"], | |
| "description": snippet["description"], | |
| "thumbnail": snippet["thumbnails"]["medium"]["url"], | |
| "channelTitle": snippet["channelTitle"], | |
| "url": f"https://www.youtube.com/watch?v={item['id']['videoId']}", | |
| "type": "youtube_video", | |
| } | |
| ) | |
| random.shuffle(videos) | |
| result = videos[:limit] | |
| logger.info(f"β Returning {len(result)} recommendations") | |
| return result | |
| except Exception as e: | |
| logger.error(f"β YouTube search failed: {e}") | |
| return [] | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Main entry point | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def get_recommendations_for_user( | |
| self, db, user_id: str, limit: int = 5 | |
| ) -> List[Dict]: | |
| logger.info(f"π Fetching notes for user: {user_id}") | |
| # ββ Fetch notes ββββββββββββββββββββββββββ | |
| try: | |
| notes_docs = ( | |
| db.collection("notes") | |
| .where("userId", "==", user_id) | |
| .stream() | |
| ) | |
| notes = [doc.to_dict() for doc in notes_docs] | |
| logger.info(f"π Found {len(notes)} notes") | |
| except Exception as e: | |
| logger.error(f"β Firebase fetch failed: {e}") | |
| notes = [] | |
| if not notes: | |
| logger.info("β οΈ No notes β falling back to general recommendations") | |
| return await self.get_youtube_recommendations("educational tutorials", limit) | |
| # ββ Step 1: top 3 categories βββββββββββββ | |
| top_categories = self._get_top_categories(notes, top_n=3) | |
| if not top_categories: | |
| logger.info("β οΈ No valid categories β falling back") | |
| return await self.get_youtube_recommendations("educational tutorials", limit) | |
| # ββ Step 2: keywords via Claude ββββββββββ | |
| latest_notes = self._latest_notes_per_category(notes, top_categories, top_n=2) | |
| valid_categories = [ | |
| cat for cat in top_categories | |
| if cat in latest_notes and latest_notes[cat] | |
| ] | |
| keyword_tasks = [ | |
| self._extract_keywords_with_claude(latest_notes[cat], cat) | |
| for cat in valid_categories | |
| ] | |
| keyword_results = await asyncio.gather(*keyword_tasks) | |
| category_keywords: Dict[str, List[str]] = { | |
| cat: kws | |
| for cat, kws in zip(valid_categories, keyword_results) # β zip ΨΉΩΩ ΩΩΨ³ Ψ§ΩΩΩΨ³Ψͺ | |
| } | |
| # ββ Step 3: build query & recommend ββββββ | |
| all_videos = [] | |
| for category, keywords in category_keywords.items(): | |
| query = " ".join(keywords[:3]) | |
| logger.info(f"π― Searching category: {category} | Query: {query}") | |
| videos = await self.get_youtube_recommendations(query, limit=2) | |
| for v in videos: | |
| v["category"] = category | |
| all_videos.extend(videos) | |
| random.shuffle(all_videos) | |
| return all_videos[:limit * 2] |