"""
Study Assistant - Comprehensive Gradio-based study tool
Features: OCR, Multi-Agent Chat (via OpenRouter), Exam Generation, Voice Chat, Gemini Live, Visualizations
"""
import os, io, re, json, time, asyncio, base64, random, tempfile, traceback, wave, inspect
from pathlib import Path
from typing import Optional
from dataclasses import dataclass, field
import gradio as gr
import numpy as np
# ── OpenRouter (OpenAI-compatible) ──
from openai import OpenAI
# ── HF for Whisper ASR ──
from huggingface_hub import InferenceClient
# ── Google GenAI for Gemini Live ──
try:
from google import genai as google_genai
from google.genai import types as genai_types
GOOGLE_AVAILABLE = True
except ImportError:
GOOGLE_AVAILABLE = False
# ── OCR ──
import fitz
from PIL import Image
try:
import pytesseract
TESSERACT_AVAILABLE = True
except ImportError:
TESSERACT_AVAILABLE = False
try:
from pptx import Presentation
PPTX_AVAILABLE = True
except ImportError:
PPTX_AVAILABLE = False
# ── Viz ──
import plotly.graph_objects as go
import plotly.express as px
# ── Audio ──
try:
from pydub import AudioSegment
PYDUB_AVAILABLE = True
except ImportError:
PYDUB_AVAILABLE = False
try:
import librosa
LIBROSA_AVAILABLE = True
except ImportError:
LIBROSA_AVAILABLE = False
try:
from gtts import gTTS
GTTS_AVAILABLE = True
except ImportError:
GTTS_AVAILABLE = False
# ═══════════════════ GLOBAL STATE ═══════════════════
extracted_texts: dict[str, str] = {}
exam_store: dict = {}
exam_scores: list[dict] = []
# ═══════════════════ UTILITIES ═══════════════════
def clean_response(text: str) -> str:
if not text:
return ""
text = re.sub(r'[\s\S]*?', '', text, flags=re.DOTALL)
text = re.sub(r'[\s\S]*$', '', text, flags=re.DOTALL)
return text.strip()
def robust_json_parse(text: str) -> Optional[dict]:
"""Try multiple strategies to parse JSON from LLM output."""
if not text or not text.strip():
return None
# Strategy 1: Direct parse
try:
return json.loads(text)
except (json.JSONDecodeError, ValueError):
pass
# Strategy 2: Strip markdown code fences
stripped = re.sub(r'^```(?:json)?\s*\n?', '', text, flags=re.MULTILINE)
stripped = re.sub(r'\n?```\s*$', '', stripped, flags=re.MULTILINE).strip()
try:
return json.loads(stripped)
except (json.JSONDecodeError, ValueError):
pass
# Strategy 3: Find the outermost { ... } block
# Use a balanced brace finder instead of greedy regex
start = text.find('{')
if start != -1:
depth = 0
end = start
for i in range(start, len(text)):
if text[i] == '{':
depth += 1
elif text[i] == '}':
depth -= 1
if depth == 0:
end = i + 1
break
candidate = text[start:end]
try:
return json.loads(candidate)
except (json.JSONDecodeError, ValueError):
pass
# Strategy 4: Fix common JSON issues (single quotes, trailing commas)
if start != -1 and end > start:
candidate = text[start:end]
# Replace single quotes with double quotes (risky but last resort)
fixed = candidate.replace("'", '"')
# Remove trailing commas before } or ]
fixed = re.sub(r',\s*([}\]])', r'\1', fixed)
try:
return json.loads(fixed)
except (json.JSONDecodeError, ValueError):
pass
return None
def robust_json_array_parse(text: str) -> list:
"""Try to parse a JSON array from LLM output."""
if not text or not text.strip():
return []
# Strategy 1: Direct parse
try:
result = json.loads(text)
if isinstance(result, list):
return result
except (json.JSONDecodeError, ValueError):
pass
# Strategy 2: Strip markdown
stripped = re.sub(r'^```(?:json)?\s*\n?', '', text, flags=re.MULTILINE)
stripped = re.sub(r'\n?```\s*$', '', stripped, flags=re.MULTILINE).strip()
try:
result = json.loads(stripped)
if isinstance(result, list):
return result
except (json.JSONDecodeError, ValueError):
pass
# Strategy 3: Find [ ... ] block
match = re.search(r'\[[\s\S]*\]', text)
if match:
try:
result = json.loads(match.group())
if isinstance(result, list):
return result
except (json.JSONDecodeError, ValueError):
pass
return []
# ═══════════════════ MODEL ROUTING (ALL VIA OPENROUTER) ═══════════════════
MODEL_OPTIONS = {
"DeepSeek R1": {"model_id": "deepseek/deepseek-r1-0528:free"},
"Nemotron 120B": {"model_id": "nvidia/nemotron-3-super-120b-a12b:free"},
"Gemma 3 27B": {"model_id": "google/gemma-3-27b-it:free"},
"Llama 3.3 70B": {"model_id": "meta-llama/llama-3.3-70b-instruct:free"},
"Qwen3 235B": {"model_id": "qwen/qwen3-235b-a22b:free"},
"DeepSeek V3": {"model_id": "deepseek/deepseek-chat-v3-0324:free"},
}
_or_client: Optional[OpenAI] = None
def _get_or_client() -> Optional[OpenAI]:
global _or_client
key = os.environ.get("OPENROUTER_API_KEY", "")
if not key:
return None
if _or_client is None:
_or_client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=key,
default_headers={
"HTTP-Referer": "https://huggingface.co/spaces/georgtawadrous/study-assistant",
"X-Title": "Study Assistant",
},
)
return _or_client
_hf_client: Optional[InferenceClient] = None
def _get_hf_client() -> Optional[InferenceClient]:
global _hf_client
token = os.environ.get("HF_TOKEN", "")
if not token:
return None
if _hf_client is None:
_hf_client = InferenceClient(api_key=token)
return _hf_client
_google_client = None
def _get_google_client():
global _google_client
key = os.environ.get("GOOGLE_API_KEY", "")
if not key or not GOOGLE_AVAILABLE:
return None
if _google_client is None:
_google_client = google_genai.Client(api_key=key)
return _google_client
def chat_with_model(model_name: str, messages: list[dict], max_tokens: int = 4096) -> str:
client = _get_or_client()
if not client:
return "⚠️ OPENROUTER_API_KEY not set."
try:
r = client.chat.completions.create(
model=MODEL_OPTIONS[model_name]["model_id"],
messages=messages,
max_tokens=max_tokens,
)
return clean_response(r.choices[0].message.content or "")
except Exception as e:
return f"⚠️ {model_name}: {e}"
def stream_chat_with_model(model_name: str, messages: list[dict], max_tokens: int = 4096):
client = _get_or_client()
if not client:
yield "⚠️ OPENROUTER_API_KEY not set."
return
try:
stream = client.chat.completions.create(
model=MODEL_OPTIONS[model_name]["model_id"],
messages=messages,
max_tokens=max_tokens,
stream=True,
)
full = ""
for chunk in stream:
delta = chunk.choices[0].delta.content
if delta:
full += delta
yield clean_response(full)
except Exception as e:
yield f"⚠️ {model_name}: {e}"
# ═══════════════════ OCR ═══════════════════
def extract_text_from_pdf(fp):
doc = fitz.open(fp)
pages = []
for i, pg in enumerate(doc):
t = pg.get_text("text").strip()
if t and len(t) > 30:
pages.append(f"--- Page {i+1} ---\n{t}")
elif TESSERACT_AVAILABLE:
px2 = pg.get_pixmap(dpi=300)
img = Image.frombytes("RGB", [px2.width, px2.height], px2.samples)
ot = pytesseract.image_to_string(img).strip()
pages.append(f"--- Page {i+1} (OCR) ---\n{ot}" if ot else f"--- Page {i+1} ---\n[empty]")
else:
pages.append(f"--- Page {i+1} ---\n[no tesseract]")
doc.close()
return "\n\n".join(pages)
def extract_text_from_image(fp):
if not TESSERACT_AVAILABLE:
return "[no tesseract]"
return pytesseract.image_to_string(Image.open(fp)).strip() or "[empty]"
def extract_text_from_pptx(fp):
if not PPTX_AVAILABLE:
return "[no pptx]"
prs = Presentation(fp)
slides = []
for i, sl in enumerate(prs.slides):
txts = [p.text.strip() for sh in sl.shapes if sh.has_text_frame for p in sh.text_frame.paragraphs if p.text.strip()]
slides.append(f"--- Slide {i+1} ---\n" + "\n".join(txts) if txts else f"--- Slide {i+1} ---\n[empty]")
return "\n\n".join(slides)
def process_upload(files):
if not files:
return "", "⚠️ No files."
results = []
for fp in files:
fn, ext = Path(fp).name, Path(fp).suffix.lower()
try:
if ext == ".pdf":
t = extract_text_from_pdf(fp)
elif ext in (".png", ".jpg", ".jpeg", ".bmp", ".tiff", ".webp"):
t = extract_text_from_image(fp)
elif ext in (".pptx", ".ppt"):
t = extract_text_from_pptx(fp)
elif ext in (".txt", ".md", ".csv", ".py", ".json"):
t = Path(fp).read_text(errors="replace")
else:
t = f"[unsupported: {ext}]"
except Exception as e:
t = f"[error: {e}]"
extracted_texts[fn] = t
results.append(f"## 📄 {fn}\n\n{t[:3000]}" + (f"\n\n…*({len(t):,} chars)*" if len(t) > 3000 else ""))
return "\n\n---\n\n".join(results), f"✅ {len(files)} file(s): {', '.join(extracted_texts.keys())}"
def get_material_choices():
return list(extracted_texts.keys()) if extracted_texts else ["No materials yet"]
# ═══════════════════ CHAT ═══════════════════
def build_system_prompt(custom, use_mat):
b = custom or "You are a helpful study assistant. Use LaTeX ($...$, $$...$$) for math. Be thorough."
if use_mat and extracted_texts:
b += "\n\n--- MATERIALS ---\n" + "\n\n".join(f"### {n}\n{t[:8000]}" for n, t in extracted_texts.items()) + "\n--- END ---"
return b
def chat_respond(msg, history, model, sysp, use_mat):
if not msg.strip():
yield history
return
sp = build_system_prompt(sysp, use_mat)
msgs = [{"role": "system", "content": sp}]
msgs += [{"role": m["role"], "content": m["content"]} for m in history]
msgs += [{"role": "user", "content": msg}]
history = history + [{"role": "user", "content": msg}, {"role": "assistant", "content": "⏳"}]
yield history
for p in stream_chat_with_model(model, msgs):
history[-1]["content"] = p
yield history
# ═══════════════════ EXAM ═══════════════════
EXAM_PROMPT = """You are an expert exam generator. Based on the study material below, generate a comprehensive exam.
CRITICAL: Return ONLY valid JSON. No markdown fences. No explanation. No text before or after the JSON.
Generate exactly {num_mcq} multiple-choice questions and {num_written} written questions.
JSON format:
{{"mcq":[{{"id":1,"question":"...","options":["A) ...","B) ...","C) ...","D) ..."],"correct":"A","explanation":"..."}}],"written":[{{"id":1,"question":"...","rubric":"Key points: ..."}}]}}
Study material:
{material}"""
def generate_exam(mat, mdl, nm, nw, progress=gr.Progress()):
global exam_store
if mat not in extracted_texts:
return "⚠️ Upload materials first in the Upload & OCR tab.", "", gr.update(visible=False)
progress(0.2, desc="Generating exam questions...")
raw = chat_with_model(
mdl,
[
{"role": "system", "content": "You are an exam generator. Return ONLY valid JSON. No markdown code fences. No explanation text."},
{"role": "user", "content": EXAM_PROMPT.format(
num_mcq=int(nm),
num_written=int(nw),
material=extracted_texts[mat][:15000],
)},
],
16000,
)
progress(0.7, desc="Parsing exam...")
# Check if model returned an error
if raw.startswith("⚠️"):
return raw, "", gr.update(visible=False)
# Robust JSON parsing with multiple fallback strategies
d = robust_json_parse(raw)
if not d:
# Show what we got for debugging
preview = raw[:1500] if raw else "(empty response)"
return f"⚠️ Could not parse exam JSON from model response.\n\n**Raw response preview:**\n```\n{preview}\n```\n\n**Tip:** Try a different model (Llama 3.3 70B or DeepSeek V3 tend to produce cleaner JSON).", "", gr.update(visible=False)
mcqs = d.get("mcq", [])
wrs = d.get("written", [])
if not mcqs and not wrs:
return "⚠️ Exam was empty. Try again or use a different model.", "", gr.update(visible=False)
exam_store = {"mcq": mcqs, "written": wrs, "material_name": mat, "model": mdl}
progress(0.9, desc="Formatting...")
md1 = f"# 📝 Exam: {mat}\n**{len(mcqs)} MCQ + {len(wrs)} Written** (by {mdl})\n\n---\n## Multiple Choice\n\n"
for q in mcqs:
md1 += f"### Q{q.get('id', '?')}. {q.get('question', 'N/A')}\n"
for o in q.get("options", []):
md1 += f"- {o}\n"
md1 += "\n"
md2 = "## ✍️ Written Questions\n\n"
for q in wrs:
md2 += f"### Q{q.get('id', '?')}. {q.get('question', 'N/A')}\n\n"
progress(1.0, desc="Done!")
return md1, md2, gr.update(visible=True)
def grade_exam(mcq_str, wr_str, mdl, progress=gr.Progress()):
global exam_scores
if not exam_store.get("mcq"):
return "⚠️ No exam to grade. Generate one first."
progress(0.1, desc="Grading MCQ...")
# Parse MCQ answers
um = {}
for p in mcq_str.split(","):
if ":" in p:
try:
q, a = p.strip().split(":", 1)
um[int(q.strip())] = a.strip().upper()[0]
except (ValueError, IndexError):
pass
mc, res = 0, []
for q in exam_store["mcq"]:
ua = um.get(q["id"], "—")
ca = q.get("correct", "").strip().upper()
if len(ca) > 1:
ca = ca[0]
ok = ua == ca
if ok:
mc += 1
res.append({
"id": q["id"],
"q": q["question"][:60],
"ua": ua,
"ca": ca,
"ok": ok,
"exp": q.get("explanation", ""),
})
tm = len(exam_store["mcq"])
mp = mc / tm * 100 if tm else 0
# Grade written
ws, wr, wqs = 0, [], exam_store.get("written", [])
if wr_str.strip() and wqs:
progress(0.4, desc="Grading written answers...")
uw, c = {}, None
for l in wr_str.split("\n"):
m = re.match(r'^(\d+)[.:]\s*(.*)', l)
if m:
c = int(m.group(1))
uw[c] = m.group(2)
elif c:
uw[c] = uw.get(c, "") + " " + l
gp = "Grade each answer 0-10. Return ONLY a JSON array: [{\"id\":1,\"score\":8,\"feedback\":\"...\"}]\n\n"
for q in wqs:
gp += f"Q{q['id']}: {q['question']}\nRubric: {q.get('rubric', 'N/A')}\nStudent answer: {uw.get(q['id'], '[no answer]')}\n\n"
gr2 = clean_response(chat_with_model(
mdl,
[{"role": "system", "content": "Return ONLY a JSON array. No other text."},
{"role": "user", "content": gp}],
4000,
))
gs = robust_json_array_parse(gr2)
gm2 = {g["id"]: g for g in gs if isinstance(g, dict) and "id" in g}
for q in wqs:
g = gm2.get(q["id"], {"score": 0, "feedback": "Could not grade"})
ws += g.get("score", 0)
wr.append({
"id": q["id"],
"q": q["question"][:60],
"s": g.get("score", 0),
"f": g.get("feedback", ""),
})
twm = len(wqs) * 10
wp = ws / twm * 100 if twm else 0
ov = (mp + wp) / 2 if twm else mp
progress(0.9, desc="Formatting results...")
o = f"# 📊 Exam Results\n\n## MCQ: {mc}/{tm} ({mp:.0f}%)\n\n"
o += "| # | Question | You | Correct | Result |\n|---|---|---|---|---|\n"
for x in res:
o += f"| {x['id']} | {x['q']}… | {x['ua']} | {x['ca']} | {'✅' if x['ok'] else '❌'} |\n"
bad = [x for x in res if not x['ok'] and x['exp']]
if bad:
o += "\n### Explanations for wrong answers:\n\n"
for x in bad:
o += f"- **Q{x['id']}:** {x['exp']}\n"
o += f"\n---\n\n## Written: {ws}/{twm} ({wp:.0f}%)\n\n"
for x in wr:
o += f"**Q{x['id']}.** {x['q']}…\n- Score: **{x['s']}/10** — {x['f']}\n\n"
o += f"\n---\n\n## 🏆 Overall: {ov:.0f}%\n"
exam_scores.append({
"material": exam_store.get("material_name", "?"),
"mcq_score": mp,
"written_score": wp,
"overall": ov,
"timestamp": time.strftime("%Y-%m-%d %H:%M"),
})
return o
# ═══════════════════ VOICE (Whisper+gTTS) ═══════════════════
def transcribe_audio(ap):
if not ap:
return ""
c = _get_hf_client()
if not c:
return "[no HF_TOKEN]"
try:
r = c.automatic_speech_recognition(audio=ap, model="openai/whisper-large-v3-turbo")
return r.text if hasattr(r, 'text') else r.get("text", str(r)) if isinstance(r, dict) else str(r)
except Exception as e:
return f"[{e}]"
def text_to_speech(text):
if not text or not GTTS_AVAILABLE:
return None
try:
c = re.sub(r'\$\$?[^$]+\$\$?', '[formula]', text)
c = re.sub(r'[#*_`|>]', '', c)[:2000]
if not c.strip():
return None
t = gTTS(text=c, lang='en')
f = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False)
t.save(f.name)
return f.name
except Exception:
return None
def voice_chat(audio, history, mdl, use_mat):
if not audio:
return history, None, "⚠️ No audio."
tr = transcribe_audio(audio)
if not tr or tr.startswith("["):
return history, None, tr or "No speech"
sp = build_system_prompt("Voice assistant. Concise, <200 words. No LaTeX.", use_mat)
ms = [{"role": "system", "content": sp}]
ms += [{"role": m["role"], "content": m["content"]} for m in history]
ms += [{"role": "user", "content": tr}]
r = chat_with_model(mdl, ms, 1000)
history = history + [{"role": "user", "content": f"🎤 {tr}"}, {"role": "assistant", "content": r}]
return history, text_to_speech(r), tr
# ═══════════════════ GEMINI LIVE ═══════════════════
GEMINI_LIVE_VOICES = ["Puck", "Aoede", "Charon", "Fenrir", "Kore"]
GEMINI_LIVE_MODEL = "gemini-3.1-flash-live-preview"
@dataclass
class LiveState:
stream: np.ndarray | None = None
sampling_rate: int = 0
pause_detected: bool = False
started_talking: bool = False
stopped: bool = False
conversation: list = field(default_factory=list)
def _detect_pause(audio, sr, state):
if audio is None or len(audio) == 0:
return False
af = audio.astype(np.float32) / 32768.0
w = min(sr, len(af))
e = np.sqrt(np.mean(af[-w:]**2))
if e > 0.01 and not state.started_talking:
state.started_talking = True
return False
dur = len(af) / sr
if state.started_talking and dur > 1.5:
tail = af[-sr:] if len(af) >= sr else af
if np.sqrt(np.mean(tail**2)) < 0.005:
return True
return dur > 30
def live_process_audio(audio, state):
if audio is None:
return None, state
sr, chunk = audio
state.stream = chunk if state.stream is None else np.concatenate((state.stream, chunk))
state.sampling_rate = sr
state.pause_detected = _detect_pause(state.stream, sr, state)
if state.pause_detected and state.started_talking:
return gr.Audio(recording=False), state
return None, state
def _resample_to_16k(audio_np, orig_sr):
af = audio_np.astype(np.float32) / 32768.0
if af.ndim > 1:
af = af.mean(axis=1)
if LIBROSA_AVAILABLE and orig_sr != 16000:
af = librosa.resample(af, orig_sr=orig_sr, target_sr=16000)
elif orig_sr != 16000:
idx = np.arange(0, len(af), orig_sr / 16000).astype(int)
idx = idx[idx < len(af)]
af = af[idx]
return (af * 32768).astype(np.int16).tobytes()
def _pcm24k_to_mp3(pcm):
if PYDUB_AVAILABLE:
seg = AudioSegment(pcm, frame_rate=24000, sample_width=2, channels=1)
buf = io.BytesIO()
seg.export(buf, format="mp3", bitrate="192k")
return buf.getvalue()
buf = io.BytesIO()
with wave.open(buf, 'wb') as w:
w.setnchannels(1)
w.setsampwidth(2)
w.setframerate(24000)
w.writeframes(pcm)
return buf.getvalue()
def live_response(state, voice, use_mat):
"""Send recorded audio to Gemini Live, receive audio response."""
if not state.pause_detected and not state.started_talking:
yield None, LiveState()
return
client = _get_google_client()
if not client:
state.conversation.append({"role": "assistant", "content": "⚠️ GOOGLE_API_KEY not set."})
yield None, LiveState(conversation=state.conversation)
return
sys_t = "You are an expert study tutor. Be concise, educational, and engaging. Ask follow-up questions."
if use_mat and extracted_texts:
sys_t += "\n\nStudy materials:\n" + "\n".join(f"[{k}]: {v[:4000]}" for k, v in extracted_texts.items())
config = genai_types.LiveConnectConfig(
response_modalities=[genai_types.Modality.AUDIO],
speech_config=genai_types.SpeechConfig(
voice_config=genai_types.VoiceConfig(
prebuilt_voice_config=genai_types.PrebuiltVoiceConfig(voice_name=voice),
),
),
system_instruction=genai_types.Content(
parts=[genai_types.Part(text=sys_t)],
),
input_audio_transcription=genai_types.AudioTranscriptionConfig(),
output_audio_transcription=genai_types.AudioTranscriptionConfig(),
realtime_input_config=genai_types.RealtimeInputConfig(
turn_coverage="TURN_INCLUDES_ONLY_ACTIVITY",
),
)
pcm16 = _resample_to_16k(state.stream, state.sampling_rate)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
with wave.open(f.name, 'wb') as w:
w.setnchannels(1)
w.setsampwidth(2)
w.setframerate(16000)
w.writeframes(pcm16)
state.conversation.append({"role": "user", "content": {"path": f.name, "mime_type": "audio/wav"}})
output_pcm = b""
user_transcript = ""
ai_transcript = ""
async def _call():
nonlocal output_pcm, user_transcript, ai_transcript
try:
async with client.aio.live.connect(model=GEMINI_LIVE_MODEL, config=config) as session:
await session.send_realtime_input(
audio=genai_types.Blob(data=pcm16, mime_type="audio/pcm;rate=16000"),
)
async for response in session.receive():
server_content = response.server_content
if server_content:
if server_content.model_turn:
for part in server_content.model_turn.parts:
if part.inline_data:
output_pcm += part.inline_data.data
if server_content.input_transcription and server_content.input_transcription.text:
user_transcript += server_content.input_transcription.text
if server_content.output_transcription and server_content.output_transcription.text:
ai_transcript += server_content.output_transcription.text
if server_content.turn_complete:
break
if server_content.interrupted:
break
except Exception as e:
print(f"Gemini Live error: {e}")
traceback.print_exc()
try:
loop = asyncio.new_event_loop()
loop.run_until_complete(_call())
loop.close()
except Exception as e:
print(f"Loop error: {e}")
if user_transcript:
state.conversation.append({"role": "user", "content": f"🎤 {user_transcript}"})
if ai_transcript:
state.conversation.append({"role": "assistant", "content": ai_transcript})
if output_pcm:
mp3 = _pcm24k_to_mp3(output_pcm)
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f:
f.write(mp3)
if not ai_transcript:
state.conversation.append({"role": "assistant", "content": {"path": f.name, "mime_type": "audio/mp3"}})
yield mp3, state
else:
state.conversation.append({"role": "assistant", "content": "⚠️ No audio response from Gemini Live."})
yield None, LiveState(conversation=state.conversation)
def live_start_recording(state):
if not state.stopped:
return gr.Audio(recording=True)
# ═══════════════════ VISUALIZATIONS ═══════════════════
VIZ_PROMPT = "Data viz expert. Plotly code. Use go/px/np. Assign `fig`. No fig.show(). No fences. Only code.\n{mc}"
LATEX_PROMPT = "Math explainer. $...$ inline, $$...$$ display. Step-by-step.\n{mc}"
def generate_visualization(prompt, mdl, use_mat):
if not prompt.strip():
return None, "⚠️ Describe a chart."
mc = ("Material:\n" + "\n".join(f"[{k}]:{v[:2000]}" for k, v in extracted_texts.items())) if use_mat and extracted_texts else ""
r = clean_response(chat_with_model(
mdl,
[{"role": "system", "content": VIZ_PROMPT.format(mc=mc)},
{"role": "user", "content": prompt}],
4000,
))
code = re.sub(r'^```(?:python)?\s*\n?', '', r, flags=re.MULTILINE)
code = re.sub(r'^```\s*$', '', code, flags=re.MULTILINE).strip()
try:
g = {"go": go, "px": px, "np": np}
try:
import pandas as pd
g["pd"] = pd
except ImportError:
pass
exec(code, g)
fig = g.get("fig")
if not fig:
return None, f"⚠️ No `fig`.\n```python\n{code}\n```"
return fig, f"✅\n```python\n{code}\n```"
except Exception as e:
return None, f"⚠️ {e}\n```python\n{code}\n```"
def generate_latex_explanation(topic, mdl, use_mat):
if not topic.strip():
return "⚠️ Enter a topic."
mc = ("Material:\n" + "\n".join(f"[{k}]:{v[:2000]}" for k, v in extracted_texts.items())) if use_mat and extracted_texts else ""
return clean_response(chat_with_model(
mdl,
[{"role": "system", "content": LATEX_PROMPT.format(mc=mc)},
{"role": "user", "content": f"Explain: {topic}"}],
4000,
))
# ═══════════════════ DASHBOARD ═══════════════════
def get_dashboard_data():
md = "# 📚 Dashboard\n\n## Materials\n\n"
if extracted_texts:
for n, t in extracted_texts.items():
md += f"- **{n}** — {len(t.split()):,} words\n"
else:
md += "_None._\n"
md += "\n## Exams\n\n"
if exam_scores:
md += "| Date | Material | MCQ | Written | Overall |\n|---|---|---|---|---|\n"
for s in exam_scores:
md += f"| {s['timestamp']} | {s['material'][:25]} | {s['mcq_score']:.0f}% | {s['written_score']:.0f}% | {s['overall']:.0f}% |\n"
else:
md += "_None._\n"
or_ok = '✅' if os.environ.get('OPENROUTER_API_KEY') else '❌'
hf_ok = '✅' if os.environ.get('HF_TOKEN') else '❌'
gl_ok = '✅' if os.environ.get('GOOGLE_API_KEY') and GOOGLE_AVAILABLE else '❌'
md += f"\n## Status\n- **OpenRouter** (chat): {or_ok}\n- **HF** (Whisper): {hf_ok}\n- **Google** (Gemini Live): {gl_ok}\n"
fig = None
if exam_scores:
fig = go.Figure()
lb = [f"{s['material'][:12]}…
{s['timestamp']}" for s in exam_scores]
fig.add_trace(go.Bar(name="MCQ", x=lb, y=[s["mcq_score"] for s in exam_scores], marker_color="#4CAF50"))
fig.add_trace(go.Bar(name="Written", x=lb, y=[s["written_score"] for s in exam_scores], marker_color="#2196F3"))
fig.add_trace(go.Scatter(name="Overall", x=lb, y=[s["overall"] for s in exam_scores], mode="lines+markers", line=dict(color="#FF9800", width=3)))
fig.update_layout(barmode="group", yaxis_title="%", yaxis=dict(range=[0, 105]), template="plotly_white", height=400)
return md, fig
# ═══════════════════ UI ═══════════════════
def build_app():
with gr.Blocks(
title="📚 Study Assistant",
css=".main-title{text-align:center}",
theme=gr.themes.Soft(primary_hue="blue", secondary_hue="purple"),
) as demo:
gr.Markdown(
"# 📚 AI Study Assistant\n**Upload → Chat → Exam → Voice → Visualize**\n\n"
"> **Required:** `OPENROUTER_API_KEY` · **Optional:** `HF_TOKEN` (Whisper) · `GOOGLE_API_KEY` (Gemini Live)",
elem_classes=["main-title"],
)
with gr.Tab("💬 Chat"):
with gr.Row():
with gr.Column(scale=1, min_width=280):
m_sel = gr.Dropdown(list(MODEL_OPTIONS.keys()), value="DeepSeek R1", label="🤖 Model")
sysp_box = gr.Textbox(label="System Prompt", lines=3)
um_chat = gr.Checkbox(label="📎 Include Materials", value=True)
gr.Markdown("### Models (free via OpenRouter)\n| Model | Size |\n|---|---|\n| DeepSeek R1 | 671B MoE |\n| Nemotron 120B | 120B MoE |\n| Gemma 3 27B | 27B |\n| Llama 3.3 70B | 70B |\n| Qwen3 235B | 235B MoE |\n| DeepSeek V3 | 685B MoE |")
with gr.Column(scale=3):
cb = gr.Chatbot(
height=550, type="messages", show_copy_button=True,
latex_delimiters=[
{"left": "$$", "right": "$$", "display": True},
{"left": "$", "right": "$", "display": False},
{"left": "\\(", "right": "\\)", "display": False},
{"left": "\\[", "right": "\\]", "display": True},
],
)
with gr.Row():
mi = gr.Textbox(placeholder="Ask about your study materials...", lines=2, scale=5, show_label=False)
sb = gr.Button("Send 📤", variant="primary", scale=1)
gr.Button("🗑️ Clear", size="sm").click(lambda: [], None, cb)
sb.click(chat_respond, [mi, cb, m_sel, sysp_box, um_chat], [cb]).then(lambda: "", None, mi)
mi.submit(chat_respond, [mi, cb, m_sel, sysp_box, um_chat], [cb]).then(lambda: "", None, mi)
with gr.Tab("📤 Upload & OCR"):
gr.Markdown("## Upload Materials\nPDF, PPTX, PNG, JPG, TXT, MD")
with gr.Row():
with gr.Column(scale=1):
fu = gr.File(
label="📁 Upload", file_count="multiple",
file_types=[".pdf", ".pptx", ".ppt", ".png", ".jpg", ".jpeg", ".bmp", ".tiff", ".txt", ".md", ".csv", ".py"],
type="filepath",
)
ub = gr.Button("🔍 Process", variant="primary", size="lg")
us = gr.Markdown("_Upload and Process_")
with gr.Column(scale=2):
eo = gr.Markdown("_Text here..._")
ub.click(process_upload, [fu], [eo, us])
with gr.Tab("📝 Exam"):
gr.Markdown("## Generate & Take Exams")
with gr.Row():
with gr.Column(scale=1, min_width=280):
md2 = gr.Dropdown(get_material_choices(), label="📄 Material", interactive=True)
gr.Button("🔄 Refresh", size="sm").click(lambda: gr.update(choices=get_material_choices()), None, md2)
em = gr.Dropdown(list(MODEL_OPTIONS.keys()), value="DeepSeek R1", label="🤖 Model")
nm = gr.Slider(5, 150, 100, step=5, label="MCQ")
nw = gr.Slider(2, 50, 20, step=1, label="Written")
gen_btn = gr.Button("🎯 Generate", variant="primary", size="lg")
with gr.Column(scale=2):
emd = gr.Markdown("_Generate exam..._")
ewd = gr.Markdown("")
with gr.Group(visible=False) as eg:
gr.Markdown("---\n## ✏️ Take Exam")
with gr.Row():
ma = gr.Textbox(label="MCQ (1:A, 2:B...)", lines=4)
wa = gr.Textbox(label="Written (1: answer...)", lines=8)
grade_mdl = gr.Dropdown(list(MODEL_OPTIONS.keys()), value="DeepSeek R1", label="Grading Model")
grd_btn = gr.Button("📊 Grade", variant="primary", size="lg")
er = gr.Markdown("")
gen_btn.click(generate_exam, [md2, em, nm, nw], [emd, ewd, eg])
grd_btn.click(grade_exam, [ma, wa, grade_mdl], [er])
with gr.Tab("🎤 Voice Chat"):
gr.Markdown("## Voice Assistant\nRecord → Whisper → LLM → gTTS\n> Needs `HF_TOKEN`")
with gr.Row():
with gr.Column(scale=1, min_width=280):
vm = gr.Dropdown(list(MODEL_OPTIONS.keys()), value="DeepSeek R1", label="🤖 Model")
umv = gr.Checkbox(label="📎 Materials", value=True)
ai = gr.Audio(sources=["microphone"], type="filepath", label="🎙️ Record")
vb = gr.Button("🔊 Send", variant="primary", size="lg")
vt = gr.Textbox(label="Transcript", interactive=False)
with gr.Column(scale=2):
vc = gr.Chatbot(
height=400, type="messages",
latex_delimiters=[{"left": "$$", "right": "$$", "display": True}, {"left": "$", "right": "$", "display": False}],
)
va_out = gr.Audio(label="🔊 Response", type="filepath", autoplay=True)
vb.click(voice_chat, [ai, vc, vm, umv], [vc, va_out, vt])
gr.Button("🗑️ Clear", size="sm").click(lambda: ([], None, ""), None, [vc, va_out, vt])
with gr.Tab("🔴 Gemini Live"):
gr.Markdown(
"## 🔴 Gemini Live — Real-time Voice\n"
"Native bidirectional voice conversation via Gemini.\n"
"> Requires `GOOGLE_API_KEY` with Gemini API access"
)
with gr.Row():
with gr.Column(scale=1, min_width=280):
lv = gr.Dropdown(GEMINI_LIVE_VOICES, value="Puck", label="🗣️ Voice")
lum = gr.Checkbox(label="📎 Materials", value=True)
gr.Markdown("### How to use\n1. Click the microphone\n2. Speak your question\n3. Pause — auto-detected\n4. Listen to the AI response\n5. Mic restarts automatically!")
with gr.Column(scale=3):
lcb = gr.Chatbot(label="Conversation", height=400, type="messages")
with gr.Row():
lmic = gr.Audio(label="🎙️ Speak", sources="microphone", type="numpy")
lout = gr.Audio(label="🔊 Response", streaming=True, autoplay=True)
ls = gr.State(value=LiveState())
lstr = lmic.stream(live_process_audio, [lmic, ls], [lmic, ls], stream_every=0.5, time_limit=30)
lresp = lmic.stop_recording(live_response, [ls, lv, lum], [lout, ls])
lresp.then(lambda s: s.conversation, [ls], [lcb])
lrst = lout.stop(live_start_recording, [ls], [lmic])
gr.Button("⛔ Stop", variant="stop").click(
lambda: (LiveState(stopped=True), gr.Audio(recording=False)),
None, [ls, lmic], cancels=[lresp, lrst],
)
with gr.Tab("📊 Visualizations"):
gr.Markdown("## Charts & LaTeX")
with gr.Row():
vmdl = gr.Dropdown(list(MODEL_OPTIONS.keys()), value="DeepSeek R1", label="🤖 Model")
umvz = gr.Checkbox(label="📎 Materials", value=True)
with gr.Row():
with gr.Column():
gr.Markdown("### 📈 Charts")
vp = gr.Textbox(label="Describe", placeholder="Bar chart comparing sorting algorithms...", lines=2)
vbtn = gr.Button("📊 Generate", variant="primary")
vplt = gr.Plot()
vcd = gr.Markdown()
with gr.Column():
gr.Markdown("### 📐 LaTeX")
lt = gr.Textbox(label="Topic", placeholder="Fourier Transform, Bayes' Theorem...", lines=2)
lbtn = gr.Button("📐 Explain", variant="primary")
lo = gr.Markdown(
latex_delimiters=[
{"left": "$$", "right": "$$", "display": True},
{"left": "$", "right": "$", "display": False},
{"left": "\\(", "right": "\\)", "display": False},
{"left": "\\[", "right": "\\]", "display": True},
],
)
vbtn.click(generate_visualization, [vp, vmdl, umvz], [vplt, vcd])
lbtn.click(generate_latex_explanation, [lt, vmdl, umvz], [lo])
with gr.Tab("📊 Dashboard"):
dmd = gr.Markdown("_Refresh_")
dfg = gr.Plot()
gr.Button("🔄 Refresh", variant="primary").click(get_dashboard_data, None, [dmd, dfg])
demo.load(get_dashboard_data, None, [dmd, dfg])
return demo
if __name__ == "__main__":
demo = build_app()
demo.launch()