Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import requests | |
| import os | |
| import re | |
| import io | |
| import time | |
| import contextlib | |
| import zipfile | |
| import tracker | |
| import rag_engine | |
| import doc_loader | |
| import modules.admin_panel as admin_panel | |
| from openai import OpenAI | |
| from google import genai | |
| from google.genai import types | |
| from datetime import datetime | |
| from test_integration import run_tests | |
| from core.QuizEngine import QuizEngine | |
| from core.PineconeManager import PineconeManager | |
| from huggingface_hub import hf_hub_download | |
| # --- CONFIGURATION --- | |
| st.set_page_config(page_title="Navy AI Toolkit", page_icon="⚓", layout="wide") | |
| API_URL_ROOT = os.getenv("API_URL") | |
| OPENAI_KEY = os.getenv("OPENAI_API_KEY") | |
| GOOGLE_KEY = os.getenv("GOOGLE_API_KEY") # NEW: Google Key | |
| # --- INITIALIZATION --- | |
| if "roles" not in st.session_state: | |
| st.session_state.roles = [] | |
| if "quiz_state" not in st.session_state: | |
| st.session_state.quiz_state = { | |
| "active": False, "question_data": None, "user_answer": "", | |
| "feedback": None, "streak": 0, "generated_question_text": "" | |
| } | |
| if "quiz_history" not in st.session_state: st.session_state.quiz_history = [] | |
| if "active_index" not in st.session_state: st.session_state.active_index = None | |
| # Debug State Variables | |
| if "last_prompt_sent" not in st.session_state: st.session_state.last_prompt_sent = "" | |
| if "last_context_used" not in st.session_state: st.session_state.last_context_used = "" | |
| # --- FLATTENER LOGIC --- | |
| class OutlineProcessor: | |
| """Parses text outlines for the Flattener tool.""" | |
| def __init__(self, file_content): | |
| self.raw_lines = file_content.split('\n') | |
| def _is_list_item(self, line): | |
| pattern = r"^\s*(\d+\.|[a-zA-Z]\.|-|\*)\s+" | |
| return bool(re.match(pattern, line)) | |
| def _merge_multiline_items(self): | |
| merged_lines = [] | |
| for line in self.raw_lines: | |
| stripped = line.strip() | |
| if not stripped: continue | |
| if not merged_lines: | |
| merged_lines.append(line) | |
| continue | |
| if not self._is_list_item(line): | |
| merged_lines[-1] = merged_lines[-1].rstrip() + " " + stripped | |
| else: | |
| merged_lines.append(line) | |
| return merged_lines | |
| def parse(self): | |
| clean_lines = self._merge_multiline_items() | |
| stack = [] | |
| results = [] | |
| for line in clean_lines: | |
| stripped = line.strip() | |
| indent = len(line) - len(line.lstrip()) | |
| while stack and stack[-1]['indent'] >= indent: | |
| stack.pop() | |
| stack.append({'indent': indent, 'text': stripped}) | |
| if len(stack) > 1: | |
| context_str = " > ".join([item['text'] for item in stack[:-1]]) | |
| else: | |
| context_str = "ROOT" | |
| results.append({"context": context_str, "target": stripped}) | |
| return results | |
| # --- HELPER FUNCTIONS --- | |
| def query_model_universal(messages, max_tokens, model_choice, user_key=None): | |
| """Unified router for Chat, Tools, and Quiz.""" | |
| # 1. DEBUG CAPTURE | |
| if messages and messages[-1]['role'] == 'user': | |
| st.session_state.last_prompt_sent = messages[-1]['content'] | |
| # --- ROUTE 1: GOOGLE GEMINI (NEW) --- | |
| if "Gemini" in model_choice: | |
| # Use System Key (Env Var) or User Override if you allow it | |
| # For now, we strictly use the Hugging Face Secret as requested | |
| if not GOOGLE_KEY: return "[Error: No GOOGLE_API_KEY found in Secrets]", None | |
| try: | |
| client = genai.Client(api_key=GOOGLE_KEY) | |
| # Convert Chat History to Single String for 'generate_content' | |
| # (Gemini supports chat history objects, but string is more robust for RAG contexts) | |
| full_prompt = "" | |
| for m in messages: | |
| role = m["role"].upper() | |
| content = m["content"] | |
| full_prompt += f"{role}: {content}\n\n" | |
| full_prompt += "ASSISTANT: " | |
| # RETRY LOGIC (User Provided) | |
| max_retries = 3 # Slightly conservative for UI responsiveness | |
| model_id = "gemini-2.0-flash" # or "gemini-1.5-pro" depending on your access | |
| for attempt in range(max_retries): | |
| try: | |
| response = client.models.generate_content( | |
| model=model_id, | |
| contents=full_prompt, | |
| config=types.GenerateContentConfig( | |
| max_output_tokens=max_tokens, | |
| temperature=0.3 | |
| ) | |
| ) | |
| # Usage tracking is different for Gemini, we estimate or grab from response if available | |
| # usage_meta = response.usage_metadata (if available) | |
| return response.text.strip(), {"input": 0, "output": 0} | |
| except Exception as e: | |
| error_msg = str(e) | |
| if "429" in error_msg or "RESOURCE_EXHAUSTED" in error_msg: | |
| wait_time = 10 # Short wait | |
| time.sleep(wait_time) | |
| else: | |
| return f"[Gemini Error: {error_msg}]", None | |
| return "[Error: Gemini Rate Limit Exceeded]", None | |
| except Exception as e: | |
| return f"[Gemini Client Error: {e}]", None | |
| # --- ROUTE 2: OPENAI GPT-4o --- | |
| elif "GPT-4o" in model_choice: | |
| key = user_key if user_key else OPENAI_KEY | |
| if not key: return "[Error: No OpenAI API Key]", None | |
| client = OpenAI(api_key=key) | |
| try: | |
| resp = client.chat.completions.create( | |
| model="gpt-4o", max_tokens=max_tokens, messages=messages, temperature=0.3 | |
| ) | |
| usage = {"input": resp.usage.prompt_tokens, "output": resp.usage.completion_tokens} | |
| return resp.choices[0].message.content, usage | |
| except Exception as e: | |
| return f"[OpenAI Error: {e}]", None | |
| # --- ROUTE 3: LOCAL/OPEN SOURCE --- | |
| else: | |
| model_map = { | |
| "Granite 4 (IBM)": "granite4:latest", | |
| "Llama 3.2 (Meta)": "llama3.2:latest", | |
| "Gemma 3 (Google)": "gemma3:latest" | |
| } | |
| tech_name = model_map.get(model_choice) | |
| if not tech_name: return "[Error: Model Map Failed]", None | |
| url = f"{API_URL_ROOT}/generate" | |
| hist = "" | |
| sys_msg = "You are a helpful assistant." | |
| for m in messages: | |
| if m['role']=='system': sys_msg = m['content'] | |
| elif m['role']=='user': hist += f"User: {m['content']}\n" | |
| elif m['role']=='assistant': hist += f"Assistant: {m['content']}\n" | |
| hist += "Assistant: " | |
| try: | |
| r = requests.post(url, json={"text": hist, "persona": sys_msg, "max_tokens": max_tokens, "model": tech_name}, timeout=600) | |
| if r.status_code == 200: | |
| d = r.json() | |
| return d.get("response", ""), d.get("usage", {"input":0,"output":0}) | |
| return f"[Local Error {r.status_code}]", None | |
| except Exception as e: | |
| return f"[Conn Error: {e}]", None | |
| def update_sidebar_metrics(): | |
| if metric_placeholder: | |
| stats = tracker.get_daily_stats() | |
| u_stats = stats["users"].get(st.session_state.username, {"input":0, "output":0}) | |
| metric_placeholder.metric("My Tokens Today", u_stats["input"] + u_stats["output"]) | |
| def generate_study_guide_md(history): | |
| md = "# ⚓ Study Guide\n\nGenerated: " + datetime.now().strftime('%Y-%m-%d %H:%M') + "\n\n" | |
| for item in history: | |
| md += f"## Q: {item['question']}\n**Your Answer:** {item['user_answer']}\n\n**Grade:** {item['grade']}\n\n**Context/Correct Info:**\n> {item['context']}\n\n---\n\n" | |
| return md | |
| # --- LOGIN --- | |
| if "authentication_status" not in st.session_state or st.session_state["authentication_status"] is None: | |
| login_tab, register_tab = st.tabs(["🔑 Login", "📝 Register"]) | |
| with login_tab: | |
| if tracker.check_login(): | |
| if "last_user" in st.session_state and st.session_state.last_user != st.session_state.username: | |
| st.session_state.messages = [] | |
| st.session_state.user_openai_key = None | |
| st.session_state.last_user = st.session_state.username | |
| tracker.download_user_db(st.session_state.username) | |
| st.rerun() | |
| with register_tab: | |
| st.header("Create Account") | |
| with st.form("reg_form"): | |
| new_user = st.text_input("Username"); new_name = st.text_input("Display Name") | |
| new_email = st.text_input("Email"); new_pwd = st.text_input("Password", type="password") | |
| invite = st.text_input("Invitation Passcode") | |
| if st.form_submit_button("Register"): | |
| success, msg = tracker.register_user(new_email, new_user, new_name, new_pwd, invite) | |
| if success: st.success(msg) | |
| else: st.error(msg) | |
| if not st.session_state.get("authentication_status"): st.stop() | |
| # --- SIDEBAR --- | |
| metric_placeholder = None | |
| with st.sidebar: | |
| st.header("👤 User Profile") | |
| st.write(f"Welcome, **{st.session_state.name}**") | |
| st.header("📊 Usage Tracker") | |
| metric_placeholder = st.empty() | |
| if "admin" in st.session_state.roles: | |
| admin_panel.render_admin_sidebar() | |
| st.divider() | |
| st.header("🌲 Pinecone Settings") | |
| pc_key = os.getenv("PINECONE_API_KEY") | |
| if pc_key: | |
| pm = PineconeManager(pc_key) | |
| indexes = pm.list_indexes() | |
| selected_index = st.selectbox("Active Index", indexes) | |
| st.session_state.active_index = selected_index | |
| if selected_index: | |
| current_model = st.session_state.get("active_embed_model", "sentence-transformers/all-MiniLM-L6-v2") | |
| try: | |
| emb_fn = rag_engine.get_embedding_func(current_model) | |
| test_vec = emb_fn.embed_query("test") | |
| active_model_dim = len(test_vec) | |
| if pm.check_dimension_compatibility(selected_index, active_model_dim): st.caption(f"✅ Compatible ({active_model_dim}d)") | |
| else: st.error(f"❌ Mismatch! Model: {active_model_dim}d") | |
| except Exception as e: st.caption(f"⚠️ Check failed: {e}") | |
| with st.expander("Create New Index"): | |
| new_idx_name = st.text_input("Index Name") | |
| new_idx_dim = st.selectbox("Dimension", [384, 768, 1024, 1536, 3072], index=0) | |
| if st.button("Create"): | |
| with st.spinner("Provisioning..."): | |
| ok, msg = pm.create_index(new_idx_name, dimension=new_idx_dim) | |
| if ok: st.success(msg); time.sleep(2); st.rerun() | |
| else: st.error(msg) | |
| else: st.warning("No Pinecone Key") | |
| st.header("🧠 Intelligence") | |
| st.subheader("1. Embeddings") | |
| embed_options = { | |
| "Standard (All-MiniLM, 384d)": "sentence-transformers/all-MiniLM-L6-v2", | |
| "High-Perf (MPNet, 768d)": "sentence-transformers/all-mpnet-base-v2", | |
| "OpenAI Small (1536d)": "text-embedding-3-small", | |
| "Custom Navy (BGE, 768d)": "NavyDevilDoc/navy-custom-models/bge-finetuned" | |
| } | |
| embed_choice_label = st.selectbox("Select Embedding Model", list(embed_options.keys())) | |
| st.session_state.active_embed_model = embed_options[embed_choice_label] | |
| st.subheader("2. Chat Model") | |
| # Base local models | |
| model_map = {"Granite 4 (IBM)": "granite4:latest", | |
| "Llama 3.2 (Meta)": "llama3.2:latest", | |
| "Gemma 3 (Google)": "gemma3:latest"} | |
| opts = list(model_map.keys()) | |
| is_admin = "admin" in st.session_state.roles | |
| user_key = None | |
| # Logic for Premium Models | |
| if not is_admin: | |
| user_key = st.text_input("Unlock GPT-4o", type="password") | |
| st.session_state.user_openai_key = user_key if user_key else None | |
| else: st.session_state.user_openai_key = None | |
| # Add Premium Options if Admin or Key provided | |
| if is_admin or st.session_state.get("user_openai_key"): | |
| opts.append("GPT-4o (Omni)") | |
| # Add Gemini if Key exists (System wide) | |
| if GOOGLE_KEY: | |
| opts.append("Gemini 2.5 (Google)") | |
| model_choice = st.radio("Select Model:", opts, key="model_selector_radio") | |
| st.info(f"Connected to: **{model_choice}**") | |
| st.divider() | |
| if st.session_state.authenticator: st.session_state.authenticator.logout(location='sidebar') | |
| update_sidebar_metrics() | |
| # --- MAIN APP --- | |
| st.title("⚓ Navy AI Toolkit") | |
| tab1, tab2, tab3 = st.tabs(["💬 Chat Playground", "📂 Knowledge & Tools", "⚡ Quiz Mode"]) | |
| # === TAB 1: CHAT === | |
| with tab1: | |
| # 1. LAYOUT: Header + Placeholder for Download Button | |
| col_header, col_btn = st.columns([6, 1]) | |
| with col_header: | |
| st.header("Discussion & Analysis") | |
| download_placeholder = col_btn.empty() | |
| if "messages" not in st.session_state: st.session_state.messages = [] | |
| # RENDER DEBUG OVERLAY (If enabled in Admin) | |
| admin_panel.render_debug_overlay("Chat Tab") | |
| c1, c2 = st.columns([3, 1]) | |
| with c1: st.caption(f"Active Model: **{st.session_state.get('model_selector_radio', 'Granite')}**") | |
| with c2: use_rag = st.toggle("Enable Knowledge Base", value=False) | |
| for msg in st.session_state.messages: | |
| with st.chat_message(msg["role"]): st.markdown(msg["content"]) | |
| if prompt := st.chat_input("Input command..."): | |
| st.session_state.messages.append({"role": "user", "content": prompt}) | |
| with st.chat_message("user"): st.markdown(prompt) | |
| context_txt = "" | |
| sys_p = "You are a helpful AI assistant." | |
| st.session_state.last_context_used = "" # Reset context debug | |
| if use_rag: | |
| if not st.session_state.active_index: st.error("⚠️ Please select an Active Index in the sidebar first.") | |
| else: | |
| with st.spinner("Searching Knowledge Base..."): | |
| docs = rag_engine.search_knowledge_base( | |
| query=prompt, | |
| username=st.session_state.username, | |
| index_name=st.session_state.active_index, | |
| embed_model_name=st.session_state.active_embed_model | |
| ) | |
| if docs: | |
| sys_p = "You are a Navy Document Analyst. Answer based PRIMARILY on the Context." | |
| for i, d in enumerate(docs): | |
| src = d.metadata.get('source', 'Unknown') | |
| context_txt += f"<document index='{i+1}' source='{src}'>\n{d.page_content}\n</document>\n" | |
| st.session_state.last_context_used = context_txt | |
| if context_txt: | |
| final_prompt = f"User Question: {prompt}\n\n<context>\n{context_txt}\n</context>\n\nInstruction: Answer using the context above." | |
| else: final_prompt = prompt | |
| with st.chat_message("assistant"): | |
| with st.spinner("Thinking..."): | |
| hist = [{"role":"system", "content":sys_p}] + st.session_state.messages[-6:-1] + [{"role":"user", "content":final_prompt}] | |
| resp, usage = query_model_universal(hist, 2000, model_choice, st.session_state.get("user_openai_key")) | |
| st.markdown(resp) | |
| if usage: | |
| m_name = "GPT-4o" if "GPT-4o" in model_choice else model_choice.split()[0] | |
| tracker.log_usage(m_name, usage["input"], usage["output"]) | |
| update_sidebar_metrics() | |
| st.session_state.messages.append({"role": "assistant", "content": resp}) | |
| if use_rag and context_txt: | |
| with st.expander("📚 View Context Used"): st.text(context_txt) | |
| # 3. LATE RENDER: Fill Download Button | |
| if st.session_state.messages: | |
| chat_log = f"# ⚓ Navy AI Toolkit - Chat Log\nDate: {datetime.now().strftime('%Y-%m-%d %H:%M')}\nModel: {st.session_state.get('model_selector_radio', 'Unknown')}\n\n---\n\n" | |
| for msg in st.session_state.messages: | |
| chat_log += f"**{msg['role'].upper()}**: {msg['content']}\n\n" | |
| with download_placeholder: | |
| st.download_button("💾 Save", chat_log, f"chat_{datetime.now().strftime('%Y%m%d_%H%M')}.md", "text/markdown") | |
| # === TAB 2: KNOWLEDGE & TOOLS === | |
| with tab2: | |
| st.header("Document Processor") | |
| c1, c2 = st.columns([1, 1]) | |
| with c1: uploaded_file = st.file_uploader("Upload File", type=["pdf", "docx", "pptx", "txt", "md"]) | |
| with c2: | |
| use_vision = st.toggle("👁️ Enable Vision Mode") | |
| if use_vision and "GPT-4o" not in opts: st.warning("Vision requires OpenAI.") | |
| if uploaded_file: | |
| temp_path = rag_engine.save_uploaded_file(uploaded_file, st.session_state.username) | |
| col_a, col_b, col_c = st.columns(3) | |
| # COLUMN A: Ingest | |
| with col_a: | |
| chunk_strategy = st.selectbox("Chunking Strategy", ["paragraph", "token"]) | |
| if st.button("📥 Add to KB", type="primary"): | |
| if not st.session_state.active_index: st.error("Select Index first.") | |
| else: | |
| with st.spinner("Ingesting..."): | |
| ok, msg = rag_engine.ingest_file(temp_path, st.session_state.username, st.session_state.active_index, st.session_state.active_embed_model, chunk_strategy) | |
| if ok: tracker.upload_user_db(st.session_state.username); st.success(msg) | |
| else: st.error(msg) | |
| # COLUMN B: Summarize | |
| with col_b: | |
| st.write(""); st.write("") | |
| if st.button("📝 Summarize"): | |
| with st.spinner("Summarizing..."): | |
| key = st.session_state.get("user_openai_key") or OPENAI_KEY | |
| class FileObj: | |
| def __init__(self, p, n): self.path=p; self.name=n | |
| def read(self): | |
| with open(self.path, "rb") as f: return f.read() | |
| raw = doc_loader.extract_text_from_file(FileObj(temp_path, uploaded_file.name), use_vision=use_vision, api_key=key) | |
| prompt = f"Summarize:\n\n{raw[:20000]}" | |
| msgs = [{"role":"user", "content": prompt}] | |
| summ, usage = query_model_universal(msgs, 1000, model_choice, st.session_state.get("user_openai_key")) | |
| st.subheader("Summary"); st.markdown(summ) | |
| # COLUMN C: Flatten | |
| with col_c: | |
| st.write(""); st.write("") | |
| if "flattened_result" not in st.session_state: st.session_state.flattened_result = None | |
| if st.button("📄 Flatten"): | |
| with st.spinner("Flattening..."): | |
| key = st.session_state.get("user_openai_key") or OPENAI_KEY | |
| # 1. Read File | |
| with open(temp_path, "rb") as f: | |
| class Wrapper: | |
| def __init__(self, data, n): self.data=data; self.name=n | |
| def read(self): return self.data | |
| raw = doc_loader.extract_text_from_file(Wrapper(f.read(), uploaded_file.name), use_vision=use_vision, api_key=key) | |
| # 2. Parse Outline (This was missing logic previously) | |
| proc = OutlineProcessor(raw) | |
| items = proc.parse() | |
| # 3. Process Items | |
| out_txt = [] | |
| bar = st.progress(0) | |
| for i, item in enumerate(items): | |
| p = f"Context: {item['context']}\nTarget: {item['target']}\nRewrite as one sentence." | |
| m = [{"role":"user", "content": p}] | |
| res, _ = query_model_universal(m, 300, model_choice, st.session_state.get("user_openai_key")) | |
| out_txt.append(res) | |
| bar.progress((i+1)/len(items)) | |
| final_flattened_text = "\n".join(out_txt) | |
| st.session_state.flattened_result = {"text": final_flattened_text, "source": f"{uploaded_file.name}_flat"} | |
| st.rerun() | |
| if st.session_state.flattened_result: | |
| res = st.session_state.flattened_result | |
| st.success("Complete!") | |
| st.text_area("Result", res["text"], height=200) | |
| if st.button("📥 Index Flat"): | |
| if not st.session_state.active_index: | |
| st.error("Please select an Active Index.") | |
| else: | |
| with st.spinner("Indexing..."): | |
| # FIX: Pass the active_embed_model here! | |
| ok, msg = rag_engine.process_and_add_text( | |
| text=res["text"], | |
| source_name=res["source"], | |
| username=st.session_state.username, | |
| index_name=st.session_state.active_index, | |
| embed_model_name=st.session_state.active_embed_model | |
| ) | |
| if ok: | |
| tracker.upload_user_db(st.session_state.username) | |
| st.success(msg) | |
| else: | |
| st.error(msg) | |
| st.divider() | |
| st.subheader("Database Management") | |
| c1, c2 = st.columns([2, 1]) | |
| with c1: st.info("Missing local files? Resync below.") | |
| with c2: | |
| if st.button("🔄 Resync from Pinecone"): | |
| if not st.session_state.active_index: st.error("Select Index.") | |
| else: | |
| with st.spinner("Resyncing..."): | |
| ok, msg = rag_engine.rebuild_cache_from_pinecone(st.session_state.username, st.session_state.active_index) | |
| if ok: st.success(msg); time.sleep(1); st.rerun() | |
| else: st.error(msg) | |
| docs = rag_engine.list_documents(st.session_state.username) | |
| if docs: | |
| for d in docs: | |
| c1, c2 = st.columns([4,1]) | |
| c1.text(f"📄 {d['filename']}") | |
| if c2.button("🗑️", key=d['source']): | |
| if not st.session_state.active_index: st.error("Select Index.") | |
| else: | |
| rag_engine.delete_document(st.session_state.username, d['source'], st.session_state.active_index) | |
| tracker.upload_user_db(st.session_state.username); st.rerun() | |
| else: st.warning("Cache Empty.") | |
| # === TAB 3: QUIZ MODE === | |
| with tab3: | |
| st.header("⚓ Qualification Board Simulator") | |
| admin_panel.render_debug_overlay("Quiz Tab") | |
| col_mode, col_streak = st.columns([3, 1]) | |
| with col_mode: | |
| quiz_mode = st.radio("Mode:", ["⚡ Acronym Lightning Round", "📖 Document Deep Dive"], horizontal=True) | |
| if "Document" in quiz_mode: | |
| focus_topic = st.text_input("🎯 Focus Topic", placeholder="e.g., PPBE...", help="Leave empty for random.") | |
| else: | |
| focus_topic = None | |
| if "last_quiz_mode" not in st.session_state: st.session_state.last_quiz_mode = quiz_mode | |
| if "quiz_trigger" not in st.session_state: st.session_state.quiz_trigger = False | |
| if st.session_state.last_quiz_mode != quiz_mode: | |
| st.session_state.quiz_state["active"] = False | |
| st.session_state.quiz_state["question_data"] = None | |
| st.session_state.quiz_state["feedback"] = None | |
| st.session_state.quiz_state["generated_question_text"] = "" | |
| st.session_state.last_quiz_mode = quiz_mode | |
| st.rerun() | |
| quiz = QuizEngine() | |
| qs = st.session_state.quiz_state | |
| with col_streak: | |
| st.metric("Streak", qs["streak"]) | |
| if st.button("Reset"): qs["streak"] = 0 | |
| if st.session_state.quiz_history: | |
| with st.expander(f"📚 Review Study Guide ({len(st.session_state.quiz_history)})"): | |
| st.download_button( | |
| "📥 Download Markdown", | |
| generate_study_guide_md(st.session_state.quiz_history), | |
| f"StudyGuide_{datetime.now().strftime('%Y%m%d')}.md" | |
| ) | |
| st.divider() | |
| def generate_question(): | |
| with st.spinner("Consulting Board..."): | |
| st.session_state.last_context_used = "" | |
| if "Acronym" in quiz_mode: | |
| q_data = quiz.get_random_acronym() | |
| if q_data: | |
| qs["active"]=True | |
| qs["question_data"]=q_data | |
| qs["feedback"]=None | |
| qs["generated_question_text"]=q_data["question"] | |
| else: | |
| st.error("No acronyms.") | |
| else: | |
| valid_question_found = False | |
| attempts = 0 | |
| last_error = None | |
| while not valid_question_found and attempts < 5: | |
| attempts += 1 | |
| q_ctx = quiz.get_document_context(st.session_state.username, topic_filter=focus_topic) | |
| if q_ctx and "error" in q_ctx: | |
| last_error = q_ctx["error"] | |
| break | |
| if q_ctx: | |
| # NEW: Use the Scenario Prompt | |
| prompt = quiz.construct_scenario_prompt(q_ctx["context_text"]) | |
| st.session_state.last_context_used = q_ctx["context_text"] | |
| # Generate | |
| response_text, usage = query_model_universal([{"role": "user", "content": prompt}], 600, model_choice, st.session_state.get("user_openai_key")) | |
| # PARSE OUTPUT (Scenario vs Solution) | |
| if "SCENARIO:" in response_text and "SOLUTION:" in response_text: | |
| parts = response_text.split("SOLUTION:") | |
| scenario_text = parts[0].replace("SCENARIO:", "").strip() | |
| solution_text = parts[1].strip() | |
| valid_question_found = True | |
| qs["active"] = True | |
| qs["question_data"] = q_ctx | |
| qs["generated_question_text"] = scenario_text | |
| qs["hidden_solution"] = solution_text | |
| qs["feedback"] = None | |
| else: | |
| # Fallback if model ignores format | |
| valid_question_found = True | |
| qs["active"] = True | |
| qs["question_data"] = q_ctx | |
| qs["generated_question_text"] = response_text | |
| qs["hidden_solution"] = "Refer to Source Text." | |
| qs["feedback"] = None | |
| if not valid_question_found: | |
| if last_error == "topic_not_found": | |
| st.warning(f"Topic '{focus_topic}' not found.") | |
| elif focus_topic: | |
| st.warning(f"Found '{focus_topic}' but could not generate question.") | |
| else: | |
| st.warning("Could not generate question. Try Resync.") | |
| if st.session_state.quiz_trigger: | |
| st.session_state.quiz_trigger = False | |
| generate_question() | |
| st.rerun() | |
| if not qs["active"]: | |
| if st.button("🚀 New Question", type="primary"): | |
| generate_question() | |
| st.rerun() | |
| if qs["active"]: | |
| st.markdown(f"### {qs['generated_question_text']}") | |
| if "document" in qs.get("question_data", {}).get("type", ""): | |
| st.caption(f"Source: *{qs['question_data']['source_file']}*") | |
| with st.form(key="quiz_response"): | |
| user_ans = st.text_area("Answer:") | |
| sub = st.form_submit_button("Submit") | |
| if sub and user_ans: | |
| with st.spinner("Board is deliberating..."): | |
| data = qs["question_data"] | |
| if data["type"] == "acronym": | |
| prompt = quiz.construct_acronym_grading_prompt(data["term"], data["correct_definition"], user_ans) | |
| final_context_for_history = data["correct_definition"] | |
| msgs = [{"role": "user", "content": prompt}] | |
| grade, _ = query_model_universal(msgs, 1000, model_choice, st.session_state.get("user_openai_key")) | |
| qs["feedback"] = grade | |
| else: | |
| # NEW: Scenario Grading Logic | |
| scenario = qs["generated_question_text"] | |
| solution = qs.get("hidden_solution", "") | |
| context_ref = data["context_text"] | |
| prompt = quiz.construct_scenario_grading_prompt(scenario, user_ans, solution, context_ref) | |
| st.session_state.last_context_used = f"SCENARIO: {scenario}\n\nSOLUTION: {solution}\n\nREF: {context_ref}" | |
| msgs = [{"role": "user", "content": prompt}] | |
| grade, _ = query_model_universal(msgs, 1000, model_choice, st.session_state.get("user_openai_key")) | |
| qs["feedback"] = grade | |
| # Logic to determine PASS/FAIL | |
| is_pass = False | |
| if "10/10" in grade or "9/10" in grade or "8/10" in grade or "7/10" in grade or "PASS" in grade: | |
| is_pass = True | |
| qs["streak"] += 1 | |
| elif "FAIL" in grade or " 6/" in grade or " 5/" in grade: | |
| qs["streak"] = 0 | |
| else: | |
| is_pass = True | |
| qs["streak"] += 1 | |
| # Save history | |
| st.session_state.quiz_history.append({ | |
| "question": qs["generated_question_text"], | |
| "user_answer": user_ans, | |
| "grade": "PASS" if is_pass else "FAIL", # Simplified for history list | |
| "context": f"**Official Solution:** {qs.get('hidden_solution', '')}\n\n**Source Text:** {data.get('context_text', '')[:500]}..." | |
| }) | |
| st.rerun() | |
| if qs["feedback"]: | |
| st.divider() | |
| if "PASS" in qs["feedback"] or "7/10" in qs["feedback"] or "8/10" in qs["feedback"] or "9/10" in qs["feedback"] or "10/10" in qs["feedback"]: | |
| st.success("✅ CORRECT / PASSING") | |
| else: | |
| if "FAIL" in qs["feedback"]: st.error("❌ INCORRECT") | |
| else: st.warning("⚠️ PARTIAL / CRITIQUE") | |
| st.markdown(qs["feedback"]) | |
| data = qs["question_data"] | |
| if data["type"] == "acronym": | |
| st.info(f"**Definition:** {data['correct_definition']}") | |
| elif data["type"] == "document": | |
| with st.expander("Show Official Solution"): | |
| st.info(qs.get("hidden_solution", "No solution generated.")) | |
| if st.button("Next Question ➡️"): | |
| st.session_state.quiz_trigger = True | |
| qs["active"] = False | |
| qs["question_data"] = None | |
| qs["feedback"] = None | |
| st.rerun() |