AIdea-Server / src /recommendation /recommender.py
β€œbotayla”
update
c342700
import asyncio
from collections import Counter
from typing import List, Dict, Optional
from googleapiclient.discovery import build
from src.utils.logger import setup_logger
import random
# import anthropic
from groq import Groq
logger = setup_logger(__name__)
class RecommendationService:
"""
Service for suggesting videos based on user's saved notes.
Pipeline:
1. Top 3 most-repeated categories across all user notes
2. Extract key keywords from the latest note per category (via Claude)
3. Build a YouTube search query and return recommendations
"""
def __init__(self, api_key: Optional[str] = None):
self.api_key = "AIzaSyA3erB-Lxd5SOoBOXaumOCVaEr3TcgYG60"
self.youtube = build("youtube", "v3", developerKey=self.api_key)
self.groq_client = Groq(api_key="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") # Ω‡Ω†Ψ§
# ──────────────────────────────────────────────
# Step 1: top 3 categories
# ──────────────────────────────────────────────
def _get_top_categories(self, notes: List[Dict], top_n: int = 3) -> List[str]:
"""Count category frequency across all notes and return the top N."""
counter: Counter = Counter()
for note in notes:
cat = note.get("category")
if not cat:
continue
cats = cat if isinstance(cat, list) else [cat]
for c in cats:
if c and c != "Uncategorized":
counter[c] += 1
top = [cat for cat, _ in counter.most_common(top_n)]
logger.info(f"πŸ† Top categories: {top}")
return top
# ──────────────────────────────────────────────
# Step 2: keywords from latest note per category
# ──────────────────────────────────────────────
def _latest_notes_per_category(
self, notes: List[Dict], categories: List[str], top_n: int = 2
) -> Dict[str, List[Dict]]:
"""
return a dict mapping each category to its latest N notes, sorted by createdAt.
"""
buckets: Dict[str, List[Dict]] = {cat: [] for cat in categories}
for note in notes:
cat = note.get("category")
cats = cat if isinstance(cat, list) else [cat] if cat else []
for c in cats:
if c in buckets:
buckets[c].append(note)
# sort each category's notes by createdAt and keep top N
return {
cat: sorted(notes_list, key=lambda n: n.get("createdAt", 0), reverse=True)[:top_n]
for cat, notes_list in buckets.items()
}
async def _extract_keywords_with_claude(
self, notes: List[Dict], category: str # ← List Ψ¨Ψ―Ω„ Dict
) -> List[str]:
# combine all relevant text fields from the notes into one string for context
combined_content = "\n---\n".join([
note.get("content") or note.get("text") or note.get("videoTitle") or ""
for note in notes
]).strip()
if not combined_content:
return [category]
prompt = (
f"You are a search-query assistant. "
f"Given the notes below (category: {category}), "
f"extract 3 to 5 concise English keywords or short phrases that best "
f"represent the core topic for a YouTube educational search. "
f"Reply with ONLY a JSON array of strings, no explanation.\n\n"
f"Notes:\n{combined_content[:2000]}" # ← زودي Ψ§Ω„Ψ­Ψ― شوية
)
try:
loop = asyncio.get_event_loop()
# groq_client = Groq(api_key="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")
response = await loop.run_in_executor(
None,
lambda: self.groq_client.chat.completions.create(
model="llama-3.3-70b-versatile",
messages=[{"role": "user", "content": prompt}],
max_tokens=120,
)
)
raw = response.choices[0].message.content.strip()
import json, re
# strip accidental markdown fences
raw = re.sub(r"```json|```", "", raw).strip()
keywords = json.loads(raw)
if isinstance(keywords, list):
logger.info(f"πŸ”‘ Keywords for '{category}': {keywords}")
return [str(k) for k in keywords[:5]]
except Exception as e:
logger.warning(f"⚠️ Claude keyword extraction failed for '{category}': {e}")
return [category] # fallback
# ──────────────────────────────────────────────
# Step 3: build query & search YouTube
# ──────────────────────────────────────────────
async def _build_search_query(
self, category_keywords: Dict[str, List[str]]
) -> str:
"""
Merge keywords from each top category into one balanced search query.
Takes up to 2 keywords per category to keep the query focused.
"""
parts = []
for keywords in category_keywords.values():
parts.extend(keywords[:2])
query = " OR ".join(parts[:6]) # YouTube search works best under ~60 chars
logger.info(f"πŸ” Final search query: {query}")
return query
async def get_youtube_recommendations(
self, query: str, limit: int = 5
) -> List[Dict]:
"""Search YouTube for educational videos matching the query."""
if not query:
return []
enhanced_query = f"{query} tutorial "
logger.info(f"🎬 Searching YouTube: {enhanced_query}")
try:
loop = asyncio.get_event_loop()
search_response = await loop.run_in_executor(
None,
lambda: self.youtube.search()
.list(
q=enhanced_query,
part="snippet",
maxResults=limit * 3,
type="video",
relevanceLanguage="en",
videoEmbeddable="true",
videoDuration="medium",
)
.execute(),
)
videos = []
for item in search_response.get("items", []):
snippet = item["snippet"]
videos.append(
{
"videoId": item["id"]["videoId"],
"title": snippet["title"],
"description": snippet["description"],
"thumbnail": snippet["thumbnails"]["medium"]["url"],
"channelTitle": snippet["channelTitle"],
"url": f"https://www.youtube.com/watch?v={item['id']['videoId']}",
"type": "youtube_video",
}
)
random.shuffle(videos)
result = videos[:limit]
logger.info(f"βœ… Returning {len(result)} recommendations")
return result
except Exception as e:
logger.error(f"❌ YouTube search failed: {e}")
return []
# ──────────────────────────────────────────────
# Main entry point
# ──────────────────────────────────────────────
async def get_recommendations_for_user(
self, db, user_id: str, limit: int = 5
) -> List[Dict]:
logger.info(f"πŸ“š Fetching notes for user: {user_id}")
# ── Fetch notes ──────────────────────────
try:
notes_docs = (
db.collection("notes")
.where("userId", "==", user_id)
.stream()
)
notes = [doc.to_dict() for doc in notes_docs]
logger.info(f"πŸ“ Found {len(notes)} notes")
except Exception as e:
logger.error(f"❌ Firebase fetch failed: {e}")
notes = []
if not notes:
logger.info("⚠️ No notes β€” falling back to general recommendations")
return await self.get_youtube_recommendations("educational tutorials", limit)
# ── Step 1: top 3 categories ─────────────
top_categories = self._get_top_categories(notes, top_n=3)
if not top_categories:
logger.info("⚠️ No valid categories β€” falling back")
return await self.get_youtube_recommendations("educational tutorials", limit)
# ── Step 2: keywords via Claude ──────────
latest_notes = self._latest_notes_per_category(notes, top_categories, top_n=2)
valid_categories = [
cat for cat in top_categories
if cat in latest_notes and latest_notes[cat]
]
keyword_tasks = [
self._extract_keywords_with_claude(latest_notes[cat], cat)
for cat in valid_categories
]
keyword_results = await asyncio.gather(*keyword_tasks)
category_keywords: Dict[str, List[str]] = {
cat: kws
for cat, kws in zip(valid_categories, keyword_results) # βœ… zip ΨΉΩ„Ω‰ نفس Ψ§Ω„Ω„ΩŠΨ³Ψͺ
}
# ── Step 3: build query & recommend ──────
all_videos = []
for category, keywords in category_keywords.items():
query = " ".join(keywords[:3])
logger.info(f"🎯 Searching category: {category} | Query: {query}")
videos = await self.get_youtube_recommendations(query, limit=2)
for v in videos:
v["category"] = category
all_videos.extend(videos)
random.shuffle(all_videos)
return all_videos[:limit * 2]