Raminnit's picture
Upload 2 files
5851ef6 verified
"""
The Lecture Whisperer β€” app.py
A Hugging Face Space that transcribes lectures, extracts slide content,
syncs them together, and generates mock quizzes.
Uses the HF Inference API to keep Space RAM low.
"""
import os
import re
import json
import time
import tempfile
import textwrap
from pathlib import Path
from typing import Optional
import gradio as gr
import requests
from pdf2image import convert_from_path
from PIL import Image
import base64
from io import BytesIO
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
HF_TOKEN = os.environ.get("HF_TOKEN", "") # Set as a Space secret
HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"}
# Inference API endpoints
WHISPER_API = "https://api-inference.huggingface.co/models/openai/whisper-large-v3"
QWEN_API = "https://api-inference.huggingface.co/models/Qwen/Qwen2-VL-7B-Instruct"
LLAMA_API = "https://api-inference.huggingface.co/models/meta-llama/Meta-Llama-3-8B-Instruct"
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def pil_to_base64(img: Image.Image, fmt: str = "PNG") -> str:
buf = BytesIO()
img.save(buf, format=fmt)
return base64.b64encode(buf.getvalue()).decode()
def hf_post(url: str, payload: dict, retries: int = 3, wait: int = 20) -> dict:
"""POST to a HF Inference endpoint with simple retry / loading handling."""
for attempt in range(retries):
resp = requests.post(url, headers=HEADERS, json=payload, timeout=120)
if resp.status_code == 503:
data = resp.json()
if "estimated_time" in data:
secs = min(int(data["estimated_time"]) + 5, 60)
else:
secs = wait
time.sleep(secs)
continue
resp.raise_for_status()
return resp.json()
raise RuntimeError(f"HF API unavailable after {retries} retries: {url}")
# ---------------------------------------------------------------------------
# 1. Audio β†’ Transcription with timestamps
# ---------------------------------------------------------------------------
def transcribe_audio(audio_path: str) -> list[dict]:
"""
Returns a list of segments:
[{"start": float, "end": float, "text": str}, ...]
"""
if not audio_path:
return []
with open(audio_path, "rb") as f:
audio_bytes = f.read()
# Whisper via Inference API (binary upload)
resp = requests.post(
WHISPER_API,
headers={**HEADERS, "Content-Type": "audio/mpeg"},
data=audio_bytes,
params={"return_timestamps": "true"},
timeout=300,
)
resp.raise_for_status()
result = resp.json()
# Normalise output shape
segments = []
if "chunks" in result:
for chunk in result["chunks"]:
ts = chunk.get("timestamp", [0, 0]) or [0, 0]
segments.append({
"start": ts[0] if ts[0] is not None else 0,
"end": ts[1] if ts[1] is not None else 0,
"text": chunk.get("text", "").strip(),
})
elif "text" in result:
# Fallback: single block, no timestamps
segments.append({"start": 0.0, "end": 0.0, "text": result["text"].strip()})
return segments
def format_transcript(segments: list[dict]) -> str:
lines = []
for seg in segments:
ts = f"[{int(seg['start'])//60:02d}:{int(seg['start'])%60:02d}]"
lines.append(f"{ts} {seg['text']}")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# 2. PDF β†’ slide images β†’ text extraction
# ---------------------------------------------------------------------------
def pdf_to_images(pdf_path: str) -> list[Image.Image]:
"""Convert each PDF page to a PIL Image."""
if not pdf_path:
return []
images = convert_from_path(pdf_path, dpi=150)
return images
def extract_slide_text(img: Image.Image, slide_num: int) -> str:
"""Ask Qwen2-VL to extract key text/concepts from a slide image."""
b64 = pil_to_base64(img)
payload = {
"inputs": {
"image": b64,
"question": (
"You are an academic assistant. "
"Extract ALL key text, equations, definitions, bullet points, "
"and concepts visible on this lecture slide. "
"Be thorough and concise. Output plain text only."
),
}
}
try:
result = hf_post(QWEN_API, payload)
if isinstance(result, list):
return result[0].get("generated_text", "").strip()
if isinstance(result, dict):
return result.get("generated_text", "").strip()
return str(result)
except Exception as e:
return f"[Slide {slide_num}: extraction error β€” {e}]"
# ---------------------------------------------------------------------------
# 3. Sync Logic
# ---------------------------------------------------------------------------
def build_keyword_index(slide_texts: list[str]) -> list[set]:
"""Build a simple keyword set per slide (lowercase words, len > 3)."""
stop = {
"this", "that", "with", "from", "have", "will", "been",
"they", "their", "there", "were", "what", "when", "which",
"also", "into", "over", "more", "some", "such", "than",
"then", "these", "those", "after", "about", "would",
}
index = []
for text in slide_texts:
words = re.findall(r"[a-z]{4,}", text.lower())
index.append({w for w in words if w not in stop})
return index
def sync_transcript_to_slides(
segments: list[dict],
slide_texts: list[str],
) -> list[dict]:
"""
For each transcript segment, find the most likely slide.
Returns enriched segments with a `slide_idx` key (0-based, or -1 if no match).
"""
if not slide_texts or not segments:
return segments
slide_kw_index = build_keyword_index(slide_texts)
enriched = []
for seg in segments:
seg_words = set(re.findall(r"[a-z]{4,}", seg["text"].lower()))
best_slide, best_score = -1, 0
for idx, kw_set in enumerate(slide_kw_index):
score = len(seg_words & kw_set)
if score > best_score:
best_score, best_slide = score, idx
seg = dict(seg)
seg["slide_idx"] = best_slide if best_score > 0 else -1
seg["match_score"] = best_score
enriched.append(seg)
return enriched
def format_sync_report(synced: list[dict]) -> str:
lines = ["## Transcript ↔ Slide Sync Report\n"]
for seg in synced:
ts = f"[{int(seg['start'])//60:02d}:{int(seg['start'])%60:02d}]"
slide_ref = (
f"β†’ **Slide {seg['slide_idx']+1}** (score: {seg['match_score']})"
if seg["slide_idx"] >= 0
else "β†’ no slide match"
)
lines.append(f"`{ts}` {seg['text']}\n {slide_ref}\n")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# 4. Mock Quiz Generation
# ---------------------------------------------------------------------------
def generate_quiz(transcript_text: str) -> str:
"""Ask Llama-3-8B to write 5-10 MCQs based strictly on the transcript."""
if not transcript_text.strip():
return "No transcript available. Please process audio first."
# Truncate to avoid token overflow
clipped = transcript_text[:6000]
prompt = textwrap.dedent(f"""
<|begin_of_text|>
<|start_header_id|>system<|end_header_id|>
You are an expert educator. Generate a multiple-choice quiz based ONLY on
the lecture transcript provided. Do not add external knowledge.
Format each question as:
Q1. <question>
A) <option> B) <option> C) <option> D) <option>
Answer: <letter>
<|eot_id|>
<|start_header_id|>user<|end_header_id|>
Lecture transcript:
{clipped}
Create 7 multiple-choice questions with 4 options each. Include the correct answer.
<|eot_id|>
<|start_header_id|>assistant<|end_header_id|>
""").strip()
payload = {
"inputs": prompt,
"parameters": {
"max_new_tokens": 1200,
"temperature": 0.4,
"do_sample": True,
},
}
result = hf_post(LLAMA_API, payload)
if isinstance(result, list):
return result[0].get("generated_text", "").split("<|start_header_id|>assistant<|end_header_id|>")[-1].strip()
return str(result)
# ---------------------------------------------------------------------------
# 5. Chatbot Q&A
# ---------------------------------------------------------------------------
def answer_question(
message: str,
history: list,
transcript_text: str,
slide_texts_json: str,
) -> tuple[list, list]:
"""RAG-lite: stuff relevant context into Llama prompt."""
slide_texts = json.loads(slide_texts_json) if slide_texts_json else []
context_parts = []
if transcript_text:
# Grab sentences containing keywords from the question
question_words = set(re.findall(r"[a-z]{4,}", message.lower()))
relevant_lines = [
line for line in transcript_text.split("\n")
if question_words & set(re.findall(r"[a-z]{4,}", line.lower()))
]
context_parts.append("**Transcript excerpts:**\n" + "\n".join(relevant_lines[:15]))
if slide_texts:
kw_index = build_keyword_index(slide_texts)
question_words = set(re.findall(r"[a-z]{4,}", message.lower()))
for i, (text, kw_set) in enumerate(zip(slide_texts, kw_index)):
if question_words & kw_set:
context_parts.append(f"**Slide {i+1}:**\n{text[:400]}")
if len(context_parts) >= 5:
break
context = "\n\n".join(context_parts) if context_parts else "No specific context found."
history_str = ""
for h in history[-4:]:
history_str += f"User: {h[0]}\nAssistant: {h[1]}\n"
prompt = textwrap.dedent(f"""
<|begin_of_text|>
<|start_header_id|>system<|end_header_id|>
You are a helpful teaching assistant. Answer questions about the lecture
using ONLY the provided context. If unsure, say so.
<|eot_id|>
<|start_header_id|>user<|end_header_id|>
Context:
{context}
Conversation so far:
{history_str}
Question: {message}
<|eot_id|>
<|start_header_id|>assistant<|end_header_id|>
""").strip()
payload = {
"inputs": prompt,
"parameters": {"max_new_tokens": 512, "temperature": 0.3, "do_sample": True},
}
result = hf_post(LLAMA_API, payload)
if isinstance(result, list):
answer = result[0].get("generated_text", "").split("<|start_header_id|>assistant<|end_header_id|>")[-1].strip()
else:
answer = str(result)
history.append((message, answer))
return history, history
# ---------------------------------------------------------------------------
# 6. Main Processing Pipeline
# ---------------------------------------------------------------------------
def process_inputs(audio_file, pdf_file, progress=gr.Progress()):
"""
Master pipeline called by the 'Process' button.
Returns:
transcript_text (str) β€” for display / state
sync_report (str) β€” markdown
slide_images (list[PIL]) β€” for gallery
slide_texts_json (str) β€” JSON string for state
status (str) β€” status message
"""
if audio_file is None and pdf_file is None:
return "", "Please upload at least one file.", [], "[]", "⚠️ No files uploaded."
transcript_text = ""
segments: list[dict] = []
slide_images: list[Image.Image] = []
slide_texts: list[str] = []
# ---- Transcription ----
if audio_file is not None:
progress(0.1, desc="πŸŽ™οΈ Transcribing audio (this may take a while)…")
try:
segments = transcribe_audio(audio_file)
transcript_text = format_transcript(segments)
except Exception as e:
transcript_text = f"Transcription error: {e}"
# ---- PDF β†’ Images β†’ Text ----
if pdf_file is not None:
progress(0.4, desc="πŸ“„ Converting PDF pages…")
try:
slide_images = pdf_to_images(pdf_file)
except Exception as e:
return transcript_text, f"PDF error: {e}", [], "[]", f"❌ PDF error: {e}"
for i, img in enumerate(slide_images):
progress(0.5 + 0.4 * (i / max(len(slide_images), 1)),
desc=f"πŸ” Analysing slide {i+1}/{len(slide_images)}…")
slide_texts.append(extract_slide_text(img, i + 1))
# ---- Sync ----
progress(0.92, desc="πŸ”— Syncing transcript to slides…")
sync_report = ""
if segments and slide_texts:
synced = sync_transcript_to_slides(segments, slide_texts)
sync_report = format_sync_report(synced)
elif segments:
sync_report = "No slides uploaded β€” sync skipped.\n\n" + format_transcript(segments)
elif slide_texts:
sync_report = "No audio uploaded β€” showing extracted slide texts.\n\n"
for i, t in enumerate(slide_texts):
sync_report += f"**Slide {i+1}:**\n{t}\n\n"
slide_texts_json = json.dumps(slide_texts)
status = "βœ… Processing complete!"
progress(1.0, desc=status)
return transcript_text, sync_report, slide_images, slide_texts_json, status
# ---------------------------------------------------------------------------
# 7. Gradio UI
# ---------------------------------------------------------------------------
THEME = gr.themes.Soft(
primary_hue="violet",
secondary_hue="indigo",
neutral_hue="slate",
font=[gr.themes.GoogleFont("DM Sans"), "ui-sans-serif", "sans-serif"],
font_mono=[gr.themes.GoogleFont("JetBrains Mono"), "monospace"],
).set(
button_primary_background_fill="*primary_500",
button_primary_background_fill_hover="*primary_600",
)
CSS = """
#header {
text-align: center;
padding: 1.5rem 0 1rem;
background: linear-gradient(135deg, #4f46e5 0%, #7c3aed 50%, #a21caf 100%);
border-radius: 12px;
margin-bottom: 1rem;
color: white;
}
#header h1 { font-size: 2.4rem; margin: 0; font-weight: 800; letter-spacing: -1px; }
#header p { margin: 0.25rem 0 0; opacity: 0.85; font-size: 1rem; }
.status-box { font-weight: 600; border-left: 4px solid #7c3aed; padding-left: 0.75rem; }
.quiz-output { font-family: 'JetBrains Mono', monospace; font-size: 0.85rem; }
"""
def build_ui():
with gr.Blocks(title="The Lecture Whisperer") as demo:
# ── Shared state ────────────────────────────────────────────────────
state_transcript = gr.State("")
state_slide_texts = gr.State("[]")
# ── Header ──────────────────────────────────────────────────────────
gr.HTML("""
<div id="header">
<h1>πŸŽ“ The Lecture Whisperer</h1>
<p>Transcribe Β· Analyse Β· Sync Β· Quiz β€” your lectures, supercharged by AI</p>
</div>
""")
# ── Tabs ─────────────────────────────────────────────────────────────
with gr.Tabs():
# ================================================================
# TAB 1 β€” Upload & Process
# ================================================================
with gr.TabItem("πŸ“€ Upload & Process"):
gr.Markdown(
"Upload your lecture audio and/or slide PDF. "
"Hit **Process** to run the full pipeline."
)
with gr.Row():
with gr.Column(scale=1):
audio_input = gr.Audio(
label="πŸŽ™οΈ Lecture Audio (MP3 / WAV)",
type="filepath",
)
pdf_input = gr.File(
label="πŸ“‘ Lecture Slides (PDF)",
file_types=[".pdf"],
type="filepath",
)
process_btn = gr.Button(
"⚑ Process Lecture", variant="primary", size="lg"
)
status_output = gr.Textbox(
label="Status",
interactive=False,
elem_classes=["status-box"],
)
with gr.Column(scale=2):
sync_output = gr.Markdown(
label="Sync Report",
value="*Sync report will appear here after processing…*",
)
process_btn.click(
fn=process_inputs,
inputs=[audio_input, pdf_input],
outputs=[
state_transcript,
sync_output,
gr.State(), # slide_images placeholder β€” see below
state_slide_texts,
status_output,
],
)
# ================================================================
# TAB 2 β€” Dashboard (Chatbot + Gallery)
# ================================================================
with gr.TabItem("πŸ“Š Dashboard"):
with gr.Row():
# ── Left: Chatbot ────────────────────────────────────────
with gr.Column(scale=1):
gr.Markdown("### πŸ’¬ Ask About the Lecture")
chatbot = gr.Chatbot(
label="Lecture Assistant",
height=480,
)
with gr.Row():
chat_input = gr.Textbox(
placeholder="e.g. What did the professor say about Newton's laws?",
label="Your question",
scale=4,
)
chat_btn = gr.Button("Send", variant="primary", scale=1)
chat_state = gr.State([])
def _chat(msg, history, transcript, slide_texts_json):
return answer_question(msg, history, transcript, slide_texts_json)
chat_btn.click(
fn=_chat,
inputs=[chat_input, chat_state, state_transcript, state_slide_texts],
outputs=[chatbot, chat_state],
)
chat_input.submit(
fn=_chat,
inputs=[chat_input, chat_state, state_transcript, state_slide_texts],
outputs=[chatbot, chat_state],
)
# ── Right: Slide Gallery ─────────────────────────────────
with gr.Column(scale=1):
gr.Markdown("### πŸ–ΌοΈ Slide Gallery")
slide_gallery = gr.Gallery(
label="Lecture Slides",
columns=2,
height=520,
object_fit="contain",
)
refresh_gallery_btn = gr.Button(
"πŸ”„ Refresh Gallery", variant="secondary"
)
# Wire up gallery refresh
# We need to store images in state from the process step
state_images = gr.State([])
def _refresh_gallery(imgs):
return imgs
refresh_gallery_btn.click(
fn=_refresh_gallery,
inputs=[state_images],
outputs=[slide_gallery],
)
# Patch process_btn outputs to also update gallery state
# We redo the click with the full output list here:
process_btn.click(
fn=process_inputs,
inputs=[audio_input, pdf_input],
outputs=[
state_transcript,
sync_output,
state_images,
state_slide_texts,
status_output,
],
)
# ================================================================
# TAB 3 β€” Mock Quiz
# ================================================================
with gr.TabItem("πŸ“ Mock Quiz"):
gr.Markdown(
"Generate a multiple-choice quiz based strictly on the "
"lecture transcript. Requires audio to be processed first."
)
quiz_btn = gr.Button(
"🧠 Generate Mock Quiz", variant="primary", size="lg"
)
quiz_output = gr.Markdown(
value="*Quiz will appear here…*",
elem_classes=["quiz-output"],
)
def _gen_quiz(transcript):
if not transcript.strip():
return "⚠️ No transcript found. Please upload and process audio first."
return generate_quiz(transcript)
quiz_btn.click(
fn=_gen_quiz,
inputs=[state_transcript],
outputs=[quiz_output],
)
return demo
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
app = build_ui()
app.launch(share=False, theme=THEME, css=CSS)