Spaces:
Sleeping
Sleeping
| """ | |
| The Lecture Whisperer β app.py | |
| A Hugging Face Space that transcribes lectures, extracts slide content, | |
| syncs them together, and generates mock quizzes. | |
| Uses the HF Inference API to keep Space RAM low. | |
| """ | |
| import os | |
| import re | |
| import json | |
| import time | |
| import tempfile | |
| import textwrap | |
| from pathlib import Path | |
| from typing import Optional | |
| import gradio as gr | |
| import requests | |
| from pdf2image import convert_from_path | |
| from PIL import Image | |
| import base64 | |
| from io import BytesIO | |
| # --------------------------------------------------------------------------- | |
| # Configuration | |
| # --------------------------------------------------------------------------- | |
| HF_TOKEN = os.environ.get("HF_TOKEN", "") # Set as a Space secret | |
| HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"} | |
| # Inference API endpoints | |
| WHISPER_API = "https://api-inference.huggingface.co/models/openai/whisper-large-v3" | |
| QWEN_API = "https://api-inference.huggingface.co/models/Qwen/Qwen2-VL-7B-Instruct" | |
| LLAMA_API = "https://api-inference.huggingface.co/models/meta-llama/Meta-Llama-3-8B-Instruct" | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| def pil_to_base64(img: Image.Image, fmt: str = "PNG") -> str: | |
| buf = BytesIO() | |
| img.save(buf, format=fmt) | |
| return base64.b64encode(buf.getvalue()).decode() | |
| def hf_post(url: str, payload: dict, retries: int = 3, wait: int = 20) -> dict: | |
| """POST to a HF Inference endpoint with simple retry / loading handling.""" | |
| for attempt in range(retries): | |
| resp = requests.post(url, headers=HEADERS, json=payload, timeout=120) | |
| if resp.status_code == 503: | |
| data = resp.json() | |
| if "estimated_time" in data: | |
| secs = min(int(data["estimated_time"]) + 5, 60) | |
| else: | |
| secs = wait | |
| time.sleep(secs) | |
| continue | |
| resp.raise_for_status() | |
| return resp.json() | |
| raise RuntimeError(f"HF API unavailable after {retries} retries: {url}") | |
| # --------------------------------------------------------------------------- | |
| # 1. Audio β Transcription with timestamps | |
| # --------------------------------------------------------------------------- | |
| def transcribe_audio(audio_path: str) -> list[dict]: | |
| """ | |
| Returns a list of segments: | |
| [{"start": float, "end": float, "text": str}, ...] | |
| """ | |
| if not audio_path: | |
| return [] | |
| with open(audio_path, "rb") as f: | |
| audio_bytes = f.read() | |
| # Whisper via Inference API (binary upload) | |
| resp = requests.post( | |
| WHISPER_API, | |
| headers={**HEADERS, "Content-Type": "audio/mpeg"}, | |
| data=audio_bytes, | |
| params={"return_timestamps": "true"}, | |
| timeout=300, | |
| ) | |
| resp.raise_for_status() | |
| result = resp.json() | |
| # Normalise output shape | |
| segments = [] | |
| if "chunks" in result: | |
| for chunk in result["chunks"]: | |
| ts = chunk.get("timestamp", [0, 0]) or [0, 0] | |
| segments.append({ | |
| "start": ts[0] if ts[0] is not None else 0, | |
| "end": ts[1] if ts[1] is not None else 0, | |
| "text": chunk.get("text", "").strip(), | |
| }) | |
| elif "text" in result: | |
| # Fallback: single block, no timestamps | |
| segments.append({"start": 0.0, "end": 0.0, "text": result["text"].strip()}) | |
| return segments | |
| def format_transcript(segments: list[dict]) -> str: | |
| lines = [] | |
| for seg in segments: | |
| ts = f"[{int(seg['start'])//60:02d}:{int(seg['start'])%60:02d}]" | |
| lines.append(f"{ts} {seg['text']}") | |
| return "\n".join(lines) | |
| # --------------------------------------------------------------------------- | |
| # 2. PDF β slide images β text extraction | |
| # --------------------------------------------------------------------------- | |
| def pdf_to_images(pdf_path: str) -> list[Image.Image]: | |
| """Convert each PDF page to a PIL Image.""" | |
| if not pdf_path: | |
| return [] | |
| images = convert_from_path(pdf_path, dpi=150) | |
| return images | |
| def extract_slide_text(img: Image.Image, slide_num: int) -> str: | |
| """Ask Qwen2-VL to extract key text/concepts from a slide image.""" | |
| b64 = pil_to_base64(img) | |
| payload = { | |
| "inputs": { | |
| "image": b64, | |
| "question": ( | |
| "You are an academic assistant. " | |
| "Extract ALL key text, equations, definitions, bullet points, " | |
| "and concepts visible on this lecture slide. " | |
| "Be thorough and concise. Output plain text only." | |
| ), | |
| } | |
| } | |
| try: | |
| result = hf_post(QWEN_API, payload) | |
| if isinstance(result, list): | |
| return result[0].get("generated_text", "").strip() | |
| if isinstance(result, dict): | |
| return result.get("generated_text", "").strip() | |
| return str(result) | |
| except Exception as e: | |
| return f"[Slide {slide_num}: extraction error β {e}]" | |
| # --------------------------------------------------------------------------- | |
| # 3. Sync Logic | |
| # --------------------------------------------------------------------------- | |
| def build_keyword_index(slide_texts: list[str]) -> list[set]: | |
| """Build a simple keyword set per slide (lowercase words, len > 3).""" | |
| stop = { | |
| "this", "that", "with", "from", "have", "will", "been", | |
| "they", "their", "there", "were", "what", "when", "which", | |
| "also", "into", "over", "more", "some", "such", "than", | |
| "then", "these", "those", "after", "about", "would", | |
| } | |
| index = [] | |
| for text in slide_texts: | |
| words = re.findall(r"[a-z]{4,}", text.lower()) | |
| index.append({w for w in words if w not in stop}) | |
| return index | |
| def sync_transcript_to_slides( | |
| segments: list[dict], | |
| slide_texts: list[str], | |
| ) -> list[dict]: | |
| """ | |
| For each transcript segment, find the most likely slide. | |
| Returns enriched segments with a `slide_idx` key (0-based, or -1 if no match). | |
| """ | |
| if not slide_texts or not segments: | |
| return segments | |
| slide_kw_index = build_keyword_index(slide_texts) | |
| enriched = [] | |
| for seg in segments: | |
| seg_words = set(re.findall(r"[a-z]{4,}", seg["text"].lower())) | |
| best_slide, best_score = -1, 0 | |
| for idx, kw_set in enumerate(slide_kw_index): | |
| score = len(seg_words & kw_set) | |
| if score > best_score: | |
| best_score, best_slide = score, idx | |
| seg = dict(seg) | |
| seg["slide_idx"] = best_slide if best_score > 0 else -1 | |
| seg["match_score"] = best_score | |
| enriched.append(seg) | |
| return enriched | |
| def format_sync_report(synced: list[dict]) -> str: | |
| lines = ["## Transcript β Slide Sync Report\n"] | |
| for seg in synced: | |
| ts = f"[{int(seg['start'])//60:02d}:{int(seg['start'])%60:02d}]" | |
| slide_ref = ( | |
| f"β **Slide {seg['slide_idx']+1}** (score: {seg['match_score']})" | |
| if seg["slide_idx"] >= 0 | |
| else "β no slide match" | |
| ) | |
| lines.append(f"`{ts}` {seg['text']}\n {slide_ref}\n") | |
| return "\n".join(lines) | |
| # --------------------------------------------------------------------------- | |
| # 4. Mock Quiz Generation | |
| # --------------------------------------------------------------------------- | |
| def generate_quiz(transcript_text: str) -> str: | |
| """Ask Llama-3-8B to write 5-10 MCQs based strictly on the transcript.""" | |
| if not transcript_text.strip(): | |
| return "No transcript available. Please process audio first." | |
| # Truncate to avoid token overflow | |
| clipped = transcript_text[:6000] | |
| prompt = textwrap.dedent(f""" | |
| <|begin_of_text|> | |
| <|start_header_id|>system<|end_header_id|> | |
| You are an expert educator. Generate a multiple-choice quiz based ONLY on | |
| the lecture transcript provided. Do not add external knowledge. | |
| Format each question as: | |
| Q1. <question> | |
| A) <option> B) <option> C) <option> D) <option> | |
| Answer: <letter> | |
| <|eot_id|> | |
| <|start_header_id|>user<|end_header_id|> | |
| Lecture transcript: | |
| {clipped} | |
| Create 7 multiple-choice questions with 4 options each. Include the correct answer. | |
| <|eot_id|> | |
| <|start_header_id|>assistant<|end_header_id|> | |
| """).strip() | |
| payload = { | |
| "inputs": prompt, | |
| "parameters": { | |
| "max_new_tokens": 1200, | |
| "temperature": 0.4, | |
| "do_sample": True, | |
| }, | |
| } | |
| result = hf_post(LLAMA_API, payload) | |
| if isinstance(result, list): | |
| return result[0].get("generated_text", "").split("<|start_header_id|>assistant<|end_header_id|>")[-1].strip() | |
| return str(result) | |
| # --------------------------------------------------------------------------- | |
| # 5. Chatbot Q&A | |
| # --------------------------------------------------------------------------- | |
| def answer_question( | |
| message: str, | |
| history: list, | |
| transcript_text: str, | |
| slide_texts_json: str, | |
| ) -> tuple[list, list]: | |
| """RAG-lite: stuff relevant context into Llama prompt.""" | |
| slide_texts = json.loads(slide_texts_json) if slide_texts_json else [] | |
| context_parts = [] | |
| if transcript_text: | |
| # Grab sentences containing keywords from the question | |
| question_words = set(re.findall(r"[a-z]{4,}", message.lower())) | |
| relevant_lines = [ | |
| line for line in transcript_text.split("\n") | |
| if question_words & set(re.findall(r"[a-z]{4,}", line.lower())) | |
| ] | |
| context_parts.append("**Transcript excerpts:**\n" + "\n".join(relevant_lines[:15])) | |
| if slide_texts: | |
| kw_index = build_keyword_index(slide_texts) | |
| question_words = set(re.findall(r"[a-z]{4,}", message.lower())) | |
| for i, (text, kw_set) in enumerate(zip(slide_texts, kw_index)): | |
| if question_words & kw_set: | |
| context_parts.append(f"**Slide {i+1}:**\n{text[:400]}") | |
| if len(context_parts) >= 5: | |
| break | |
| context = "\n\n".join(context_parts) if context_parts else "No specific context found." | |
| history_str = "" | |
| for h in history[-4:]: | |
| history_str += f"User: {h[0]}\nAssistant: {h[1]}\n" | |
| prompt = textwrap.dedent(f""" | |
| <|begin_of_text|> | |
| <|start_header_id|>system<|end_header_id|> | |
| You are a helpful teaching assistant. Answer questions about the lecture | |
| using ONLY the provided context. If unsure, say so. | |
| <|eot_id|> | |
| <|start_header_id|>user<|end_header_id|> | |
| Context: | |
| {context} | |
| Conversation so far: | |
| {history_str} | |
| Question: {message} | |
| <|eot_id|> | |
| <|start_header_id|>assistant<|end_header_id|> | |
| """).strip() | |
| payload = { | |
| "inputs": prompt, | |
| "parameters": {"max_new_tokens": 512, "temperature": 0.3, "do_sample": True}, | |
| } | |
| result = hf_post(LLAMA_API, payload) | |
| if isinstance(result, list): | |
| answer = result[0].get("generated_text", "").split("<|start_header_id|>assistant<|end_header_id|>")[-1].strip() | |
| else: | |
| answer = str(result) | |
| history.append((message, answer)) | |
| return history, history | |
| # --------------------------------------------------------------------------- | |
| # 6. Main Processing Pipeline | |
| # --------------------------------------------------------------------------- | |
| def process_inputs(audio_file, pdf_file, progress=gr.Progress()): | |
| """ | |
| Master pipeline called by the 'Process' button. | |
| Returns: | |
| transcript_text (str) β for display / state | |
| sync_report (str) β markdown | |
| slide_images (list[PIL]) β for gallery | |
| slide_texts_json (str) β JSON string for state | |
| status (str) β status message | |
| """ | |
| if audio_file is None and pdf_file is None: | |
| return "", "Please upload at least one file.", [], "[]", "β οΈ No files uploaded." | |
| transcript_text = "" | |
| segments: list[dict] = [] | |
| slide_images: list[Image.Image] = [] | |
| slide_texts: list[str] = [] | |
| # ---- Transcription ---- | |
| if audio_file is not None: | |
| progress(0.1, desc="ποΈ Transcribing audio (this may take a while)β¦") | |
| try: | |
| segments = transcribe_audio(audio_file) | |
| transcript_text = format_transcript(segments) | |
| except Exception as e: | |
| transcript_text = f"Transcription error: {e}" | |
| # ---- PDF β Images β Text ---- | |
| if pdf_file is not None: | |
| progress(0.4, desc="π Converting PDF pagesβ¦") | |
| try: | |
| slide_images = pdf_to_images(pdf_file) | |
| except Exception as e: | |
| return transcript_text, f"PDF error: {e}", [], "[]", f"β PDF error: {e}" | |
| for i, img in enumerate(slide_images): | |
| progress(0.5 + 0.4 * (i / max(len(slide_images), 1)), | |
| desc=f"π Analysing slide {i+1}/{len(slide_images)}β¦") | |
| slide_texts.append(extract_slide_text(img, i + 1)) | |
| # ---- Sync ---- | |
| progress(0.92, desc="π Syncing transcript to slidesβ¦") | |
| sync_report = "" | |
| if segments and slide_texts: | |
| synced = sync_transcript_to_slides(segments, slide_texts) | |
| sync_report = format_sync_report(synced) | |
| elif segments: | |
| sync_report = "No slides uploaded β sync skipped.\n\n" + format_transcript(segments) | |
| elif slide_texts: | |
| sync_report = "No audio uploaded β showing extracted slide texts.\n\n" | |
| for i, t in enumerate(slide_texts): | |
| sync_report += f"**Slide {i+1}:**\n{t}\n\n" | |
| slide_texts_json = json.dumps(slide_texts) | |
| status = "β Processing complete!" | |
| progress(1.0, desc=status) | |
| return transcript_text, sync_report, slide_images, slide_texts_json, status | |
| # --------------------------------------------------------------------------- | |
| # 7. Gradio UI | |
| # --------------------------------------------------------------------------- | |
| THEME = gr.themes.Soft( | |
| primary_hue="violet", | |
| secondary_hue="indigo", | |
| neutral_hue="slate", | |
| font=[gr.themes.GoogleFont("DM Sans"), "ui-sans-serif", "sans-serif"], | |
| font_mono=[gr.themes.GoogleFont("JetBrains Mono"), "monospace"], | |
| ).set( | |
| button_primary_background_fill="*primary_500", | |
| button_primary_background_fill_hover="*primary_600", | |
| ) | |
| CSS = """ | |
| #header { | |
| text-align: center; | |
| padding: 1.5rem 0 1rem; | |
| background: linear-gradient(135deg, #4f46e5 0%, #7c3aed 50%, #a21caf 100%); | |
| border-radius: 12px; | |
| margin-bottom: 1rem; | |
| color: white; | |
| } | |
| #header h1 { font-size: 2.4rem; margin: 0; font-weight: 800; letter-spacing: -1px; } | |
| #header p { margin: 0.25rem 0 0; opacity: 0.85; font-size: 1rem; } | |
| .status-box { font-weight: 600; border-left: 4px solid #7c3aed; padding-left: 0.75rem; } | |
| .quiz-output { font-family: 'JetBrains Mono', monospace; font-size: 0.85rem; } | |
| """ | |
| def build_ui(): | |
| with gr.Blocks(title="The Lecture Whisperer") as demo: | |
| # ββ Shared state ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| state_transcript = gr.State("") | |
| state_slide_texts = gr.State("[]") | |
| # ββ Header ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| gr.HTML(""" | |
| <div id="header"> | |
| <h1>π The Lecture Whisperer</h1> | |
| <p>Transcribe Β· Analyse Β· Sync Β· Quiz β your lectures, supercharged by AI</p> | |
| </div> | |
| """) | |
| # ββ Tabs βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tabs(): | |
| # ================================================================ | |
| # TAB 1 β Upload & Process | |
| # ================================================================ | |
| with gr.TabItem("π€ Upload & Process"): | |
| gr.Markdown( | |
| "Upload your lecture audio and/or slide PDF. " | |
| "Hit **Process** to run the full pipeline." | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| audio_input = gr.Audio( | |
| label="ποΈ Lecture Audio (MP3 / WAV)", | |
| type="filepath", | |
| ) | |
| pdf_input = gr.File( | |
| label="π Lecture Slides (PDF)", | |
| file_types=[".pdf"], | |
| type="filepath", | |
| ) | |
| process_btn = gr.Button( | |
| "β‘ Process Lecture", variant="primary", size="lg" | |
| ) | |
| status_output = gr.Textbox( | |
| label="Status", | |
| interactive=False, | |
| elem_classes=["status-box"], | |
| ) | |
| with gr.Column(scale=2): | |
| sync_output = gr.Markdown( | |
| label="Sync Report", | |
| value="*Sync report will appear here after processingβ¦*", | |
| ) | |
| process_btn.click( | |
| fn=process_inputs, | |
| inputs=[audio_input, pdf_input], | |
| outputs=[ | |
| state_transcript, | |
| sync_output, | |
| gr.State(), # slide_images placeholder β see below | |
| state_slide_texts, | |
| status_output, | |
| ], | |
| ) | |
| # ================================================================ | |
| # TAB 2 β Dashboard (Chatbot + Gallery) | |
| # ================================================================ | |
| with gr.TabItem("π Dashboard"): | |
| with gr.Row(): | |
| # ββ Left: Chatbot ββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π¬ Ask About the Lecture") | |
| chatbot = gr.Chatbot( | |
| label="Lecture Assistant", | |
| height=480, | |
| ) | |
| with gr.Row(): | |
| chat_input = gr.Textbox( | |
| placeholder="e.g. What did the professor say about Newton's laws?", | |
| label="Your question", | |
| scale=4, | |
| ) | |
| chat_btn = gr.Button("Send", variant="primary", scale=1) | |
| chat_state = gr.State([]) | |
| def _chat(msg, history, transcript, slide_texts_json): | |
| return answer_question(msg, history, transcript, slide_texts_json) | |
| chat_btn.click( | |
| fn=_chat, | |
| inputs=[chat_input, chat_state, state_transcript, state_slide_texts], | |
| outputs=[chatbot, chat_state], | |
| ) | |
| chat_input.submit( | |
| fn=_chat, | |
| inputs=[chat_input, chat_state, state_transcript, state_slide_texts], | |
| outputs=[chatbot, chat_state], | |
| ) | |
| # ββ Right: Slide Gallery βββββββββββββββββββββββββββββββββ | |
| with gr.Column(scale=1): | |
| gr.Markdown("### πΌοΈ Slide Gallery") | |
| slide_gallery = gr.Gallery( | |
| label="Lecture Slides", | |
| columns=2, | |
| height=520, | |
| object_fit="contain", | |
| ) | |
| refresh_gallery_btn = gr.Button( | |
| "π Refresh Gallery", variant="secondary" | |
| ) | |
| # Wire up gallery refresh | |
| # We need to store images in state from the process step | |
| state_images = gr.State([]) | |
| def _refresh_gallery(imgs): | |
| return imgs | |
| refresh_gallery_btn.click( | |
| fn=_refresh_gallery, | |
| inputs=[state_images], | |
| outputs=[slide_gallery], | |
| ) | |
| # Patch process_btn outputs to also update gallery state | |
| # We redo the click with the full output list here: | |
| process_btn.click( | |
| fn=process_inputs, | |
| inputs=[audio_input, pdf_input], | |
| outputs=[ | |
| state_transcript, | |
| sync_output, | |
| state_images, | |
| state_slide_texts, | |
| status_output, | |
| ], | |
| ) | |
| # ================================================================ | |
| # TAB 3 β Mock Quiz | |
| # ================================================================ | |
| with gr.TabItem("π Mock Quiz"): | |
| gr.Markdown( | |
| "Generate a multiple-choice quiz based strictly on the " | |
| "lecture transcript. Requires audio to be processed first." | |
| ) | |
| quiz_btn = gr.Button( | |
| "π§ Generate Mock Quiz", variant="primary", size="lg" | |
| ) | |
| quiz_output = gr.Markdown( | |
| value="*Quiz will appear hereβ¦*", | |
| elem_classes=["quiz-output"], | |
| ) | |
| def _gen_quiz(transcript): | |
| if not transcript.strip(): | |
| return "β οΈ No transcript found. Please upload and process audio first." | |
| return generate_quiz(transcript) | |
| quiz_btn.click( | |
| fn=_gen_quiz, | |
| inputs=[state_transcript], | |
| outputs=[quiz_output], | |
| ) | |
| return demo | |
| # --------------------------------------------------------------------------- | |
| # Entry point | |
| # --------------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| app = build_ui() | |
| app.launch(share=False, theme=THEME, css=CSS) | |