AIdea-Server / src /summarization /note_generator.py
Ali Hashhash
feat: implement structured summarization schemas and notes generation API endpoints
74fb373
import json
import os
import re
import time
from typing import Dict, List, Optional
from groq import Groq
from pydantic import ValidationError
from ..utils.logger import setup_logger
from .schemas import SummarySchema
logger = setup_logger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# CONFIGURATION
# ─────────────────────────────────────────────────────────────────────────────
# Token threshold: below this, a single API call is used.
_SINGLE_PASS_TOKEN_LIMIT = 8_000
# Target chunk size for MAP phase (tokens). Kept small so that
# prompt + chunk + response stays well under the 12K TPM free-tier limit.
_CHUNK_TARGET_TOKENS = 2_500
# Model — unified for both MAP and REDUCE phases.
# llama-3.3-70b-versatile has 12K TPM on the free tier (the highest).
_MODEL_PRIMARY = "llama-3.3-70b-versatile"
# Maximum retries when a rate-limit (413 / 429) is hit.
_RATE_LIMIT_MAX_RETRIES = 3
_RATE_LIMIT_SLEEP_SECONDS = 60
# ─────────────────────────────────────────────────────────────────────────────
# PROMPT TEMPLATES — SINGLE-PASS (unchanged)
# ─────────────────────────────────────────────────────────────────────────────
_SUMMARY_SYSTEM = """
You are an expert educational content analyst and structured note-taking specialist.
Transform raw video transcripts into clean, structured chronological JSON summaries.
LANGUAGE RULE — CRITICAL, NEVER VIOLATE:
- Detect the primary language of the transcript.
- Every content field (title, summary, segments, conclusion) MUST be written entirely in that SAME detected language.
- Do NOT mix languages. Arabic transcript -> everything in Arabic.
- Only the "detected_language" and "suggested_category" fields are stated in English.
TIMELINE RULES — STRICTLY ENFORCED:
- Divide the transcript into chronological segments that follow its natural progression.
- Produce a MINIMUM of 3 and a MAXIMUM of 7 segments.
- Each segment MUST cover a distinct phase or theme; do NOT repeat the same topic.
- Segments must be ordered chronologically as they appear in the transcript.
- Each segment must include:
* title: a short descriptive title
* summary: concise summary of that section (2-3 sentences)
* key_insight: the single most important takeaway from that section
* why_it_matters: brief explanation of value/importance (1-2 sentences)
TOPICS RULE:
- Extract the actual topics discussed in the video dynamically.
- Topics should be specific and descriptive (e.g. "Python", "Machine Learning", "Neural Networks").
- Do NOT use generic fixed categories.
CATEGORY RULE:
- Provide a single, concise category label (1-2 words max) in English.
- This should be the most accurate high-level category for the video content.
- Examples: "Programming", "Finance", "History", "Psychology", "Mathematics", "Cooking".
- The suggested_category MUST always be in English regardless of the transcript language.
CRITICAL: RETURN A JSON OBJECT EXACTLY MATCHING THIS STRUCTURE.
DO NOT CHANGE, OMIT, OR RENAME ANY KEYS.
{
"title": "Inferred video title in transcript language",
"detected_language": "English (or Arabic, etc.)",
"summary": "Concise overall summary (3-5 sentences)",
"segments": [
{
"title": "Segment title",
"summary": "What this section covers (2-3 sentences)",
"key_insight": "Most important point from this section",
"why_it_matters": "Why this is valuable (1-2 sentences)"
}
],
"conclusion": "Final overall takeaway / closing conclusion",
"topics": ["Topic1", "Topic2", "Topic3"],
"suggested_category": "Programming"
}
OUTPUT: Return ONLY a valid JSON object. No markdown fences, no extra text.
""".strip()
_SUMMARY_USER = """
Video Title: {video_title}
TRANSCRIPT:
{transcript}
Analyze thoroughly. Detect the language.
Divide the content into 3-7 chronological segments.
For each segment provide: title, summary, key_insight, why_it_matters.
Return ONLY the exact JSON structure requested.
""".strip()
# ─────────────────────────────────────────────────────────────────────────────
# PROMPT TEMPLATES — MAP PHASE
# ─────────────────────────────────────────────────────────────────────────────
_MAP_SYSTEM = """
You are an expert educational content analyst.
You will receive ONE CHUNK of a longer video transcript.
Extract the key information from this chunk ONLY.
LANGUAGE RULE — CRITICAL:
- Detect the primary language of the text.
- Write ALL content fields in that SAME detected language.
- Only "detected_language" is stated in English.
Return a JSON object with this EXACT structure:
{
"detected_language": "English (or Arabic, etc.)",
"chunk_summary": "Concise summary of this chunk (3-5 sentences)",
"key_points": [
{
"title": "Short title for this point",
"detail": "1-2 sentence explanation",
"insight": "Key takeaway"
}
],
"topics": ["Topic1", "Topic2"]
}
RULES:
- Extract 2-4 key points from this chunk.
- Topics should be specific (e.g. "Python", "Neural Networks"), not generic.
- OUTPUT: Return ONLY a valid JSON object. No markdown fences, no extra text.
""".strip()
_MAP_USER = """
Video Title: {video_title}
Chunk {chunk_index} of {total_chunks}:
{chunk_text}
Extract the key information from this chunk. Return ONLY the JSON.
""".strip()
# ─────────────────────────────────────────────────────────────────────────────
# PROMPT TEMPLATES — REDUCE PHASE
# ─────────────────────────────────────────────────────────────────────────────
_REDUCE_SYSTEM = """
You are an expert educational content analyst and structured note-taking specialist.
You will receive INTERMEDIATE SUMMARIES from multiple chunks of a single video transcript.
Your job is to MERGE them into ONE final, cohesive, structured summary.
LANGUAGE RULE — CRITICAL, NEVER VIOLATE:
- Use the detected language from the intermediate summaries.
- Every content field MUST be in that SAME language.
- Only "detected_language" and "suggested_category" are stated in English.
TIMELINE RULES — STRICTLY ENFORCED:
- Merge the chunk summaries into 3-7 chronological segments.
- Each segment MUST cover a distinct phase or theme; do NOT repeat topics.
- Segments must follow the natural progression of the video.
- Each segment must include: title, summary, key_insight, why_it_matters.
CATEGORY RULE:
- Provide a single, concise category label (1-2 words max) in English.
- This should be the most accurate high-level category for the video content.
- Examples: "Programming", "Finance", "History", "Psychology", "Mathematics", "Cooking".
- The suggested_category MUST always be in English regardless of the transcript language.
CRITICAL: RETURN A JSON OBJECT EXACTLY MATCHING THIS STRUCTURE.
{
"title": "Inferred video title in transcript language",
"detected_language": "English (or Arabic, etc.)",
"summary": "Concise overall summary (3-5 sentences)",
"segments": [
{
"title": "Segment title",
"summary": "What this section covers (2-3 sentences)",
"key_insight": "Most important point from this section",
"why_it_matters": "Why this is valuable (1-2 sentences)"
}
],
"conclusion": "Final overall takeaway / closing conclusion",
"topics": ["Topic1", "Topic2", "Topic3"],
"suggested_category": "Programming"
}
OUTPUT: Return ONLY a valid JSON object. No markdown fences, no extra text.
""".strip()
_REDUCE_USER = """
Video Title: {video_title}
The following are intermediate summaries extracted from {total_chunks} consecutive chunks
of the video transcript. Merge them into ONE cohesive final summary.
{merged_summaries}
Merge into 3-7 chronological segments. Return ONLY the final JSON structure.
""".strip()
# ─────────────────────────────────────────────────────────────────────────────
# LANGUAGE LABELS (simplified)
# ─────────────────────────────────────────────────────────────────────────────
_LABELS = {
"Arabic": {
"source": "المصدر",
"duration": "المدة",
"summary": "الملخص العام",
"timeline": "التسلسل الزمني",
"insight": "أهم نقطة",
"why": "لماذا يهم؟",
"conclusion": "الخلاصة",
},
"English": {
"source": "Source",
"duration": "Duration",
"summary": "Overall Summary",
"timeline": "Timeline",
"insight": "Key Insight",
"why": "Why It Matters",
"conclusion": "Conclusion",
},
}
def _labels(language: str) -> dict:
return _LABELS.get(language, _LABELS["English"])
# ─────────────────────────────────────────────────────────────────────────────
# TOKEN UTILITIES
# ─────────────────────────────────────────────────────────────────────────────
def _estimate_tokens(text: str) -> int:
"""
Lightweight token estimation using a word-count heuristic.
Production logs show that Groq's tokenizer produces ~2.5 tokens per
whitespace-delimited word for Arabic / mixed-script transcripts.
Using 2.5× as a conservative multiplier to avoid underestimation.
"""
word_count = len(text.split())
return int(word_count * 2.5)
def _split_into_chunks(text: str, target_tokens: int = _CHUNK_TARGET_TOKENS) -> List[str]:
"""
Split text into chunks of approximately `target_tokens` tokens each.
Splits on sentence boundaries (period + space, newline) to avoid
cutting mid-sentence. Falls back to word-level splitting if no
sentence boundaries are found within a chunk.
"""
# Split into sentences (on ". " or newline)
sentences = re.split(r'(?<=[.!?])\s+|\n+', text)
sentences = [s.strip() for s in sentences if s.strip()]
chunks: List[str] = []
current_chunk: List[str] = []
current_tokens = 0
for sentence in sentences:
sentence_tokens = _estimate_tokens(sentence)
# If a single sentence exceeds the target, split by words
if sentence_tokens > target_tokens:
# Flush current chunk first
if current_chunk:
chunks.append(" ".join(current_chunk))
current_chunk = []
current_tokens = 0
words = sentence.split()
word_buffer: List[str] = []
buffer_tokens = 0
for word in words:
wt = _estimate_tokens(word)
if buffer_tokens + wt > target_tokens and word_buffer:
chunks.append(" ".join(word_buffer))
word_buffer = [word]
buffer_tokens = wt
else:
word_buffer.append(word)
buffer_tokens += wt
if word_buffer:
chunks.append(" ".join(word_buffer))
continue
if current_tokens + sentence_tokens > target_tokens and current_chunk:
chunks.append(" ".join(current_chunk))
current_chunk = [sentence]
current_tokens = sentence_tokens
else:
current_chunk.append(sentence)
current_tokens += sentence_tokens
# Don't forget the last chunk
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
# ─────────────────────────────────────────────────────────────────────────────
# NOTE GENERATOR
# ─────────────────────────────────────────────────────────────────────────────
class NoteGenerator:
"""
Generates structured study notes using Groq.
Automatically selects between:
- **Single-pass**: for short transcripts (< 8K tokens)
- **Map-Reduce**: for long transcripts (≥ 8K tokens), splitting into
chunks, summarizing each individually, then merging in a REDUCE pass.
Uses a single model (llama-3.3-70b-versatile) for all phases and
includes adaptive rate-limit retry (60s backoff on 413/429).
"""
def __init__(self):
self.api_key = os.environ.get("GROQ_API_KEY", "").strip()
self.client = Groq(api_key=self.api_key) if self.api_key else None
self.model = _MODEL_PRIMARY
self.chunk_delay = float(
os.environ.get("GROQ_CHUNK_DELAY_SECONDS", "3")
)
logger.info(
"🚀 NoteGenerator v5.1 initialized — model: %s, delay: %.1fs",
self.model, self.chunk_delay,
)
# ── Low-level API call ──────────────────────────────────────────────
def _chat(
self,
system: str,
user: str,
max_tokens: int = 4096,
) -> Optional[str]:
"""Send a chat completion request to Groq."""
try:
response = self.client.chat.completions.create(
model=self.model,
max_tokens=max_tokens,
temperature=0.3,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user},
],
)
return response.choices[0].message.content
except Exception as e:
logger.error("❌ Groq API call failed (model=%s): %s", self.model, e)
return None
# ── Error fallback ──────────────────────────────────────────────────
def _get_error_json(self, error_msg: str) -> Dict:
return {
"title": "Error in Generation",
"detected_language": "English",
"summary": f"Could not generate notes: {error_msg}",
"segments": [],
"conclusion": "",
"topics": [],
"suggested_category": "",
}
# ── Single-pass summarization (short transcripts) ───────────────────
def _single_pass(self, transcript_text: str, video_title: str) -> Dict:
"""Process the entire transcript in one API call."""
logger.info("📝 Single-pass summarization via %s", self.model)
user_prompt = _SUMMARY_USER.format(
video_title=video_title,
transcript=transcript_text,
)
raw = self._chat(_SUMMARY_SYSTEM, user_prompt, max_tokens=4096)
if raw is None:
return self._get_error_json("Groq API call failed (single-pass).")
return self._parse_and_validate(raw)
# ── Map-Reduce summarization (long transcripts) ─────────────────────
def _map_reduce(self, transcript_text: str, video_title: str) -> Dict:
"""
Split transcript into chunks, summarize each (MAP), then merge (REDUCE).
"""
chunks = _split_into_chunks(transcript_text)
total = len(chunks)
logger.info(
"🗺️ Map-Reduce activated: %d chunks (delay=%.1fs between calls)",
total, self.chunk_delay,
)
# ── MAP PHASE ───────────────────────────────────────────────────
intermediate_results: List[Dict] = []
for i, chunk in enumerate(chunks, start=1):
chunk_tokens = _estimate_tokens(chunk)
logger.info(
" 📦 MAP chunk %d/%d (~%d est. tokens)...", i, total, chunk_tokens,
)
user_prompt = _MAP_USER.format(
video_title=video_title,
chunk_index=i,
total_chunks=total,
chunk_text=chunk,
)
# Retry loop with adaptive backoff on rate-limit errors
raw = None
for attempt in range(1, _RATE_LIMIT_MAX_RETRIES + 1):
raw = self._chat(
_MAP_SYSTEM, user_prompt,
max_tokens=2048,
)
if raw is not None:
break # success
# _chat() returns None on any exception. Check if it was a
# rate-limit error (413 / 429) by inspecting the last
# exception. We re-try with a 60s sleep.
logger.warning(
" ⚠️ MAP chunk %d/%d attempt %d/%d failed. "
"Sleeping %ds for TPM window reset...",
i, total, attempt, _RATE_LIMIT_MAX_RETRIES,
_RATE_LIMIT_SLEEP_SECONDS,
)
time.sleep(_RATE_LIMIT_SLEEP_SECONDS)
if raw:
try:
parsed = json.loads(raw)
intermediate_results.append(parsed)
logger.info(" ✅ MAP chunk %d/%d done.", i, total)
except json.JSONDecodeError as e:
logger.warning(
" ⚠️ MAP chunk %d/%d returned invalid JSON: %s", i, total, e,
)
else:
logger.error(
" ❌ MAP chunk %d/%d failed after %d retries. Skipping.",
i, total, _RATE_LIMIT_MAX_RETRIES,
)
# Respect TPM limits — delay between consecutive API calls
if i < total and self.chunk_delay > 0:
logger.info(" ⏳ Sleeping %.1fs (TPM cooldown)...", self.chunk_delay)
time.sleep(self.chunk_delay)
if not intermediate_results:
return self._get_error_json(
"Map-Reduce failed: no chunks were successfully summarized."
)
# ── REDUCE PHASE ────────────────────────────────────────────────
logger.info("🔗 REDUCE phase: merging %d intermediate summaries...", len(intermediate_results))
# Build a readable merged text for the reduce prompt
merged_parts: List[str] = []
all_topics: List[str] = []
detected_lang = "English"
for idx, result in enumerate(intermediate_results, start=1):
detected_lang = result.get("detected_language", detected_lang)
chunk_summary = result.get("chunk_summary", "")
key_points = result.get("key_points", [])
topics = result.get("topics", [])
all_topics.extend(topics)
part = f"--- Chunk {idx} ---\n"
part += f"Summary: {chunk_summary}\n"
for kp in key_points:
if isinstance(kp, dict):
part += f"- {kp.get('title', '')}: {kp.get('detail', '')} "
part += f"(Insight: {kp.get('insight', '')})\n"
part += f"Topics: {', '.join(topics)}\n"
merged_parts.append(part)
merged_text = "\n".join(merged_parts)
# Check if the merged text itself is within single-pass limits
reduce_tokens = _estimate_tokens(merged_text)
logger.info("🔗 REDUCE input: ~%d tokens", reduce_tokens)
user_prompt = _REDUCE_USER.format(
video_title=video_title,
total_chunks=len(intermediate_results),
merged_summaries=merged_text,
)
# Sleep before REDUCE to ensure TPM cooldown from last MAP call
if self.chunk_delay > 0:
logger.info(" ⏳ Sleeping %.1fs before REDUCE call...", self.chunk_delay)
time.sleep(self.chunk_delay)
# REDUCE with retry on rate-limit
raw = None
for attempt in range(1, _RATE_LIMIT_MAX_RETRIES + 1):
raw = self._chat(_REDUCE_SYSTEM, user_prompt, max_tokens=4096)
if raw is not None:
break
logger.warning(
" ⚠️ REDUCE attempt %d/%d failed. Sleeping %ds...",
attempt, _RATE_LIMIT_MAX_RETRIES, _RATE_LIMIT_SLEEP_SECONDS,
)
time.sleep(_RATE_LIMIT_SLEEP_SECONDS)
if raw is None:
return self._get_error_json("Groq API call failed (REDUCE phase after retries).")
return self._parse_and_validate(raw)
# ── JSON parsing + schema validation ────────────────────────────────
def _parse_and_validate(self, raw_json: str) -> Dict:
"""Parse raw JSON string and validate against SummarySchema."""
try:
data = json.loads(raw_json)
validated = SummarySchema(**data)
return validated.model_dump()
except (json.JSONDecodeError, ValidationError) as e:
logger.error("❌ Schema validation failed: %s", e)
return self._get_error_json(f"Validation Error: {str(e)}")
# ── Public API (unchanged signature) ────────────────────────────────
def generateSummary(self, transcript_text: str, video_title: str) -> Dict:
"""
Generate structured JSON summary from transcript.
Automatically selects single-pass or Map-Reduce based on estimated
token count. The return type is always a Dict matching SummarySchema.
"""
if not self.client:
return self._get_error_json("Groq API Key missing.")
# Estimate total tokens for the full prompt
full_prompt = _SUMMARY_USER.format(
video_title=video_title,
transcript=transcript_text,
)
total_tokens = _estimate_tokens(_SUMMARY_SYSTEM + full_prompt)
logger.info(
"📊 Token estimate: ~%d tokens (threshold: %d)",
total_tokens, _SINGLE_PASS_TOKEN_LIMIT,
)
if total_tokens < _SINGLE_PASS_TOKEN_LIMIT:
return self._single_pass(transcript_text, video_title)
else:
logger.info(
"⚡ Transcript too large for single-pass (%d ≥ %d). "
"Activating Map-Reduce pipeline...",
total_tokens, _SINGLE_PASS_TOKEN_LIMIT,
)
return self._map_reduce(transcript_text, video_title)
# ── Markdown formatting (unchanged) ─────────────────────────────────
def format_notes_to_markdown(self, json_notes: Dict) -> str:
"""Convert JSON notes to clean Markdown — Summary → Timeline → Conclusion."""
lang = json_notes.get("detected_language", "English")
L = _labels(lang)
lines: list[str] = []
def add(text: str = ""):
lines.append(text)
def blank():
lines.append("")
def divider():
lines.append("")
lines.append("---")
lines.append("")
# ── OVERALL SUMMARY ──
summary = json_notes.get("summary", "")
if summary:
add(f"## 📋 {L['summary']}")
blank()
add(summary)
divider()
# ── TIMELINE ──
segments = json_notes.get("segments", [])
if segments:
add(f"## 🕐 {L['timeline']}")
blank()
for i, seg in enumerate(segments, start=1):
s_title = seg.get("title", "") if isinstance(seg, dict) else seg.title
s_summary = seg.get("summary", "") if isinstance(seg, dict) else seg.summary
s_insight = seg.get("key_insight", "") if isinstance(seg, dict) else seg.key_insight
s_why = seg.get("why_it_matters", "") if isinstance(seg, dict) else seg.why_it_matters
add(f"### {i}. {s_title}")
blank()
add(s_summary)
blank()
if s_insight:
add(f"> **💎 {L['insight']}:** {s_insight}")
blank()
if s_why:
add(f"> **{L['why']}** {s_why}")
blank()
divider()
# ── CONCLUSION ──
conclusion = json_notes.get("conclusion", "")
if conclusion:
add(f"## 🔖 {L['conclusion']}")
blank()
add(f"> {conclusion}")
blank()
return "\n".join(lines)
def format_final_notes(
self,
notes: str,
video_title: str,
video_url: str,
duration: int,
detected_language: str = "English",
) -> str:
"""
Wrap the formatted Markdown body with Source + Duration header.
"""
L = _labels(detected_language)
if duration and duration > 0:
hours = int(duration // 3600)
minutes = int((duration % 3600) // 60)
secs = int(duration % 60)
if hours > 0:
duration_str = f"{hours}:{minutes:02d}:{secs:02d}"
else:
duration_str = f"{minutes:02d}:{secs:02d}"
else:
duration_str = "N/A (Auto-generated)"
header = (
f"# {video_title}\n\n"
f"---\n\n"
f"> **{L['source']}:** {video_url} \n"
f"> **{L['duration']}:** {duration_str}\n\n"
f"---\n\n"
)
return header + notes