import json import os import re import time from typing import Dict, List, Optional from groq import Groq from pydantic import ValidationError from ..utils.logger import setup_logger from .schemas import SummarySchema logger = setup_logger(__name__) # ───────────────────────────────────────────────────────────────────────────── # CONFIGURATION # ───────────────────────────────────────────────────────────────────────────── # Token threshold: below this, a single API call is used. _SINGLE_PASS_TOKEN_LIMIT = 8_000 # Target chunk size for MAP phase (tokens). Kept small so that # prompt + chunk + response stays well under the 12K TPM free-tier limit. _CHUNK_TARGET_TOKENS = 2_500 # Model — unified for both MAP and REDUCE phases. # llama-3.3-70b-versatile has 12K TPM on the free tier (the highest). _MODEL_PRIMARY = "llama-3.3-70b-versatile" # Maximum retries when a rate-limit (413 / 429) is hit. _RATE_LIMIT_MAX_RETRIES = 3 _RATE_LIMIT_SLEEP_SECONDS = 60 # ───────────────────────────────────────────────────────────────────────────── # PROMPT TEMPLATES — SINGLE-PASS (unchanged) # ───────────────────────────────────────────────────────────────────────────── _SUMMARY_SYSTEM = """ You are an expert educational content analyst and structured note-taking specialist. Transform raw video transcripts into clean, structured chronological JSON summaries. LANGUAGE RULE — CRITICAL, NEVER VIOLATE: - Detect the primary language of the transcript. - Every content field (title, summary, segments, conclusion) MUST be written entirely in that SAME detected language. - Do NOT mix languages. Arabic transcript -> everything in Arabic. - Only the "detected_language" and "suggested_category" fields are stated in English. TIMELINE RULES — STRICTLY ENFORCED: - Divide the transcript into chronological segments that follow its natural progression. - Produce a MINIMUM of 3 and a MAXIMUM of 7 segments. - Each segment MUST cover a distinct phase or theme; do NOT repeat the same topic. - Segments must be ordered chronologically as they appear in the transcript. - Each segment must include: * title: a short descriptive title * summary: concise summary of that section (2-3 sentences) * key_insight: the single most important takeaway from that section * why_it_matters: brief explanation of value/importance (1-2 sentences) TOPICS RULE: - Extract the actual topics discussed in the video dynamically. - Topics should be specific and descriptive (e.g. "Python", "Machine Learning", "Neural Networks"). - Do NOT use generic fixed categories. CATEGORY RULE: - Provide a single, concise category label (1-2 words max) in English. - This should be the most accurate high-level category for the video content. - Examples: "Programming", "Finance", "History", "Psychology", "Mathematics", "Cooking". - The suggested_category MUST always be in English regardless of the transcript language. CRITICAL: RETURN A JSON OBJECT EXACTLY MATCHING THIS STRUCTURE. DO NOT CHANGE, OMIT, OR RENAME ANY KEYS. { "title": "Inferred video title in transcript language", "detected_language": "English (or Arabic, etc.)", "summary": "Concise overall summary (3-5 sentences)", "segments": [ { "title": "Segment title", "summary": "What this section covers (2-3 sentences)", "key_insight": "Most important point from this section", "why_it_matters": "Why this is valuable (1-2 sentences)" } ], "conclusion": "Final overall takeaway / closing conclusion", "topics": ["Topic1", "Topic2", "Topic3"], "suggested_category": "Programming" } OUTPUT: Return ONLY a valid JSON object. No markdown fences, no extra text. """.strip() _SUMMARY_USER = """ Video Title: {video_title} TRANSCRIPT: {transcript} Analyze thoroughly. Detect the language. Divide the content into 3-7 chronological segments. For each segment provide: title, summary, key_insight, why_it_matters. Return ONLY the exact JSON structure requested. """.strip() # ───────────────────────────────────────────────────────────────────────────── # PROMPT TEMPLATES — MAP PHASE # ───────────────────────────────────────────────────────────────────────────── _MAP_SYSTEM = """ You are an expert educational content analyst. You will receive ONE CHUNK of a longer video transcript. Extract the key information from this chunk ONLY. LANGUAGE RULE — CRITICAL: - Detect the primary language of the text. - Write ALL content fields in that SAME detected language. - Only "detected_language" is stated in English. Return a JSON object with this EXACT structure: { "detected_language": "English (or Arabic, etc.)", "chunk_summary": "Concise summary of this chunk (3-5 sentences)", "key_points": [ { "title": "Short title for this point", "detail": "1-2 sentence explanation", "insight": "Key takeaway" } ], "topics": ["Topic1", "Topic2"] } RULES: - Extract 2-4 key points from this chunk. - Topics should be specific (e.g. "Python", "Neural Networks"), not generic. - OUTPUT: Return ONLY a valid JSON object. No markdown fences, no extra text. """.strip() _MAP_USER = """ Video Title: {video_title} Chunk {chunk_index} of {total_chunks}: {chunk_text} Extract the key information from this chunk. Return ONLY the JSON. """.strip() # ───────────────────────────────────────────────────────────────────────────── # PROMPT TEMPLATES — REDUCE PHASE # ───────────────────────────────────────────────────────────────────────────── _REDUCE_SYSTEM = """ You are an expert educational content analyst and structured note-taking specialist. You will receive INTERMEDIATE SUMMARIES from multiple chunks of a single video transcript. Your job is to MERGE them into ONE final, cohesive, structured summary. LANGUAGE RULE — CRITICAL, NEVER VIOLATE: - Use the detected language from the intermediate summaries. - Every content field MUST be in that SAME language. - Only "detected_language" and "suggested_category" are stated in English. TIMELINE RULES — STRICTLY ENFORCED: - Merge the chunk summaries into 3-7 chronological segments. - Each segment MUST cover a distinct phase or theme; do NOT repeat topics. - Segments must follow the natural progression of the video. - Each segment must include: title, summary, key_insight, why_it_matters. CATEGORY RULE: - Provide a single, concise category label (1-2 words max) in English. - This should be the most accurate high-level category for the video content. - Examples: "Programming", "Finance", "History", "Psychology", "Mathematics", "Cooking". - The suggested_category MUST always be in English regardless of the transcript language. CRITICAL: RETURN A JSON OBJECT EXACTLY MATCHING THIS STRUCTURE. { "title": "Inferred video title in transcript language", "detected_language": "English (or Arabic, etc.)", "summary": "Concise overall summary (3-5 sentences)", "segments": [ { "title": "Segment title", "summary": "What this section covers (2-3 sentences)", "key_insight": "Most important point from this section", "why_it_matters": "Why this is valuable (1-2 sentences)" } ], "conclusion": "Final overall takeaway / closing conclusion", "topics": ["Topic1", "Topic2", "Topic3"], "suggested_category": "Programming" } OUTPUT: Return ONLY a valid JSON object. No markdown fences, no extra text. """.strip() _REDUCE_USER = """ Video Title: {video_title} The following are intermediate summaries extracted from {total_chunks} consecutive chunks of the video transcript. Merge them into ONE cohesive final summary. {merged_summaries} Merge into 3-7 chronological segments. Return ONLY the final JSON structure. """.strip() # ───────────────────────────────────────────────────────────────────────────── # LANGUAGE LABELS (simplified) # ───────────────────────────────────────────────────────────────────────────── _LABELS = { "Arabic": { "source": "المصدر", "duration": "المدة", "summary": "الملخص العام", "timeline": "التسلسل الزمني", "insight": "أهم نقطة", "why": "لماذا يهم؟", "conclusion": "الخلاصة", }, "English": { "source": "Source", "duration": "Duration", "summary": "Overall Summary", "timeline": "Timeline", "insight": "Key Insight", "why": "Why It Matters", "conclusion": "Conclusion", }, } def _labels(language: str) -> dict: return _LABELS.get(language, _LABELS["English"]) # ───────────────────────────────────────────────────────────────────────────── # TOKEN UTILITIES # ───────────────────────────────────────────────────────────────────────────── def _estimate_tokens(text: str) -> int: """ Lightweight token estimation using a word-count heuristic. Production logs show that Groq's tokenizer produces ~2.5 tokens per whitespace-delimited word for Arabic / mixed-script transcripts. Using 2.5× as a conservative multiplier to avoid underestimation. """ word_count = len(text.split()) return int(word_count * 2.5) def _split_into_chunks(text: str, target_tokens: int = _CHUNK_TARGET_TOKENS) -> List[str]: """ Split text into chunks of approximately `target_tokens` tokens each. Splits on sentence boundaries (period + space, newline) to avoid cutting mid-sentence. Falls back to word-level splitting if no sentence boundaries are found within a chunk. """ # Split into sentences (on ". " or newline) sentences = re.split(r'(?<=[.!?])\s+|\n+', text) sentences = [s.strip() for s in sentences if s.strip()] chunks: List[str] = [] current_chunk: List[str] = [] current_tokens = 0 for sentence in sentences: sentence_tokens = _estimate_tokens(sentence) # If a single sentence exceeds the target, split by words if sentence_tokens > target_tokens: # Flush current chunk first if current_chunk: chunks.append(" ".join(current_chunk)) current_chunk = [] current_tokens = 0 words = sentence.split() word_buffer: List[str] = [] buffer_tokens = 0 for word in words: wt = _estimate_tokens(word) if buffer_tokens + wt > target_tokens and word_buffer: chunks.append(" ".join(word_buffer)) word_buffer = [word] buffer_tokens = wt else: word_buffer.append(word) buffer_tokens += wt if word_buffer: chunks.append(" ".join(word_buffer)) continue if current_tokens + sentence_tokens > target_tokens and current_chunk: chunks.append(" ".join(current_chunk)) current_chunk = [sentence] current_tokens = sentence_tokens else: current_chunk.append(sentence) current_tokens += sentence_tokens # Don't forget the last chunk if current_chunk: chunks.append(" ".join(current_chunk)) return chunks # ───────────────────────────────────────────────────────────────────────────── # NOTE GENERATOR # ───────────────────────────────────────────────────────────────────────────── class NoteGenerator: """ Generates structured study notes using Groq. Automatically selects between: - **Single-pass**: for short transcripts (< 8K tokens) - **Map-Reduce**: for long transcripts (≥ 8K tokens), splitting into chunks, summarizing each individually, then merging in a REDUCE pass. Uses a single model (llama-3.3-70b-versatile) for all phases and includes adaptive rate-limit retry (60s backoff on 413/429). """ def __init__(self): self.api_key = os.environ.get("GROQ_API_KEY", "").strip() self.client = Groq(api_key=self.api_key) if self.api_key else None self.model = _MODEL_PRIMARY self.chunk_delay = float( os.environ.get("GROQ_CHUNK_DELAY_SECONDS", "3") ) logger.info( "🚀 NoteGenerator v5.1 initialized — model: %s, delay: %.1fs", self.model, self.chunk_delay, ) # ── Low-level API call ────────────────────────────────────────────── def _chat( self, system: str, user: str, max_tokens: int = 4096, ) -> Optional[str]: """Send a chat completion request to Groq.""" try: response = self.client.chat.completions.create( model=self.model, max_tokens=max_tokens, temperature=0.3, response_format={"type": "json_object"}, messages=[ {"role": "system", "content": system}, {"role": "user", "content": user}, ], ) return response.choices[0].message.content except Exception as e: logger.error("❌ Groq API call failed (model=%s): %s", self.model, e) return None # ── Error fallback ────────────────────────────────────────────────── def _get_error_json(self, error_msg: str) -> Dict: return { "title": "Error in Generation", "detected_language": "English", "summary": f"Could not generate notes: {error_msg}", "segments": [], "conclusion": "", "topics": [], "suggested_category": "", } # ── Single-pass summarization (short transcripts) ─────────────────── def _single_pass(self, transcript_text: str, video_title: str) -> Dict: """Process the entire transcript in one API call.""" logger.info("📝 Single-pass summarization via %s", self.model) user_prompt = _SUMMARY_USER.format( video_title=video_title, transcript=transcript_text, ) raw = self._chat(_SUMMARY_SYSTEM, user_prompt, max_tokens=4096) if raw is None: return self._get_error_json("Groq API call failed (single-pass).") return self._parse_and_validate(raw) # ── Map-Reduce summarization (long transcripts) ───────────────────── def _map_reduce(self, transcript_text: str, video_title: str) -> Dict: """ Split transcript into chunks, summarize each (MAP), then merge (REDUCE). """ chunks = _split_into_chunks(transcript_text) total = len(chunks) logger.info( "🗺️ Map-Reduce activated: %d chunks (delay=%.1fs between calls)", total, self.chunk_delay, ) # ── MAP PHASE ─────────────────────────────────────────────────── intermediate_results: List[Dict] = [] for i, chunk in enumerate(chunks, start=1): chunk_tokens = _estimate_tokens(chunk) logger.info( " 📦 MAP chunk %d/%d (~%d est. tokens)...", i, total, chunk_tokens, ) user_prompt = _MAP_USER.format( video_title=video_title, chunk_index=i, total_chunks=total, chunk_text=chunk, ) # Retry loop with adaptive backoff on rate-limit errors raw = None for attempt in range(1, _RATE_LIMIT_MAX_RETRIES + 1): raw = self._chat( _MAP_SYSTEM, user_prompt, max_tokens=2048, ) if raw is not None: break # success # _chat() returns None on any exception. Check if it was a # rate-limit error (413 / 429) by inspecting the last # exception. We re-try with a 60s sleep. logger.warning( " ⚠️ MAP chunk %d/%d attempt %d/%d failed. " "Sleeping %ds for TPM window reset...", i, total, attempt, _RATE_LIMIT_MAX_RETRIES, _RATE_LIMIT_SLEEP_SECONDS, ) time.sleep(_RATE_LIMIT_SLEEP_SECONDS) if raw: try: parsed = json.loads(raw) intermediate_results.append(parsed) logger.info(" ✅ MAP chunk %d/%d done.", i, total) except json.JSONDecodeError as e: logger.warning( " ⚠️ MAP chunk %d/%d returned invalid JSON: %s", i, total, e, ) else: logger.error( " ❌ MAP chunk %d/%d failed after %d retries. Skipping.", i, total, _RATE_LIMIT_MAX_RETRIES, ) # Respect TPM limits — delay between consecutive API calls if i < total and self.chunk_delay > 0: logger.info(" ⏳ Sleeping %.1fs (TPM cooldown)...", self.chunk_delay) time.sleep(self.chunk_delay) if not intermediate_results: return self._get_error_json( "Map-Reduce failed: no chunks were successfully summarized." ) # ── REDUCE PHASE ──────────────────────────────────────────────── logger.info("🔗 REDUCE phase: merging %d intermediate summaries...", len(intermediate_results)) # Build a readable merged text for the reduce prompt merged_parts: List[str] = [] all_topics: List[str] = [] detected_lang = "English" for idx, result in enumerate(intermediate_results, start=1): detected_lang = result.get("detected_language", detected_lang) chunk_summary = result.get("chunk_summary", "") key_points = result.get("key_points", []) topics = result.get("topics", []) all_topics.extend(topics) part = f"--- Chunk {idx} ---\n" part += f"Summary: {chunk_summary}\n" for kp in key_points: if isinstance(kp, dict): part += f"- {kp.get('title', '')}: {kp.get('detail', '')} " part += f"(Insight: {kp.get('insight', '')})\n" part += f"Topics: {', '.join(topics)}\n" merged_parts.append(part) merged_text = "\n".join(merged_parts) # Check if the merged text itself is within single-pass limits reduce_tokens = _estimate_tokens(merged_text) logger.info("🔗 REDUCE input: ~%d tokens", reduce_tokens) user_prompt = _REDUCE_USER.format( video_title=video_title, total_chunks=len(intermediate_results), merged_summaries=merged_text, ) # Sleep before REDUCE to ensure TPM cooldown from last MAP call if self.chunk_delay > 0: logger.info(" ⏳ Sleeping %.1fs before REDUCE call...", self.chunk_delay) time.sleep(self.chunk_delay) # REDUCE with retry on rate-limit raw = None for attempt in range(1, _RATE_LIMIT_MAX_RETRIES + 1): raw = self._chat(_REDUCE_SYSTEM, user_prompt, max_tokens=4096) if raw is not None: break logger.warning( " ⚠️ REDUCE attempt %d/%d failed. Sleeping %ds...", attempt, _RATE_LIMIT_MAX_RETRIES, _RATE_LIMIT_SLEEP_SECONDS, ) time.sleep(_RATE_LIMIT_SLEEP_SECONDS) if raw is None: return self._get_error_json("Groq API call failed (REDUCE phase after retries).") return self._parse_and_validate(raw) # ── JSON parsing + schema validation ──────────────────────────────── def _parse_and_validate(self, raw_json: str) -> Dict: """Parse raw JSON string and validate against SummarySchema.""" try: data = json.loads(raw_json) validated = SummarySchema(**data) return validated.model_dump() except (json.JSONDecodeError, ValidationError) as e: logger.error("❌ Schema validation failed: %s", e) return self._get_error_json(f"Validation Error: {str(e)}") # ── Public API (unchanged signature) ──────────────────────────────── def generateSummary(self, transcript_text: str, video_title: str) -> Dict: """ Generate structured JSON summary from transcript. Automatically selects single-pass or Map-Reduce based on estimated token count. The return type is always a Dict matching SummarySchema. """ if not self.client: return self._get_error_json("Groq API Key missing.") # Estimate total tokens for the full prompt full_prompt = _SUMMARY_USER.format( video_title=video_title, transcript=transcript_text, ) total_tokens = _estimate_tokens(_SUMMARY_SYSTEM + full_prompt) logger.info( "📊 Token estimate: ~%d tokens (threshold: %d)", total_tokens, _SINGLE_PASS_TOKEN_LIMIT, ) if total_tokens < _SINGLE_PASS_TOKEN_LIMIT: return self._single_pass(transcript_text, video_title) else: logger.info( "⚡ Transcript too large for single-pass (%d ≥ %d). " "Activating Map-Reduce pipeline...", total_tokens, _SINGLE_PASS_TOKEN_LIMIT, ) return self._map_reduce(transcript_text, video_title) # ── Markdown formatting (unchanged) ───────────────────────────────── def format_notes_to_markdown(self, json_notes: Dict) -> str: """Convert JSON notes to clean Markdown — Summary → Timeline → Conclusion.""" lang = json_notes.get("detected_language", "English") L = _labels(lang) lines: list[str] = [] def add(text: str = ""): lines.append(text) def blank(): lines.append("") def divider(): lines.append("") lines.append("---") lines.append("") # ── OVERALL SUMMARY ── summary = json_notes.get("summary", "") if summary: add(f"## 📋 {L['summary']}") blank() add(summary) divider() # ── TIMELINE ── segments = json_notes.get("segments", []) if segments: add(f"## 🕐 {L['timeline']}") blank() for i, seg in enumerate(segments, start=1): s_title = seg.get("title", "") if isinstance(seg, dict) else seg.title s_summary = seg.get("summary", "") if isinstance(seg, dict) else seg.summary s_insight = seg.get("key_insight", "") if isinstance(seg, dict) else seg.key_insight s_why = seg.get("why_it_matters", "") if isinstance(seg, dict) else seg.why_it_matters add(f"### {i}. {s_title}") blank() add(s_summary) blank() if s_insight: add(f"> **💎 {L['insight']}:** {s_insight}") blank() if s_why: add(f"> **{L['why']}** {s_why}") blank() divider() # ── CONCLUSION ── conclusion = json_notes.get("conclusion", "") if conclusion: add(f"## 🔖 {L['conclusion']}") blank() add(f"> {conclusion}") blank() return "\n".join(lines) def format_final_notes( self, notes: str, video_title: str, video_url: str, duration: int, detected_language: str = "English", ) -> str: """ Wrap the formatted Markdown body with Source + Duration header. """ L = _labels(detected_language) if duration and duration > 0: hours = int(duration // 3600) minutes = int((duration % 3600) // 60) secs = int(duration % 60) if hours > 0: duration_str = f"{hours}:{minutes:02d}:{secs:02d}" else: duration_str = f"{minutes:02d}:{secs:02d}" else: duration_str = "N/A (Auto-generated)" header = ( f"# {video_title}\n\n" f"---\n\n" f"> **{L['source']}:** {video_url} \n" f"> **{L['duration']}:** {duration_str}\n\n" f"---\n\n" ) return header + notes