import streamlit as st import requests import os import re import io import time import contextlib import zipfile import tracker import rag_engine import doc_loader import modules.admin_panel as admin_panel from openai import OpenAI from google import genai from google.genai import types from datetime import datetime from test_integration import run_tests from core.QuizEngine import QuizEngine from core.PineconeManager import PineconeManager from huggingface_hub import hf_hub_download # --- CONFIGURATION --- st.set_page_config(page_title="Navy AI Toolkit", page_icon="⚓", layout="wide") API_URL_ROOT = os.getenv("API_URL") OPENAI_KEY = os.getenv("OPENAI_API_KEY") GOOGLE_KEY = os.getenv("GOOGLE_API_KEY") # NEW: Google Key # --- INITIALIZATION --- if "roles" not in st.session_state: st.session_state.roles = [] if "quiz_state" not in st.session_state: st.session_state.quiz_state = { "active": False, "question_data": None, "user_answer": "", "feedback": None, "streak": 0, "generated_question_text": "" } if "quiz_history" not in st.session_state: st.session_state.quiz_history = [] if "active_index" not in st.session_state: st.session_state.active_index = None # Debug State Variables if "last_prompt_sent" not in st.session_state: st.session_state.last_prompt_sent = "" if "last_context_used" not in st.session_state: st.session_state.last_context_used = "" # --- FLATTENER LOGIC --- class OutlineProcessor: """Parses text outlines for the Flattener tool.""" def __init__(self, file_content): self.raw_lines = file_content.split('\n') def _is_list_item(self, line): pattern = r"^\s*(\d+\.|[a-zA-Z]\.|-|\*)\s+" return bool(re.match(pattern, line)) def _merge_multiline_items(self): merged_lines = [] for line in self.raw_lines: stripped = line.strip() if not stripped: continue if not merged_lines: merged_lines.append(line) continue if not self._is_list_item(line): merged_lines[-1] = merged_lines[-1].rstrip() + " " + stripped else: merged_lines.append(line) return merged_lines def parse(self): clean_lines = self._merge_multiline_items() stack = [] results = [] for line in clean_lines: stripped = line.strip() indent = len(line) - len(line.lstrip()) while stack and stack[-1]['indent'] >= indent: stack.pop() stack.append({'indent': indent, 'text': stripped}) if len(stack) > 1: context_str = " > ".join([item['text'] for item in stack[:-1]]) else: context_str = "ROOT" results.append({"context": context_str, "target": stripped}) return results # --- HELPER FUNCTIONS --- def query_model_universal(messages, max_tokens, model_choice, user_key=None): """Unified router for Chat, Tools, and Quiz.""" # 1. DEBUG CAPTURE if messages and messages[-1]['role'] == 'user': st.session_state.last_prompt_sent = messages[-1]['content'] # --- ROUTE 1: GOOGLE GEMINI (NEW) --- if "Gemini" in model_choice: # Use System Key (Env Var) or User Override if you allow it # For now, we strictly use the Hugging Face Secret as requested if not GOOGLE_KEY: return "[Error: No GOOGLE_API_KEY found in Secrets]", None try: client = genai.Client(api_key=GOOGLE_KEY) # Convert Chat History to Single String for 'generate_content' # (Gemini supports chat history objects, but string is more robust for RAG contexts) full_prompt = "" for m in messages: role = m["role"].upper() content = m["content"] full_prompt += f"{role}: {content}\n\n" full_prompt += "ASSISTANT: " # RETRY LOGIC (User Provided) max_retries = 3 # Slightly conservative for UI responsiveness model_id = "gemini-2.0-flash" # or "gemini-1.5-pro" depending on your access for attempt in range(max_retries): try: response = client.models.generate_content( model=model_id, contents=full_prompt, config=types.GenerateContentConfig( max_output_tokens=max_tokens, temperature=0.3 ) ) # Usage tracking is different for Gemini, we estimate or grab from response if available # usage_meta = response.usage_metadata (if available) return response.text.strip(), {"input": 0, "output": 0} except Exception as e: error_msg = str(e) if "429" in error_msg or "RESOURCE_EXHAUSTED" in error_msg: wait_time = 10 # Short wait time.sleep(wait_time) else: return f"[Gemini Error: {error_msg}]", None return "[Error: Gemini Rate Limit Exceeded]", None except Exception as e: return f"[Gemini Client Error: {e}]", None # --- ROUTE 2: OPENAI GPT-4o --- elif "GPT-4o" in model_choice: key = user_key if user_key else OPENAI_KEY if not key: return "[Error: No OpenAI API Key]", None client = OpenAI(api_key=key) try: resp = client.chat.completions.create( model="gpt-4o", max_tokens=max_tokens, messages=messages, temperature=0.3 ) usage = {"input": resp.usage.prompt_tokens, "output": resp.usage.completion_tokens} return resp.choices[0].message.content, usage except Exception as e: return f"[OpenAI Error: {e}]", None # --- ROUTE 3: LOCAL/OPEN SOURCE --- else: model_map = { "Granite 4 (IBM)": "granite4:latest", "Llama 3.2 (Meta)": "llama3.2:latest", "Gemma 3 (Google)": "gemma3:latest" } tech_name = model_map.get(model_choice) if not tech_name: return "[Error: Model Map Failed]", None url = f"{API_URL_ROOT}/generate" hist = "" sys_msg = "You are a helpful assistant." for m in messages: if m['role']=='system': sys_msg = m['content'] elif m['role']=='user': hist += f"User: {m['content']}\n" elif m['role']=='assistant': hist += f"Assistant: {m['content']}\n" hist += "Assistant: " try: r = requests.post(url, json={"text": hist, "persona": sys_msg, "max_tokens": max_tokens, "model": tech_name}, timeout=600) if r.status_code == 200: d = r.json() return d.get("response", ""), d.get("usage", {"input":0,"output":0}) return f"[Local Error {r.status_code}]", None except Exception as e: return f"[Conn Error: {e}]", None def update_sidebar_metrics(): if metric_placeholder: stats = tracker.get_daily_stats() u_stats = stats["users"].get(st.session_state.username, {"input":0, "output":0}) metric_placeholder.metric("My Tokens Today", u_stats["input"] + u_stats["output"]) def generate_study_guide_md(history): md = "# ⚓ Study Guide\n\nGenerated: " + datetime.now().strftime('%Y-%m-%d %H:%M') + "\n\n" for item in history: md += f"## Q: {item['question']}\n**Your Answer:** {item['user_answer']}\n\n**Grade:** {item['grade']}\n\n**Context/Correct Info:**\n> {item['context']}\n\n---\n\n" return md # --- LOGIN --- if "authentication_status" not in st.session_state or st.session_state["authentication_status"] is None: login_tab, register_tab = st.tabs(["🔑 Login", "📝 Register"]) with login_tab: if tracker.check_login(): if "last_user" in st.session_state and st.session_state.last_user != st.session_state.username: st.session_state.messages = [] st.session_state.user_openai_key = None st.session_state.last_user = st.session_state.username tracker.download_user_db(st.session_state.username) st.rerun() with register_tab: st.header("Create Account") with st.form("reg_form"): new_user = st.text_input("Username"); new_name = st.text_input("Display Name") new_email = st.text_input("Email"); new_pwd = st.text_input("Password", type="password") invite = st.text_input("Invitation Passcode") if st.form_submit_button("Register"): success, msg = tracker.register_user(new_email, new_user, new_name, new_pwd, invite) if success: st.success(msg) else: st.error(msg) if not st.session_state.get("authentication_status"): st.stop() # --- SIDEBAR --- metric_placeholder = None with st.sidebar: st.header("👤 User Profile") st.write(f"Welcome, **{st.session_state.name}**") st.header("📊 Usage Tracker") metric_placeholder = st.empty() if "admin" in st.session_state.roles: admin_panel.render_admin_sidebar() st.divider() st.header("🌲 Pinecone Settings") pc_key = os.getenv("PINECONE_API_KEY") if pc_key: pm = PineconeManager(pc_key) indexes = pm.list_indexes() selected_index = st.selectbox("Active Index", indexes) st.session_state.active_index = selected_index if selected_index: current_model = st.session_state.get("active_embed_model", "sentence-transformers/all-MiniLM-L6-v2") try: emb_fn = rag_engine.get_embedding_func(current_model) test_vec = emb_fn.embed_query("test") active_model_dim = len(test_vec) if pm.check_dimension_compatibility(selected_index, active_model_dim): st.caption(f"✅ Compatible ({active_model_dim}d)") else: st.error(f"❌ Mismatch! Model: {active_model_dim}d") except Exception as e: st.caption(f"⚠️ Check failed: {e}") with st.expander("Create New Index"): new_idx_name = st.text_input("Index Name") new_idx_dim = st.selectbox("Dimension", [384, 768, 1024, 1536, 3072], index=0) if st.button("Create"): with st.spinner("Provisioning..."): ok, msg = pm.create_index(new_idx_name, dimension=new_idx_dim) if ok: st.success(msg); time.sleep(2); st.rerun() else: st.error(msg) else: st.warning("No Pinecone Key") st.header("🧠 Intelligence") st.subheader("1. Embeddings") embed_options = { "Standard (All-MiniLM, 384d)": "sentence-transformers/all-MiniLM-L6-v2", "High-Perf (MPNet, 768d)": "sentence-transformers/all-mpnet-base-v2", "OpenAI Small (1536d)": "text-embedding-3-small", "Custom Navy (BGE, 768d)": "NavyDevilDoc/navy-custom-models/bge-finetuned" } embed_choice_label = st.selectbox("Select Embedding Model", list(embed_options.keys())) st.session_state.active_embed_model = embed_options[embed_choice_label] st.subheader("2. Chat Model") # Base local models model_map = {"Granite 4 (IBM)": "granite4:latest", "Llama 3.2 (Meta)": "llama3.2:latest", "Gemma 3 (Google)": "gemma3:latest"} opts = list(model_map.keys()) is_admin = "admin" in st.session_state.roles user_key = None # Logic for Premium Models if not is_admin: user_key = st.text_input("Unlock GPT-4o", type="password") st.session_state.user_openai_key = user_key if user_key else None else: st.session_state.user_openai_key = None # Add Premium Options if Admin or Key provided if is_admin or st.session_state.get("user_openai_key"): opts.append("GPT-4o (Omni)") # Add Gemini if Key exists (System wide) if GOOGLE_KEY: opts.append("Gemini 2.5 (Google)") model_choice = st.radio("Select Model:", opts, key="model_selector_radio") st.info(f"Connected to: **{model_choice}**") st.divider() if st.session_state.authenticator: st.session_state.authenticator.logout(location='sidebar') update_sidebar_metrics() # --- MAIN APP --- st.title("⚓ Navy AI Toolkit") tab1, tab2, tab3 = st.tabs(["💬 Chat Playground", "📂 Knowledge & Tools", "⚡ Quiz Mode"]) # === TAB 1: CHAT === with tab1: # 1. LAYOUT: Header + Placeholder for Download Button col_header, col_btn = st.columns([6, 1]) with col_header: st.header("Discussion & Analysis") download_placeholder = col_btn.empty() if "messages" not in st.session_state: st.session_state.messages = [] # RENDER DEBUG OVERLAY (If enabled in Admin) admin_panel.render_debug_overlay("Chat Tab") c1, c2 = st.columns([3, 1]) with c1: st.caption(f"Active Model: **{st.session_state.get('model_selector_radio', 'Granite')}**") with c2: use_rag = st.toggle("Enable Knowledge Base", value=False) for msg in st.session_state.messages: with st.chat_message(msg["role"]): st.markdown(msg["content"]) if prompt := st.chat_input("Input command..."): st.session_state.messages.append({"role": "user", "content": prompt}) with st.chat_message("user"): st.markdown(prompt) context_txt = "" sys_p = "You are a helpful AI assistant." st.session_state.last_context_used = "" # Reset context debug if use_rag: if not st.session_state.active_index: st.error("⚠️ Please select an Active Index in the sidebar first.") else: with st.spinner("Searching Knowledge Base..."): docs = rag_engine.search_knowledge_base( query=prompt, username=st.session_state.username, index_name=st.session_state.active_index, embed_model_name=st.session_state.active_embed_model ) if docs: sys_p = "You are a Navy Document Analyst. Answer based PRIMARILY on the Context." for i, d in enumerate(docs): src = d.metadata.get('source', 'Unknown') context_txt += f"\n{d.page_content}\n\n" st.session_state.last_context_used = context_txt if context_txt: final_prompt = f"User Question: {prompt}\n\n\n{context_txt}\n\n\nInstruction: Answer using the context above." else: final_prompt = prompt with st.chat_message("assistant"): with st.spinner("Thinking..."): hist = [{"role":"system", "content":sys_p}] + st.session_state.messages[-6:-1] + [{"role":"user", "content":final_prompt}] resp, usage = query_model_universal(hist, 2000, model_choice, st.session_state.get("user_openai_key")) st.markdown(resp) if usage: m_name = "GPT-4o" if "GPT-4o" in model_choice else model_choice.split()[0] tracker.log_usage(m_name, usage["input"], usage["output"]) update_sidebar_metrics() st.session_state.messages.append({"role": "assistant", "content": resp}) if use_rag and context_txt: with st.expander("📚 View Context Used"): st.text(context_txt) # 3. LATE RENDER: Fill Download Button if st.session_state.messages: chat_log = f"# ⚓ Navy AI Toolkit - Chat Log\nDate: {datetime.now().strftime('%Y-%m-%d %H:%M')}\nModel: {st.session_state.get('model_selector_radio', 'Unknown')}\n\n---\n\n" for msg in st.session_state.messages: chat_log += f"**{msg['role'].upper()}**: {msg['content']}\n\n" with download_placeholder: st.download_button("💾 Save", chat_log, f"chat_{datetime.now().strftime('%Y%m%d_%H%M')}.md", "text/markdown") # === TAB 2: KNOWLEDGE & TOOLS === with tab2: st.header("Document Processor") c1, c2 = st.columns([1, 1]) with c1: uploaded_file = st.file_uploader("Upload File", type=["pdf", "docx", "pptx", "txt", "md"]) with c2: use_vision = st.toggle("👁️ Enable Vision Mode") if use_vision and "GPT-4o" not in opts: st.warning("Vision requires OpenAI.") if uploaded_file: temp_path = rag_engine.save_uploaded_file(uploaded_file, st.session_state.username) col_a, col_b, col_c = st.columns(3) # COLUMN A: Ingest with col_a: chunk_strategy = st.selectbox("Chunking Strategy", ["paragraph", "token"]) if st.button("📥 Add to KB", type="primary"): if not st.session_state.active_index: st.error("Select Index first.") else: with st.spinner("Ingesting..."): ok, msg = rag_engine.ingest_file(temp_path, st.session_state.username, st.session_state.active_index, st.session_state.active_embed_model, chunk_strategy) if ok: tracker.upload_user_db(st.session_state.username); st.success(msg) else: st.error(msg) # COLUMN B: Summarize with col_b: st.write(""); st.write("") if st.button("📝 Summarize"): with st.spinner("Summarizing..."): key = st.session_state.get("user_openai_key") or OPENAI_KEY class FileObj: def __init__(self, p, n): self.path=p; self.name=n def read(self): with open(self.path, "rb") as f: return f.read() raw = doc_loader.extract_text_from_file(FileObj(temp_path, uploaded_file.name), use_vision=use_vision, api_key=key) prompt = f"Summarize:\n\n{raw[:20000]}" msgs = [{"role":"user", "content": prompt}] summ, usage = query_model_universal(msgs, 1000, model_choice, st.session_state.get("user_openai_key")) st.subheader("Summary"); st.markdown(summ) # COLUMN C: Flatten with col_c: st.write(""); st.write("") if "flattened_result" not in st.session_state: st.session_state.flattened_result = None if st.button("📄 Flatten"): with st.spinner("Flattening..."): key = st.session_state.get("user_openai_key") or OPENAI_KEY # 1. Read File with open(temp_path, "rb") as f: class Wrapper: def __init__(self, data, n): self.data=data; self.name=n def read(self): return self.data raw = doc_loader.extract_text_from_file(Wrapper(f.read(), uploaded_file.name), use_vision=use_vision, api_key=key) # 2. Parse Outline (This was missing logic previously) proc = OutlineProcessor(raw) items = proc.parse() # 3. Process Items out_txt = [] bar = st.progress(0) for i, item in enumerate(items): p = f"Context: {item['context']}\nTarget: {item['target']}\nRewrite as one sentence." m = [{"role":"user", "content": p}] res, _ = query_model_universal(m, 300, model_choice, st.session_state.get("user_openai_key")) out_txt.append(res) bar.progress((i+1)/len(items)) final_flattened_text = "\n".join(out_txt) st.session_state.flattened_result = {"text": final_flattened_text, "source": f"{uploaded_file.name}_flat"} st.rerun() if st.session_state.flattened_result: res = st.session_state.flattened_result st.success("Complete!") st.text_area("Result", res["text"], height=200) if st.button("📥 Index Flat"): if not st.session_state.active_index: st.error("Please select an Active Index.") else: with st.spinner("Indexing..."): # FIX: Pass the active_embed_model here! ok, msg = rag_engine.process_and_add_text( text=res["text"], source_name=res["source"], username=st.session_state.username, index_name=st.session_state.active_index, embed_model_name=st.session_state.active_embed_model ) if ok: tracker.upload_user_db(st.session_state.username) st.success(msg) else: st.error(msg) st.divider() st.subheader("Database Management") c1, c2 = st.columns([2, 1]) with c1: st.info("Missing local files? Resync below.") with c2: if st.button("🔄 Resync from Pinecone"): if not st.session_state.active_index: st.error("Select Index.") else: with st.spinner("Resyncing..."): ok, msg = rag_engine.rebuild_cache_from_pinecone(st.session_state.username, st.session_state.active_index) if ok: st.success(msg); time.sleep(1); st.rerun() else: st.error(msg) docs = rag_engine.list_documents(st.session_state.username) if docs: for d in docs: c1, c2 = st.columns([4,1]) c1.text(f"📄 {d['filename']}") if c2.button("🗑️", key=d['source']): if not st.session_state.active_index: st.error("Select Index.") else: rag_engine.delete_document(st.session_state.username, d['source'], st.session_state.active_index) tracker.upload_user_db(st.session_state.username); st.rerun() else: st.warning("Cache Empty.") # === TAB 3: QUIZ MODE === with tab3: st.header("⚓ Qualification Board Simulator") admin_panel.render_debug_overlay("Quiz Tab") col_mode, col_streak = st.columns([3, 1]) with col_mode: quiz_mode = st.radio("Mode:", ["⚡ Acronym Lightning Round", "📖 Document Deep Dive"], horizontal=True) if "Document" in quiz_mode: focus_topic = st.text_input("🎯 Focus Topic", placeholder="e.g., PPBE...", help="Leave empty for random.") else: focus_topic = None if "last_quiz_mode" not in st.session_state: st.session_state.last_quiz_mode = quiz_mode if "quiz_trigger" not in st.session_state: st.session_state.quiz_trigger = False if st.session_state.last_quiz_mode != quiz_mode: st.session_state.quiz_state["active"] = False st.session_state.quiz_state["question_data"] = None st.session_state.quiz_state["feedback"] = None st.session_state.quiz_state["generated_question_text"] = "" st.session_state.last_quiz_mode = quiz_mode st.rerun() quiz = QuizEngine() qs = st.session_state.quiz_state with col_streak: st.metric("Streak", qs["streak"]) if st.button("Reset"): qs["streak"] = 0 if st.session_state.quiz_history: with st.expander(f"📚 Review Study Guide ({len(st.session_state.quiz_history)})"): st.download_button( "📥 Download Markdown", generate_study_guide_md(st.session_state.quiz_history), f"StudyGuide_{datetime.now().strftime('%Y%m%d')}.md" ) st.divider() def generate_question(): with st.spinner("Consulting Board..."): st.session_state.last_context_used = "" if "Acronym" in quiz_mode: q_data = quiz.get_random_acronym() if q_data: qs["active"]=True qs["question_data"]=q_data qs["feedback"]=None qs["generated_question_text"]=q_data["question"] else: st.error("No acronyms.") else: valid_question_found = False attempts = 0 last_error = None while not valid_question_found and attempts < 5: attempts += 1 q_ctx = quiz.get_document_context(st.session_state.username, topic_filter=focus_topic) if q_ctx and "error" in q_ctx: last_error = q_ctx["error"] break if q_ctx: # NEW: Use the Scenario Prompt prompt = quiz.construct_scenario_prompt(q_ctx["context_text"]) st.session_state.last_context_used = q_ctx["context_text"] # Generate response_text, usage = query_model_universal([{"role": "user", "content": prompt}], 600, model_choice, st.session_state.get("user_openai_key")) # PARSE OUTPUT (Scenario vs Solution) if "SCENARIO:" in response_text and "SOLUTION:" in response_text: parts = response_text.split("SOLUTION:") scenario_text = parts[0].replace("SCENARIO:", "").strip() solution_text = parts[1].strip() valid_question_found = True qs["active"] = True qs["question_data"] = q_ctx qs["generated_question_text"] = scenario_text qs["hidden_solution"] = solution_text qs["feedback"] = None else: # Fallback if model ignores format valid_question_found = True qs["active"] = True qs["question_data"] = q_ctx qs["generated_question_text"] = response_text qs["hidden_solution"] = "Refer to Source Text." qs["feedback"] = None if not valid_question_found: if last_error == "topic_not_found": st.warning(f"Topic '{focus_topic}' not found.") elif focus_topic: st.warning(f"Found '{focus_topic}' but could not generate question.") else: st.warning("Could not generate question. Try Resync.") if st.session_state.quiz_trigger: st.session_state.quiz_trigger = False generate_question() st.rerun() if not qs["active"]: if st.button("🚀 New Question", type="primary"): generate_question() st.rerun() if qs["active"]: st.markdown(f"### {qs['generated_question_text']}") if "document" in qs.get("question_data", {}).get("type", ""): st.caption(f"Source: *{qs['question_data']['source_file']}*") with st.form(key="quiz_response"): user_ans = st.text_area("Answer:") sub = st.form_submit_button("Submit") if sub and user_ans: with st.spinner("Board is deliberating..."): data = qs["question_data"] if data["type"] == "acronym": prompt = quiz.construct_acronym_grading_prompt(data["term"], data["correct_definition"], user_ans) final_context_for_history = data["correct_definition"] msgs = [{"role": "user", "content": prompt}] grade, _ = query_model_universal(msgs, 1000, model_choice, st.session_state.get("user_openai_key")) qs["feedback"] = grade else: # NEW: Scenario Grading Logic scenario = qs["generated_question_text"] solution = qs.get("hidden_solution", "") context_ref = data["context_text"] prompt = quiz.construct_scenario_grading_prompt(scenario, user_ans, solution, context_ref) st.session_state.last_context_used = f"SCENARIO: {scenario}\n\nSOLUTION: {solution}\n\nREF: {context_ref}" msgs = [{"role": "user", "content": prompt}] grade, _ = query_model_universal(msgs, 1000, model_choice, st.session_state.get("user_openai_key")) qs["feedback"] = grade # Logic to determine PASS/FAIL is_pass = False if "10/10" in grade or "9/10" in grade or "8/10" in grade or "7/10" in grade or "PASS" in grade: is_pass = True qs["streak"] += 1 elif "FAIL" in grade or " 6/" in grade or " 5/" in grade: qs["streak"] = 0 else: is_pass = True qs["streak"] += 1 # Save history st.session_state.quiz_history.append({ "question": qs["generated_question_text"], "user_answer": user_ans, "grade": "PASS" if is_pass else "FAIL", # Simplified for history list "context": f"**Official Solution:** {qs.get('hidden_solution', '')}\n\n**Source Text:** {data.get('context_text', '')[:500]}..." }) st.rerun() if qs["feedback"]: st.divider() if "PASS" in qs["feedback"] or "7/10" in qs["feedback"] or "8/10" in qs["feedback"] or "9/10" in qs["feedback"] or "10/10" in qs["feedback"]: st.success("✅ CORRECT / PASSING") else: if "FAIL" in qs["feedback"]: st.error("❌ INCORRECT") else: st.warning("⚠️ PARTIAL / CRITIQUE") st.markdown(qs["feedback"]) data = qs["question_data"] if data["type"] == "acronym": st.info(f"**Definition:** {data['correct_definition']}") elif data["type"] == "document": with st.expander("Show Official Solution"): st.info(qs.get("hidden_solution", "No solution generated.")) if st.button("Next Question ➡️"): st.session_state.quiz_trigger = True qs["active"] = False qs["question_data"] = None qs["feedback"] = None st.rerun()