import streamlit as st import requests import os import re import io import contextlib import zipfile import tracker import rag_engine import doc_loader from openai import OpenAI from datetime import datetime from test_integration import run_tests # --- CONFIGURATION --- st.set_page_config(page_title="Navy AI Toolkit", page_icon="⚓", layout="wide") API_URL_ROOT = os.getenv("API_URL") OPENAI_KEY = os.getenv("OPENAI_API_KEY") # --- INITIALIZATION --- if "roles" not in st.session_state: st.session_state.roles = [] # --- FLATTENER LOGIC (Integrated) --- class OutlineProcessor: """Parses text outlines for the Flattener tool.""" def __init__(self, file_content): self.raw_lines = file_content.split('\n') def _is_list_item(self, line): pattern = r"^\s*(\d+\.|[a-zA-Z]\.|-|\*)\s+" return bool(re.match(pattern, line)) def _merge_multiline_items(self): merged_lines = [] for line in self.raw_lines: stripped = line.strip() if not stripped: continue if not merged_lines: merged_lines.append(line) continue if not self._is_list_item(line): merged_lines[-1] = merged_lines[-1].rstrip() + " " + stripped else: merged_lines.append(line) return merged_lines def parse(self): clean_lines = self._merge_multiline_items() stack = [] results = [] for line in clean_lines: stripped = line.strip() indent = len(line) - len(line.lstrip()) while stack and stack[-1]['indent'] >= indent: stack.pop() stack.append({'indent': indent, 'text': stripped}) if len(stack) > 1: context_str = " > ".join([item['text'] for item in stack[:-1]]) else: context_str = "ROOT" results.append({"context": context_str, "target": stripped}) return results # --- HELPER FUNCTIONS --- def query_model_universal(messages, max_tokens, model_choice, user_key=None): """Unified router for both Chat and Tools.""" # 1. OpenAI Path if "GPT-4o" in model_choice: key = user_key if user_key else OPENAI_KEY if not key: return "[Error: No OpenAI API Key]", None client = OpenAI(api_key=key) try: resp = client.chat.completions.create( model="gpt-4o", max_tokens=max_tokens, messages=messages, temperature=0.3 ) usage = {"input": resp.usage.prompt_tokens, "output": resp.usage.completion_tokens} return resp.choices[0].message.content, usage except Exception as e: return f"[OpenAI Error: {e}]", None # 2. Local Path else: model_map = { "Granite 4 (IBM)": "granite4:latest", "Llama 3.2 (Meta)": "llama3.2:latest", "Gemma 3 (Google)": "gemma3:latest" } tech_name = model_map.get(model_choice) if not tech_name: return "[Error: Model Map Failed]", None url = f"{API_URL_ROOT}/generate" # Flatten history for Ollama hist = "" sys_msg = "You are a helpful assistant." for m in messages: if m['role']=='system': sys_msg = m['content'] elif m['role']=='user': hist += f"User: {m['content']}\n" elif m['role']=='assistant': hist += f"Assistant: {m['content']}\n" hist += "Assistant: " try: r = requests.post(url, json={"text": hist, "persona": sys_msg, "max_tokens": max_tokens, "model": tech_name}, timeout=300) if r.status_code == 200: d = r.json() return d.get("response", ""), d.get("usage", {"input":0,"output":0}) return f"[Local Error {r.status_code}]", None except Exception as e: return f"[Conn Error: {e}]", None def update_sidebar_metrics(): # Helper to safely update metrics if placeholder exists if metric_placeholder: stats = tracker.get_daily_stats() u_stats = stats["users"].get(st.session_state.username, {"input":0, "output":0}) metric_placeholder.metric("My Tokens Today", u_stats["input"] + u_stats["output"]) # --- LOGIN --- if "authentication_status" not in st.session_state or st.session_state["authentication_status"] is None: login_tab, register_tab = st.tabs(["🔑 Login", "📝 Register"]) with login_tab: if tracker.check_login(): # Session Isolation Logic if "last_user" in st.session_state and st.session_state.last_user != st.session_state.username: st.session_state.messages = [] st.session_state.user_openai_key = None st.session_state.last_user = st.session_state.username tracker.download_user_db(st.session_state.username) st.rerun() with register_tab: st.header("Create Account") with st.form("reg_form"): new_user = st.text_input("Username") new_name = st.text_input("Display Name") new_email = st.text_input("Email") new_pwd = st.text_input("Password", type="password") invite = st.text_input("Invitation Passcode") if st.form_submit_button("Register"): success, msg = tracker.register_user(new_email, new_user, new_name, new_pwd, invite) if success: st.success(msg) else: st.error(msg) if not st.session_state.get("authentication_status"): st.stop() # --- SIDEBAR --- metric_placeholder = None with st.sidebar: st.header("👤 User Profile") st.write(f"Welcome, **{st.session_state.name}**") st.header("📊 Usage Tracker") metric_placeholder = st.empty() # Admin Tools if "admin" in st.session_state.roles: st.divider() st.header("🛡️ Admin Tools") log_path = tracker.get_log_path() if log_path.exists(): with open(log_path, "r") as f: log_data = f.read() st.download_button( label="📥 Download Usage Logs", data=log_data, file_name=f"usage_log_{datetime.now().strftime('%Y-%m-%d')}.json", mime="application/json" ) st.divider() # Model Selector st.header("🧠 Intelligence") model_map = { "Granite 4 (IBM)": "granite4:latest", "Llama 3.2 (Meta)": "llama3.2:latest", "Gemma 3 (Google)": "gemma3:latest" } opts = list(model_map.keys()) model_captions = ["Slower, free, private" for _ in opts] # Vision Key Input (User or Admin) is_admin = "admin" in st.session_state.roles user_key = None if not is_admin: user_key = st.text_input( "🔓 Unlock GPT-4o (Enter API Key)", type="password", key=f"key_{st.session_state.username}", help="Required for Vision Mode and GPT-4o." ) if user_key: st.session_state.user_openai_key = user_key st.caption("✅ Key Active") else: st.session_state.user_openai_key = None else: # Admin defaults to system key, but we ensure state is clean st.session_state.user_openai_key = None # Unlock GPT-4o option if is_admin or st.session_state.get("user_openai_key"): opts.append("GPT-4o (Omni)") model_captions.append("Fast, smart, sends data to OpenAI") model_choice = st.radio("Select Model:", opts, captions=model_captions, key="model_selector_radio") st.info(f"Connected to: **{model_choice}**") st.divider() if st.session_state.authenticator: st.session_state.authenticator.logout(location='sidebar') st.divider() st.subheader("🔧 System Diagnostics") if st.button("Run Integration Test"): with st.spinner("Running diagnostics..."): # Create a buffer to capture the text that would normally be printed f = io.StringIO() # Redirect 'print' statements to our buffer instead of the console try: with contextlib.redirect_stdout(f): run_tests() # Display the result in a code block for easy reading st.success("Tests Completed") st.code(f.getvalue(), language="text") except Exception as e: st.error(f"Test Execution Failed: {e}") update_sidebar_metrics() # --- MAIN APP --- st.title("⚓ Navy AI Toolkit") tab1, tab2 = st.tabs(["💬 Chat Playground", "📂 Knowledge & Tools"]) # === TAB 1: CHAT === with tab1: st.header("Discussion & Analysis") if "messages" not in st.session_state: st.session_state.messages = [] c1, c2 = st.columns([3, 1]) with c1: st.caption(f"Active Model: **{st.session_state.get('model_selector_radio', 'Granite')}**") with c2: use_rag = st.toggle("Enable Knowledge Base", value=False) for msg in st.session_state.messages: with st.chat_message(msg["role"]): st.markdown(msg["content"]) if prompt := st.chat_input("Input command..."): st.session_state.messages.append({"role": "user", "content": prompt}) with st.chat_message("user"): st.markdown(prompt) # RAG Search context_txt = "" # 1. Default System Prompt (No RAG) sys_p = "You are a helpful AI assistant." if use_rag: with st.spinner("Searching Knowledge Base..."): docs = rag_engine.search_knowledge_base(prompt, st.session_state.username) if docs: # 2. Strict System Prompt (With RAG) # We relax the strictness slightly to allow for inference, # while still demanding evidence. sys_p = ( "You are a Navy Document Analyst. " "You must answer the user's question based PRIMARILY on the provided Context. " "If the Context contains the answer, output it clearly. " "If the Context does NOT contain the answer, simply state: " "'I cannot find that specific information in the documents provided.'" ) # 3. XML-Formatted Context Construction # This helps the model "see" the start and end of each chunk clearly. for i, d in enumerate(docs): src = d.metadata.get('source', 'Unknown') context_txt += f"\n{d.page_content}\n\n" # 4. Construct Final User Payload if context_txt: final_prompt = ( f"User Question: {prompt}\n\n" f"\n{context_txt}\n\n\n" "Instruction: Answer the question using the context above." ) else: final_prompt = prompt # Generation with st.chat_message("assistant"): with st.spinner("Thinking..."): # Memory Window hist = [{"role":"system", "content":sys_p}] + st.session_state.messages[-6:-1] + [{"role":"user", "content":final_prompt}] resp, usage = query_model_universal(hist, 2000, model_choice, st.session_state.get("user_openai_key")) st.markdown(resp) if usage: m_name = "GPT-4o" if "GPT-4o" in model_choice else model_choice.split()[0] tracker.log_usage(m_name, usage["input"], usage["output"]) update_sidebar_metrics() st.session_state.messages.append({"role": "assistant", "content": resp}) if use_rag and context_txt: with st.expander("📚 View Context Used"): st.text(context_txt) # === TAB 2: KNOWLEDGE & TOOLS === with tab2: st.header("Document Processor") c1, c2 = st.columns([1, 1]) with c1: uploaded_file = st.file_uploader("Upload File (PDF, PPT, Doc, Text)", type=["pdf", "docx", "pptx", "txt", "md"]) with c2: use_vision = st.toggle("👁️ Enable Vision Mode", help="Use GPT-4o to read diagrams/tables. Requires API Key.") if use_vision and "GPT-4o" not in opts: st.warning("Vision requires OpenAI Access.") if uploaded_file: # Save temp temp_path = rag_engine.save_uploaded_file(uploaded_file) # ACTION BAR col_a, col_b, col_c = st.columns(3) # 1. ADD TO DB (With Strategy Selection) with col_a: chunk_strategy = st.selectbox( "Chunking Strategy", ["paragraph", "token"], # Removed 'page' as it is not implemented in new engine yet help="Paragraph: Standard. Token: Dense text.", key="chunk_selector" ) if st.button("📥 Add to Knowledge Base", type="primary"): with st.spinner("Ingesting..."): # Note: New engine uses internal Tesseract OCR, not GPT-4o Vision # so we don't pass vision flags or keys here anymore. ok, msg = rag_engine.ingest_file( file_path=temp_path, username=st.session_state.username, strategy=chunk_strategy ) if ok: tracker.upload_user_db(st.session_state.username) # Auto-Sync st.success(msg) else: st.error(msg) # 2. SUMMARIZE with col_b: # Spacer to align buttons visually since col_a has a selectbox st.write("") st.write("") if st.button("📝 Summarize Document"): with st.spinner("Reading & Summarizing..."): key = st.session_state.get("user_openai_key") or OPENAI_KEY # Extract raw text first class FileObj: def __init__(self, p, n): self.path=p; self.name=n def read(self): with open(self.path, "rb") as f: return f.read() # Extraction raw = doc_loader.extract_text_from_file( FileObj(temp_path, uploaded_file.name), use_vision=use_vision, api_key=key ) # Call LLM prompt = f"Summarize this document into a key executive brief:\n\n{raw[:20000]}" # Truncate for safety msgs = [{"role":"user", "content": prompt}] summ, usage = query_model_universal(msgs, 1000, model_choice, st.session_state.get("user_openai_key")) st.subheader("Summary Result") st.markdown(summ) if usage: m_name = "GPT-4o" if "GPT-4o" in model_choice else model_choice.split()[0] tracker.log_usage(m_name, usage["input"], usage["output"]) update_sidebar_metrics() # 3. FLATTEN with col_c: # Spacer to align buttons st.write("") st.write("") # We use a session state variable to store the result so it persists for the "Index" step if "flattened_result" not in st.session_state: st.session_state.flattened_result = None if st.button("📄 Flatten Context"): with st.spinner("Flattening..."): key = st.session_state.get("user_openai_key") or OPENAI_KEY # A. Extract with open(temp_path, "rb") as f: class Wrapper: def __init__(self, data, n): self.data=data; self.name=n def read(self): return self.data raw = doc_loader.extract_text_from_file( Wrapper(f.read(), uploaded_file.name), use_vision=use_vision, api_key=key ) # B. Parse proc = OutlineProcessor(raw) items = proc.parse() # C. Flatten out_txt = [] bar = st.progress(0) for i, item in enumerate(items): p = f"Context: {item['context']}\nTarget: {item['target']}\nRewrite as one sentence." m = [{"role":"user", "content": p}] res, _ = query_model_universal(m, 300, model_choice, st.session_state.get("user_openai_key")) out_txt.append(res) bar.progress((i+1)/len(items)) # D. Store Result in Session State final_flattened_text = "\n".join(out_txt) st.session_state.flattened_result = { "text": final_flattened_text, "source": f"{uploaded_file.name}_flat" } st.rerun() # Refresh to show the new result/buttons # Display Result & Index Option if st.session_state.flattened_result: res = st.session_state.flattened_result st.success("Flattening Complete!") st.text_area("Result", res["text"], height=200) # The New Button if st.button("📥 Index This Flattened Version"): with st.spinner("Indexing Flattened Text..."): ok, msg = rag_engine.process_and_add_text( res["text"], res["source"], st.session_state.username ) if ok: tracker.upload_user_db(st.session_state.username) # Sync! st.success(msg) else: st.error(msg) st.divider() # DB MANAGER st.subheader("Database Management") docs = rag_engine.list_documents(st.session_state.username) if docs: for d in docs: c1, c2 = st.columns([4,1]) c1.text(f"📄 {d['filename']} ({d['chunks']} chunks)") if c2.button("🗑️", key=d['source']): rag_engine.delete_document(st.session_state.username, d['source']) tracker.upload_user_db(st.session_state.username) st.rerun() else: st.info("Database Empty.")