import base64 import html import json import math import os import random import re import uuid import wave from dataclasses import dataclass, asdict from pathlib import Path from typing import Any, Dict, List, Optional import gradio as gr import requests try: from gradio_client import Client as HFSpaceClient except Exception: # pragma: no cover HFSpaceClient = None # type: ignore try: import spaces # type: ignore except Exception: class _SpacesFallback: @staticmethod def GPU(fn): return fn spaces = _SpacesFallback() # type: ignore try: from pypdf import PdfReader except Exception: # pragma: no cover PdfReader = None # type: ignore try: import pypdfium2 as pdfium except Exception: # pragma: no cover pdfium = None # type: ignore APP_DIR = Path(__file__).parent.resolve() TMP_DIR = APP_DIR / "tmp_outputs" TMP_DIR.mkdir(exist_ok=True) def _load_dotenv_file(dotenv_path: Path) -> None: if not dotenv_path.exists(): return for raw_line in dotenv_path.read_text(encoding="utf-8").splitlines(): line = raw_line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) key = key.strip() value = value.strip().strip('"').strip("'") if key and key not in os.environ: os.environ[key] = value _load_dotenv_file(APP_DIR / ".env") API_URL = os.getenv("API_URL") or os.getenv("API_UR", "") API_KEY = os.getenv("API_KEY", "") USE_MOCK_MODELS = os.getenv("USE_MOCK_MODELS", "0" if (API_URL and API_KEY) else "1") == "1" USE_MOCK_TTS = os.getenv("USE_MOCK_TTS", "0") == "1" CHAT_MODEL_ID = os.getenv("QWEN_VL_MODEL_ID", "gpt-4.1") TTS_MODEL_ID = os.getenv("QWEN_TTS_MODEL_ID", "qwen-tts") TTS_SPEAKER = os.getenv("QWEN_TTS_SPEAKER", "longxiaochun_v2") TTS_FORMAT = os.getenv("QWEN_TTS_FORMAT", "wav") HF_TTS_SPACE_ID = os.getenv("HF_TTS_SPACE_ID", "").strip() HF_TTS_SPACE_URL = os.getenv("HF_TTS_SPACE_URL", "").strip() _hf_tts_api_name_raw = (os.getenv("HF_TTS_API_NAME", "//tts_chunk") or "").strip() if not _hf_tts_api_name_raw: HF_TTS_API_NAME = "//tts_chunk" elif _hf_tts_api_name_raw.startswith("//"): HF_TTS_API_NAME = _hf_tts_api_name_raw else: HF_TTS_API_NAME = f"/{_hf_tts_api_name_raw.lstrip('/')}" HF_TTS_VOICE = os.getenv("HF_TTS_VOICE", "male") HF_TTS_LANGUAGE = os.getenv("HF_TTS_LANGUAGE", "Chinese") HF_TTS_ALLOW_FALLBACK = os.getenv("HF_TTS_ALLOW_FALLBACK", "1") == "1" TEXT_SPLIT_TO_CHUNK = (os.getenv("TEXT_SPLIT_TO_CHUNK", "0") or "").strip().lower() in {"1", "true", "yes", "y"} HF_TOKEN = ( os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACEHUB_API_TOKEN") or os.getenv("HF_API_TOKEN", "") ) API_TIMEOUT_SEC = int(os.getenv("API_TIMEOUT_SEC", "180")) QWEN_VL_MAX_PAGES = int(os.getenv("QWEN_VL_MAX_PAGES", "4")) QWEN_VL_RENDER_SCALE = float(os.getenv("QWEN_VL_RENDER_SCALE", "1.5")) QWEN_VL_MAX_NEW_TOKENS = int(os.getenv("QWEN_VL_MAX_NEW_TOKENS", "800")) QWEN_VL_MCQ_MAX_NEW_TOKENS = int(os.getenv("QWEN_VL_MCQ_MAX_NEW_TOKENS", "1800")) DEFAULT_LECTURE_PROMPT_TEMPLATE = """ You are a teaching assistant. Read the uploaded paper content and produce a clear lecture-style explanation in English: 1. Explain the problem and background first; 2. Explain the core method step by step / module by module; 3. Summarize key experiments and highlights; 4. End with limitations and suitable use cases; 5. Keep it classroom-friendly (about 400-700 words). Paper content (may be excerpted): {document} """.strip() DEFAULT_MCQ_PROMPT_TEMPLATE = """ Based on the paper content below, generate 5 English single-choice MCQs for a classroom quiz. Output strict JSON only (no markdown code block), in this format: {{ "questions": [ {{ "question": "...", "options": ["Option A", "Option B", "Option C", "Option D"], "answer": "A", "explanation": "..." }} ] }} Requirements: 1. Exactly 5 questions; 2. 4 options per question; 3. `answer` must be one of A/B/C/D; 4. Explanation should tell why it is correct and common mistakes; 5. Cover background, method, experiments/results, and limitations. Paper content (may be excerpted): {document} """.strip() DEFAULT_MCQ_RETRY_PROMPT_TEMPLATE = """ Generate 5 English single-choice MCQs from the following paper content. Output valid JSON only. No explanation outside JSON, no markdown. Constraints: 1. Compact JSON (single line is fine); 2. Exactly 5 questions; 3. Each question includes `question`, `options` (4 items), `answer` (A/B/C/D), `explanation`; 4. Keep explanations short (1-2 sentences); 5. If uncertain, still generate based on the paper content only. Output format: {{"questions":[{{"question":"...","options":["...","...","...","..."],"answer":"A","explanation":"..."}}]}} Paper content: {document} """.strip() CHARACTERS_DIR = APP_DIR / "characters" def _read_text_if_exists(path: Path, fallback: str) -> str: try: return path.read_text(encoding="utf-8").strip() except Exception: return fallback def render_prompt_template(template: str, document: str, replacements: Optional[Dict[str, str]] = None) -> str: # Avoid `str.format(...)` because character prompt files may contain JSON braces. s = str(template) s = s.replace("{document}", document).replace("{paper_text}", document) if replacements: for k, v in replacements.items(): s = s.replace("{" + str(k) + "}", str(v)) return s def load_character_configs() -> Dict[str, Dict[str, Any]]: configs: Dict[str, Dict[str, Any]] = {} if CHARACTERS_DIR.exists(): for d in sorted(CHARACTERS_DIR.iterdir()): if not d.is_dir(): continue meta_path = d / "meta.json" meta: Dict[str, Any] = {} if meta_path.exists(): try: parsed = json.loads(meta_path.read_text(encoding="utf-8")) if isinstance(parsed, dict): meta = parsed except Exception: meta = {} cid = str(meta.get("id") or d.name) if cid in configs: cid = d.name avatar_rel = str(meta.get("avatar", "avatar.jpg")) lecture_prompt_path = d / "lecture_prompt.txt" mcq_prompt_path = d / "mcq_prompt.txt" mcq_retry_prompt_path = d / "mcq_retry_prompt.txt" feedback_prompt_path = d / "feedback.txt" config: Dict[str, Any] = { "id": cid, "display_name": str(meta.get("display_name", d.name)), "tagline": str(meta.get("tagline", "Research paper explainer · MCQ coach")), "byline": str(meta.get("byline", "By @local-demo")), "chat_label": str(meta.get("chat_label", meta.get("display_name", d.name))), "chat_mode": str(meta.get("chat_mode", "paper mode")), "avatar_path": str((d / avatar_rel).resolve()), "lecture_prompt_template": _read_text_if_exists( lecture_prompt_path, DEFAULT_LECTURE_PROMPT_TEMPLATE, ), "mcq_prompt_template": _read_text_if_exists( mcq_prompt_path, DEFAULT_MCQ_PROMPT_TEMPLATE, ), "mcq_retry_prompt_template": _read_text_if_exists( mcq_retry_prompt_path, DEFAULT_MCQ_RETRY_PROMPT_TEMPLATE, ), "feedback_prompt_template": _read_text_if_exists( feedback_prompt_path, "", ), "lecture_prompt_path": str(lecture_prompt_path.resolve()), "mcq_prompt_path": str(mcq_prompt_path.resolve()), "mcq_retry_prompt_path": str(mcq_retry_prompt_path.resolve()), "feedback_prompt_path": str(feedback_prompt_path.resolve()), } configs[cid] = config if not configs: # Fallback to a built-in default character if no folder config exists. configs["default"] = { "id": "default", "display_name": "PDF Paper Tutor", "tagline": "Research paper explainer · MCQ coach", "byline": "By @local-demo", "chat_label": "PDF Paper Tutor", "chat_mode": "paper mode", "avatar_path": str((APP_DIR / "avatar.jpg").resolve()) if (APP_DIR / "avatar.jpg").exists() else "", "lecture_prompt_template": DEFAULT_LECTURE_PROMPT_TEMPLATE, "mcq_prompt_template": DEFAULT_MCQ_PROMPT_TEMPLATE, "mcq_retry_prompt_template": DEFAULT_MCQ_RETRY_PROMPT_TEMPLATE, "feedback_prompt_template": "", } return configs CHARACTER_CONFIGS = load_character_configs() DEFAULT_CHARACTER_ID = next(iter(CHARACTER_CONFIGS.keys())) def get_character_config(character_id: Optional[str]) -> Dict[str, Any]: global CHARACTER_CONFIGS, DEFAULT_CHARACTER_ID # Reload from disk so prompt/template edits (e.g. mcq_prompt.txt) take effect immediately. CHARACTER_CONFIGS = load_character_configs() if DEFAULT_CHARACTER_ID not in CHARACTER_CONFIGS: DEFAULT_CHARACTER_ID = next(iter(CHARACTER_CONFIGS.keys())) if character_id and character_id in CHARACTER_CONFIGS: return CHARACTER_CONFIGS[character_id] return CHARACTER_CONFIGS[DEFAULT_CHARACTER_ID] @dataclass class MCQItem: question: str options: List[str] answer: str # A/B/C/D explanation: str def to_display_choices(self) -> List[str]: labels = ["A", "B", "C", "D"] return [f"{labels[i]}. {opt}" for i, opt in enumerate(self.options)] def correct_choice_display(self) -> str: idx = ["A", "B", "C", "D"].index(self.answer) return self.to_display_choices()[idx] def new_session_state() -> Dict[str, Any]: return { "lecture_text": "", "lecture_audio_path": None, "selected_paragraph_idx": "", "explanation_audio_path": None, "last_explanation_tts_text": "", "pdf_path": None, "pdf_excerpt": "", "character_id": DEFAULT_CHARACTER_ID, "exam_character_id": None, "mcq_generating": False, "current_page": "explain", "mcqs": [], "current_index": 0, "score": 0, "awaiting_next_after_wrong": False, "completed": False, "exam_chat": [], "status": "Idle", } def strip_code_fence(text: str) -> str: s = text.strip() if s.startswith("```"): s = re.sub(r"^```[a-zA-Z0-9_-]*\n?", "", s) s = re.sub(r"\n?```$", "", s) return s.strip() def extract_pdf_text(pdf_path: str, max_chars: int = 16000) -> str: if PdfReader is None: return ( "PDF text extraction library (pypdf) is unavailable. " "Please install pypdf or switch to a Vision-based PDF reader implementation." ) reader = PdfReader(pdf_path) chunks: List[str] = [] total = 0 for page_idx, page in enumerate(reader.pages, start=1): try: text = page.extract_text() or "" except Exception: text = "" if text.strip(): chunk = f"[Page {page_idx}]\n{text.strip()}\n" chunks.append(chunk) total += len(chunk) if total >= max_chars: break if not chunks: return ( "No extractable text was found in the PDF. " "For scanned PDFs, implement page-image rendering and pass images to Qwen-VL." ) return "\n".join(chunks)[:max_chars] def write_tone_wav(text: str, out_path: str, seconds: float = 2.0, sample_rate: int = 16000) -> str: # Mock TTS fallback: writes a short tone so the UI flow is testable without a TTS model. freq = 440 + (len(text) % 220) amplitude = 9000 frames = int(sample_rate * max(1.0, min(seconds, 8.0))) with wave.open(out_path, "wb") as wf: wf.setnchannels(1) wf.setsampwidth(2) wf.setframerate(sample_rate) for i in range(frames): sample = int(amplitude * math.sin(2 * math.pi * freq * (i / sample_rate))) wf.writeframesraw(sample.to_bytes(2, byteorder="little", signed=True)) return out_path def normalize_option_text(text: Any) -> str: s = str(text or "").strip() s = re.sub(r"^\s*(?:[A-Da-d]\s*[\.\)\:\-]\s*)+", "", s).strip() return s def normalize_explanation_text(text: Any) -> str: s = str(text or "").strip() s = re.sub(r"^\s*(?:Explanation|Reason)\s*:\s*", "", s, flags=re.IGNORECASE).strip() return s def render_pdf_pages_for_vl(pdf_path: str, max_pages: int, scale: float) -> List[str]: if pdfium is None: raise RuntimeError("pypdfium2 is required to render PDF pages for Qwen3-VL.") doc = pdfium.PdfDocument(pdf_path) page_count = len(doc) if page_count == 0: raise RuntimeError("Uploaded PDF has no pages.") render_dir = TMP_DIR / f"pdf_pages_{uuid.uuid4().hex}" render_dir.mkdir(exist_ok=True) paths: List[str] = [] try: for i in range(min(page_count, max_pages)): page = doc[i] pil = page.render(scale=scale).to_pil() pil = pil.convert("RGB") out_path = render_dir / f"page_{i+1:02d}.png" pil.save(out_path, format="PNG") paths.append(str(out_path)) close_fn = getattr(page, "close", None) if callable(close_fn): close_fn() finally: close_fn = getattr(doc, "close", None) if callable(close_fn): close_fn() if not paths: raise RuntimeError("Failed to render PDF pages for Qwen3-VL.") return paths def image_file_to_data_url(image_path: str) -> str: image_bytes = Path(image_path).read_bytes() b64 = base64.b64encode(image_bytes).decode("ascii") return f"data:image/png;base64,{b64}" def _api_headers() -> Dict[str, str]: if not API_KEY: raise RuntimeError("Missing API_KEY. Put it in .env or environment variables.") return { "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json", } def _require_api_url() -> str: if not API_URL: raise RuntimeError("Missing API_URL/API_UR. Put it in .env or environment variables.") return API_URL.rstrip("/") def _dashscope_tts_url() -> str: base = _require_api_url() if "/compatible-mode/" in base: root = base.split("/compatible-mode/", 1)[0] elif base.endswith("/v1"): root = base[:-3] else: root = base return f"{root}/api/v1/services/aigc/multimodal-generation/generation" def _save_binary_audio(audio_bytes: bytes, out_path: str) -> str: Path(out_path).write_bytes(audio_bytes) return out_path def _is_hf_tts_enabled() -> bool: return bool(HF_TTS_SPACE_ID or HF_TTS_SPACE_URL) def _tts_backend_name() -> str: if USE_MOCK_TTS: return "mock_tts" if _is_hf_tts_enabled(): return f"hf_space:{HF_TTS_SPACE_ID or HF_TTS_SPACE_URL}" return "api_tts" def _extract_audio_source(result: Any) -> str: if isinstance(result, str): return result if isinstance(result, dict): for key in ("path", "name", "url"): value = result.get(key) if isinstance(value, str) and value.strip(): return value nested = result.get("audio") if nested is not None: return _extract_audio_source(nested) if isinstance(result, (list, tuple)): for item in result: try: return _extract_audio_source(item) except RuntimeError: continue raise RuntimeError(f"Unsupported HF Space audio output: {result!r}") def _read_audio_bytes_from_source(source: str) -> bytes: source = (source or "").strip() if not source: raise RuntimeError("HF Space returned an empty audio source.") if source.startswith("http://") or source.startswith("https://"): resp = requests.get(source, timeout=API_TIMEOUT_SEC) if resp.status_code >= 400: raise RuntimeError(f"Failed to fetch HF Space audio URL {resp.status_code}: {resp.text[:500]}") return resp.content path = Path(source) if path.exists(): return path.read_bytes() raise RuntimeError(f"HF Space audio path does not exist: {source}") def split_text_for_tts(text: str, max_len: int = 480) -> List[str]: cleaned = re.sub(r"\s+", " ", (text or "")).strip() if not cleaned: return [] if len(cleaned) <= max_len: return [cleaned] # Prefer sentence-ish chunks, then hard-split as fallback. pieces = re.split(r"(?<=[。!?!?;;::\.])\s*", cleaned) chunks: List[str] = [] buf = "" for piece in pieces: piece = piece.strip() if not piece: continue if len(piece) > max_len: if buf: chunks.append(buf) buf = "" for i in range(0, len(piece), max_len): chunks.append(piece[i:i + max_len]) continue candidate = f"{buf} {piece}".strip() if buf else piece if len(candidate) <= max_len: buf = candidate else: chunks.append(buf) buf = piece if buf: chunks.append(buf) return chunks def split_text_every_two_sentences(text: str, max_len: int = 480) -> List[str]: cleaned = re.sub(r"\s+", " ", (text or "")).strip() if not cleaned: return [] if len(cleaned) <= max_len: return [cleaned] sentences = [s.strip() for s in re.split(r"(?<=[。!?!?;;::\.])\s*", cleaned) if s and s.strip()] if not sentences: return split_text_for_tts(cleaned, max_len=max_len) groups: List[str] = [] i = 0 while i < len(sentences): pair = " ".join(sentences[i:i + 2]).strip() if pair: groups.append(pair) i += 2 chunks: List[str] = [] for g in groups: if len(g) <= max_len: chunks.append(g) else: chunks.extend(split_text_for_tts(g, max_len=max_len)) return [c for c in chunks if c and c.strip()] def concat_wav_files(wav_paths: List[str], out_path: str) -> str: if not wav_paths: raise RuntimeError("No WAV chunks to concatenate.") if len(wav_paths) == 1: return _save_binary_audio(Path(wav_paths[0]).read_bytes(), out_path) params = None frames: List[bytes] = [] for p in wav_paths: with wave.open(str(p), "rb") as wf: cur_params = (wf.getnchannels(), wf.getsampwidth(), wf.getframerate()) if params is None: params = cur_params elif cur_params != params: raise RuntimeError("TTS WAV chunks have mismatched formats and cannot be concatenated.") frames.append(wf.readframes(wf.getnframes())) assert params is not None with wave.open(out_path, "wb") as out: out.setnchannels(params[0]) out.setsampwidth(params[1]) out.setframerate(params[2]) for f in frames: out.writeframes(f) return out_path class QwenPipelineEngine: """ Gradio-facing backend for: PDF -> lecture text -> MCQs -> TTS audio This ships with a mock mode by default so the workflow is runnable immediately. When USE_MOCK_MODELS=0, it calls remote APIs for text generation. TTS mock is controlled separately by USE_MOCK_TTS. - VL: OpenAI-compatible /chat/completions (works with DashScope compatible-mode and vLLM-style APIs) - TTS: HF Space /tts_chunk (optional) or DashScope/OpenAI-compatible endpoints """ def __init__(self) -> None: self.mock_mode = USE_MOCK_MODELS self.vl_loaded = False self.tts_loaded = False self._pdf_page_cache: Dict[str, List[str]] = {} self._hf_tts_client: Any = None def ensure_vl_loaded(self) -> None: if self.vl_loaded: return if self.mock_mode: self.vl_loaded = True return _require_api_url() if not API_KEY: raise RuntimeError("Missing API_KEY for VL API calls.") self.vl_loaded = True def ensure_tts_loaded(self) -> None: if self.tts_loaded: return if USE_MOCK_TTS: self.tts_loaded = True return if _is_hf_tts_enabled(): self._ensure_hf_tts_client() self.tts_loaded = True return _require_api_url() if not API_KEY: raise RuntimeError("Missing API_KEY for TTS API calls.") self.tts_loaded = True def _ensure_hf_tts_client(self) -> Any: if HFSpaceClient is None: raise RuntimeError("Missing gradio_client. Please install with: pip install gradio_client") if self._hf_tts_client is not None: return self._hf_tts_client src = HF_TTS_SPACE_URL or HF_TTS_SPACE_ID if not src: raise RuntimeError("Missing HF_TTS_SPACE_ID or HF_TTS_SPACE_URL.") token = (HF_TOKEN or "").strip() # gradio_client constructor args differ across versions; handle both old/new signatures. if not token: self._hf_tts_client = HFSpaceClient(src) return self._hf_tts_client try: self._hf_tts_client = HFSpaceClient(src, hf_token=token) except TypeError: try: self._hf_tts_client = HFSpaceClient(src, token=token) except TypeError: self._hf_tts_client = HFSpaceClient(src, headers={"Authorization": f"Bearer {token}"}) return self._hf_tts_client def _hf_space_tts_single(self, text: str, out_path: str, *, voice: str, language: str) -> str: configured = (HF_TTS_API_NAME or "").strip() normalized = configured.lstrip("/") result: Any = None last_exc: Optional[Exception] = None api_candidates: List[str] = [] for attempt in range(2): client = self._ensure_hf_tts_client() api_prefix = "" cfg = getattr(client, "config", None) if isinstance(cfg, dict): api_prefix = str(cfg.get("api_prefix") or "").strip() api_candidates = [] prefixed = f"{api_prefix.rstrip('/')}/{normalized}" if api_prefix and normalized else "" for cand in [ configured, f"/{normalized}" if normalized else "", normalized, prefixed, "/gradio_api/tts_chunk", "gradio_api/tts_chunk", "/tts_chunk", "tts_chunk", "/predict", "predict", ]: cand = cand.strip() if cand and cand not in api_candidates: api_candidates.append(cand) result = None last_exc = None for api_name in api_candidates: try: result = client.predict( text=text, voice=voice, language=language, api_name=api_name, ) last_exc = None break except Exception as exc: msg = str(exc) lower_msg = msg.lower() if ("cannot find a function" in lower_msg) and ("api_name" in lower_msg): last_exc = exc continue raise if last_exc is None: break # Refresh cached client once in case the upstream app reloaded and endpoints changed. if attempt == 0: self._hf_tts_client = None if last_exc is not None: available_hint = "" view_api = getattr(client, "view_api", None) if callable(view_api): try: api_info = view_api(return_format="dict") available_hint = f" Available endpoints: {api_info}" except Exception: available_hint = "" tried = ", ".join(api_candidates) raise RuntimeError(f"No matching HF API endpoint. Tried: [{tried}].{available_hint}") from last_exc source = _extract_audio_source(result) audio_bytes = _read_audio_bytes_from_source(source) return _save_binary_audio(audio_bytes, out_path) def _mock_generate_lecture(self, pdf_excerpt: str) -> str: excerpt = re.sub(r"\s+", " ", pdf_excerpt).strip() excerpt = excerpt[:1000] return ( f" {excerpt}" ) def _mock_generate_mcqs(self, lecture_text: str) -> List[MCQItem]: base_questions = [ MCQItem( question="What type of core problem does this paper most likely address?", options=["Performance or efficiency bottlenecks in existing methods", "How to design database indexes", "How to build a frontend page", "How to compress video files"], answer="A", explanation="Paper-reading tasks usually focus on limitations of prior methods, then propose improvements in performance, efficiency, or robustness.", ), MCQItem( question="What is the best way to explain a paper's method?", options=["Explain the pipeline from input to output by modules or steps", "Only list references", "Only show experiment tables without method details", "Only present conclusions without background"], answer="A", explanation="A structured, step-by-step explanation helps learners understand how the paper moves from problem to solution.", ), MCQItem( question="Why provide both answers and explanations in MCQs?", options=["To enable feedback and error correction", "Only to make JSON longer", "Because Gradio requires explanations", "To reduce the number of questions"], answer="A", explanation="Answer + explanation completes the teaching loop and helps users learn from mistakes.", ), MCQItem( question="What is the risk of feeding a very long paper in one shot?", options=["Context overflow can increase cost and cause information loss or failure", "The model automatically becomes more accurate", "TTS audio becomes shorter", "The PDF file gets corrupted"], answer="A", explanation="Long documents usually need chunking and summarization to avoid context-window issues and quality degradation.", ), MCQItem( question="In this demo pipeline, what is Qwen TTS used for?", options=["Convert lecture text and explanations into audio", "Convert PDF to images", "Train Qwen3-VL-8B", "Generate new MCQ answers"], answer="A", explanation="TTS turns text explanations into speech, improving interactivity and accessibility.", ), ] return base_questions def _get_pdf_page_images(self, pdf_path: str) -> List[str]: cache_key = str(Path(pdf_path).resolve()) cached = self._pdf_page_cache.get(cache_key) if cached and all(Path(p).exists() for p in cached): return cached page_paths = render_pdf_pages_for_vl( pdf_path, max_pages=QWEN_VL_MAX_PAGES, scale=QWEN_VL_RENDER_SCALE, ) self._pdf_page_cache[cache_key] = page_paths return page_paths def _chat_completions( self, messages: List[Dict[str, Any]], max_tokens: int, *, temperature: Optional[float] = None, top_p: Optional[float] = None, ) -> str: url = f"{_require_api_url()}/chat/completions" payload: Dict[str, Any] = { "model": CHAT_MODEL_ID, "messages": messages, "max_tokens": max_tokens, "stream": False, } if temperature is not None: payload["temperature"] = float(temperature) if top_p is not None: payload["top_p"] = float(top_p) resp = requests.post(url, headers=_api_headers(), json=payload, timeout=API_TIMEOUT_SEC) if resp.status_code >= 400: raise RuntimeError(f"VL API error {resp.status_code}: {resp.text[:1000]}") data = resp.json() choices = data.get("choices") or [] if not choices: raise RuntimeError(f"VL API returned no choices: {data}") content = choices[0].get("message", {}).get("content", "") if isinstance(content, str): return content.strip() if isinstance(content, list): parts: List[str] = [] for item in content: if isinstance(item, dict) and item.get("type") in {"text", "output_text"}: parts.append(str(item.get("text") or item.get("content") or "")) return "\n".join([p for p in parts if p]).strip() return str(content).strip() def _real_generate_text_from_pdf( self, pdf_path: str, prompt: str, max_tokens: Optional[int] = None, *, temperature: Optional[float] = None, top_p: Optional[float] = None, ) -> str: page_image_paths = self._get_pdf_page_images(pdf_path) content: List[Dict[str, Any]] = [] for p in page_image_paths: content.append({"type": "image_url", "image_url": {"url": image_file_to_data_url(p)}}) content.append({"type": "text", "text": prompt}) messages = [{"role": "user", "content": content}] return self._chat_completions( messages, max_tokens=max_tokens or QWEN_VL_MAX_NEW_TOKENS, temperature=temperature, top_p=top_p, ) def _real_tts_single(self, text: str, out_path: str, *, voice: Optional[str] = None) -> str: if not text.strip(): return write_tone_wav("empty", out_path) if _is_hf_tts_enabled(): try: return self._hf_space_tts_single( text, out_path, voice=str(voice or HF_TTS_VOICE), language=HF_TTS_LANGUAGE, ) except Exception as exc: if not HF_TTS_ALLOW_FALLBACK: raise RuntimeError(f"HF Space TTS failed and fallback is disabled: {type(exc).__name__}: {exc}") if USE_MOCK_TTS: return write_tone_wav(text, out_path) openai_url = f"{_require_api_url()}/audio/speech" openai_payload = { "model": TTS_MODEL_ID, "input": text, "voice": TTS_SPEAKER, "format": TTS_FORMAT, } openai_resp = requests.post( openai_url, headers=_api_headers(), json=openai_payload, timeout=API_TIMEOUT_SEC, ) content_type = openai_resp.headers.get("content-type", "") if openai_resp.status_code < 400 and "application/json" not in content_type.lower(): return _save_binary_audio(openai_resp.content, out_path) # DashScope fallback: multimodal generation returns an audio URL in JSON. payload = { "model": TTS_MODEL_ID, "input": {"text": text}, "parameters": {"voice": TTS_SPEAKER, "format": TTS_FORMAT}, } resp = requests.post( _dashscope_tts_url(), headers=_api_headers(), json=payload, timeout=API_TIMEOUT_SEC, ) if resp.status_code >= 400: err1 = openai_resp.text[:500] if openai_resp.text else "" err2 = resp.text[:500] if resp.text else "" raise RuntimeError( f"TTS API failed. openai-compatible: {openai_resp.status_code} {err1}; " f"dashscope: {resp.status_code} {err2}" ) data = resp.json() audio_url = ( (((data.get("output") or {}).get("audio") or {}).get("url")) or (((data.get("output") or {}).get("audio_url"))) ) if not audio_url: raise RuntimeError(f"TTS API returned no audio URL: {data}") audio_resp = requests.get(audio_url, timeout=API_TIMEOUT_SEC) if audio_resp.status_code >= 400: raise RuntimeError(f"Failed to download TTS audio {audio_resp.status_code}: {audio_resp.text[:500]}") return _save_binary_audio(audio_resp.content, out_path) def _synthesize_tts_chunks(self, chunks: List[str], out_path: str, *, voice: Optional[str] = None) -> str: chunks = [str(c or "").strip() for c in chunks if str(c or "").strip()] if not chunks: return write_tone_wav("empty", out_path) if len(chunks) == 1: return self._real_tts_single(chunks[0], out_path, voice=voice) chunk_paths: List[str] = [] for idx, chunk in enumerate(chunks, start=1): chunk_path = str(TMP_DIR / f"tts_chunk_{idx}_{uuid.uuid4().hex}.wav") chunk_paths.append(self._real_tts_single(chunk, chunk_path, voice=voice)) return concat_wav_files(chunk_paths, out_path) def _real_tts(self, text: str, out_path: str, *, voice: Optional[str] = None) -> str: cleaned = str(text or "").strip() if not cleaned: return write_tone_wav("empty", out_path) if TEXT_SPLIT_TO_CHUNK: return self._synthesize_tts_chunks(split_text_for_tts(cleaned, max_len=480), out_path, voice=voice) try: return self._real_tts_single(cleaned, out_path, voice=voice) except Exception as exc: err = str(exc).lower() too_long = ( "text too long" in err or "too long for chunk-level api" in err or "chunk-level api" in err ) if not too_long: raise return self._synthesize_tts_chunks( split_text_every_two_sentences(cleaned, max_len=480), out_path, voice=voice, ) @spaces.GPU def build_lesson_and_quiz(self, pdf_path: str, character_cfg: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: self.ensure_vl_loaded() pdf_excerpt = extract_pdf_text(pdf_path) cfg = character_cfg or get_character_config(None) lecture_template = cfg.get("lecture_prompt_template", DEFAULT_LECTURE_PROMPT_TEMPLATE) mcq_template = cfg.get("mcq_prompt_template", DEFAULT_MCQ_PROMPT_TEMPLATE) mcq_retry_template = cfg.get("mcq_retry_prompt_template", DEFAULT_MCQ_RETRY_PROMPT_TEMPLATE) if self.mock_mode: lecture_text = self._mock_generate_lecture(pdf_excerpt) mcqs = self._mock_generate_mcqs(lecture_text) else: lecture_prompt = render_prompt_template( str(lecture_template), pdf_excerpt, replacements={"style_seed": uuid.uuid4().hex}, ) lecture_text = self._real_generate_text_from_pdf( pdf_path, lecture_prompt, max_tokens=QWEN_VL_MAX_NEW_TOKENS, temperature=0.9, top_p=0.95, ) quiz_prompt = render_prompt_template(str(mcq_template), pdf_excerpt) raw_mcq_json = self._real_generate_text_from_pdf( pdf_path, quiz_prompt, max_tokens=QWEN_VL_MCQ_MAX_NEW_TOKENS, temperature=0.2, top_p=0.9, ) try: mcqs = parse_mcq_json(raw_mcq_json) except json.JSONDecodeError: retry_prompt = render_prompt_template(str(mcq_retry_template), pdf_excerpt) retry_raw = self._real_generate_text_from_pdf( pdf_path, retry_prompt, max_tokens=QWEN_VL_MCQ_MAX_NEW_TOKENS, temperature=0.2, top_p=0.9, ) mcqs = parse_mcq_json(retry_raw) return { "lecture_text": lecture_text, "mcqs": [asdict(q) for q in mcqs], "pdf_excerpt": pdf_excerpt, } @spaces.GPU def build_lecture(self, pdf_path: str, character_cfg: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: self.ensure_vl_loaded() pdf_excerpt = extract_pdf_text(pdf_path) cfg = character_cfg or get_character_config(None) lecture_template = cfg.get("lecture_prompt_template", DEFAULT_LECTURE_PROMPT_TEMPLATE) if self.mock_mode: lecture_text = self._mock_generate_lecture(pdf_excerpt) else: lecture_prompt = render_prompt_template( str(lecture_template), pdf_excerpt, replacements={"style_seed": uuid.uuid4().hex}, ) lecture_text = self._real_generate_text_from_pdf( pdf_path, lecture_prompt, max_tokens=QWEN_VL_MAX_NEW_TOKENS, temperature=0.9, top_p=0.95, ) return { "lecture_text": lecture_text, "pdf_excerpt": pdf_excerpt, } @spaces.GPU def build_mcqs(self, pdf_path: str, pdf_excerpt: str, character_cfg: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: self.ensure_vl_loaded() cfg = character_cfg or get_character_config(None) mcq_template = cfg.get("mcq_prompt_template", DEFAULT_MCQ_PROMPT_TEMPLATE) mcq_retry_template = cfg.get("mcq_retry_prompt_template", DEFAULT_MCQ_RETRY_PROMPT_TEMPLATE) if self.mock_mode: mcqs = self._mock_generate_mcqs(pdf_excerpt) return rebalance_mcq_answers([asdict(q) for q in mcqs]) quiz_prompt = render_prompt_template(str(mcq_template), pdf_excerpt) raw_mcq_json = self._real_generate_text_from_pdf( pdf_path, quiz_prompt, max_tokens=QWEN_VL_MCQ_MAX_NEW_TOKENS, temperature=0.2, top_p=0.9, ) try: mcqs = parse_mcq_json(raw_mcq_json) except (json.JSONDecodeError, ValueError): retry_prompt = render_prompt_template(str(mcq_retry_template), pdf_excerpt) retry_raw = self._real_generate_text_from_pdf( pdf_path, retry_prompt, max_tokens=QWEN_VL_MCQ_MAX_NEW_TOKENS, temperature=0.2, top_p=0.9, ) mcqs = parse_mcq_json(retry_raw) return rebalance_mcq_answers([asdict(q) for q in mcqs]) @spaces.GPU def synthesize_tts(self, text: str, name_prefix: str = "audio", *, voice: Optional[str] = None) -> str: self.ensure_tts_loaded() out_path = str(TMP_DIR / f"{name_prefix}_{uuid.uuid4().hex}.wav") if USE_MOCK_TTS: return write_tone_wav(text, out_path) return self._real_tts(text, out_path, voice=voice) def parse_mcq_json(raw: str) -> List[MCQItem]: def _normalize_answer_label(answer_raw: Any, options: List[str]) -> str: s = str(answer_raw or "").strip() if not s: return "" up = s.upper() if up in {"A", "B", "C", "D"}: return up m = re.search(r"\b([ABCD])\b", up) if m: return m.group(1) if up.startswith("OPTION "): tail = up.replace("OPTION ", "", 1).strip() if tail in {"A", "B", "C", "D"}: return tail normalized_answer_text = normalize_option_text(s).strip().lower() if normalized_answer_text: for i, opt in enumerate(options[:4]): if normalized_answer_text == normalize_option_text(opt).strip().lower(): return ["A", "B", "C", "D"][i] return "" cleaned = strip_code_fence(raw) try: payload = json.loads(cleaned) except json.JSONDecodeError: start = cleaned.find("{") end = cleaned.rfind("}") if start != -1 and end != -1 and end > start: payload = json.loads(cleaned[start:end + 1]) else: raise if isinstance(payload, list): questions = payload else: questions = payload.get("questions", []) or payload.get("items", []) or payload.get("data", []) parsed: List[MCQItem] = [] for item in questions[:5]: if not isinstance(item, dict): continue q = str(item.get("question", "")).strip() options_raw = item.get("options", []) if not isinstance(options_raw, list): options_raw = item.get("choices", []) if isinstance(item.get("choices", []), list) else [] options = [normalize_option_text(x) for x in options_raw][:4] explanation = str( item.get("explanation", "") or item.get("rationale", "") or item.get("reason", "") ).strip() answer = _normalize_answer_label( item.get("answer", "") or item.get("correct_answer", "") or item.get("correctOption", "") or item.get("correct", ""), options, ) if not answer: idx_value = item.get("answer_index", item.get("correct_index", None)) try: idx = int(idx_value) if 0 <= idx < 4: answer = ["A", "B", "C", "D"][idx] except Exception: pass if len(options) != 4: continue if answer not in {"A", "B", "C", "D"}: continue if not q or not explanation: continue parsed.append(MCQItem(question=q, options=options, answer=answer, explanation=explanation)) if len(parsed) != 5: raise ValueError(f"Expected 5 MCQs, got {len(parsed)}") return parsed def rebalance_mcq_answers(mcqs: List[Dict[str, Any]]) -> List[Dict[str, Any]]: labels = ["A", "B", "C", "D"] n = min(5, len(mcqs)) rng = random.Random(uuid.uuid4().int) targets = labels[:] rng.shuffle(targets) while len(targets) < n: targets.append(rng.choice(labels)) out: List[Dict[str, Any]] = [] for i, q in enumerate(mcqs[:n]): opts = list(q.get("options", []) or []) ans = str(q.get("answer", "")).strip().upper() if len(opts) != 4 or ans not in {"A", "B", "C", "D"}: out.append(q) continue correct_idx = labels.index(ans) correct_opt = opts[correct_idx] distractors = [opts[j] for j in range(4) if j != correct_idx] target_idx = labels.index(targets[i]) new_opts: List[str] = [] d_i = 0 for j in range(4): if j == target_idx: new_opts.append(correct_opt) else: new_opts.append(distractors[d_i]) d_i += 1 q2 = dict(q) q2["options"] = new_opts q2["answer"] = labels[target_idx] out.append(q2) return out engine = QwenPipelineEngine() def get_current_mcq(state: Dict[str, Any]) -> Optional[Dict[str, Any]]: idx = state.get("current_index", 0) mcqs = state.get("mcqs", []) if not mcqs or idx < 0 or idx >= len(mcqs): return None return mcqs[idx] def format_question_block(state: Dict[str, Any]) -> str: mcq = get_current_mcq(state) if mcq is None: if state.get("completed"): total = len(state.get("mcqs", [])) return f"### Quiz Completed\nScore: {state.get('score', 0)} / {total}" return "### No question loaded" qn = state["current_index"] + 1 total = len(state["mcqs"]) return f"### Question {qn}/{total}\n\n{mcq['question']}" def current_choices(state: Dict[str, Any]) -> List[str]: mcq = get_current_mcq(state) if mcq is None: return [] labels = ["A", "B", "C", "D"] return [f"{labels[i]}. {normalize_option_text(opt)}" for i, opt in enumerate(mcq["options"])] def score_text(state: Dict[str, Any]) -> str: total = len(state.get("mcqs", [])) return f"Score: {state.get('score', 0)} / {total}" def _exam_chat_text_for_question(state: Dict[str, Any], mcq: Dict[str, Any]) -> str: qn = state.get("current_index", 0) + 1 total = len(state.get("mcqs", [])) labels = ["A", "B", "C", "D"] options = mcq.get("options", []) lines = [f"Question {qn}/{total}", str(mcq.get("question", "")).strip(), ""] for i in range(min(4, len(options))): lines.append(f"{labels[i]}. {normalize_option_text(options[i])}") return "\n".join([x for x in lines if x is not None]).strip() def _ensure_current_question_in_exam_chat(state: Dict[str, Any]) -> None: if not state.get("mcqs") or state.get("completed"): return chat: List[Dict[str, Any]] = state.setdefault("exam_chat", []) q_index = int(state.get("current_index", 0)) for msg in reversed(chat): if msg.get("kind") == "mcq": if int(msg.get("q_index", -1)) == q_index: return break mcq = get_current_mcq(state) if mcq is None: return chat.append({"role": "assistant", "kind": "mcq", "q_index": q_index, "text": _exam_chat_text_for_question(state, mcq)}) def _append_exam_user_answer(state: Dict[str, Any], choice: str) -> None: chat: List[Dict[str, Any]] = state.setdefault("exam_chat", []) q_index = int(state.get("current_index", 0)) display = choice if "." in choice: _, rest = choice.split(".", 1) if rest.strip(): display = rest.strip() chat.append({"role": "user", "kind": "answer", "q_index": q_index, "text": display}) def _append_exam_assistant_text(state: Dict[str, Any], text: str, *, kind: str = "note") -> None: chat: List[Dict[str, Any]] = state.setdefault("exam_chat", []) q_index = int(state.get("current_index", 0)) chat.append({"role": "assistant", "kind": kind, "q_index": q_index, "text": text}) def _score_band(score: int, total: int) -> str: if total <= 0: return "none" ratio = score / total if ratio >= 0.9: return "excellent" if ratio >= 0.7: return "good" if ratio >= 0.5: return "fair" return "poor" def _pick_variant(items: List[str], seed: int) -> str: if not items: return "" return items[seed % len(items)] def _character_feedback_style_from_mcq_prompt(character_id: str) -> str: cfg = get_character_config(character_id) prompt_text = str(cfg.get("mcq_prompt_template", "") or "") if not prompt_text.strip(): return "" role_line = "" tone_line = "" in_tone_block = False for raw in prompt_text.splitlines(): line = raw.strip() if not line: continue lower = line.lower() if not role_line and lower.startswith("you are "): role_line = line continue if lower.startswith("tone:"): in_tone_block = True continue if in_tone_block: # Stop tone parsing when another section starts. if line.endswith(":"): in_tone_block = False continue tone_line = line in_tone_block = False style_parts: List[str] = [] if role_line: style_parts.append(role_line.rstrip(".")) if tone_line: style_parts.append(f"Tone: {tone_line}") return " ".join(style_parts).strip() def _examiner_style_prompt(character_id: str) -> str: cfg = get_character_config(character_id) feedback_prompt = str(cfg.get("feedback_prompt_template", "") or "").strip() if feedback_prompt: return feedback_prompt character_style = _character_feedback_style_from_mcq_prompt(character_id) if character_style: return ( f"{character_style}. " "You are giving live exam feedback after each answer. " "Respond in concise English, in-character, practical, and pointed. " "No markdown, no emojis, no stage directions." ) return ( "You are an examiner giving live feedback after each answer. " "Respond in concise English and focus on the student's performance. " "No markdown, no emojis." ) def _llm_exam_feedback(messages: List[Dict[str, Any]], *, max_tokens: int = 120) -> str: engine.ensure_vl_loaded() return engine._chat_completions(messages, max_tokens=max_tokens, temperature=0.9, top_p=0.95) def _llm_short_exam_remark(character_id: str, *, kind: str, context: str = "") -> str: if engine.mock_mode: return "" ctx = " ".join(str(context or "").strip().split()) if kind == "correct": instruction = f"Write ONE short English sentence for a correct answer. Context: {ctx}. Max 16 words. No markdown. No emojis." elif kind == "incorrect": instruction = f"Write ONE short English sentence for an incorrect answer without giving the option letter. Context: {ctx}. Max 20 words. No markdown. No emojis." else: instruction = f"Write 1-2 short English final remarks with one concrete revision suggestion. Context: {ctx}. Max 28 words total. No markdown. No emojis." text = _llm_exam_feedback( [ {"role": "system", "content": _examiner_style_prompt(character_id)}, {"role": "user", "content": instruction}, ], max_tokens=80 if kind in {"correct", "incorrect"} else 120, ) return " ".join(str(text or "").strip().split()) def exam_feedback_correct(character_id: str, *, q_index: int) -> str: if engine.mock_mode: cid = (character_id or "").lower() if "snape" in cid: return _pick_variant( [ "Correct. Keep going.", "Right answer. Stay focused.", "Good. Next question.", "Exactly. Keep your pace.", ], q_index, ) if "mcgonagall" in cid or "mcg" in cid: return _pick_variant( [ "That's correct. Keep it up.", "Good work. Move on.", "Well done. Stay consistent.", "Precisely. Continue.", ], q_index, ) return "That's right." try: remark = _llm_short_exam_remark( character_id, kind="correct", context=f"Question {q_index + 1} answered correctly.", ) if remark: return remark except Exception: pass return "That's right." def exam_feedback_incorrect( character_id: str, *, q_index: int, correct_choice_display: str, explanation: str, ) -> str: explanation = normalize_explanation_text(explanation) if engine.mock_mode: cid = (character_id or "").lower() if "snape" in cid: opener = _pick_variant( [ "Wrong. Read more carefully.", "Incorrect. Check the prompt details.", "Not correct. Your reading is too loose.", "Incorrect. Be more rigorous.", ], q_index, ) return f"{opener}\nThe correct answer is {correct_choice_display}\n\n{explanation}" if "mcgonagall" in cid or "mcg" in cid: opener = _pick_variant( [ "Incorrect. Think first, then answer.", "Not quite. Slow down and read precisely.", "Wrong. Stop guessing.", "Incorrect. Focus on the method itself.", ], q_index, ) return f"{opener}\nThe correct answer is {correct_choice_display}\n\n{explanation}" return f"Incorrect.\nThe correct answer is {correct_choice_display}\n\n{explanation}" try: remark = _llm_short_exam_remark( character_id, kind="incorrect", context=f"Question {q_index + 1} answered incorrectly.", ) if remark: return f"{remark}\nThe correct answer is {correct_choice_display}\n\n{explanation}" except Exception: pass return f"Incorrect.\nThe correct answer is {correct_choice_display}\n\n{explanation}" def exam_feedback_final(character_id: str, *, score: int, total: int) -> str: if engine.mock_mode: cid = (character_id or "").lower() band = _score_band(score, total) if "snape" in cid: mapping = { "excellent": "Excellent performance this time.", "good": "Good. Keep polishing details.", "fair": "Fair. More practice is needed.", "poor": "Poor. Review the lecture and retry.", "none": "No score available yet.", } return mapping.get(band, "Quiz finished.") if "mcgonagall" in cid or "mcg" in cid: mapping = { "excellent": "Excellent. Keep this standard.", "good": "Good understanding. Improve the details.", "fair": "Passable, but not stable yet.", "poor": "Not acceptable. Review and try again.", "none": "No score available yet.", } return mapping.get(band, "Quiz finished.") return f"Final score: {score} / {total}." try: remark = _llm_short_exam_remark( character_id, kind="final", context=f"Final score: {score} / {total}.", ) if remark: return remark except Exception: pass return f"Final score: {score} / {total}." def _roleplay_explain_feedback(character_id: str) -> str: cid = (character_id or "").lower() if "snape" in cid: return "Lecture is ready. Select a chunk to play, then go to the exam." if "mcgonagall" in cid or "mcg" in cid: return "Lecture is ready. Review it carefully, then enter the exam." return "Lecture is ready. Review it, then enter the exam." def _roleplay_loading_text(character_id: str, *, phase: str) -> str: cfg = get_character_config(character_id) name = str(cfg.get("display_name", "Professor")) cid = (character_id or "").lower() if phase == "lecture": if "snape" in cid: return f"Professor {name} is scrutinizing your paper…" if "mcgonagall" in cid or "mcg" in cid: return f"Professor {name} is reviewing your paper with strict precision…" return f"Professor {name} is reviewing your paper…" if "snape" in cid: return f"Professor {name} is preparing a rigorous exam…" if "mcgonagall" in cid or "mcg" in cid: return f"Professor {name} is preparing challenging questions…" return f"Professor {name} is preparing your exam materials…" def build_loading_html(text: str) -> str: safe = html.escape(str(text or ""), quote=False) if not safe: return "" return f"""
{safe}
""".strip() def _build_exam_chat_avatar_html(character_id: Optional[str]) -> str: cfg = get_character_config(character_id) avatar_url = _image_data_url(Path(cfg.get("avatar_path", ""))) if cfg.get("avatar_path") else "" return f'avatar' if avatar_url else "" def build_exam_chat_html(state: Dict[str, Any]) -> str: chat: List[Dict[str, Any]] = state.get("exam_chat", []) or [] if not chat and state.get("mcqs") and not state.get("completed"): mcq = get_current_mcq(state) if mcq is not None: chat = [{"role": "assistant", "kind": "mcq", "q_index": int(state.get("current_index", 0)), "text": _exam_chat_text_for_question(state, mcq)}] character_id = state.get("exam_character_id") or DEFAULT_CHARACTER_ID avatar_html = _build_exam_chat_avatar_html(character_id) parts: List[str] = ['
'] for msg in chat: role = msg.get("role", "assistant") safe = html.escape(str(msg.get("text", "")), quote=False).replace("\n", "
") if role == "user": parts.append(f'
{safe}
') else: parts.append(f'
{avatar_html}
{safe}
') parts.append("
") return "".join(parts) def reset_ui_from_state( state: Dict[str, Any], feedback: str = "", *, results_visible: bool = True, loading_visible: bool = False, loading_text: str = "", exam_picker_visible: bool = False, ): quiz_ready = bool(state.get("mcqs")) current_page = state.get("current_page", "explain") explain_character_id = state.get("character_id") or DEFAULT_CHARACTER_ID exam_character_id = state.get("exam_character_id") or explain_character_id top_character_id = exam_character_id if current_page == "exam" else explain_character_id top_picker_value = top_character_id show_explain_page = results_visible and current_page != "exam" show_exam_page = results_visible and current_page == "exam" submit_interactive = quiz_ready and not state.get("completed", False) radio_interactive = submit_interactive lecture_tts_ready = bool(state.get("lecture_text")) picker_choices = paragraph_picker_choices(state.get("lecture_text", "")) selected_paragraph_value = paragraph_picker_value_for_idx( state.get("lecture_text", ""), str(state.get("selected_paragraph_idx", "")).strip(), ) if selected_paragraph_value is None and picker_choices: selected_paragraph_value = picker_choices[0][1] if state.get("completed"): radio_interactive = False return ( state, build_character_header_html(top_character_id), gr.update(value=top_picker_value), build_chat_avatar_html(top_character_id), build_chat_meta_html(top_character_id), gr.update(value=build_loading_html(loading_text), visible=loading_visible), gr.update(visible=show_explain_page), gr.update(visible=show_exam_page), state.get("status", "Idle"), build_clickable_lecture_html(state.get("lecture_text", ""), str(state.get("selected_paragraph_idx", ""))), gr.update( choices=picker_choices, value=selected_paragraph_value, interactive=lecture_tts_ready, visible=lecture_tts_ready, ), state.get("lecture_audio_path", None), gr.update(interactive=lecture_tts_ready), gr.update(visible=lecture_tts_ready, interactive=lecture_tts_ready), gr.update(visible=lecture_tts_ready, interactive=lecture_tts_ready), gr.update(visible=exam_picker_visible), gr.update(value=build_exam_chat_html(state), visible=show_exam_page and (quiz_ready or bool(state.get("exam_chat")))), gr.update(choices=current_choices(state), value=None, interactive=radio_interactive), score_text(state), feedback, gr.update(interactive=submit_interactive), gr.update(interactive=quiz_ready), ) def process_pdf(pdf_file: Optional[str], character_id: str, state: Dict[str, Any]): state = new_session_state() state["character_id"] = character_id or DEFAULT_CHARACTER_ID if not pdf_file: state["status"] = "Please upload a PDF first." yield reset_ui_from_state(state, feedback="Upload a PDF to start.", results_visible=False, loading_visible=False) return state["status"] = "Generating..." yield reset_ui_from_state( state, feedback="Reading the paper and generating lecture/quiz content...", results_visible=False, loading_visible=True, loading_text=_roleplay_loading_text(state.get("character_id") or DEFAULT_CHARACTER_ID, phase="lecture"), ) try: result = engine.build_lecture(pdf_file, get_character_config(state["character_id"])) lecture_text = result["lecture_text"] pdf_excerpt = result["pdf_excerpt"] state["lecture_text"] = lecture_text state["lecture_audio_path"] = None state["selected_paragraph_idx"] = "" state["explanation_audio_path"] = None state["last_explanation_tts_text"] = "" state["pdf_path"] = pdf_file state["pdf_excerpt"] = pdf_excerpt state["current_page"] = "explain" state["mcqs"] = [] state["current_index"] = 0 state["score"] = 0 state["awaiting_next_after_wrong"] = False state["completed"] = False state["status"] = "Lecture generated." yield reset_ui_from_state( state, feedback=_roleplay_explain_feedback(state.get("character_id") or DEFAULT_CHARACTER_ID), results_visible=True, loading_visible=False, ) except Exception as exc: state["status"] = "Generation failed." state["lecture_text"] = f"Error: {type(exc).__name__}: {exc}" state["current_page"] = "explain" yield reset_ui_from_state( state, feedback=f"Error: {type(exc).__name__}: {exc}", results_visible=True, loading_visible=False, ) def submit_answer(choice: Optional[str], state: Dict[str, Any]): if not state.get("mcqs"): state["status"] = "No quiz loaded." return reset_ui_from_state(state, feedback="Upload a PDF and generate lecture first.") if state.get("completed"): return reset_ui_from_state(state, feedback="Quiz already completed.") if not choice: return reset_ui_from_state(state, feedback="Please select an option.") mcq = get_current_mcq(state) if mcq is None: state["status"] = "No current question." return reset_ui_from_state(state, feedback="No current question.") _ensure_current_question_in_exam_chat(state) _append_exam_user_answer(state, choice) selected_label = choice.split(".", 1)[0].strip().upper() correct_label = str(mcq["answer"]).upper() exam_character_id = state.get("exam_character_id") or state.get("character_id") or DEFAULT_CHARACTER_ID q_index = int(state.get("current_index", 0)) if selected_label == correct_label: state["score"] += 1 state["last_explanation_tts_text"] = "" state["explanation_audio_path"] = None state["awaiting_next_after_wrong"] = False correct_text = exam_feedback_correct(str(exam_character_id), q_index=q_index) state["status"] = correct_text if state["current_index"] >= len(state["mcqs"]) - 1: state["completed"] = True state["status"] = "Quiz completed." total = len(state.get("mcqs") or []) final_comment = exam_feedback_final(str(exam_character_id), score=int(state.get("score", 0)), total=total) _append_exam_assistant_text( state, f"Quiz finished.\nFinal score: {state['score']} / {len(state['mcqs'])}.\n{final_comment}", kind="summary", ) return reset_ui_from_state( state, feedback="", ) _append_exam_assistant_text(state, correct_text, kind="result") state["current_index"] += 1 _ensure_current_question_in_exam_chat(state) return reset_ui_from_state(state, feedback="") correct_idx = ["A", "B", "C", "D"].index(correct_label) correct_choice_display = f"{correct_label}. {mcq['options'][correct_idx]}" explanation = normalize_explanation_text(mcq.get("explanation", "")) state["last_explanation_tts_text"] = "" state["explanation_audio_path"] = None state["awaiting_next_after_wrong"] = False incorrect_text = exam_feedback_incorrect( str(exam_character_id), q_index=q_index, correct_choice_display=str(correct_choice_display), explanation=str(explanation or "").strip(), ) state["status"] = incorrect_text.splitlines()[0] if incorrect_text else "Incorrect." _append_exam_assistant_text(state, incorrect_text or "Incorrect.", kind="explanation" if explanation else "result") if state["current_index"] >= len(state["mcqs"]) - 1: state["completed"] = True state["status"] = "Quiz completed." total = len(state.get("mcqs") or []) final_comment = exam_feedback_final(str(exam_character_id), score=int(state.get("score", 0)), total=total) _append_exam_assistant_text( state, f"Quiz finished.\nFinal score: {state['score']} / {len(state['mcqs'])}.\n{final_comment}", kind="summary", ) return reset_ui_from_state(state, feedback="") state["current_index"] += 1 _ensure_current_question_in_exam_chat(state) return reset_ui_from_state(state, feedback="") def restart_quiz(state: Dict[str, Any]): if not state.get("mcqs"): return reset_ui_from_state(new_session_state(), feedback="Upload a PDF and generate lecture first.") state["current_index"] = 0 state["score"] = 0 state["awaiting_next_after_wrong"] = False state["completed"] = False state["last_explanation_tts_text"] = "" state["explanation_audio_path"] = None state["exam_chat"] = [] _ensure_current_question_in_exam_chat(state) state["status"] = "Quiz restarted." return reset_ui_from_state(state, feedback="Quiz restarted.") def open_exam_picker(state: Dict[str, Any]): if not state.get("lecture_text"): state["status"] = "No lecture loaded." return reset_ui_from_state(state, feedback="Generate lecture first.", results_visible=False, loading_visible=False) state["status"] = "Choose an examiner." state["current_page"] = "explain" return reset_ui_from_state(state, feedback="", results_visible=True, loading_visible=False, exam_picker_visible=True) def close_exam_picker(state: Dict[str, Any]): return reset_ui_from_state(state, feedback="") def start_exam_mcgonagall(state: Dict[str, Any]): yield from generate_exam_mcq("Mcgonagall", state) def start_exam_snape(state: Dict[str, Any]): yield from generate_exam_mcq("snape", state) def start_exam(state: Dict[str, Any]): if not state.get("lecture_text"): state["status"] = "No lecture loaded." yield reset_ui_from_state(state, feedback="Generate lecture first.", results_visible=False, loading_visible=False) return character_id = state.get("character_id") or DEFAULT_CHARACTER_ID yield from generate_exam_mcq(character_id, state) def generate_exam_mcq(selected_character_id: Optional[str], state: Dict[str, Any]): if not state.get("lecture_text"): state["status"] = "No lecture loaded." yield reset_ui_from_state(state, feedback="Generate lecture first.", results_visible=False, loading_visible=False) return if not selected_character_id: state["status"] = "Please choose an examiner." yield reset_ui_from_state(state, feedback="", results_visible=True, loading_visible=False) return state["current_page"] = "exam" state["exam_character_id"] = selected_character_id cfg = get_character_config(selected_character_id) display_name = str(cfg.get("display_name", "Professor")) state["status"] = f"{display_name} is preparing your exam..." state["mcq_generating"] = True state["last_explanation_tts_text"] = "" state["explanation_audio_path"] = None state["mcqs"] = [] state["exam_chat"] = [] yield reset_ui_from_state( state, feedback="", results_visible=True, loading_visible=True, loading_text=_roleplay_loading_text(selected_character_id, phase="exam"), ) try: pdf_path = state.get("pdf_path") pdf_excerpt = state.get("pdf_excerpt", "") if not pdf_path: raise RuntimeError("PDF path missing in session state.") mcqs = engine.build_mcqs(pdf_path, pdf_excerpt, get_character_config(selected_character_id)) state["mcqs"] = mcqs state["current_index"] = 0 state["score"] = 0 state["awaiting_next_after_wrong"] = False state["completed"] = False state["current_page"] = "exam" state["mcq_generating"] = False _ensure_current_question_in_exam_chat(state) state["status"] = "Exam prepared." yield reset_ui_from_state( state, feedback="", results_visible=True, loading_visible=False, ) except Exception as exc: state["current_page"] = "exam" state["mcq_generating"] = False state["status"] = "Exam generation failed." _append_exam_assistant_text( state, f"Failed to generate exam.\nError: {type(exc).__name__}: {exc}", kind="note", ) yield reset_ui_from_state( state, feedback="", results_visible=True, loading_visible=False, ) def on_generate_click(pdf_file: Optional[str], explain_character_id: str, state: Dict[str, Any]): yield from process_pdf(pdf_file, explain_character_id, state) def go_to_explain_page(state: Dict[str, Any]): state["current_page"] = "explain" return reset_ui_from_state(state, feedback=state.get("status", "Explain page")) def on_character_change(character_id: str, state: Dict[str, Any]): cfg = get_character_config(character_id) if state.get("current_page") == "exam": state["exam_character_id"] = cfg["id"] loading_on = bool(state.get("mcq_generating")) loading_text = _roleplay_loading_text(cfg["id"], phase="exam") if loading_on else "" return ( state, build_character_header_html(cfg["id"]), build_chat_avatar_html(cfg["id"]), build_chat_meta_html(cfg["id"]), gr.update(visible=False), gr.update(visible=True), gr.update(value=build_loading_html(loading_text), visible=loading_on), state.get("status", "Exam"), ) state["character_id"] = cfg["id"] state["current_page"] = "explain" state["lecture_audio_path"] = None state["selected_paragraph_idx"] = "" state["explanation_audio_path"] = None state["last_explanation_tts_text"] = "" # Keep generated content if user wants to compare, but hide result pages until next generate. return ( state, build_character_header_html(cfg["id"]), build_chat_avatar_html(cfg["id"]), build_chat_meta_html(cfg["id"]), gr.update(visible=False), gr.update(visible=False), gr.update(value="", visible=False), "Character switched. Upload PDF and click Generate.", ) def tts_voice_for_character(character_id: Optional[str]) -> str: cid = (character_id or "").lower() if "mcgonagall" in cid or cid == "mcg": return "female" if "snape" in cid: return "male" return HF_TTS_VOICE def play_lecture_audio(state: Dict[str, Any]): if not state.get("lecture_text"): state["status"] = "No lecture text available." return ( state, state["status"], state.get("lecture_audio_path"), "Generate lecture first.", build_clickable_lecture_html(state.get("lecture_text", ""), str(state.get("selected_paragraph_idx", ""))), ) backend = _tts_backend_name() voice = tts_voice_for_character(state.get("character_id")) try: state["status"] = f"Generating full lecture audio ({backend})..." state["lecture_audio_path"] = engine.synthesize_tts(state["lecture_text"], name_prefix="lecture", voice=voice) state["status"] = "Full lecture audio ready." return ( state, state["status"], state["lecture_audio_path"], f"Full lecture audio generated via `{backend}`.", build_clickable_lecture_html(state.get("lecture_text", ""), str(state.get("selected_paragraph_idx", ""))), ) except Exception as exc: state["status"] = "Full lecture audio generation failed." return ( state, state["status"], state.get("lecture_audio_path"), f"TTS error via `{backend}`: {type(exc).__name__}: {exc}", build_clickable_lecture_html(state.get("lecture_text", ""), str(state.get("selected_paragraph_idx", ""))), ) def split_lecture_paragraphs(text: str) -> List[str]: s = str(text or "").replace("\r\n", "\n").strip() if not s: return [] pieces = re.split(r"\n\s*\n+", s) paragraphs = [p.strip() for p in pieces if p and p.strip()] # If the model outputs a single giant paragraph, fall back to sentence-pair chunks # so the selector always has usable granularity for TTS. if len(paragraphs) <= 1: fallback_chunks = split_text_every_two_sentences(s, max_len=420) if len(fallback_chunks) > 1: return [c.strip() for c in fallback_chunks if c and c.strip()] return paragraphs def paragraph_picker_choices(lecture_text: str) -> List[tuple[str, str]]: paragraphs = split_lecture_paragraphs(lecture_text) choices: List[tuple[str, str]] = [] for i, p in enumerate(paragraphs): preview = re.sub(r"\s+", " ", str(p or "")).strip() if len(preview) > 110: preview = preview[:107].rstrip() + "..." choices.append((f"Chunk {i + 1}: {preview}", str(i))) return choices def paragraph_picker_idx_from_value(value: Any) -> str: s = str(value or "").strip() if not s: return "" if s.isdigit(): return s m = re.match(r"^\s*(\d+)\s*[\.、::-]", s) if not m: return "" return str(max(0, int(m.group(1)) - 1)) def paragraph_picker_value_for_idx(lecture_text: str, idx: str) -> Optional[str]: try: i = int(str(idx or "").strip()) except Exception: return None paragraphs = split_lecture_paragraphs(lecture_text) if i < 0 or i >= len(paragraphs): return None return str(i) def build_clickable_lecture_html(lecture_text: str, selected_idx: str = "") -> str: paragraphs = split_lecture_paragraphs(lecture_text) if not paragraphs: return '
Generated lecture explanation will appear here...
' selected = str(selected_idx or "").strip() parts: List[str] = ['
'] for i, p in enumerate(paragraphs): safe = html.escape(p, quote=False).replace("\n", "
") selected_cls = " is-selected" if selected and selected == str(i) else "" selected_style = ( "background: #f97316 !important; " "border-color: #f97316 !important; " "box-shadow: 0 0 0 1px rgba(255,255,255,0.16) inset !important; " "color: #ffffff !important;" if selected_cls else "" ) parts.append( f'
' f'
{safe}
' f'
' ) parts.append("
") return "".join(parts) def play_lecture_paragraph_audio(paragraph_idx: str, state: Dict[str, Any]): lecture_text = state.get("lecture_text", "") paragraphs = split_lecture_paragraphs(str(lecture_text or "")) if not paragraphs: state["status"] = "No lecture content available." return ( state, state.get("status", "Idle"), state.get("lecture_audio_path"), "Generate lecture first.", build_clickable_lecture_html(state.get("lecture_text", ""), str(state.get("selected_paragraph_idx", ""))), ) try: idx = int(str(paragraph_idx or "").strip()) except Exception: idx = -1 if idx < 0 or idx >= len(paragraphs): state["status"] = "Invalid chunk selection." return ( state, state.get("status", "Idle"), state.get("lecture_audio_path"), "Please select a valid chunk.", build_clickable_lecture_html(state.get("lecture_text", ""), str(state.get("selected_paragraph_idx", ""))), ) backend = _tts_backend_name() voice = tts_voice_for_character(state.get("character_id")) try: state["selected_paragraph_idx"] = str(idx) state["status"] = f"Generating chunk audio ({backend})..." audio_path = engine.synthesize_tts( paragraphs[idx], name_prefix=f"lecture_p{idx+1}", voice=voice, ) state["lecture_audio_path"] = audio_path state["status"] = "Chunk audio ready." char_len = len(paragraphs[idx]) return ( state, state["status"], audio_path, f"Generated chunk {idx+1}/{len(paragraphs)} ({char_len} chars). You can play it below.", build_clickable_lecture_html(state.get("lecture_text", ""), str(state.get("selected_paragraph_idx", ""))), ) except Exception as exc: state["status"] = "Chunk audio generation failed." return ( state, state["status"], state.get("lecture_audio_path"), f"TTS error via `{backend}`: {type(exc).__name__}: {exc}", build_clickable_lecture_html(state.get("lecture_text", ""), str(state.get("selected_paragraph_idx", ""))), ) def play_explanation_audio(state: Dict[str, Any]): text = state.get("last_explanation_tts_text", "") if not text: state["status"] = "No explanation available for audio." return state, state["status"], state.get("explanation_audio_path"), "Answer a question first." voice = tts_voice_for_character(state.get("exam_character_id") or state.get("character_id")) try: state["status"] = "Generating explanation audio..." state["explanation_audio_path"] = engine.synthesize_tts(text, name_prefix="explanation", voice=voice) state["status"] = "Explanation audio ready." return state, state["status"], state["explanation_audio_path"], "Explanation audio generated." except Exception as exc: state["status"] = "Explanation audio generation failed." return state, state["status"], state.get("explanation_audio_path"), f"TTS error: {type(exc).__name__}: {exc}" def on_play_lecture_audio_click(state: Dict[str, Any]): state, status, audio_path, feedback, lecture_html = play_lecture_audio(state) lecture_text = state.get("lecture_text", "") picker_choices = paragraph_picker_choices(lecture_text) selected_paragraph_value = paragraph_picker_value_for_idx( lecture_text, str(state.get("selected_paragraph_idx", "")).strip(), ) if selected_paragraph_value is None and picker_choices: selected_paragraph_value = picker_choices[0][1] lecture_tts_ready = bool(lecture_text) return ( state, status, audio_path, feedback, lecture_html, gr.update( choices=picker_choices, value=selected_paragraph_value, interactive=lecture_tts_ready, visible=lecture_tts_ready, ), ) def on_play_paragraph_click(paragraph_idx: str, state: Dict[str, Any]): idx_value = paragraph_picker_idx_from_value(paragraph_idx) state, status, audio_path, feedback, lecture_html = play_lecture_paragraph_audio(idx_value, state) lecture_text = state.get("lecture_text", "") picker_choices = paragraph_picker_choices(lecture_text) selected_paragraph_value = paragraph_picker_value_for_idx( lecture_text, str(state.get("selected_paragraph_idx", "")).strip(), ) if selected_paragraph_value is None and picker_choices: selected_paragraph_value = picker_choices[0][1] lecture_tts_ready = bool(lecture_text) return ( state, status, audio_path, feedback, lecture_html, gr.update( choices=picker_choices, value=selected_paragraph_value, interactive=lecture_tts_ready, visible=lecture_tts_ready, ), ) def build_css() -> str: bg_css = "" return f""" @import url('https://fonts.googleapis.com/css2?family=Instrument+Serif:ital@0;1&family=Inter:wght@400;500;600;700&display=swap'); html, body {{ height: 100%; min-height: 100%; }} body {{ background-color: #ffffff !important; color: #0f172a !important; font-family: "Inter", sans-serif !important; }} .app, #root, .gradio-container, .gradio-container > .main {{ min-height: 100%; background: transparent !important; }} .gradio-container {{ position: relative; z-index: 1; }} .gradio-container .block, .gradio-container .panel, .gradio-container .gr-box, .gradio-container .gr-form, .gradio-container .gr-group {{ background: rgba(14, 16, 24, 0.62) !important; backdrop-filter: blur(2px); border-color: rgba(255, 255, 255, 0.08) !important; }} .gradio-container textarea, .gradio-container input, .gradio-container .wrap, .gradio-container .svelte-1ipelgc {{ background-color: transparent !important; }} .gradio-container textarea, .gradio-container input {{ box-shadow: none !important; color: #eef1f6 !important; }} .gradio-container label, .gradio-container .prose, .gradio-container .prose p, .gradio-container .prose code, .gradio-container .prose strong {{ color: #eef1f6 !important; }} #page-shell {{ min-height: 100%; padding: 2rem 1.2rem 9rem 1.2rem; max-width: 980px; margin: 0 auto; }} #page-shell .hero {{ text-align: center; margin: 1.2rem 0 1.8rem 0; }} #page-shell .hero-title {{ margin: 0; color: #f4f6fb; letter-spacing: 0.01em; font-family: "Instrument Serif", Georgia, serif; font-weight: 400; font-size: clamp(2.05rem, 3vw, 2.75rem); text-shadow: 0 1px 8px rgba(0,0,0,0.35); }} #page-shell .hero-sub {{ margin: 0.65rem 0 0 0; color: rgba(241, 244, 251, 0.88); font-size: 0.98rem; }} #page-shell .hero-note {{ margin-top: 0.5rem; color: rgba(241, 244, 251, 0.72); font-size: 0.92rem; }} #character-card {{ background: transparent !important; border: none !important; box-shadow: none !important; }} .char-wrap {{ display: flex; flex-direction: column; align-items: center; gap: 0.45rem; margin-bottom: 0.8rem; }} .char-avatar {{ width: 84px; height: 84px; border-radius: 999px; object-fit: cover; border: 1px solid rgba(255,255,255,0.18); box-shadow: 0 8px 26px rgba(0,0,0,0.28); }} .char-name {{ color: #f6f7fb; font-weight: 600; font-size: 1.05rem; }} .char-tag {{ color: rgba(240,243,250,0.78); font-size: 0.95rem; }} .char-byline {{ color: rgba(240,243,250,0.58); font-size: 0.85rem; }} #character-select-wrap {{ background: transparent !important; border: none !important; box-shadow: none !important; margin: -0.1rem auto 0.8rem auto !important; max-width: 220px !important; min-width: 0 !important; padding: 0 !important; }} #page-shell .flat-select, #page-shell .flat-select > div, #page-shell .flat-select .block, #page-shell .flat-select .gradio-dropdown {{ background: transparent !important; border: none !important; box-shadow: none !important; padding: 0 !important; }} #character-select-wrap, #character-select-wrap > div, #character-select-wrap > div > div, #character-select-wrap .wrap, #character-select-wrap input, #character-select-wrap button {{ background: transparent !important; border: none !important; box-shadow: none !important; }} #character-select-wrap .wrap {{ justify-content: center; padding: 0 !important; min-height: 20px !important; }} #character-select-wrap input, #character-select-wrap [role="combobox"], #character-select-wrap [role="combobox"] {{ font-family: "Inter", sans-serif !important; font-size: 0.88rem !important; font-weight: 400 !important; color: rgba(240,243,250,0.78) !important; text-align: center !important; }} #character-select-wrap [role="combobox"] {{ min-height: 20px !important; padding: 0 !important; }} #character-select-wrap [role="listbox"], [data-testid="dropdown-menu"] {{ background: rgba(20, 22, 30, 0.96) !important; border: 1px solid rgba(255,255,255,0.12) !important; box-shadow: 0 12px 30px rgba(0,0,0,0.35) !important; z-index: 9999 !important; }} [data-testid="dropdown-menu"] * {{ color: #eef1f6 !important; }} #character-select-wrap svg, #character-select-wrap [data-icon] {{ opacity: 0.65 !important; color: rgba(240,243,250,0.78) !important; }} #character-select-wrap {{ display: flex !important; justify-content: center !important; }} #character-select-wrap .wrap {{ display: flex !important; gap: 0.35rem !important; flex-wrap: wrap !important; justify-content: center !important; align-items: center !important; }} #character-select-wrap label {{ background: transparent !important; border: 1px solid rgba(255,255,255,0.14) !important; border-radius: 999px !important; padding: 0 !important; min-height: 42px !important; height: 42px !important; display: inline-flex !important; align-items: center !important; justify-content: center !important; line-height: 1 !important; }} #character-select-wrap label span {{ color: rgba(240,243,250,0.78) !important; font-size: 0.88rem !important; display: inline-flex !important; align-items: center !important; justify-content: center !important; height: 100% !important; padding: 0 0.8rem !important; line-height: 1 !important; text-align: center !important; }} #character-select-wrap input[type="radio"] {{ display: none !important; }} #character-select-wrap label:has(input[type="radio"]:checked) {{ background: rgba(255,255,255,0.10) !important; border-color: rgba(255,255,255,0.22) !important; }} #character-select-wrap label:has(input[type="radio"]:checked) span {{ color: #ffffff !important; }} #gen-loading {{ text-align: center; padding: 14px 18px; margin: 0 0 12px 0; color: #f2f3f8; background: rgba(255,255,255,0.08); border: 1px solid rgba(255,255,255,0.12); border-radius: 12px; backdrop-filter: blur(3px); }} .gen-loading-inner {{ display: flex; flex-direction: column; align-items: center; gap: 10px; }} .loader {{ width: 120px; height: 20px; border-radius: 20px; background: linear-gradient(#f97316 0 0) 0/0% no-repeat #93c5fd; animation: l2 2s infinite steps(10); }} @keyframes l2 {{ 100% {{ background-size: 110%; }} }} .gradio-container [data-testid="progress-bar"], .gradio-container [data-testid="progress-bar"] *, .gradio-container .progress-bar, .gradio-container .progress-bar-container, .gradio-container .progress-bar-wrap, .gradio-container .top-progress, .gradio-container .progress {{ display: none !important; }} #results-panel {{ background: transparent !important; border: none !important; box-shadow: none !important; padding: 0 !important; gap: 0.75rem; }} #chat-row {{ background: transparent !important; border: none !important; box-shadow: none !important; align-items: flex-start !important; }} #chat-avatar-col {{ max-width: 54px !important; min-width: 54px !important; }} .mini-avatar {{ width: 34px; height: 34px; border-radius: 999px; object-fit: cover; border: 1px solid rgba(255,255,255,0.16); }} #chat-main {{ flex: 1; }} #chat-meta {{ margin: 0 0 0.45rem 0; color: rgba(245,247,252,0.95); font-size: 0.95rem; font-weight: 600; }} #chat-meta .pill {{ margin-left: 0.5rem; padding: 0.08rem 0.45rem; border-radius: 999px; background: rgba(255,255,255,0.1); color: rgba(255,255,255,0.78); font-size: 0.78rem; }} #lecture-wrap {{ background: rgba(33, 36, 46, 0.82) !important; border: 1px solid rgba(255,255,255,0.06) !important; border-radius: 20px !important; padding: 0.35rem 0.45rem !important; }} #lecture-wrap textarea, #lecture-wrap .prose {{ font-style: italic; line-height: 1.45 !important; color: rgba(244,246,251,0.95) !important; }} #lecture-clickable, #lecture-clickable .html-container, #lecture-clickable .html-container *, #lecture-clickable .lecture-clickable, #lecture-clickable .lecture-clickable * {{ pointer-events: auto !important; opacity: 1 !important; filter: none !important; }} #lecture-clickable .lecture-paragraph {{ cursor: default; pointer-events: auto !important; padding: 10px 12px; border-radius: 14px; margin: 0 0 10px 0; border: 1px solid rgba(255,255,255,0.08); background: rgba(255,255,255,0.04); font-style: italic; line-height: 1.45 !important; color: rgba(244,246,251,0.95) !important; }} #lecture-clickable .chunk-text {{ flex: 1 1 auto; min-width: 0; }} #lecture-clickable .lecture-paragraph:hover {{ background: rgba(255,255,255,0.08); border-color: rgba(255,255,255,0.14); }} #lecture-clickable .lecture-paragraph.is-selected {{ background: #f97316 !important; border-color: #f97316 !important; box-shadow: 0 0 0 1px rgba(255,255,255,0.16) inset !important; color: #ffffff !important; }} #lecture-clickable .lecture-paragraph[data-selected="1"] {{ background: #f97316 !important; border-color: #f97316 !important; box-shadow: 0 0 0 1px rgba(255,255,255,0.16) inset !important; color: #ffffff !important; }} #lecture-wrap [disabled], #lecture-wrap [aria-disabled="true"], #lecture-wrap .disabled, #lecture-wrap .pending, #lecture-wrap .loading, #lecture-wrap .generating {{ opacity: 1 !important; filter: none !important; }} .lecture-empty {{ padding: 10px 12px; color: rgba(244,246,251,0.72); font-style: italic; }} #tts-loading {{ margin: 8px 0 0 0; padding: 10px 12px; border-radius: 14px; border: 1px solid rgba(255,255,255,0.10); background: rgba(255,255,255,0.05); }} .tts-loading-row {{ display: flex; align-items: center; gap: 10px; }} .tts-loading-bar {{ width: 120px; height: 10px; border-radius: 999px; background: linear-gradient(#f97316 0 0) 0/0% no-repeat rgba(147, 197, 253, 0.55); animation: tts_loading 1.6s infinite steps(10); flex: 0 0 auto; }} .tts-loading-text {{ color: rgba(244,246,251,0.85); font-size: 0.92rem; }} #selected-paragraph, #play-paragraph-btn {{ display: none !important; }} #chunk-controls {{ margin-top: 8px !important; align-items: start !important; gap: 8px !important; overflow: visible !important; position: relative !important; z-index: 60 !important; }} #tts-wrap, #paragraph-picker, #paragraph-picker .wrap {{ overflow: visible !important; }} #paragraph-picker .wrap {{ max-height: 320px !important; overflow: auto !important; border: 1px solid rgba(255,255,255,0.10) !important; border-radius: 12px !important; padding: 8px !important; }} #paragraph-picker label {{ border: 1px solid rgba(255,255,255,0.08) !important; border-radius: 10px !important; padding: 8px 10px !important; margin-bottom: 6px !important; background: rgba(255,255,255,0.03) !important; }} #paragraph-picker label:hover {{ background: rgba(255,255,255,0.06) !important; border-color: rgba(255,255,255,0.14) !important; }} #paragraph-picker input[type="radio"]:checked + span {{ color: #f97316 !important; font-weight: 700 !important; }} #play-selected-chunk-btn button {{ min-height: 42px !important; height: 42px !important; border-radius: 999px !important; background: #f97316 !important; border-color: #f97316 !important; color: #ffffff !important; font-weight: 700 !important; font-size: 18px !important; line-height: 1 !important; padding: 0 14px !important; }} #play-selected-chunk-btn button:hover {{ background: #ea580c !important; border-color: #ea580c !important; }} #play-selected-chunk-btn button[disabled] {{ background: #f97316 !important; border-color: #f97316 !important; color: #ffffff !important; opacity: 0.75 !important; }} @keyframes tts_loading {{ 100% {{ background-size: 110%; }} }} #lecture-actions {{ background: transparent !important; border: none !important; box-shadow: none !important; margin-top: 0.35rem !important; }} #exam-entry-wrap {{ background: transparent !important; border: none !important; box-shadow: none !important; margin-top: 0.25rem !important; }} #bottom-composer {{ position: fixed; left: 50%; transform: translateX(-50%); bottom: 18px; width: min(860px, calc(100vw - 28px)); z-index: 40; background: rgba(24, 26, 34, 0.88); border: 1px solid rgba(255,255,255,0.08); border-radius: 999px; box-shadow: 0 16px 40px rgba(0,0,0,0.22); backdrop-filter: blur(10px); padding: 8px 10px; align-items: center !important; gap: 10px !important; }} #bottom-composer .wrap {{ border: none !important; }} #bottom-composer .block {{ background: transparent !important; border: none !important; box-shadow: none !important; }} #bottom-composer button {{ border-radius: 999px !important; }} #generate-btn button {{ min-height: 42px !important; height: 42px !important; padding: 0 18px !important; font-size: 0.9rem !important; line-height: 42px !important; min-width: 132px !important; display: inline-flex !important; align-items: center !important; justify-content: center !important; }} #generate-btn .wrap {{ min-height: 42px !important; display: flex !important; align-items: center !important; }} #pdf-uploader {{ min-height: 42px; }} #pdf-uploader .wrap {{ min-height: 42px !important; padding: 4px 10px !important; }} #pdf-uploader [data-testid="file-upload-dropzone"] {{ min-height: 42px !important; height: 42px !important; padding: 2px 8px !important; display: flex !important; align-items: center !important; justify-content: center !important; }} #pdf-uploader [data-testid="file-upload-dropzone"] * {{ font-size: 0.88rem !important; }} #status-wrap, #quiz-wrap, #tts-wrap, #explain-wrap {{ background: rgba(18, 20, 28, 0.58) !important; border-radius: 16px !important; }} #exam-page {{ background: transparent !important; border: none !important; box-shadow: none !important; padding: 0 !important; }} #exam-nav {{ background: transparent !important; border: none !important; box-shadow: none !important; justify-content: space-between; align-items: center; }} #exam-chat .exam-chat-wrap {{ width: 100%; display: flex; flex-direction: column; gap: 10px; padding: 0; border-radius: 0; background: transparent; border: none; max-height: 420px; overflow-y: auto; }} #exam-chat .exam-msg {{ display: flex; gap: 10px; align-items: flex-end; }} #exam-chat .exam-msg.user {{ justify-content: flex-end; }} #exam-chat .exam-msg.assistant {{ justify-content: flex-start; }} #exam-chat .exam-chat-avatar {{ width: 34px; height: 34px; border-radius: 999px; object-fit: cover; }} #exam-chat .bubble {{ max-width: 82%; padding: 10px 12px; border-radius: 14px; font-size: 0.95rem; line-height: 1.35; white-space: normal; }} #exam-chat .bubble.assistant {{ background: rgba(255, 255, 255, 0.10); border: 1px solid rgba(255, 255, 255, 0.14); color: rgba(255, 255, 255, 0.95); }} #exam-chat .bubble.user {{ background: rgba(59, 130, 246, 0.22); border: 1px solid rgba(59, 130, 246, 0.28); color: rgba(255, 255, 255, 0.95); }} @media (prefers-color-scheme: light) {{ body {{ background: linear-gradient(180deg, #f5f7fb 0%, #eef2f8 100%) !important; }} .gradio-container .block, .gradio-container .panel, .gradio-container .gr-box, .gradio-container .gr-form, .gradio-container .gr-group {{ background: rgba(255, 255, 255, 0.96) !important; border-color: rgba(15, 23, 42, 0.10) !important; }} .gradio-container textarea, .gradio-container input, .gradio-container label, .gradio-container .prose, .gradio-container .prose p, .gradio-container .prose code, .gradio-container .prose strong {{ color: #0f172a !important; }} .gradio-container .prose span, .gradio-container .prose em, .gradio-container .prose li, .gradio-container .prose a, .gradio-container .prose blockquote, .gradio-container .prose h1, .gradio-container .prose h2, .gradio-container .prose h3, .gradio-container .prose h4, .gradio-container .prose h5, .gradio-container .prose h6 {{ color: #0f172a !important; opacity: 1 !important; }} #lecture-wrap .prose, #lecture-wrap .prose * {{ color: #0f172a !important; opacity: 1 !important; }} #lecture-clickable .lecture-paragraph {{ background: rgba(15, 23, 42, 0.04); border-color: rgba(15, 23, 42, 0.10); color: #0f172a !important; }} #lecture-clickable .lecture-row {{ display: block; }} #lecture-clickable .lecture-paragraph:hover {{ background: rgba(15, 23, 42, 0.06); border-color: rgba(15, 23, 42, 0.16); }} #lecture-clickable .lecture-paragraph.is-selected {{ background: #f97316 !important; border-color: #f97316 !important; box-shadow: 0 0 0 1px rgba(255,255,255,0.18) inset !important; color: #ffffff !important; }} #lecture-clickable .lecture-paragraph[data-selected="1"] {{ background: #f97316 !important; border-color: #f97316 !important; box-shadow: 0 0 0 1px rgba(255,255,255,0.18) inset !important; color: #ffffff !important; }} .lecture-empty {{ color: rgba(15, 23, 42, 0.72); }} #tts-loading {{ border-color: rgba(15, 23, 42, 0.12); background: rgba(15, 23, 42, 0.03); }} .tts-loading-bar {{ background: linear-gradient(#f97316 0 0) 0/0% no-repeat rgba(59, 130, 246, 0.25); }} .tts-loading-text {{ color: rgba(15, 23, 42, 0.78); }} #lecture-wrap .prose code, #lecture-wrap .prose pre {{ color: #0f172a !important; opacity: 1 !important; }} .char-name {{ color: #0f172a !important; }} .char-tag {{ color: rgba(15, 23, 42, 0.78) !important; }} .char-byline {{ color: rgba(15, 23, 42, 0.58) !important; }} #character-select-wrap label {{ border-color: rgba(15, 23, 42, 0.22) !important; background: rgba(255, 255, 255, 0.85) !important; min-height: 42px !important; height: 42px !important; display: inline-flex !important; align-items: center !important; justify-content: center !important; }} #character-select-wrap label span {{ color: rgba(15, 23, 42, 0.82) !important; height: 100% !important; display: inline-flex !important; align-items: center !important; justify-content: center !important; text-align: center !important; }} #character-select-wrap label:has(input[type="radio"]:checked) {{ background: rgba(15, 23, 42, 0.10) !important; border-color: rgba(15, 23, 42, 0.32) !important; }} #character-select-wrap label:has(input[type="radio"]:checked) span {{ color: #0f172a !important; }} #character-select-wrap svg, #character-select-wrap [data-icon] {{ color: rgba(15, 23, 42, 0.70) !important; }} #chat-meta {{ color: #0f172a !important; background: rgba(255, 255, 255, 0.92) !important; border: 1px solid rgba(15, 23, 42, 0.10) !important; border-radius: 12px !important; padding: 0.45rem 0.7rem !important; }} #chat-meta .pill {{ background: rgba(15, 23, 42, 0.10) !important; color: rgba(15, 23, 42, 0.75) !important; }} #lecture-wrap {{ background: rgba(255, 255, 255, 0.95) !important; border-color: rgba(15, 23, 42, 0.10) !important; }} #lecture-wrap .wrap, #lecture-wrap .block, #lecture-wrap [data-testid="textbox"] {{ background: transparent !important; border: none !important; box-shadow: none !important; }} #lecture-wrap textarea {{ background: #ffffff !important; color: #0f172a !important; border: 1px solid rgba(15, 23, 42, 0.16) !important; border-radius: 10px !important; }} #gen-loading {{ color: #0f172a !important; background: rgba(255, 255, 255, 0.90) !important; border-color: rgba(15, 23, 42, 0.14) !important; }} #gen-loading, #gen-loading *, #gen-loading p, #gen-loading span {{ color: #111827 !important; opacity: 1 !important; }} #bottom-composer {{ background: rgba(255, 255, 255, 0.94) !important; border-color: rgba(15, 23, 42, 0.14) !important; box-shadow: 0 16px 40px rgba(15, 23, 42, 0.16) !important; }} #pdf-uploader [data-testid="file-upload-dropzone"] {{ border-color: rgba(15, 23, 42, 0.20) !important; }} #pdf-uploader [data-testid="file-upload-dropzone"] * {{ color: #0f172a !important; }} #status-wrap, #quiz-wrap, #tts-wrap, #explain-wrap {{ background: #ffffff !important; border: 1px solid rgba(15, 23, 42, 0.10) !important; box-shadow: 0 6px 18px rgba(15, 23, 42, 0.06) !important; }} #status-wrap .block, #quiz-wrap .block, #tts-wrap .block, #explain-wrap .block, #status-wrap .wrap, #quiz-wrap .wrap, #tts-wrap .wrap, #explain-wrap .wrap {{ background: #ffffff !important; border-color: rgba(15, 23, 42, 0.10) !important; box-shadow: none !important; }} #status-wrap textarea, #quiz-wrap textarea, #explain-wrap textarea, #quiz-wrap input, #status-wrap input, #explain-wrap input {{ background: #ffffff !important; color: #0f172a !important; border: 1px solid rgba(15, 23, 42, 0.16) !important; }} #quiz-wrap input[type="radio"] {{ appearance: auto !important; accent-color: #f97316 !important; }} #quiz-wrap input[type="radio"]:checked {{ background-color: #f97316 !important; border-color: #f97316 !important; }} #quiz-wrap label, #quiz-wrap legend, #status-wrap label, #explain-wrap label {{ color: #0f172a !important; }} #quiz-wrap label span, #quiz-wrap [role="radiogroup"] label span {{ color: #0f172a !important; }} #quiz-wrap .prose, #quiz-wrap .prose p, #quiz-wrap .prose span, #quiz-wrap .prose strong, #quiz-wrap .prose em, #quiz-wrap .prose li {{ color: #0f172a !important; opacity: 1 !important; }} #quiz-wrap .prose p {{ color: #1f2937 !important; font-weight: 500 !important; }} #quiz-wrap [role="radiogroup"] label {{ background: #f8fafc !important; border: 1px solid rgba(15, 23, 42, 0.14) !important; }} #exam-chat .exam-chat-wrap {{ background: transparent !important; border: none !important; }} #exam-chat .bubble.assistant {{ background: #f8fafc !important; border: 1px solid rgba(15, 23, 42, 0.12) !important; color: #0f172a !important; }} #exam-chat .bubble.user {{ background: rgba(59, 130, 246, 0.12) !important; border: 1px solid rgba(59, 130, 246, 0.22) !important; color: #0f172a !important; }} #results-panel, #chat-row, #chat-main, #chat-avatar-col {{ background: transparent !important; border: none !important; box-shadow: none !important; }} #chat-row > div, #chat-row .block, #chat-row .wrap, #chat-main .block, #chat-main .wrap, #chat-avatar-col .block, #chat-avatar-col .wrap {{ background: transparent !important; border: none !important; box-shadow: none !important; }} #chat-avatar-col .html-container, #chat-avatar-col .prose {{ background: transparent !important; border: none !important; box-shadow: none !important; }} #exam-nav button {{ border-color: rgba(15, 23, 42, 0.16) !important; }} #exam-picker-overlay {{ position: fixed; inset: 0; z-index: 200; display: none; align-items: center; justify-content: center; background: rgba(2, 6, 23, 0.55); backdrop-filter: blur(6px); padding: 16px; }} #exam-picker-overlay:not(.hide) {{ display: flex; }} #exam-picker-overlay.hide {{ display: none !important; pointer-events: none !important; }} #exam-picker-modal {{ width: min(720px, 94vw); border-radius: 16px; background: #ffffff; border: 1px solid rgba(15, 23, 42, 0.12); box-shadow: 0 18px 50px rgba(15, 23, 42, 0.35); padding: 16px; height: auto !important; max-height: 320px; overflow: hidden; }} #exam-picker-modal .block, #exam-picker-modal .wrap, #exam-picker-modal .panel {{ background: transparent !important; border: none !important; box-shadow: none !important; }} #exam-picker-title {{ font-weight: 700; color: #0f172a; margin-bottom: 10px; }} .exam-picker-grid {{ display: flex !important; flex-wrap: nowrap; gap: 12px; }} .exam-picker-card {{ flex: 1 1 0; min-width: 0 !important; border-radius: 14px; border: 1px solid rgba(15, 23, 42, 0.12); background: #f8fafc; padding: 12px; overflow: hidden; transition: transform 120ms ease, border-color 120ms ease, box-shadow 120ms ease; }} .exam-picker-card:hover {{ transform: translateY(-2px); border-color: rgba(59, 130, 246, 0.35); box-shadow: 0 10px 24px rgba(15, 23, 42, 0.18); }} .exam-picker-avatar {{ width: 56px; height: 56px; border-radius: 999px; object-fit: cover; display: block; margin: 0 auto 10px auto; }} .exam-picker-card button {{ width: 100%; }} [data-testid="dropdown-menu"], #character-select-wrap [role="listbox"] {{ background: rgba(255, 255, 255, 0.98) !important; border-color: rgba(15, 23, 42, 0.14) !important; box-shadow: 0 12px 30px rgba(15, 23, 42, 0.18) !important; }} [data-testid="dropdown-menu"] * {{ color: #0f172a !important; }} }} .container {{max-width: 980px; margin: 0 auto;}} .mono {{font-family: ui-monospace, Menlo, Consolas, monospace;}} {bg_css} """ CSS = build_css() def _image_data_url(path: Path) -> str: if not path.exists(): return "" mime = "image/jpeg" if path.suffix.lower() in {".jpg", ".jpeg"} else "image/png" return f"data:{mime};base64," + base64.b64encode(path.read_bytes()).decode("ascii") def build_character_header_html(character_id: Optional[str] = None) -> str: cfg = get_character_config(character_id) avatar_url = _image_data_url(Path(cfg.get("avatar_path", ""))) if cfg.get("avatar_path") else "" avatar_img = f'avatar' if avatar_url else "" return f"""
{avatar_img}
{cfg.get("display_name", "PDF Paper Tutor")}
{cfg.get("tagline", "")}
""" def build_chat_avatar_html(character_id: Optional[str] = None) -> str: cfg = get_character_config(character_id) avatar_url = _image_data_url(Path(cfg.get("avatar_path", ""))) if cfg.get("avatar_path") else "" return f'avatar' if avatar_url else "" def build_chat_meta_html(character_id: Optional[str] = None) -> str: cfg = get_character_config(character_id) return f"""
{cfg.get("chat_label", "PDF Paper Tutor")} {cfg.get("chat_mode", "paper mode")}
""" def build_exam_picker_avatar_html(character_id: str) -> str: cfg = get_character_config(character_id) avatar_url = _image_data_url(Path(cfg.get("avatar_path", ""))) if cfg.get("avatar_path") else "" avatar_img = f'avatar' if avatar_url else "" return f"""
{avatar_img}
""" with gr.Blocks(css=CSS) as demo: with gr.Column(elem_id="page-shell"): character_header_html = gr.HTML(build_character_header_html(DEFAULT_CHARACTER_ID), elem_id="character-card") character_dropdown = gr.Radio( choices=[(cfg["display_name"], cid) for cid, cfg in CHARACTER_CONFIGS.items()], value=DEFAULT_CHARACTER_ID, label="", show_label=False, interactive=True, elem_id="character-select-wrap", container=False, ) state = gr.State(new_session_state()) loading_md = gr.HTML("", elem_id="gen-loading", visible=False) lecture_click_bridge = gr.HTML( "", elem_id="lecture-click-bridge", js_on_load=""" () => { const state = window.__lectureClickTtsGlobal || (window.__lectureClickTtsGlobal = {}); if (state.bound) return; try { const getRoots = () => { const grRoot = (typeof window.gradioApp === "function") ? window.gradioApp() : null; return [ document, grRoot && grRoot.shadowRoot ? grRoot.shadowRoot : null, grRoot, ].filter(Boolean); }; const q = (sel) => { for (const r of getRoots()) { const el = r.querySelector ? r.querySelector(sel) : null; if (el) return el; } return null; }; const showLoading = (text) => { const box = q("#tts-loading"); const t = q("#tts-loading-text"); if (t) t.textContent = text || ""; if (box) { box.style.display = "block"; box.setAttribute("aria-hidden", "false"); } }; const hideLoading = () => { const box = q("#tts-loading"); if (box) { box.style.display = "none"; box.setAttribute("aria-hidden", "true"); } }; const bindAudioLoading = () => { const root = q("#lecture-audio"); const audio = root ? root.querySelector("audio") : q("audio"); if (!audio) return; if (audio.__ttsBound) return; audio.__ttsBound = true; audio.addEventListener("loadstart", () => showLoading("Loading audio..."), true); audio.addEventListener("waiting", () => showLoading("Loading audio..."), true); audio.addEventListener("canplay", () => hideLoading(), true); audio.addEventListener("playing", () => hideLoading(), true); audio.addEventListener("error", () => hideLoading(), true); }; bindAudioLoading(); if (!state.observer) { state.observer = new MutationObserver(() => bindAudioLoading()); state.observer.observe(document.body, { childList: true, subtree: true, attributes: true }); } const selectParagraph = (idx, para, autoPlay) => { const indexText = String(idx ?? "").trim(); const selectedInlineStyle = { background: "#f97316", borderColor: "#f97316", boxShadow: "0 0 0 1px rgba(255,255,255,0.16) inset", color: "#ffffff", }; for (const r of getRoots()) { const rowNodes = r.querySelectorAll ? r.querySelectorAll("#lecture-clickable .lecture-row.is-selected, #lecture-clickable .lecture-row[data-selected='1']") : []; for (const row of rowNodes) { row.classList.remove("is-selected"); row.removeAttribute("data-selected"); } const nodes = r.querySelectorAll ? r.querySelectorAll("#lecture-clickable .lecture-paragraph.is-selected") : []; for (const node of nodes) { node.classList.remove("is-selected"); node.removeAttribute("data-selected"); if (node.style) { node.style.removeProperty("background"); node.style.removeProperty("border-color"); node.style.removeProperty("box-shadow"); node.style.removeProperty("color"); } } } if (para && para.classList) { para.classList.add("is-selected"); para.setAttribute("data-selected", "1"); const row = para.closest ? para.closest(".lecture-row") : null; if (row && row.classList) { row.classList.add("is-selected"); row.setAttribute("data-selected", "1"); } if (para.style) { para.style.setProperty("background", selectedInlineStyle.background, "important"); para.style.setProperty("border-color", selectedInlineStyle.borderColor, "important"); para.style.setProperty("box-shadow", selectedInlineStyle.boxShadow, "important"); para.style.setProperty("color", selectedInlineStyle.color, "important"); } } let input = q("#selected-paragraph textarea, #selected-paragraph input"); if (!input) { const inputWrap = q("#selected-paragraph"); input = inputWrap && inputWrap.querySelector ? inputWrap.querySelector("textarea, input") : null; } if (!input) { showLoading("Chunk selector not found. Please refresh the page."); return; } input.value = indexText; input.dispatchEvent(new Event("input", { bubbles: true })); input.dispatchEvent(new Event("change", { bubbles: true })); if (!autoPlay) return; let btn = q("#play-paragraph-btn button, #play-paragraph-btn"); if (btn && btn.querySelector && btn.tagName !== "BUTTON") { const innerBtn = btn.querySelector("button"); if (innerBtn) btn = innerBtn; } if (!btn) { showLoading("Chunk play control not found. Please refresh the page."); return; } showLoading("Generating audio..."); btn.click(); }; window.__lectureSelectParagraph = (idx, el, autoPlay = true) => { selectParagraph(idx, el, autoPlay); }; const paragraphFromEvent = (e) => { const target = e ? e.target : null; if (target && target.nodeType === 1 && target.closest) { const btn = target.closest(".chunk-play-btn"); if (btn) { const row = btn.closest(".lecture-row"); if (row && row.querySelector) { const p = row.querySelector(".lecture-paragraph"); if (p) return p; } } const p = target.closest(".lecture-paragraph"); if (p) return p; } const path = (e && typeof e.composedPath === "function") ? e.composedPath() : []; for (const n of path) { if (n && n.classList && n.classList.contains("lecture-paragraph")) return n; if (n && n.classList && n.classList.contains("lecture-row") && n.querySelector) { const p = n.querySelector(".lecture-paragraph"); if (p) return p; } } return null; }; const onParagraphClick = (e) => { const para = paragraphFromEvent(e); if (!para) return; if (e && e.target && e.target.closest && e.target.closest(".chunk-play-btn")) { try { e.preventDefault(); } catch (_) {} try { e.stopPropagation(); } catch (_) {} } const idx = para.getAttribute("data-idx"); if (typeof idx !== "string" || idx.trim() === "") return; selectParagraph(idx, para, true); }; const onChunkButtonClick = (e) => { const btn = e && e.target && e.target.closest ? e.target.closest(".chunk-play-btn") : null; if (!btn) return; try { e.preventDefault(); } catch (_) {} try { e.stopPropagation(); } catch (_) {} const row = btn.closest ? btn.closest(".lecture-row") : null; const para = row && row.querySelector ? row.querySelector(".lecture-paragraph") : null; const idx = (btn.getAttribute && btn.getAttribute("data-idx")) || (para && para.getAttribute ? para.getAttribute("data-idx") : ""); if (!para || typeof idx !== "string" || idx.trim() === "") return; selectParagraph(idx, para, true); }; const bindClickRoot = (root) => { if (!root || !root.addEventListener) return; if (root.__lectureClickBound) return; root.__lectureClickBound = true; root.addEventListener("click", onParagraphClick, true); }; const bindParagraphDomHandlers = () => { for (const r of getRoots()) { if (!r || !r.querySelectorAll) continue; const btns = r.querySelectorAll("#lecture-clickable .chunk-play-btn"); for (const btn of btns) { if (btn.__chunkPlayBound) continue; btn.__chunkPlayBound = true; btn.addEventListener("click", onChunkButtonClick, true); } } }; for (const r of getRoots()) bindClickRoot(r); bindClickRoot(window); bindParagraphDomHandlers(); if (!state.rebindObserver) { state.rebindObserver = new MutationObserver(() => { for (const r of getRoots()) bindClickRoot(r); bindParagraphDomHandlers(); }); state.rebindObserver.observe(document.body, { childList: true, subtree: true }); } state.bound = true; } catch (err) { state.bound = false; try { console.error("lecture click bridge failed:", err); } catch (_) {} } } """, ) with gr.Column(visible=False, elem_id="results-panel") as explain_page: with gr.Row(elem_id="chat-row"): with gr.Column(scale=0, elem_id="chat-avatar-col"): chat_avatar_html = gr.HTML(build_chat_avatar_html(DEFAULT_CHARACTER_ID)) with gr.Column(elem_id="chat-main"): chat_meta_html = gr.HTML(build_chat_meta_html(DEFAULT_CHARACTER_ID)) with gr.Column(elem_id="lecture-wrap"): lecture_box = gr.HTML( build_clickable_lecture_html(""), elem_id="lecture-clickable", ) play_lecture_btn = gr.Button("Play Lecture Audio", interactive=False, visible=False) gr.Markdown("Tip: Select a chunk from the list below (left dot), then click the play button on the right.", elem_id="paragraph-tts-tip") lecture_feedback = gr.Markdown("") with gr.Column(elem_id="tts-wrap"): lecture_audio = gr.Audio(label="Lecture TTS", type="filepath", elem_id="lecture-audio") gr.HTML( '', ) with gr.Row(elem_id="chunk-controls"): paragraph_picker = gr.Radio( choices=[], value=None, interactive=False, visible=False, label="Select Chunk", elem_id="paragraph-picker", scale=8, ) play_selected_chunk_btn = gr.Button("▶", elem_id="play-selected-chunk-btn", visible=False, interactive=False, scale=1, min_width=52) paragraph_idx = gr.Textbox(value="", label="", show_label=False, elem_id="selected-paragraph") play_paragraph_btn = gr.Button("Play Chunk", elem_id="play-paragraph-btn") with gr.Row(elem_id="exam-entry-wrap"): exam_btn = gr.Button("Go to Exam", interactive=False, variant="secondary", scale=0) with gr.Column(visible=False, elem_id="exam-picker-overlay") as exam_picker_overlay: with gr.Column(elem_id="exam-picker-modal"): gr.HTML('
Choose your examiner
') with gr.Row(elem_classes="exam-picker-grid"): with gr.Column(elem_classes="exam-picker-card"): gr.HTML(build_exam_picker_avatar_html("Mcgonagall")) pick_mcg_btn = gr.Button("Mcgonagall", variant="primary") with gr.Column(elem_classes="exam-picker-card"): gr.HTML(build_exam_picker_avatar_html("snape")) pick_snape_btn = gr.Button("Snape", variant="primary") cancel_exam_picker_btn = gr.Button("Cancel", variant="secondary") with gr.Column(visible=False, elem_id="exam-page") as exam_page: with gr.Row(elem_id="exam-nav"): back_btn = gr.Button("Back", variant="secondary", scale=0) with gr.Column(elem_id="status-wrap", visible=False): status_box = gr.Textbox(label="Status", value="Idle", interactive=False, visible=False) with gr.Column(elem_id="quiz-wrap"): exam_chat = gr.HTML( "", visible=False, elem_id="exam-chat", autoscroll=True, js_on_load=""" () => { const state = window.__examChatAutoScroll || (window.__examChatAutoScroll = {}); const scrollToBottom = (wrap) => { if (!wrap) return; const doScroll = () => { wrap.scrollTop = wrap.scrollHeight; }; doScroll(); requestAnimationFrame(doScroll); setTimeout(doScroll, 50); }; const ensure = () => { const root = document.querySelector('#exam-chat'); const wrap = root ? root.querySelector('.exam-chat-wrap') : null; if (!root || !wrap) return; if (state.wrap === wrap) return; state.wrap = wrap; if (state.wrapObserver) state.wrapObserver.disconnect(); state.wrapObserver = new MutationObserver(() => scrollToBottom(wrap)); state.wrapObserver.observe(wrap, { childList: true, subtree: true, characterData: true }); if (state.rootObserver) state.rootObserver.disconnect(); state.rootObserver = new MutationObserver(() => scrollToBottom(wrap)); state.rootObserver.observe(root, { childList: true, subtree: true, attributes: true }); scrollToBottom(wrap); }; ensure(); if (!state.bodyObserver) { state.bodyObserver = new MutationObserver(() => ensure()); state.bodyObserver.observe(document.body, { childList: true, subtree: true }); } } """, ) choice_radio = gr.Radio(choices=[], label="Select one answer", interactive=False) with gr.Row(): submit_btn = gr.Button("Submit Answer", interactive=False) restart_btn = gr.Button("Restart Quiz", interactive=False) score_box = gr.Textbox(label="Score", value="Score: 0 / 0", interactive=False, visible=False) feedback_box = gr.Textbox(label="Feedback / Explanation", lines=8, interactive=False, visible=False) with gr.Row(elem_id="bottom-composer"): pdf_input = gr.File( label="", show_label=False, file_types=[".pdf"], type="filepath", elem_id="pdf-uploader", scale=7, min_width=0, ) run_btn = gr.Button("Generate", variant="primary", elem_id="generate-btn", scale=3, min_width=120) outputs = [ state, character_header_html, character_dropdown, chat_avatar_html, chat_meta_html, loading_md, explain_page, exam_page, status_box, lecture_box, paragraph_picker, lecture_audio, play_lecture_btn, play_selected_chunk_btn, exam_btn, exam_picker_overlay, exam_chat, choice_radio, score_box, feedback_box, submit_btn, restart_btn, ] run_btn.click(fn=on_generate_click, inputs=[pdf_input, character_dropdown, state], outputs=outputs, show_progress="hidden") character_dropdown.change( fn=on_character_change, inputs=[character_dropdown, state], outputs=[state, character_header_html, chat_avatar_html, chat_meta_html, explain_page, exam_page, loading_md, status_box], ) exam_btn.click(fn=open_exam_picker, inputs=[state], outputs=outputs, show_progress="hidden") pick_mcg_btn.click(fn=start_exam_mcgonagall, inputs=[state], outputs=outputs, show_progress="hidden") pick_snape_btn.click(fn=start_exam_snape, inputs=[state], outputs=outputs, show_progress="hidden") cancel_exam_picker_btn.click(fn=close_exam_picker, inputs=[state], outputs=outputs, show_progress="hidden") back_btn.click(fn=go_to_explain_page, inputs=[state], outputs=outputs, show_progress="hidden") submit_btn.click(fn=submit_answer, inputs=[choice_radio, state], outputs=outputs, show_progress="hidden") restart_btn.click(fn=restart_quiz, inputs=[state], outputs=outputs, show_progress="hidden") play_lecture_btn.click( fn=on_play_lecture_audio_click, inputs=[state], outputs=[state, status_box, lecture_audio, lecture_feedback, lecture_box, paragraph_picker], show_progress="minimal", ) play_paragraph_btn.click( fn=on_play_paragraph_click, inputs=[paragraph_idx, state], outputs=[state, status_box, lecture_audio, lecture_feedback, lecture_box, paragraph_picker], show_progress="minimal", ) play_selected_chunk_btn.click( fn=on_play_paragraph_click, inputs=[paragraph_picker, state], outputs=[state, status_box, lecture_audio, lecture_feedback, lecture_box, paragraph_picker], show_progress="minimal", ) demo.queue() if __name__ == "__main__": demo.launch( server_name="0.0.0.0", server_port=7860, css=CSS, ssr_mode=False, )