""" Study Assistant - Comprehensive Gradio-based study tool Features: OCR, Multi-Agent Chat (via OpenRouter), Exam Generation, Voice Chat, Gemini Live, Visualizations """ import os, io, re, json, time, asyncio, base64, random, tempfile, traceback, wave, inspect from pathlib import Path from typing import Optional from dataclasses import dataclass, field import gradio as gr import numpy as np # ── OpenRouter (OpenAI-compatible) ── from openai import OpenAI # ── HF for Whisper ASR ── from huggingface_hub import InferenceClient # ── Google GenAI for Gemini Live ── try: from google import genai as google_genai from google.genai import types as genai_types GOOGLE_AVAILABLE = True except ImportError: GOOGLE_AVAILABLE = False # ── OCR ── import fitz from PIL import Image try: import pytesseract TESSERACT_AVAILABLE = True except ImportError: TESSERACT_AVAILABLE = False try: from pptx import Presentation PPTX_AVAILABLE = True except ImportError: PPTX_AVAILABLE = False # ── Viz ── import plotly.graph_objects as go import plotly.express as px # ── Audio ── try: from pydub import AudioSegment PYDUB_AVAILABLE = True except ImportError: PYDUB_AVAILABLE = False try: import librosa LIBROSA_AVAILABLE = True except ImportError: LIBROSA_AVAILABLE = False try: from gtts import gTTS GTTS_AVAILABLE = True except ImportError: GTTS_AVAILABLE = False # ═══════════════════ GLOBAL STATE ═══════════════════ extracted_texts: dict[str, str] = {} exam_store: dict = {} exam_scores: list[dict] = [] # ═══════════════════ UTILITIES ═══════════════════ def clean_response(text: str) -> str: if not text: return "" text = re.sub(r'[\s\S]*?', '', text, flags=re.DOTALL) text = re.sub(r'[\s\S]*$', '', text, flags=re.DOTALL) return text.strip() def robust_json_parse(text: str) -> Optional[dict]: """Try multiple strategies to parse JSON from LLM output.""" if not text or not text.strip(): return None # Strategy 1: Direct parse try: return json.loads(text) except (json.JSONDecodeError, ValueError): pass # Strategy 2: Strip markdown code fences stripped = re.sub(r'^```(?:json)?\s*\n?', '', text, flags=re.MULTILINE) stripped = re.sub(r'\n?```\s*$', '', stripped, flags=re.MULTILINE).strip() try: return json.loads(stripped) except (json.JSONDecodeError, ValueError): pass # Strategy 3: Find the outermost { ... } block # Use a balanced brace finder instead of greedy regex start = text.find('{') if start != -1: depth = 0 end = start for i in range(start, len(text)): if text[i] == '{': depth += 1 elif text[i] == '}': depth -= 1 if depth == 0: end = i + 1 break candidate = text[start:end] try: return json.loads(candidate) except (json.JSONDecodeError, ValueError): pass # Strategy 4: Fix common JSON issues (single quotes, trailing commas) if start != -1 and end > start: candidate = text[start:end] # Replace single quotes with double quotes (risky but last resort) fixed = candidate.replace("'", '"') # Remove trailing commas before } or ] fixed = re.sub(r',\s*([}\]])', r'\1', fixed) try: return json.loads(fixed) except (json.JSONDecodeError, ValueError): pass return None def robust_json_array_parse(text: str) -> list: """Try to parse a JSON array from LLM output.""" if not text or not text.strip(): return [] # Strategy 1: Direct parse try: result = json.loads(text) if isinstance(result, list): return result except (json.JSONDecodeError, ValueError): pass # Strategy 2: Strip markdown stripped = re.sub(r'^```(?:json)?\s*\n?', '', text, flags=re.MULTILINE) stripped = re.sub(r'\n?```\s*$', '', stripped, flags=re.MULTILINE).strip() try: result = json.loads(stripped) if isinstance(result, list): return result except (json.JSONDecodeError, ValueError): pass # Strategy 3: Find [ ... ] block match = re.search(r'\[[\s\S]*\]', text) if match: try: result = json.loads(match.group()) if isinstance(result, list): return result except (json.JSONDecodeError, ValueError): pass return [] # ═══════════════════ MODEL ROUTING (ALL VIA OPENROUTER) ═══════════════════ MODEL_OPTIONS = { "DeepSeek R1": {"model_id": "deepseek/deepseek-r1-0528:free"}, "Nemotron 120B": {"model_id": "nvidia/nemotron-3-super-120b-a12b:free"}, "Gemma 3 27B": {"model_id": "google/gemma-3-27b-it:free"}, "Llama 3.3 70B": {"model_id": "meta-llama/llama-3.3-70b-instruct:free"}, "Qwen3 235B": {"model_id": "qwen/qwen3-235b-a22b:free"}, "DeepSeek V3": {"model_id": "deepseek/deepseek-chat-v3-0324:free"}, } _or_client: Optional[OpenAI] = None def _get_or_client() -> Optional[OpenAI]: global _or_client key = os.environ.get("OPENROUTER_API_KEY", "") if not key: return None if _or_client is None: _or_client = OpenAI( base_url="https://openrouter.ai/api/v1", api_key=key, default_headers={ "HTTP-Referer": "https://huggingface.co/spaces/georgtawadrous/study-assistant", "X-Title": "Study Assistant", }, ) return _or_client _hf_client: Optional[InferenceClient] = None def _get_hf_client() -> Optional[InferenceClient]: global _hf_client token = os.environ.get("HF_TOKEN", "") if not token: return None if _hf_client is None: _hf_client = InferenceClient(api_key=token) return _hf_client _google_client = None def _get_google_client(): global _google_client key = os.environ.get("GOOGLE_API_KEY", "") if not key or not GOOGLE_AVAILABLE: return None if _google_client is None: _google_client = google_genai.Client(api_key=key) return _google_client def chat_with_model(model_name: str, messages: list[dict], max_tokens: int = 4096) -> str: client = _get_or_client() if not client: return "⚠️ OPENROUTER_API_KEY not set." try: r = client.chat.completions.create( model=MODEL_OPTIONS[model_name]["model_id"], messages=messages, max_tokens=max_tokens, ) return clean_response(r.choices[0].message.content or "") except Exception as e: return f"⚠️ {model_name}: {e}" def stream_chat_with_model(model_name: str, messages: list[dict], max_tokens: int = 4096): client = _get_or_client() if not client: yield "⚠️ OPENROUTER_API_KEY not set." return try: stream = client.chat.completions.create( model=MODEL_OPTIONS[model_name]["model_id"], messages=messages, max_tokens=max_tokens, stream=True, ) full = "" for chunk in stream: delta = chunk.choices[0].delta.content if delta: full += delta yield clean_response(full) except Exception as e: yield f"⚠️ {model_name}: {e}" # ═══════════════════ OCR ═══════════════════ def extract_text_from_pdf(fp): doc = fitz.open(fp) pages = [] for i, pg in enumerate(doc): t = pg.get_text("text").strip() if t and len(t) > 30: pages.append(f"--- Page {i+1} ---\n{t}") elif TESSERACT_AVAILABLE: px2 = pg.get_pixmap(dpi=300) img = Image.frombytes("RGB", [px2.width, px2.height], px2.samples) ot = pytesseract.image_to_string(img).strip() pages.append(f"--- Page {i+1} (OCR) ---\n{ot}" if ot else f"--- Page {i+1} ---\n[empty]") else: pages.append(f"--- Page {i+1} ---\n[no tesseract]") doc.close() return "\n\n".join(pages) def extract_text_from_image(fp): if not TESSERACT_AVAILABLE: return "[no tesseract]" return pytesseract.image_to_string(Image.open(fp)).strip() or "[empty]" def extract_text_from_pptx(fp): if not PPTX_AVAILABLE: return "[no pptx]" prs = Presentation(fp) slides = [] for i, sl in enumerate(prs.slides): txts = [p.text.strip() for sh in sl.shapes if sh.has_text_frame for p in sh.text_frame.paragraphs if p.text.strip()] slides.append(f"--- Slide {i+1} ---\n" + "\n".join(txts) if txts else f"--- Slide {i+1} ---\n[empty]") return "\n\n".join(slides) def process_upload(files): if not files: return "", "⚠️ No files." results = [] for fp in files: fn, ext = Path(fp).name, Path(fp).suffix.lower() try: if ext == ".pdf": t = extract_text_from_pdf(fp) elif ext in (".png", ".jpg", ".jpeg", ".bmp", ".tiff", ".webp"): t = extract_text_from_image(fp) elif ext in (".pptx", ".ppt"): t = extract_text_from_pptx(fp) elif ext in (".txt", ".md", ".csv", ".py", ".json"): t = Path(fp).read_text(errors="replace") else: t = f"[unsupported: {ext}]" except Exception as e: t = f"[error: {e}]" extracted_texts[fn] = t results.append(f"## 📄 {fn}\n\n{t[:3000]}" + (f"\n\n…*({len(t):,} chars)*" if len(t) > 3000 else "")) return "\n\n---\n\n".join(results), f"✅ {len(files)} file(s): {', '.join(extracted_texts.keys())}" def get_material_choices(): return list(extracted_texts.keys()) if extracted_texts else ["No materials yet"] # ═══════════════════ CHAT ═══════════════════ def build_system_prompt(custom, use_mat): b = custom or "You are a helpful study assistant. Use LaTeX ($...$, $$...$$) for math. Be thorough." if use_mat and extracted_texts: b += "\n\n--- MATERIALS ---\n" + "\n\n".join(f"### {n}\n{t[:8000]}" for n, t in extracted_texts.items()) + "\n--- END ---" return b def chat_respond(msg, history, model, sysp, use_mat): if not msg.strip(): yield history return sp = build_system_prompt(sysp, use_mat) msgs = [{"role": "system", "content": sp}] msgs += [{"role": m["role"], "content": m["content"]} for m in history] msgs += [{"role": "user", "content": msg}] history = history + [{"role": "user", "content": msg}, {"role": "assistant", "content": "⏳"}] yield history for p in stream_chat_with_model(model, msgs): history[-1]["content"] = p yield history # ═══════════════════ EXAM ═══════════════════ EXAM_PROMPT = """You are an expert exam generator. Based on the study material below, generate a comprehensive exam. CRITICAL: Return ONLY valid JSON. No markdown fences. No explanation. No text before or after the JSON. Generate exactly {num_mcq} multiple-choice questions and {num_written} written questions. JSON format: {{"mcq":[{{"id":1,"question":"...","options":["A) ...","B) ...","C) ...","D) ..."],"correct":"A","explanation":"..."}}],"written":[{{"id":1,"question":"...","rubric":"Key points: ..."}}]}} Study material: {material}""" def generate_exam(mat, mdl, nm, nw, progress=gr.Progress()): global exam_store if mat not in extracted_texts: return "⚠️ Upload materials first in the Upload & OCR tab.", "", gr.update(visible=False) progress(0.2, desc="Generating exam questions...") raw = chat_with_model( mdl, [ {"role": "system", "content": "You are an exam generator. Return ONLY valid JSON. No markdown code fences. No explanation text."}, {"role": "user", "content": EXAM_PROMPT.format( num_mcq=int(nm), num_written=int(nw), material=extracted_texts[mat][:15000], )}, ], 16000, ) progress(0.7, desc="Parsing exam...") # Check if model returned an error if raw.startswith("⚠️"): return raw, "", gr.update(visible=False) # Robust JSON parsing with multiple fallback strategies d = robust_json_parse(raw) if not d: # Show what we got for debugging preview = raw[:1500] if raw else "(empty response)" return f"⚠️ Could not parse exam JSON from model response.\n\n**Raw response preview:**\n```\n{preview}\n```\n\n**Tip:** Try a different model (Llama 3.3 70B or DeepSeek V3 tend to produce cleaner JSON).", "", gr.update(visible=False) mcqs = d.get("mcq", []) wrs = d.get("written", []) if not mcqs and not wrs: return "⚠️ Exam was empty. Try again or use a different model.", "", gr.update(visible=False) exam_store = {"mcq": mcqs, "written": wrs, "material_name": mat, "model": mdl} progress(0.9, desc="Formatting...") md1 = f"# 📝 Exam: {mat}\n**{len(mcqs)} MCQ + {len(wrs)} Written** (by {mdl})\n\n---\n## Multiple Choice\n\n" for q in mcqs: md1 += f"### Q{q.get('id', '?')}. {q.get('question', 'N/A')}\n" for o in q.get("options", []): md1 += f"- {o}\n" md1 += "\n" md2 = "## ✍️ Written Questions\n\n" for q in wrs: md2 += f"### Q{q.get('id', '?')}. {q.get('question', 'N/A')}\n\n" progress(1.0, desc="Done!") return md1, md2, gr.update(visible=True) def grade_exam(mcq_str, wr_str, mdl, progress=gr.Progress()): global exam_scores if not exam_store.get("mcq"): return "⚠️ No exam to grade. Generate one first." progress(0.1, desc="Grading MCQ...") # Parse MCQ answers um = {} for p in mcq_str.split(","): if ":" in p: try: q, a = p.strip().split(":", 1) um[int(q.strip())] = a.strip().upper()[0] except (ValueError, IndexError): pass mc, res = 0, [] for q in exam_store["mcq"]: ua = um.get(q["id"], "—") ca = q.get("correct", "").strip().upper() if len(ca) > 1: ca = ca[0] ok = ua == ca if ok: mc += 1 res.append({ "id": q["id"], "q": q["question"][:60], "ua": ua, "ca": ca, "ok": ok, "exp": q.get("explanation", ""), }) tm = len(exam_store["mcq"]) mp = mc / tm * 100 if tm else 0 # Grade written ws, wr, wqs = 0, [], exam_store.get("written", []) if wr_str.strip() and wqs: progress(0.4, desc="Grading written answers...") uw, c = {}, None for l in wr_str.split("\n"): m = re.match(r'^(\d+)[.:]\s*(.*)', l) if m: c = int(m.group(1)) uw[c] = m.group(2) elif c: uw[c] = uw.get(c, "") + " " + l gp = "Grade each answer 0-10. Return ONLY a JSON array: [{\"id\":1,\"score\":8,\"feedback\":\"...\"}]\n\n" for q in wqs: gp += f"Q{q['id']}: {q['question']}\nRubric: {q.get('rubric', 'N/A')}\nStudent answer: {uw.get(q['id'], '[no answer]')}\n\n" gr2 = clean_response(chat_with_model( mdl, [{"role": "system", "content": "Return ONLY a JSON array. No other text."}, {"role": "user", "content": gp}], 4000, )) gs = robust_json_array_parse(gr2) gm2 = {g["id"]: g for g in gs if isinstance(g, dict) and "id" in g} for q in wqs: g = gm2.get(q["id"], {"score": 0, "feedback": "Could not grade"}) ws += g.get("score", 0) wr.append({ "id": q["id"], "q": q["question"][:60], "s": g.get("score", 0), "f": g.get("feedback", ""), }) twm = len(wqs) * 10 wp = ws / twm * 100 if twm else 0 ov = (mp + wp) / 2 if twm else mp progress(0.9, desc="Formatting results...") o = f"# 📊 Exam Results\n\n## MCQ: {mc}/{tm} ({mp:.0f}%)\n\n" o += "| # | Question | You | Correct | Result |\n|---|---|---|---|---|\n" for x in res: o += f"| {x['id']} | {x['q']}… | {x['ua']} | {x['ca']} | {'✅' if x['ok'] else '❌'} |\n" bad = [x for x in res if not x['ok'] and x['exp']] if bad: o += "\n### Explanations for wrong answers:\n\n" for x in bad: o += f"- **Q{x['id']}:** {x['exp']}\n" o += f"\n---\n\n## Written: {ws}/{twm} ({wp:.0f}%)\n\n" for x in wr: o += f"**Q{x['id']}.** {x['q']}…\n- Score: **{x['s']}/10** — {x['f']}\n\n" o += f"\n---\n\n## 🏆 Overall: {ov:.0f}%\n" exam_scores.append({ "material": exam_store.get("material_name", "?"), "mcq_score": mp, "written_score": wp, "overall": ov, "timestamp": time.strftime("%Y-%m-%d %H:%M"), }) return o # ═══════════════════ VOICE (Whisper+gTTS) ═══════════════════ def transcribe_audio(ap): if not ap: return "" c = _get_hf_client() if not c: return "[no HF_TOKEN]" try: r = c.automatic_speech_recognition(audio=ap, model="openai/whisper-large-v3-turbo") return r.text if hasattr(r, 'text') else r.get("text", str(r)) if isinstance(r, dict) else str(r) except Exception as e: return f"[{e}]" def text_to_speech(text): if not text or not GTTS_AVAILABLE: return None try: c = re.sub(r'\$\$?[^$]+\$\$?', '[formula]', text) c = re.sub(r'[#*_`|>]', '', c)[:2000] if not c.strip(): return None t = gTTS(text=c, lang='en') f = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) t.save(f.name) return f.name except Exception: return None def voice_chat(audio, history, mdl, use_mat): if not audio: return history, None, "⚠️ No audio." tr = transcribe_audio(audio) if not tr or tr.startswith("["): return history, None, tr or "No speech" sp = build_system_prompt("Voice assistant. Concise, <200 words. No LaTeX.", use_mat) ms = [{"role": "system", "content": sp}] ms += [{"role": m["role"], "content": m["content"]} for m in history] ms += [{"role": "user", "content": tr}] r = chat_with_model(mdl, ms, 1000) history = history + [{"role": "user", "content": f"🎤 {tr}"}, {"role": "assistant", "content": r}] return history, text_to_speech(r), tr # ═══════════════════ GEMINI LIVE ═══════════════════ GEMINI_LIVE_VOICES = ["Puck", "Aoede", "Charon", "Fenrir", "Kore"] GEMINI_LIVE_MODEL = "gemini-3.1-flash-live-preview" @dataclass class LiveState: stream: np.ndarray | None = None sampling_rate: int = 0 pause_detected: bool = False started_talking: bool = False stopped: bool = False conversation: list = field(default_factory=list) def _detect_pause(audio, sr, state): if audio is None or len(audio) == 0: return False af = audio.astype(np.float32) / 32768.0 w = min(sr, len(af)) e = np.sqrt(np.mean(af[-w:]**2)) if e > 0.01 and not state.started_talking: state.started_talking = True return False dur = len(af) / sr if state.started_talking and dur > 1.5: tail = af[-sr:] if len(af) >= sr else af if np.sqrt(np.mean(tail**2)) < 0.005: return True return dur > 30 def live_process_audio(audio, state): if audio is None: return None, state sr, chunk = audio state.stream = chunk if state.stream is None else np.concatenate((state.stream, chunk)) state.sampling_rate = sr state.pause_detected = _detect_pause(state.stream, sr, state) if state.pause_detected and state.started_talking: return gr.Audio(recording=False), state return None, state def _resample_to_16k(audio_np, orig_sr): af = audio_np.astype(np.float32) / 32768.0 if af.ndim > 1: af = af.mean(axis=1) if LIBROSA_AVAILABLE and orig_sr != 16000: af = librosa.resample(af, orig_sr=orig_sr, target_sr=16000) elif orig_sr != 16000: idx = np.arange(0, len(af), orig_sr / 16000).astype(int) idx = idx[idx < len(af)] af = af[idx] return (af * 32768).astype(np.int16).tobytes() def _pcm24k_to_mp3(pcm): if PYDUB_AVAILABLE: seg = AudioSegment(pcm, frame_rate=24000, sample_width=2, channels=1) buf = io.BytesIO() seg.export(buf, format="mp3", bitrate="192k") return buf.getvalue() buf = io.BytesIO() with wave.open(buf, 'wb') as w: w.setnchannels(1) w.setsampwidth(2) w.setframerate(24000) w.writeframes(pcm) return buf.getvalue() def live_response(state, voice, use_mat): """Send recorded audio to Gemini Live, receive audio response.""" if not state.pause_detected and not state.started_talking: yield None, LiveState() return client = _get_google_client() if not client: state.conversation.append({"role": "assistant", "content": "⚠️ GOOGLE_API_KEY not set."}) yield None, LiveState(conversation=state.conversation) return sys_t = "You are an expert study tutor. Be concise, educational, and engaging. Ask follow-up questions." if use_mat and extracted_texts: sys_t += "\n\nStudy materials:\n" + "\n".join(f"[{k}]: {v[:4000]}" for k, v in extracted_texts.items()) config = genai_types.LiveConnectConfig( response_modalities=[genai_types.Modality.AUDIO], speech_config=genai_types.SpeechConfig( voice_config=genai_types.VoiceConfig( prebuilt_voice_config=genai_types.PrebuiltVoiceConfig(voice_name=voice), ), ), system_instruction=genai_types.Content( parts=[genai_types.Part(text=sys_t)], ), input_audio_transcription=genai_types.AudioTranscriptionConfig(), output_audio_transcription=genai_types.AudioTranscriptionConfig(), realtime_input_config=genai_types.RealtimeInputConfig( turn_coverage="TURN_INCLUDES_ONLY_ACTIVITY", ), ) pcm16 = _resample_to_16k(state.stream, state.sampling_rate) with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: with wave.open(f.name, 'wb') as w: w.setnchannels(1) w.setsampwidth(2) w.setframerate(16000) w.writeframes(pcm16) state.conversation.append({"role": "user", "content": {"path": f.name, "mime_type": "audio/wav"}}) output_pcm = b"" user_transcript = "" ai_transcript = "" async def _call(): nonlocal output_pcm, user_transcript, ai_transcript try: async with client.aio.live.connect(model=GEMINI_LIVE_MODEL, config=config) as session: await session.send_realtime_input( audio=genai_types.Blob(data=pcm16, mime_type="audio/pcm;rate=16000"), ) async for response in session.receive(): server_content = response.server_content if server_content: if server_content.model_turn: for part in server_content.model_turn.parts: if part.inline_data: output_pcm += part.inline_data.data if server_content.input_transcription and server_content.input_transcription.text: user_transcript += server_content.input_transcription.text if server_content.output_transcription and server_content.output_transcription.text: ai_transcript += server_content.output_transcription.text if server_content.turn_complete: break if server_content.interrupted: break except Exception as e: print(f"Gemini Live error: {e}") traceback.print_exc() try: loop = asyncio.new_event_loop() loop.run_until_complete(_call()) loop.close() except Exception as e: print(f"Loop error: {e}") if user_transcript: state.conversation.append({"role": "user", "content": f"🎤 {user_transcript}"}) if ai_transcript: state.conversation.append({"role": "assistant", "content": ai_transcript}) if output_pcm: mp3 = _pcm24k_to_mp3(output_pcm) with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f: f.write(mp3) if not ai_transcript: state.conversation.append({"role": "assistant", "content": {"path": f.name, "mime_type": "audio/mp3"}}) yield mp3, state else: state.conversation.append({"role": "assistant", "content": "⚠️ No audio response from Gemini Live."}) yield None, LiveState(conversation=state.conversation) def live_start_recording(state): if not state.stopped: return gr.Audio(recording=True) # ═══════════════════ VISUALIZATIONS ═══════════════════ VIZ_PROMPT = "Data viz expert. Plotly code. Use go/px/np. Assign `fig`. No fig.show(). No fences. Only code.\n{mc}" LATEX_PROMPT = "Math explainer. $...$ inline, $$...$$ display. Step-by-step.\n{mc}" def generate_visualization(prompt, mdl, use_mat): if not prompt.strip(): return None, "⚠️ Describe a chart." mc = ("Material:\n" + "\n".join(f"[{k}]:{v[:2000]}" for k, v in extracted_texts.items())) if use_mat and extracted_texts else "" r = clean_response(chat_with_model( mdl, [{"role": "system", "content": VIZ_PROMPT.format(mc=mc)}, {"role": "user", "content": prompt}], 4000, )) code = re.sub(r'^```(?:python)?\s*\n?', '', r, flags=re.MULTILINE) code = re.sub(r'^```\s*$', '', code, flags=re.MULTILINE).strip() try: g = {"go": go, "px": px, "np": np} try: import pandas as pd g["pd"] = pd except ImportError: pass exec(code, g) fig = g.get("fig") if not fig: return None, f"⚠️ No `fig`.\n```python\n{code}\n```" return fig, f"✅\n```python\n{code}\n```" except Exception as e: return None, f"⚠️ {e}\n```python\n{code}\n```" def generate_latex_explanation(topic, mdl, use_mat): if not topic.strip(): return "⚠️ Enter a topic." mc = ("Material:\n" + "\n".join(f"[{k}]:{v[:2000]}" for k, v in extracted_texts.items())) if use_mat and extracted_texts else "" return clean_response(chat_with_model( mdl, [{"role": "system", "content": LATEX_PROMPT.format(mc=mc)}, {"role": "user", "content": f"Explain: {topic}"}], 4000, )) # ═══════════════════ DASHBOARD ═══════════════════ def get_dashboard_data(): md = "# 📚 Dashboard\n\n## Materials\n\n" if extracted_texts: for n, t in extracted_texts.items(): md += f"- **{n}** — {len(t.split()):,} words\n" else: md += "_None._\n" md += "\n## Exams\n\n" if exam_scores: md += "| Date | Material | MCQ | Written | Overall |\n|---|---|---|---|---|\n" for s in exam_scores: md += f"| {s['timestamp']} | {s['material'][:25]} | {s['mcq_score']:.0f}% | {s['written_score']:.0f}% | {s['overall']:.0f}% |\n" else: md += "_None._\n" or_ok = '✅' if os.environ.get('OPENROUTER_API_KEY') else '❌' hf_ok = '✅' if os.environ.get('HF_TOKEN') else '❌' gl_ok = '✅' if os.environ.get('GOOGLE_API_KEY') and GOOGLE_AVAILABLE else '❌' md += f"\n## Status\n- **OpenRouter** (chat): {or_ok}\n- **HF** (Whisper): {hf_ok}\n- **Google** (Gemini Live): {gl_ok}\n" fig = None if exam_scores: fig = go.Figure() lb = [f"{s['material'][:12]}…
{s['timestamp']}" for s in exam_scores] fig.add_trace(go.Bar(name="MCQ", x=lb, y=[s["mcq_score"] for s in exam_scores], marker_color="#4CAF50")) fig.add_trace(go.Bar(name="Written", x=lb, y=[s["written_score"] for s in exam_scores], marker_color="#2196F3")) fig.add_trace(go.Scatter(name="Overall", x=lb, y=[s["overall"] for s in exam_scores], mode="lines+markers", line=dict(color="#FF9800", width=3))) fig.update_layout(barmode="group", yaxis_title="%", yaxis=dict(range=[0, 105]), template="plotly_white", height=400) return md, fig # ═══════════════════ UI ═══════════════════ def build_app(): with gr.Blocks( title="📚 Study Assistant", css=".main-title{text-align:center}", theme=gr.themes.Soft(primary_hue="blue", secondary_hue="purple"), ) as demo: gr.Markdown( "# 📚 AI Study Assistant\n**Upload → Chat → Exam → Voice → Visualize**\n\n" "> **Required:** `OPENROUTER_API_KEY` · **Optional:** `HF_TOKEN` (Whisper) · `GOOGLE_API_KEY` (Gemini Live)", elem_classes=["main-title"], ) with gr.Tab("💬 Chat"): with gr.Row(): with gr.Column(scale=1, min_width=280): m_sel = gr.Dropdown(list(MODEL_OPTIONS.keys()), value="DeepSeek R1", label="🤖 Model") sysp_box = gr.Textbox(label="System Prompt", lines=3) um_chat = gr.Checkbox(label="📎 Include Materials", value=True) gr.Markdown("### Models (free via OpenRouter)\n| Model | Size |\n|---|---|\n| DeepSeek R1 | 671B MoE |\n| Nemotron 120B | 120B MoE |\n| Gemma 3 27B | 27B |\n| Llama 3.3 70B | 70B |\n| Qwen3 235B | 235B MoE |\n| DeepSeek V3 | 685B MoE |") with gr.Column(scale=3): cb = gr.Chatbot( height=550, type="messages", show_copy_button=True, latex_delimiters=[ {"left": "$$", "right": "$$", "display": True}, {"left": "$", "right": "$", "display": False}, {"left": "\\(", "right": "\\)", "display": False}, {"left": "\\[", "right": "\\]", "display": True}, ], ) with gr.Row(): mi = gr.Textbox(placeholder="Ask about your study materials...", lines=2, scale=5, show_label=False) sb = gr.Button("Send 📤", variant="primary", scale=1) gr.Button("🗑️ Clear", size="sm").click(lambda: [], None, cb) sb.click(chat_respond, [mi, cb, m_sel, sysp_box, um_chat], [cb]).then(lambda: "", None, mi) mi.submit(chat_respond, [mi, cb, m_sel, sysp_box, um_chat], [cb]).then(lambda: "", None, mi) with gr.Tab("📤 Upload & OCR"): gr.Markdown("## Upload Materials\nPDF, PPTX, PNG, JPG, TXT, MD") with gr.Row(): with gr.Column(scale=1): fu = gr.File( label="📁 Upload", file_count="multiple", file_types=[".pdf", ".pptx", ".ppt", ".png", ".jpg", ".jpeg", ".bmp", ".tiff", ".txt", ".md", ".csv", ".py"], type="filepath", ) ub = gr.Button("🔍 Process", variant="primary", size="lg") us = gr.Markdown("_Upload and Process_") with gr.Column(scale=2): eo = gr.Markdown("_Text here..._") ub.click(process_upload, [fu], [eo, us]) with gr.Tab("📝 Exam"): gr.Markdown("## Generate & Take Exams") with gr.Row(): with gr.Column(scale=1, min_width=280): md2 = gr.Dropdown(get_material_choices(), label="📄 Material", interactive=True) gr.Button("🔄 Refresh", size="sm").click(lambda: gr.update(choices=get_material_choices()), None, md2) em = gr.Dropdown(list(MODEL_OPTIONS.keys()), value="DeepSeek R1", label="🤖 Model") nm = gr.Slider(5, 150, 100, step=5, label="MCQ") nw = gr.Slider(2, 50, 20, step=1, label="Written") gen_btn = gr.Button("🎯 Generate", variant="primary", size="lg") with gr.Column(scale=2): emd = gr.Markdown("_Generate exam..._") ewd = gr.Markdown("") with gr.Group(visible=False) as eg: gr.Markdown("---\n## ✏️ Take Exam") with gr.Row(): ma = gr.Textbox(label="MCQ (1:A, 2:B...)", lines=4) wa = gr.Textbox(label="Written (1: answer...)", lines=8) grade_mdl = gr.Dropdown(list(MODEL_OPTIONS.keys()), value="DeepSeek R1", label="Grading Model") grd_btn = gr.Button("📊 Grade", variant="primary", size="lg") er = gr.Markdown("") gen_btn.click(generate_exam, [md2, em, nm, nw], [emd, ewd, eg]) grd_btn.click(grade_exam, [ma, wa, grade_mdl], [er]) with gr.Tab("🎤 Voice Chat"): gr.Markdown("## Voice Assistant\nRecord → Whisper → LLM → gTTS\n> Needs `HF_TOKEN`") with gr.Row(): with gr.Column(scale=1, min_width=280): vm = gr.Dropdown(list(MODEL_OPTIONS.keys()), value="DeepSeek R1", label="🤖 Model") umv = gr.Checkbox(label="📎 Materials", value=True) ai = gr.Audio(sources=["microphone"], type="filepath", label="🎙️ Record") vb = gr.Button("🔊 Send", variant="primary", size="lg") vt = gr.Textbox(label="Transcript", interactive=False) with gr.Column(scale=2): vc = gr.Chatbot( height=400, type="messages", latex_delimiters=[{"left": "$$", "right": "$$", "display": True}, {"left": "$", "right": "$", "display": False}], ) va_out = gr.Audio(label="🔊 Response", type="filepath", autoplay=True) vb.click(voice_chat, [ai, vc, vm, umv], [vc, va_out, vt]) gr.Button("🗑️ Clear", size="sm").click(lambda: ([], None, ""), None, [vc, va_out, vt]) with gr.Tab("🔴 Gemini Live"): gr.Markdown( "## 🔴 Gemini Live — Real-time Voice\n" "Native bidirectional voice conversation via Gemini.\n" "> Requires `GOOGLE_API_KEY` with Gemini API access" ) with gr.Row(): with gr.Column(scale=1, min_width=280): lv = gr.Dropdown(GEMINI_LIVE_VOICES, value="Puck", label="🗣️ Voice") lum = gr.Checkbox(label="📎 Materials", value=True) gr.Markdown("### How to use\n1. Click the microphone\n2. Speak your question\n3. Pause — auto-detected\n4. Listen to the AI response\n5. Mic restarts automatically!") with gr.Column(scale=3): lcb = gr.Chatbot(label="Conversation", height=400, type="messages") with gr.Row(): lmic = gr.Audio(label="🎙️ Speak", sources="microphone", type="numpy") lout = gr.Audio(label="🔊 Response", streaming=True, autoplay=True) ls = gr.State(value=LiveState()) lstr = lmic.stream(live_process_audio, [lmic, ls], [lmic, ls], stream_every=0.5, time_limit=30) lresp = lmic.stop_recording(live_response, [ls, lv, lum], [lout, ls]) lresp.then(lambda s: s.conversation, [ls], [lcb]) lrst = lout.stop(live_start_recording, [ls], [lmic]) gr.Button("⛔ Stop", variant="stop").click( lambda: (LiveState(stopped=True), gr.Audio(recording=False)), None, [ls, lmic], cancels=[lresp, lrst], ) with gr.Tab("📊 Visualizations"): gr.Markdown("## Charts & LaTeX") with gr.Row(): vmdl = gr.Dropdown(list(MODEL_OPTIONS.keys()), value="DeepSeek R1", label="🤖 Model") umvz = gr.Checkbox(label="📎 Materials", value=True) with gr.Row(): with gr.Column(): gr.Markdown("### 📈 Charts") vp = gr.Textbox(label="Describe", placeholder="Bar chart comparing sorting algorithms...", lines=2) vbtn = gr.Button("📊 Generate", variant="primary") vplt = gr.Plot() vcd = gr.Markdown() with gr.Column(): gr.Markdown("### 📐 LaTeX") lt = gr.Textbox(label="Topic", placeholder="Fourier Transform, Bayes' Theorem...", lines=2) lbtn = gr.Button("📐 Explain", variant="primary") lo = gr.Markdown( latex_delimiters=[ {"left": "$$", "right": "$$", "display": True}, {"left": "$", "right": "$", "display": False}, {"left": "\\(", "right": "\\)", "display": False}, {"left": "\\[", "right": "\\]", "display": True}, ], ) vbtn.click(generate_visualization, [vp, vmdl, umvz], [vplt, vcd]) lbtn.click(generate_latex_explanation, [lt, vmdl, umvz], [lo]) with gr.Tab("📊 Dashboard"): dmd = gr.Markdown("_Refresh_") dfg = gr.Plot() gr.Button("🔄 Refresh", variant="primary").click(get_dashboard_data, None, [dmd, dfg]) demo.load(get_dashboard_data, None, [dmd, dfg]) return demo if __name__ == "__main__": demo = build_app() demo.launch()