Spaces:
Sleeping
Sleeping
Fix: Gemini Live model β gemini-3.1-flash-live-preview, robust JSON parsing for exam generation
793c119 verified | """ | |
| Study Assistant - Comprehensive Gradio-based study tool | |
| Features: OCR, Multi-Agent Chat (via OpenRouter), Exam Generation, Voice Chat, Gemini Live, Visualizations | |
| """ | |
| import os, io, re, json, time, asyncio, base64, random, tempfile, traceback, wave, inspect | |
| from pathlib import Path | |
| from typing import Optional | |
| from dataclasses import dataclass, field | |
| import gradio as gr | |
| import numpy as np | |
| # ββ OpenRouter (OpenAI-compatible) ββ | |
| from openai import OpenAI | |
| # ββ HF for Whisper ASR ββ | |
| from huggingface_hub import InferenceClient | |
| # ββ Google GenAI for Gemini Live ββ | |
| try: | |
| from google import genai as google_genai | |
| from google.genai import types as genai_types | |
| GOOGLE_AVAILABLE = True | |
| except ImportError: | |
| GOOGLE_AVAILABLE = False | |
| # ββ OCR ββ | |
| import fitz | |
| from PIL import Image | |
| try: | |
| import pytesseract | |
| TESSERACT_AVAILABLE = True | |
| except ImportError: | |
| TESSERACT_AVAILABLE = False | |
| try: | |
| from pptx import Presentation | |
| PPTX_AVAILABLE = True | |
| except ImportError: | |
| PPTX_AVAILABLE = False | |
| # ββ Viz ββ | |
| import plotly.graph_objects as go | |
| import plotly.express as px | |
| # ββ Audio ββ | |
| try: | |
| from pydub import AudioSegment | |
| PYDUB_AVAILABLE = True | |
| except ImportError: | |
| PYDUB_AVAILABLE = False | |
| try: | |
| import librosa | |
| LIBROSA_AVAILABLE = True | |
| except ImportError: | |
| LIBROSA_AVAILABLE = False | |
| try: | |
| from gtts import gTTS | |
| GTTS_AVAILABLE = True | |
| except ImportError: | |
| GTTS_AVAILABLE = False | |
| # βββββββββββββββββββ GLOBAL STATE βββββββββββββββββββ | |
| extracted_texts: dict[str, str] = {} | |
| exam_store: dict = {} | |
| exam_scores: list[dict] = [] | |
| # βββββββββββββββββββ UTILITIES βββββββββββββββββββ | |
| def clean_response(text: str) -> str: | |
| if not text: | |
| return "" | |
| text = re.sub(r'<think>[\s\S]*?</think>', '', text, flags=re.DOTALL) | |
| text = re.sub(r'<think>[\s\S]*$', '', text, flags=re.DOTALL) | |
| return text.strip() | |
| def robust_json_parse(text: str) -> Optional[dict]: | |
| """Try multiple strategies to parse JSON from LLM output.""" | |
| if not text or not text.strip(): | |
| return None | |
| # Strategy 1: Direct parse | |
| try: | |
| return json.loads(text) | |
| except (json.JSONDecodeError, ValueError): | |
| pass | |
| # Strategy 2: Strip markdown code fences | |
| stripped = re.sub(r'^```(?:json)?\s*\n?', '', text, flags=re.MULTILINE) | |
| stripped = re.sub(r'\n?```\s*$', '', stripped, flags=re.MULTILINE).strip() | |
| try: | |
| return json.loads(stripped) | |
| except (json.JSONDecodeError, ValueError): | |
| pass | |
| # Strategy 3: Find the outermost { ... } block | |
| # Use a balanced brace finder instead of greedy regex | |
| start = text.find('{') | |
| if start != -1: | |
| depth = 0 | |
| end = start | |
| for i in range(start, len(text)): | |
| if text[i] == '{': | |
| depth += 1 | |
| elif text[i] == '}': | |
| depth -= 1 | |
| if depth == 0: | |
| end = i + 1 | |
| break | |
| candidate = text[start:end] | |
| try: | |
| return json.loads(candidate) | |
| except (json.JSONDecodeError, ValueError): | |
| pass | |
| # Strategy 4: Fix common JSON issues (single quotes, trailing commas) | |
| if start != -1 and end > start: | |
| candidate = text[start:end] | |
| # Replace single quotes with double quotes (risky but last resort) | |
| fixed = candidate.replace("'", '"') | |
| # Remove trailing commas before } or ] | |
| fixed = re.sub(r',\s*([}\]])', r'\1', fixed) | |
| try: | |
| return json.loads(fixed) | |
| except (json.JSONDecodeError, ValueError): | |
| pass | |
| return None | |
| def robust_json_array_parse(text: str) -> list: | |
| """Try to parse a JSON array from LLM output.""" | |
| if not text or not text.strip(): | |
| return [] | |
| # Strategy 1: Direct parse | |
| try: | |
| result = json.loads(text) | |
| if isinstance(result, list): | |
| return result | |
| except (json.JSONDecodeError, ValueError): | |
| pass | |
| # Strategy 2: Strip markdown | |
| stripped = re.sub(r'^```(?:json)?\s*\n?', '', text, flags=re.MULTILINE) | |
| stripped = re.sub(r'\n?```\s*$', '', stripped, flags=re.MULTILINE).strip() | |
| try: | |
| result = json.loads(stripped) | |
| if isinstance(result, list): | |
| return result | |
| except (json.JSONDecodeError, ValueError): | |
| pass | |
| # Strategy 3: Find [ ... ] block | |
| match = re.search(r'\[[\s\S]*\]', text) | |
| if match: | |
| try: | |
| result = json.loads(match.group()) | |
| if isinstance(result, list): | |
| return result | |
| except (json.JSONDecodeError, ValueError): | |
| pass | |
| return [] | |
| # βββββββββββββββββββ MODEL ROUTING (ALL VIA OPENROUTER) βββββββββββββββββββ | |
| MODEL_OPTIONS = { | |
| "DeepSeek R1": {"model_id": "deepseek/deepseek-r1-0528:free"}, | |
| "Nemotron 120B": {"model_id": "nvidia/nemotron-3-super-120b-a12b:free"}, | |
| "Gemma 3 27B": {"model_id": "google/gemma-3-27b-it:free"}, | |
| "Llama 3.3 70B": {"model_id": "meta-llama/llama-3.3-70b-instruct:free"}, | |
| "Qwen3 235B": {"model_id": "qwen/qwen3-235b-a22b:free"}, | |
| "DeepSeek V3": {"model_id": "deepseek/deepseek-chat-v3-0324:free"}, | |
| } | |
| _or_client: Optional[OpenAI] = None | |
| def _get_or_client() -> Optional[OpenAI]: | |
| global _or_client | |
| key = os.environ.get("OPENROUTER_API_KEY", "") | |
| if not key: | |
| return None | |
| if _or_client is None: | |
| _or_client = OpenAI( | |
| base_url="https://openrouter.ai/api/v1", | |
| api_key=key, | |
| default_headers={ | |
| "HTTP-Referer": "https://huggingface.co/spaces/georgtawadrous/study-assistant", | |
| "X-Title": "Study Assistant", | |
| }, | |
| ) | |
| return _or_client | |
| _hf_client: Optional[InferenceClient] = None | |
| def _get_hf_client() -> Optional[InferenceClient]: | |
| global _hf_client | |
| token = os.environ.get("HF_TOKEN", "") | |
| if not token: | |
| return None | |
| if _hf_client is None: | |
| _hf_client = InferenceClient(api_key=token) | |
| return _hf_client | |
| _google_client = None | |
| def _get_google_client(): | |
| global _google_client | |
| key = os.environ.get("GOOGLE_API_KEY", "") | |
| if not key or not GOOGLE_AVAILABLE: | |
| return None | |
| if _google_client is None: | |
| _google_client = google_genai.Client(api_key=key) | |
| return _google_client | |
| def chat_with_model(model_name: str, messages: list[dict], max_tokens: int = 4096) -> str: | |
| client = _get_or_client() | |
| if not client: | |
| return "β οΈ OPENROUTER_API_KEY not set." | |
| try: | |
| r = client.chat.completions.create( | |
| model=MODEL_OPTIONS[model_name]["model_id"], | |
| messages=messages, | |
| max_tokens=max_tokens, | |
| ) | |
| return clean_response(r.choices[0].message.content or "") | |
| except Exception as e: | |
| return f"β οΈ {model_name}: {e}" | |
| def stream_chat_with_model(model_name: str, messages: list[dict], max_tokens: int = 4096): | |
| client = _get_or_client() | |
| if not client: | |
| yield "β οΈ OPENROUTER_API_KEY not set." | |
| return | |
| try: | |
| stream = client.chat.completions.create( | |
| model=MODEL_OPTIONS[model_name]["model_id"], | |
| messages=messages, | |
| max_tokens=max_tokens, | |
| stream=True, | |
| ) | |
| full = "" | |
| for chunk in stream: | |
| delta = chunk.choices[0].delta.content | |
| if delta: | |
| full += delta | |
| yield clean_response(full) | |
| except Exception as e: | |
| yield f"β οΈ {model_name}: {e}" | |
| # βββββββββββββββββββ OCR βββββββββββββββββββ | |
| def extract_text_from_pdf(fp): | |
| doc = fitz.open(fp) | |
| pages = [] | |
| for i, pg in enumerate(doc): | |
| t = pg.get_text("text").strip() | |
| if t and len(t) > 30: | |
| pages.append(f"--- Page {i+1} ---\n{t}") | |
| elif TESSERACT_AVAILABLE: | |
| px2 = pg.get_pixmap(dpi=300) | |
| img = Image.frombytes("RGB", [px2.width, px2.height], px2.samples) | |
| ot = pytesseract.image_to_string(img).strip() | |
| pages.append(f"--- Page {i+1} (OCR) ---\n{ot}" if ot else f"--- Page {i+1} ---\n[empty]") | |
| else: | |
| pages.append(f"--- Page {i+1} ---\n[no tesseract]") | |
| doc.close() | |
| return "\n\n".join(pages) | |
| def extract_text_from_image(fp): | |
| if not TESSERACT_AVAILABLE: | |
| return "[no tesseract]" | |
| return pytesseract.image_to_string(Image.open(fp)).strip() or "[empty]" | |
| def extract_text_from_pptx(fp): | |
| if not PPTX_AVAILABLE: | |
| return "[no pptx]" | |
| prs = Presentation(fp) | |
| slides = [] | |
| for i, sl in enumerate(prs.slides): | |
| txts = [p.text.strip() for sh in sl.shapes if sh.has_text_frame for p in sh.text_frame.paragraphs if p.text.strip()] | |
| slides.append(f"--- Slide {i+1} ---\n" + "\n".join(txts) if txts else f"--- Slide {i+1} ---\n[empty]") | |
| return "\n\n".join(slides) | |
| def process_upload(files): | |
| if not files: | |
| return "", "β οΈ No files." | |
| results = [] | |
| for fp in files: | |
| fn, ext = Path(fp).name, Path(fp).suffix.lower() | |
| try: | |
| if ext == ".pdf": | |
| t = extract_text_from_pdf(fp) | |
| elif ext in (".png", ".jpg", ".jpeg", ".bmp", ".tiff", ".webp"): | |
| t = extract_text_from_image(fp) | |
| elif ext in (".pptx", ".ppt"): | |
| t = extract_text_from_pptx(fp) | |
| elif ext in (".txt", ".md", ".csv", ".py", ".json"): | |
| t = Path(fp).read_text(errors="replace") | |
| else: | |
| t = f"[unsupported: {ext}]" | |
| except Exception as e: | |
| t = f"[error: {e}]" | |
| extracted_texts[fn] = t | |
| results.append(f"## π {fn}\n\n{t[:3000]}" + (f"\n\nβ¦*({len(t):,} chars)*" if len(t) > 3000 else "")) | |
| return "\n\n---\n\n".join(results), f"β {len(files)} file(s): {', '.join(extracted_texts.keys())}" | |
| def get_material_choices(): | |
| return list(extracted_texts.keys()) if extracted_texts else ["No materials yet"] | |
| # βββββββββββββββββββ CHAT βββββββββββββββββββ | |
| def build_system_prompt(custom, use_mat): | |
| b = custom or "You are a helpful study assistant. Use LaTeX ($...$, $$...$$) for math. Be thorough." | |
| if use_mat and extracted_texts: | |
| b += "\n\n--- MATERIALS ---\n" + "\n\n".join(f"### {n}\n{t[:8000]}" for n, t in extracted_texts.items()) + "\n--- END ---" | |
| return b | |
| def chat_respond(msg, history, model, sysp, use_mat): | |
| if not msg.strip(): | |
| yield history | |
| return | |
| sp = build_system_prompt(sysp, use_mat) | |
| msgs = [{"role": "system", "content": sp}] | |
| msgs += [{"role": m["role"], "content": m["content"]} for m in history] | |
| msgs += [{"role": "user", "content": msg}] | |
| history = history + [{"role": "user", "content": msg}, {"role": "assistant", "content": "β³"}] | |
| yield history | |
| for p in stream_chat_with_model(model, msgs): | |
| history[-1]["content"] = p | |
| yield history | |
| # βββββββββββββββββββ EXAM βββββββββββββββββββ | |
| EXAM_PROMPT = """You are an expert exam generator. Based on the study material below, generate a comprehensive exam. | |
| CRITICAL: Return ONLY valid JSON. No markdown fences. No explanation. No text before or after the JSON. | |
| Generate exactly {num_mcq} multiple-choice questions and {num_written} written questions. | |
| JSON format: | |
| {{"mcq":[{{"id":1,"question":"...","options":["A) ...","B) ...","C) ...","D) ..."],"correct":"A","explanation":"..."}}],"written":[{{"id":1,"question":"...","rubric":"Key points: ..."}}]}} | |
| Study material: | |
| {material}""" | |
| def generate_exam(mat, mdl, nm, nw, progress=gr.Progress()): | |
| global exam_store | |
| if mat not in extracted_texts: | |
| return "β οΈ Upload materials first in the Upload & OCR tab.", "", gr.update(visible=False) | |
| progress(0.2, desc="Generating exam questions...") | |
| raw = chat_with_model( | |
| mdl, | |
| [ | |
| {"role": "system", "content": "You are an exam generator. Return ONLY valid JSON. No markdown code fences. No explanation text."}, | |
| {"role": "user", "content": EXAM_PROMPT.format( | |
| num_mcq=int(nm), | |
| num_written=int(nw), | |
| material=extracted_texts[mat][:15000], | |
| )}, | |
| ], | |
| 16000, | |
| ) | |
| progress(0.7, desc="Parsing exam...") | |
| # Check if model returned an error | |
| if raw.startswith("β οΈ"): | |
| return raw, "", gr.update(visible=False) | |
| # Robust JSON parsing with multiple fallback strategies | |
| d = robust_json_parse(raw) | |
| if not d: | |
| # Show what we got for debugging | |
| preview = raw[:1500] if raw else "(empty response)" | |
| return f"β οΈ Could not parse exam JSON from model response.\n\n**Raw response preview:**\n```\n{preview}\n```\n\n**Tip:** Try a different model (Llama 3.3 70B or DeepSeek V3 tend to produce cleaner JSON).", "", gr.update(visible=False) | |
| mcqs = d.get("mcq", []) | |
| wrs = d.get("written", []) | |
| if not mcqs and not wrs: | |
| return "β οΈ Exam was empty. Try again or use a different model.", "", gr.update(visible=False) | |
| exam_store = {"mcq": mcqs, "written": wrs, "material_name": mat, "model": mdl} | |
| progress(0.9, desc="Formatting...") | |
| md1 = f"# π Exam: {mat}\n**{len(mcqs)} MCQ + {len(wrs)} Written** (by {mdl})\n\n---\n## Multiple Choice\n\n" | |
| for q in mcqs: | |
| md1 += f"### Q{q.get('id', '?')}. {q.get('question', 'N/A')}\n" | |
| for o in q.get("options", []): | |
| md1 += f"- {o}\n" | |
| md1 += "\n" | |
| md2 = "## βοΈ Written Questions\n\n" | |
| for q in wrs: | |
| md2 += f"### Q{q.get('id', '?')}. {q.get('question', 'N/A')}\n\n" | |
| progress(1.0, desc="Done!") | |
| return md1, md2, gr.update(visible=True) | |
| def grade_exam(mcq_str, wr_str, mdl, progress=gr.Progress()): | |
| global exam_scores | |
| if not exam_store.get("mcq"): | |
| return "β οΈ No exam to grade. Generate one first." | |
| progress(0.1, desc="Grading MCQ...") | |
| # Parse MCQ answers | |
| um = {} | |
| for p in mcq_str.split(","): | |
| if ":" in p: | |
| try: | |
| q, a = p.strip().split(":", 1) | |
| um[int(q.strip())] = a.strip().upper()[0] | |
| except (ValueError, IndexError): | |
| pass | |
| mc, res = 0, [] | |
| for q in exam_store["mcq"]: | |
| ua = um.get(q["id"], "β") | |
| ca = q.get("correct", "").strip().upper() | |
| if len(ca) > 1: | |
| ca = ca[0] | |
| ok = ua == ca | |
| if ok: | |
| mc += 1 | |
| res.append({ | |
| "id": q["id"], | |
| "q": q["question"][:60], | |
| "ua": ua, | |
| "ca": ca, | |
| "ok": ok, | |
| "exp": q.get("explanation", ""), | |
| }) | |
| tm = len(exam_store["mcq"]) | |
| mp = mc / tm * 100 if tm else 0 | |
| # Grade written | |
| ws, wr, wqs = 0, [], exam_store.get("written", []) | |
| if wr_str.strip() and wqs: | |
| progress(0.4, desc="Grading written answers...") | |
| uw, c = {}, None | |
| for l in wr_str.split("\n"): | |
| m = re.match(r'^(\d+)[.:]\s*(.*)', l) | |
| if m: | |
| c = int(m.group(1)) | |
| uw[c] = m.group(2) | |
| elif c: | |
| uw[c] = uw.get(c, "") + " " + l | |
| gp = "Grade each answer 0-10. Return ONLY a JSON array: [{\"id\":1,\"score\":8,\"feedback\":\"...\"}]\n\n" | |
| for q in wqs: | |
| gp += f"Q{q['id']}: {q['question']}\nRubric: {q.get('rubric', 'N/A')}\nStudent answer: {uw.get(q['id'], '[no answer]')}\n\n" | |
| gr2 = clean_response(chat_with_model( | |
| mdl, | |
| [{"role": "system", "content": "Return ONLY a JSON array. No other text."}, | |
| {"role": "user", "content": gp}], | |
| 4000, | |
| )) | |
| gs = robust_json_array_parse(gr2) | |
| gm2 = {g["id"]: g for g in gs if isinstance(g, dict) and "id" in g} | |
| for q in wqs: | |
| g = gm2.get(q["id"], {"score": 0, "feedback": "Could not grade"}) | |
| ws += g.get("score", 0) | |
| wr.append({ | |
| "id": q["id"], | |
| "q": q["question"][:60], | |
| "s": g.get("score", 0), | |
| "f": g.get("feedback", ""), | |
| }) | |
| twm = len(wqs) * 10 | |
| wp = ws / twm * 100 if twm else 0 | |
| ov = (mp + wp) / 2 if twm else mp | |
| progress(0.9, desc="Formatting results...") | |
| o = f"# π Exam Results\n\n## MCQ: {mc}/{tm} ({mp:.0f}%)\n\n" | |
| o += "| # | Question | You | Correct | Result |\n|---|---|---|---|---|\n" | |
| for x in res: | |
| o += f"| {x['id']} | {x['q']}β¦ | {x['ua']} | {x['ca']} | {'β ' if x['ok'] else 'β'} |\n" | |
| bad = [x for x in res if not x['ok'] and x['exp']] | |
| if bad: | |
| o += "\n### Explanations for wrong answers:\n\n" | |
| for x in bad: | |
| o += f"- **Q{x['id']}:** {x['exp']}\n" | |
| o += f"\n---\n\n## Written: {ws}/{twm} ({wp:.0f}%)\n\n" | |
| for x in wr: | |
| o += f"**Q{x['id']}.** {x['q']}β¦\n- Score: **{x['s']}/10** β {x['f']}\n\n" | |
| o += f"\n---\n\n## π Overall: {ov:.0f}%\n" | |
| exam_scores.append({ | |
| "material": exam_store.get("material_name", "?"), | |
| "mcq_score": mp, | |
| "written_score": wp, | |
| "overall": ov, | |
| "timestamp": time.strftime("%Y-%m-%d %H:%M"), | |
| }) | |
| return o | |
| # βββββββββββββββββββ VOICE (Whisper+gTTS) βββββββββββββββββββ | |
| def transcribe_audio(ap): | |
| if not ap: | |
| return "" | |
| c = _get_hf_client() | |
| if not c: | |
| return "[no HF_TOKEN]" | |
| try: | |
| r = c.automatic_speech_recognition(audio=ap, model="openai/whisper-large-v3-turbo") | |
| return r.text if hasattr(r, 'text') else r.get("text", str(r)) if isinstance(r, dict) else str(r) | |
| except Exception as e: | |
| return f"[{e}]" | |
| def text_to_speech(text): | |
| if not text or not GTTS_AVAILABLE: | |
| return None | |
| try: | |
| c = re.sub(r'\$\$?[^$]+\$\$?', '[formula]', text) | |
| c = re.sub(r'[#*_`|>]', '', c)[:2000] | |
| if not c.strip(): | |
| return None | |
| t = gTTS(text=c, lang='en') | |
| f = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) | |
| t.save(f.name) | |
| return f.name | |
| except Exception: | |
| return None | |
| def voice_chat(audio, history, mdl, use_mat): | |
| if not audio: | |
| return history, None, "β οΈ No audio." | |
| tr = transcribe_audio(audio) | |
| if not tr or tr.startswith("["): | |
| return history, None, tr or "No speech" | |
| sp = build_system_prompt("Voice assistant. Concise, <200 words. No LaTeX.", use_mat) | |
| ms = [{"role": "system", "content": sp}] | |
| ms += [{"role": m["role"], "content": m["content"]} for m in history] | |
| ms += [{"role": "user", "content": tr}] | |
| r = chat_with_model(mdl, ms, 1000) | |
| history = history + [{"role": "user", "content": f"π€ {tr}"}, {"role": "assistant", "content": r}] | |
| return history, text_to_speech(r), tr | |
| # βββββββββββββββββββ GEMINI LIVE βββββββββββββββββββ | |
| GEMINI_LIVE_VOICES = ["Puck", "Aoede", "Charon", "Fenrir", "Kore"] | |
| GEMINI_LIVE_MODEL = "gemini-3.1-flash-live-preview" | |
| class LiveState: | |
| stream: np.ndarray | None = None | |
| sampling_rate: int = 0 | |
| pause_detected: bool = False | |
| started_talking: bool = False | |
| stopped: bool = False | |
| conversation: list = field(default_factory=list) | |
| def _detect_pause(audio, sr, state): | |
| if audio is None or len(audio) == 0: | |
| return False | |
| af = audio.astype(np.float32) / 32768.0 | |
| w = min(sr, len(af)) | |
| e = np.sqrt(np.mean(af[-w:]**2)) | |
| if e > 0.01 and not state.started_talking: | |
| state.started_talking = True | |
| return False | |
| dur = len(af) / sr | |
| if state.started_talking and dur > 1.5: | |
| tail = af[-sr:] if len(af) >= sr else af | |
| if np.sqrt(np.mean(tail**2)) < 0.005: | |
| return True | |
| return dur > 30 | |
| def live_process_audio(audio, state): | |
| if audio is None: | |
| return None, state | |
| sr, chunk = audio | |
| state.stream = chunk if state.stream is None else np.concatenate((state.stream, chunk)) | |
| state.sampling_rate = sr | |
| state.pause_detected = _detect_pause(state.stream, sr, state) | |
| if state.pause_detected and state.started_talking: | |
| return gr.Audio(recording=False), state | |
| return None, state | |
| def _resample_to_16k(audio_np, orig_sr): | |
| af = audio_np.astype(np.float32) / 32768.0 | |
| if af.ndim > 1: | |
| af = af.mean(axis=1) | |
| if LIBROSA_AVAILABLE and orig_sr != 16000: | |
| af = librosa.resample(af, orig_sr=orig_sr, target_sr=16000) | |
| elif orig_sr != 16000: | |
| idx = np.arange(0, len(af), orig_sr / 16000).astype(int) | |
| idx = idx[idx < len(af)] | |
| af = af[idx] | |
| return (af * 32768).astype(np.int16).tobytes() | |
| def _pcm24k_to_mp3(pcm): | |
| if PYDUB_AVAILABLE: | |
| seg = AudioSegment(pcm, frame_rate=24000, sample_width=2, channels=1) | |
| buf = io.BytesIO() | |
| seg.export(buf, format="mp3", bitrate="192k") | |
| return buf.getvalue() | |
| buf = io.BytesIO() | |
| with wave.open(buf, 'wb') as w: | |
| w.setnchannels(1) | |
| w.setsampwidth(2) | |
| w.setframerate(24000) | |
| w.writeframes(pcm) | |
| return buf.getvalue() | |
| def live_response(state, voice, use_mat): | |
| """Send recorded audio to Gemini Live, receive audio response.""" | |
| if not state.pause_detected and not state.started_talking: | |
| yield None, LiveState() | |
| return | |
| client = _get_google_client() | |
| if not client: | |
| state.conversation.append({"role": "assistant", "content": "β οΈ GOOGLE_API_KEY not set."}) | |
| yield None, LiveState(conversation=state.conversation) | |
| return | |
| sys_t = "You are an expert study tutor. Be concise, educational, and engaging. Ask follow-up questions." | |
| if use_mat and extracted_texts: | |
| sys_t += "\n\nStudy materials:\n" + "\n".join(f"[{k}]: {v[:4000]}" for k, v in extracted_texts.items()) | |
| config = genai_types.LiveConnectConfig( | |
| response_modalities=[genai_types.Modality.AUDIO], | |
| speech_config=genai_types.SpeechConfig( | |
| voice_config=genai_types.VoiceConfig( | |
| prebuilt_voice_config=genai_types.PrebuiltVoiceConfig(voice_name=voice), | |
| ), | |
| ), | |
| system_instruction=genai_types.Content( | |
| parts=[genai_types.Part(text=sys_t)], | |
| ), | |
| input_audio_transcription=genai_types.AudioTranscriptionConfig(), | |
| output_audio_transcription=genai_types.AudioTranscriptionConfig(), | |
| realtime_input_config=genai_types.RealtimeInputConfig( | |
| turn_coverage="TURN_INCLUDES_ONLY_ACTIVITY", | |
| ), | |
| ) | |
| pcm16 = _resample_to_16k(state.stream, state.sampling_rate) | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: | |
| with wave.open(f.name, 'wb') as w: | |
| w.setnchannels(1) | |
| w.setsampwidth(2) | |
| w.setframerate(16000) | |
| w.writeframes(pcm16) | |
| state.conversation.append({"role": "user", "content": {"path": f.name, "mime_type": "audio/wav"}}) | |
| output_pcm = b"" | |
| user_transcript = "" | |
| ai_transcript = "" | |
| async def _call(): | |
| nonlocal output_pcm, user_transcript, ai_transcript | |
| try: | |
| async with client.aio.live.connect(model=GEMINI_LIVE_MODEL, config=config) as session: | |
| await session.send_realtime_input( | |
| audio=genai_types.Blob(data=pcm16, mime_type="audio/pcm;rate=16000"), | |
| ) | |
| async for response in session.receive(): | |
| server_content = response.server_content | |
| if server_content: | |
| if server_content.model_turn: | |
| for part in server_content.model_turn.parts: | |
| if part.inline_data: | |
| output_pcm += part.inline_data.data | |
| if server_content.input_transcription and server_content.input_transcription.text: | |
| user_transcript += server_content.input_transcription.text | |
| if server_content.output_transcription and server_content.output_transcription.text: | |
| ai_transcript += server_content.output_transcription.text | |
| if server_content.turn_complete: | |
| break | |
| if server_content.interrupted: | |
| break | |
| except Exception as e: | |
| print(f"Gemini Live error: {e}") | |
| traceback.print_exc() | |
| try: | |
| loop = asyncio.new_event_loop() | |
| loop.run_until_complete(_call()) | |
| loop.close() | |
| except Exception as e: | |
| print(f"Loop error: {e}") | |
| if user_transcript: | |
| state.conversation.append({"role": "user", "content": f"π€ {user_transcript}"}) | |
| if ai_transcript: | |
| state.conversation.append({"role": "assistant", "content": ai_transcript}) | |
| if output_pcm: | |
| mp3 = _pcm24k_to_mp3(output_pcm) | |
| with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f: | |
| f.write(mp3) | |
| if not ai_transcript: | |
| state.conversation.append({"role": "assistant", "content": {"path": f.name, "mime_type": "audio/mp3"}}) | |
| yield mp3, state | |
| else: | |
| state.conversation.append({"role": "assistant", "content": "β οΈ No audio response from Gemini Live."}) | |
| yield None, LiveState(conversation=state.conversation) | |
| def live_start_recording(state): | |
| if not state.stopped: | |
| return gr.Audio(recording=True) | |
| # βββββββββββββββββββ VISUALIZATIONS βββββββββββββββββββ | |
| VIZ_PROMPT = "Data viz expert. Plotly code. Use go/px/np. Assign `fig`. No fig.show(). No fences. Only code.\n{mc}" | |
| LATEX_PROMPT = "Math explainer. $...$ inline, $$...$$ display. Step-by-step.\n{mc}" | |
| def generate_visualization(prompt, mdl, use_mat): | |
| if not prompt.strip(): | |
| return None, "β οΈ Describe a chart." | |
| mc = ("Material:\n" + "\n".join(f"[{k}]:{v[:2000]}" for k, v in extracted_texts.items())) if use_mat and extracted_texts else "" | |
| r = clean_response(chat_with_model( | |
| mdl, | |
| [{"role": "system", "content": VIZ_PROMPT.format(mc=mc)}, | |
| {"role": "user", "content": prompt}], | |
| 4000, | |
| )) | |
| code = re.sub(r'^```(?:python)?\s*\n?', '', r, flags=re.MULTILINE) | |
| code = re.sub(r'^```\s*$', '', code, flags=re.MULTILINE).strip() | |
| try: | |
| g = {"go": go, "px": px, "np": np} | |
| try: | |
| import pandas as pd | |
| g["pd"] = pd | |
| except ImportError: | |
| pass | |
| exec(code, g) | |
| fig = g.get("fig") | |
| if not fig: | |
| return None, f"β οΈ No `fig`.\n```python\n{code}\n```" | |
| return fig, f"β \n```python\n{code}\n```" | |
| except Exception as e: | |
| return None, f"β οΈ {e}\n```python\n{code}\n```" | |
| def generate_latex_explanation(topic, mdl, use_mat): | |
| if not topic.strip(): | |
| return "β οΈ Enter a topic." | |
| mc = ("Material:\n" + "\n".join(f"[{k}]:{v[:2000]}" for k, v in extracted_texts.items())) if use_mat and extracted_texts else "" | |
| return clean_response(chat_with_model( | |
| mdl, | |
| [{"role": "system", "content": LATEX_PROMPT.format(mc=mc)}, | |
| {"role": "user", "content": f"Explain: {topic}"}], | |
| 4000, | |
| )) | |
| # βββββββββββββββββββ DASHBOARD βββββββββββββββββββ | |
| def get_dashboard_data(): | |
| md = "# π Dashboard\n\n## Materials\n\n" | |
| if extracted_texts: | |
| for n, t in extracted_texts.items(): | |
| md += f"- **{n}** β {len(t.split()):,} words\n" | |
| else: | |
| md += "_None._\n" | |
| md += "\n## Exams\n\n" | |
| if exam_scores: | |
| md += "| Date | Material | MCQ | Written | Overall |\n|---|---|---|---|---|\n" | |
| for s in exam_scores: | |
| md += f"| {s['timestamp']} | {s['material'][:25]} | {s['mcq_score']:.0f}% | {s['written_score']:.0f}% | {s['overall']:.0f}% |\n" | |
| else: | |
| md += "_None._\n" | |
| or_ok = 'β ' if os.environ.get('OPENROUTER_API_KEY') else 'β' | |
| hf_ok = 'β ' if os.environ.get('HF_TOKEN') else 'β' | |
| gl_ok = 'β ' if os.environ.get('GOOGLE_API_KEY') and GOOGLE_AVAILABLE else 'β' | |
| md += f"\n## Status\n- **OpenRouter** (chat): {or_ok}\n- **HF** (Whisper): {hf_ok}\n- **Google** (Gemini Live): {gl_ok}\n" | |
| fig = None | |
| if exam_scores: | |
| fig = go.Figure() | |
| lb = [f"{s['material'][:12]}β¦<br>{s['timestamp']}" for s in exam_scores] | |
| fig.add_trace(go.Bar(name="MCQ", x=lb, y=[s["mcq_score"] for s in exam_scores], marker_color="#4CAF50")) | |
| fig.add_trace(go.Bar(name="Written", x=lb, y=[s["written_score"] for s in exam_scores], marker_color="#2196F3")) | |
| fig.add_trace(go.Scatter(name="Overall", x=lb, y=[s["overall"] for s in exam_scores], mode="lines+markers", line=dict(color="#FF9800", width=3))) | |
| fig.update_layout(barmode="group", yaxis_title="%", yaxis=dict(range=[0, 105]), template="plotly_white", height=400) | |
| return md, fig | |
| # βββββββββββββββββββ UI βββββββββββββββββββ | |
| def build_app(): | |
| with gr.Blocks( | |
| title="π Study Assistant", | |
| css=".main-title{text-align:center}", | |
| theme=gr.themes.Soft(primary_hue="blue", secondary_hue="purple"), | |
| ) as demo: | |
| gr.Markdown( | |
| "# π AI Study Assistant\n**Upload β Chat β Exam β Voice β Visualize**\n\n" | |
| "> **Required:** `OPENROUTER_API_KEY` Β· **Optional:** `HF_TOKEN` (Whisper) Β· `GOOGLE_API_KEY` (Gemini Live)", | |
| elem_classes=["main-title"], | |
| ) | |
| with gr.Tab("π¬ Chat"): | |
| with gr.Row(): | |
| with gr.Column(scale=1, min_width=280): | |
| m_sel = gr.Dropdown(list(MODEL_OPTIONS.keys()), value="DeepSeek R1", label="π€ Model") | |
| sysp_box = gr.Textbox(label="System Prompt", lines=3) | |
| um_chat = gr.Checkbox(label="π Include Materials", value=True) | |
| gr.Markdown("### Models (free via OpenRouter)\n| Model | Size |\n|---|---|\n| DeepSeek R1 | 671B MoE |\n| Nemotron 120B | 120B MoE |\n| Gemma 3 27B | 27B |\n| Llama 3.3 70B | 70B |\n| Qwen3 235B | 235B MoE |\n| DeepSeek V3 | 685B MoE |") | |
| with gr.Column(scale=3): | |
| cb = gr.Chatbot( | |
| height=550, type="messages", show_copy_button=True, | |
| latex_delimiters=[ | |
| {"left": "$$", "right": "$$", "display": True}, | |
| {"left": "$", "right": "$", "display": False}, | |
| {"left": "\\(", "right": "\\)", "display": False}, | |
| {"left": "\\[", "right": "\\]", "display": True}, | |
| ], | |
| ) | |
| with gr.Row(): | |
| mi = gr.Textbox(placeholder="Ask about your study materials...", lines=2, scale=5, show_label=False) | |
| sb = gr.Button("Send π€", variant="primary", scale=1) | |
| gr.Button("ποΈ Clear", size="sm").click(lambda: [], None, cb) | |
| sb.click(chat_respond, [mi, cb, m_sel, sysp_box, um_chat], [cb]).then(lambda: "", None, mi) | |
| mi.submit(chat_respond, [mi, cb, m_sel, sysp_box, um_chat], [cb]).then(lambda: "", None, mi) | |
| with gr.Tab("π€ Upload & OCR"): | |
| gr.Markdown("## Upload Materials\nPDF, PPTX, PNG, JPG, TXT, MD") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| fu = gr.File( | |
| label="π Upload", file_count="multiple", | |
| file_types=[".pdf", ".pptx", ".ppt", ".png", ".jpg", ".jpeg", ".bmp", ".tiff", ".txt", ".md", ".csv", ".py"], | |
| type="filepath", | |
| ) | |
| ub = gr.Button("π Process", variant="primary", size="lg") | |
| us = gr.Markdown("_Upload and Process_") | |
| with gr.Column(scale=2): | |
| eo = gr.Markdown("_Text here..._") | |
| ub.click(process_upload, [fu], [eo, us]) | |
| with gr.Tab("π Exam"): | |
| gr.Markdown("## Generate & Take Exams") | |
| with gr.Row(): | |
| with gr.Column(scale=1, min_width=280): | |
| md2 = gr.Dropdown(get_material_choices(), label="π Material", interactive=True) | |
| gr.Button("π Refresh", size="sm").click(lambda: gr.update(choices=get_material_choices()), None, md2) | |
| em = gr.Dropdown(list(MODEL_OPTIONS.keys()), value="DeepSeek R1", label="π€ Model") | |
| nm = gr.Slider(5, 150, 100, step=5, label="MCQ") | |
| nw = gr.Slider(2, 50, 20, step=1, label="Written") | |
| gen_btn = gr.Button("π― Generate", variant="primary", size="lg") | |
| with gr.Column(scale=2): | |
| emd = gr.Markdown("_Generate exam..._") | |
| ewd = gr.Markdown("") | |
| with gr.Group(visible=False) as eg: | |
| gr.Markdown("---\n## βοΈ Take Exam") | |
| with gr.Row(): | |
| ma = gr.Textbox(label="MCQ (1:A, 2:B...)", lines=4) | |
| wa = gr.Textbox(label="Written (1: answer...)", lines=8) | |
| grade_mdl = gr.Dropdown(list(MODEL_OPTIONS.keys()), value="DeepSeek R1", label="Grading Model") | |
| grd_btn = gr.Button("π Grade", variant="primary", size="lg") | |
| er = gr.Markdown("") | |
| gen_btn.click(generate_exam, [md2, em, nm, nw], [emd, ewd, eg]) | |
| grd_btn.click(grade_exam, [ma, wa, grade_mdl], [er]) | |
| with gr.Tab("π€ Voice Chat"): | |
| gr.Markdown("## Voice Assistant\nRecord β Whisper β LLM β gTTS\n> Needs `HF_TOKEN`") | |
| with gr.Row(): | |
| with gr.Column(scale=1, min_width=280): | |
| vm = gr.Dropdown(list(MODEL_OPTIONS.keys()), value="DeepSeek R1", label="π€ Model") | |
| umv = gr.Checkbox(label="π Materials", value=True) | |
| ai = gr.Audio(sources=["microphone"], type="filepath", label="ποΈ Record") | |
| vb = gr.Button("π Send", variant="primary", size="lg") | |
| vt = gr.Textbox(label="Transcript", interactive=False) | |
| with gr.Column(scale=2): | |
| vc = gr.Chatbot( | |
| height=400, type="messages", | |
| latex_delimiters=[{"left": "$$", "right": "$$", "display": True}, {"left": "$", "right": "$", "display": False}], | |
| ) | |
| va_out = gr.Audio(label="π Response", type="filepath", autoplay=True) | |
| vb.click(voice_chat, [ai, vc, vm, umv], [vc, va_out, vt]) | |
| gr.Button("ποΈ Clear", size="sm").click(lambda: ([], None, ""), None, [vc, va_out, vt]) | |
| with gr.Tab("π΄ Gemini Live"): | |
| gr.Markdown( | |
| "## π΄ Gemini Live β Real-time Voice\n" | |
| "Native bidirectional voice conversation via Gemini.\n" | |
| "> Requires `GOOGLE_API_KEY` with Gemini API access" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1, min_width=280): | |
| lv = gr.Dropdown(GEMINI_LIVE_VOICES, value="Puck", label="π£οΈ Voice") | |
| lum = gr.Checkbox(label="π Materials", value=True) | |
| gr.Markdown("### How to use\n1. Click the microphone\n2. Speak your question\n3. Pause β auto-detected\n4. Listen to the AI response\n5. Mic restarts automatically!") | |
| with gr.Column(scale=3): | |
| lcb = gr.Chatbot(label="Conversation", height=400, type="messages") | |
| with gr.Row(): | |
| lmic = gr.Audio(label="ποΈ Speak", sources="microphone", type="numpy") | |
| lout = gr.Audio(label="π Response", streaming=True, autoplay=True) | |
| ls = gr.State(value=LiveState()) | |
| lstr = lmic.stream(live_process_audio, [lmic, ls], [lmic, ls], stream_every=0.5, time_limit=30) | |
| lresp = lmic.stop_recording(live_response, [ls, lv, lum], [lout, ls]) | |
| lresp.then(lambda s: s.conversation, [ls], [lcb]) | |
| lrst = lout.stop(live_start_recording, [ls], [lmic]) | |
| gr.Button("β Stop", variant="stop").click( | |
| lambda: (LiveState(stopped=True), gr.Audio(recording=False)), | |
| None, [ls, lmic], cancels=[lresp, lrst], | |
| ) | |
| with gr.Tab("π Visualizations"): | |
| gr.Markdown("## Charts & LaTeX") | |
| with gr.Row(): | |
| vmdl = gr.Dropdown(list(MODEL_OPTIONS.keys()), value="DeepSeek R1", label="π€ Model") | |
| umvz = gr.Checkbox(label="π Materials", value=True) | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### π Charts") | |
| vp = gr.Textbox(label="Describe", placeholder="Bar chart comparing sorting algorithms...", lines=2) | |
| vbtn = gr.Button("π Generate", variant="primary") | |
| vplt = gr.Plot() | |
| vcd = gr.Markdown() | |
| with gr.Column(): | |
| gr.Markdown("### π LaTeX") | |
| lt = gr.Textbox(label="Topic", placeholder="Fourier Transform, Bayes' Theorem...", lines=2) | |
| lbtn = gr.Button("π Explain", variant="primary") | |
| lo = gr.Markdown( | |
| latex_delimiters=[ | |
| {"left": "$$", "right": "$$", "display": True}, | |
| {"left": "$", "right": "$", "display": False}, | |
| {"left": "\\(", "right": "\\)", "display": False}, | |
| {"left": "\\[", "right": "\\]", "display": True}, | |
| ], | |
| ) | |
| vbtn.click(generate_visualization, [vp, vmdl, umvz], [vplt, vcd]) | |
| lbtn.click(generate_latex_explanation, [lt, vmdl, umvz], [lo]) | |
| with gr.Tab("π Dashboard"): | |
| dmd = gr.Markdown("_Refresh_") | |
| dfg = gr.Plot() | |
| gr.Button("π Refresh", variant="primary").click(get_dashboard_data, None, [dmd, dfg]) | |
| demo.load(get_dashboard_data, None, [dmd, dfg]) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = build_app() | |
| demo.launch() | |