import streamlit as st
import requests
import os
import re
import io
import contextlib
import zipfile
import tracker
import rag_engine
import doc_loader
from openai import OpenAI
from datetime import datetime
from test_integration import run_tests
# --- CONFIGURATION ---
st.set_page_config(page_title="Navy AI Toolkit", page_icon="⚓", layout="wide")
API_URL_ROOT = os.getenv("API_URL")
OPENAI_KEY = os.getenv("OPENAI_API_KEY")
# --- INITIALIZATION ---
if "roles" not in st.session_state:
st.session_state.roles = []
# --- FLATTENER LOGIC (Integrated) ---
class OutlineProcessor:
"""Parses text outlines for the Flattener tool."""
def __init__(self, file_content):
self.raw_lines = file_content.split('\n')
def _is_list_item(self, line):
pattern = r"^\s*(\d+\.|[a-zA-Z]\.|-|\*)\s+"
return bool(re.match(pattern, line))
def _merge_multiline_items(self):
merged_lines = []
for line in self.raw_lines:
stripped = line.strip()
if not stripped: continue
if not merged_lines:
merged_lines.append(line)
continue
if not self._is_list_item(line):
merged_lines[-1] = merged_lines[-1].rstrip() + " " + stripped
else:
merged_lines.append(line)
return merged_lines
def parse(self):
clean_lines = self._merge_multiline_items()
stack = []
results = []
for line in clean_lines:
stripped = line.strip()
indent = len(line) - len(line.lstrip())
while stack and stack[-1]['indent'] >= indent:
stack.pop()
stack.append({'indent': indent, 'text': stripped})
if len(stack) > 1:
context_str = " > ".join([item['text'] for item in stack[:-1]])
else:
context_str = "ROOT"
results.append({"context": context_str, "target": stripped})
return results
# --- HELPER FUNCTIONS ---
def query_model_universal(messages, max_tokens, model_choice, user_key=None):
"""Unified router for both Chat and Tools."""
# 1. OpenAI Path
if "GPT-4o" in model_choice:
key = user_key if user_key else OPENAI_KEY
if not key: return "[Error: No OpenAI API Key]", None
client = OpenAI(api_key=key)
try:
resp = client.chat.completions.create(
model="gpt-4o", max_tokens=max_tokens, messages=messages, temperature=0.3
)
usage = {"input": resp.usage.prompt_tokens, "output": resp.usage.completion_tokens}
return resp.choices[0].message.content, usage
except Exception as e:
return f"[OpenAI Error: {e}]", None
# 2. Local Path
else:
model_map = {
"Granite 4 (IBM)": "granite4:latest",
"Llama 3.2 (Meta)": "llama3.2:latest",
"Gemma 3 (Google)": "gemma3:latest"
}
tech_name = model_map.get(model_choice)
if not tech_name: return "[Error: Model Map Failed]", None
url = f"{API_URL_ROOT}/generate"
# Flatten history for Ollama
hist = ""
sys_msg = "You are a helpful assistant."
for m in messages:
if m['role']=='system': sys_msg = m['content']
elif m['role']=='user': hist += f"User: {m['content']}\n"
elif m['role']=='assistant': hist += f"Assistant: {m['content']}\n"
hist += "Assistant: "
try:
r = requests.post(url, json={"text": hist, "persona": sys_msg, "max_tokens": max_tokens, "model": tech_name}, timeout=300)
if r.status_code == 200:
d = r.json()
return d.get("response", ""), d.get("usage", {"input":0,"output":0})
return f"[Local Error {r.status_code}]", None
except Exception as e:
return f"[Conn Error: {e}]", None
def update_sidebar_metrics():
# Helper to safely update metrics if placeholder exists
if metric_placeholder:
stats = tracker.get_daily_stats()
u_stats = stats["users"].get(st.session_state.username, {"input":0, "output":0})
metric_placeholder.metric("My Tokens Today", u_stats["input"] + u_stats["output"])
# --- LOGIN ---
if "authentication_status" not in st.session_state or st.session_state["authentication_status"] is None:
login_tab, register_tab = st.tabs(["🔑 Login", "📝 Register"])
with login_tab:
if tracker.check_login():
# Session Isolation Logic
if "last_user" in st.session_state and st.session_state.last_user != st.session_state.username:
st.session_state.messages = []
st.session_state.user_openai_key = None
st.session_state.last_user = st.session_state.username
tracker.download_user_db(st.session_state.username)
st.rerun()
with register_tab:
st.header("Create Account")
with st.form("reg_form"):
new_user = st.text_input("Username")
new_name = st.text_input("Display Name")
new_email = st.text_input("Email")
new_pwd = st.text_input("Password", type="password")
invite = st.text_input("Invitation Passcode")
if st.form_submit_button("Register"):
success, msg = tracker.register_user(new_email, new_user, new_name, new_pwd, invite)
if success:
st.success(msg)
else:
st.error(msg)
if not st.session_state.get("authentication_status"): st.stop()
# --- SIDEBAR ---
metric_placeholder = None
with st.sidebar:
st.header("👤 User Profile")
st.write(f"Welcome, **{st.session_state.name}**")
st.header("📊 Usage Tracker")
metric_placeholder = st.empty()
# Admin Tools
if "admin" in st.session_state.roles:
st.divider()
st.header("🛡️ Admin Tools")
log_path = tracker.get_log_path()
if log_path.exists():
with open(log_path, "r") as f:
log_data = f.read()
st.download_button(
label="📥 Download Usage Logs",
data=log_data,
file_name=f"usage_log_{datetime.now().strftime('%Y-%m-%d')}.json",
mime="application/json"
)
st.divider()
# Model Selector
st.header("🧠 Intelligence")
model_map = {
"Granite 4 (IBM)": "granite4:latest",
"Llama 3.2 (Meta)": "llama3.2:latest",
"Gemma 3 (Google)": "gemma3:latest"
}
opts = list(model_map.keys())
model_captions = ["Slower, free, private" for _ in opts]
# Vision Key Input (User or Admin)
is_admin = "admin" in st.session_state.roles
user_key = None
if not is_admin:
user_key = st.text_input(
"🔓 Unlock GPT-4o (Enter API Key)",
type="password",
key=f"key_{st.session_state.username}",
help="Required for Vision Mode and GPT-4o."
)
if user_key:
st.session_state.user_openai_key = user_key
st.caption("✅ Key Active")
else:
st.session_state.user_openai_key = None
else:
# Admin defaults to system key, but we ensure state is clean
st.session_state.user_openai_key = None
# Unlock GPT-4o option
if is_admin or st.session_state.get("user_openai_key"):
opts.append("GPT-4o (Omni)")
model_captions.append("Fast, smart, sends data to OpenAI")
model_choice = st.radio("Select Model:", opts, captions=model_captions, key="model_selector_radio")
st.info(f"Connected to: **{model_choice}**")
st.divider()
if st.session_state.authenticator:
st.session_state.authenticator.logout(location='sidebar')
st.divider()
st.subheader("🔧 System Diagnostics")
if st.button("Run Integration Test"):
with st.spinner("Running diagnostics..."):
# Create a buffer to capture the text that would normally be printed
f = io.StringIO()
# Redirect 'print' statements to our buffer instead of the console
try:
with contextlib.redirect_stdout(f):
run_tests()
# Display the result in a code block for easy reading
st.success("Tests Completed")
st.code(f.getvalue(), language="text")
except Exception as e:
st.error(f"Test Execution Failed: {e}")
update_sidebar_metrics()
# --- MAIN APP ---
st.title("⚓ Navy AI Toolkit")
tab1, tab2 = st.tabs(["💬 Chat Playground", "📂 Knowledge & Tools"])
# === TAB 1: CHAT ===
with tab1:
st.header("Discussion & Analysis")
if "messages" not in st.session_state: st.session_state.messages = []
c1, c2 = st.columns([3, 1])
with c1: st.caption(f"Active Model: **{st.session_state.get('model_selector_radio', 'Granite')}**")
with c2: use_rag = st.toggle("Enable Knowledge Base", value=False)
for msg in st.session_state.messages:
with st.chat_message(msg["role"]): st.markdown(msg["content"])
if prompt := st.chat_input("Input command..."):
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"): st.markdown(prompt)
# RAG Search
context_txt = ""
# 1. Default System Prompt (No RAG)
sys_p = "You are a helpful AI assistant."
if use_rag:
with st.spinner("Searching Knowledge Base..."):
docs = rag_engine.search_knowledge_base(prompt, st.session_state.username)
if docs:
# 2. Strict System Prompt (With RAG)
# We relax the strictness slightly to allow for inference,
# while still demanding evidence.
sys_p = (
"You are a Navy Document Analyst. "
"You must answer the user's question based PRIMARILY on the provided Context. "
"If the Context contains the answer, output it clearly. "
"If the Context does NOT contain the answer, simply state: "
"'I cannot find that specific information in the documents provided.'"
)
# 3. XML-Formatted Context Construction
# This helps the model "see" the start and end of each chunk clearly.
for i, d in enumerate(docs):
src = d.metadata.get('source', 'Unknown')
context_txt += f"\n{d.page_content}\n\n"
# 4. Construct Final User Payload
if context_txt:
final_prompt = (
f"User Question: {prompt}\n\n"
f"\n{context_txt}\n\n\n"
"Instruction: Answer the question using the context above."
)
else:
final_prompt = prompt
# Generation
with st.chat_message("assistant"):
with st.spinner("Thinking..."):
# Memory Window
hist = [{"role":"system", "content":sys_p}] + st.session_state.messages[-6:-1] + [{"role":"user", "content":final_prompt}]
resp, usage = query_model_universal(hist, 2000, model_choice, st.session_state.get("user_openai_key"))
st.markdown(resp)
if usage:
m_name = "GPT-4o" if "GPT-4o" in model_choice else model_choice.split()[0]
tracker.log_usage(m_name, usage["input"], usage["output"])
update_sidebar_metrics()
st.session_state.messages.append({"role": "assistant", "content": resp})
if use_rag and context_txt:
with st.expander("📚 View Context Used"):
st.text(context_txt)
# === TAB 2: KNOWLEDGE & TOOLS ===
with tab2:
st.header("Document Processor")
c1, c2 = st.columns([1, 1])
with c1:
uploaded_file = st.file_uploader("Upload File (PDF, PPT, Doc, Text)", type=["pdf", "docx", "pptx", "txt", "md"])
with c2:
use_vision = st.toggle("👁️ Enable Vision Mode", help="Use GPT-4o to read diagrams/tables. Requires API Key.")
if use_vision and "GPT-4o" not in opts:
st.warning("Vision requires OpenAI Access.")
if uploaded_file:
# Save temp
temp_path = rag_engine.save_uploaded_file(uploaded_file)
# ACTION BAR
col_a, col_b, col_c = st.columns(3)
# 1. ADD TO DB (With Strategy Selection)
with col_a:
chunk_strategy = st.selectbox(
"Chunking Strategy",
["paragraph", "token"], # Removed 'page' as it is not implemented in new engine yet
help="Paragraph: Standard. Token: Dense text.",
key="chunk_selector"
)
if st.button("📥 Add to Knowledge Base", type="primary"):
with st.spinner("Ingesting..."):
# Note: New engine uses internal Tesseract OCR, not GPT-4o Vision
# so we don't pass vision flags or keys here anymore.
ok, msg = rag_engine.ingest_file(
file_path=temp_path,
username=st.session_state.username,
strategy=chunk_strategy
)
if ok:
tracker.upload_user_db(st.session_state.username) # Auto-Sync
st.success(msg)
else:
st.error(msg)
# 2. SUMMARIZE
with col_b:
# Spacer to align buttons visually since col_a has a selectbox
st.write("")
st.write("")
if st.button("📝 Summarize Document"):
with st.spinner("Reading & Summarizing..."):
key = st.session_state.get("user_openai_key") or OPENAI_KEY
# Extract raw text first
class FileObj:
def __init__(self, p, n): self.path=p; self.name=n
def read(self):
with open(self.path, "rb") as f: return f.read()
# Extraction
raw = doc_loader.extract_text_from_file(
FileObj(temp_path, uploaded_file.name),
use_vision=use_vision, api_key=key
)
# Call LLM
prompt = f"Summarize this document into a key executive brief:\n\n{raw[:20000]}" # Truncate for safety
msgs = [{"role":"user", "content": prompt}]
summ, usage = query_model_universal(msgs, 1000, model_choice, st.session_state.get("user_openai_key"))
st.subheader("Summary Result")
st.markdown(summ)
if usage:
m_name = "GPT-4o" if "GPT-4o" in model_choice else model_choice.split()[0]
tracker.log_usage(m_name, usage["input"], usage["output"])
update_sidebar_metrics()
# 3. FLATTEN
with col_c:
# Spacer to align buttons
st.write("")
st.write("")
# We use a session state variable to store the result so it persists for the "Index" step
if "flattened_result" not in st.session_state:
st.session_state.flattened_result = None
if st.button("📄 Flatten Context"):
with st.spinner("Flattening..."):
key = st.session_state.get("user_openai_key") or OPENAI_KEY
# A. Extract
with open(temp_path, "rb") as f:
class Wrapper:
def __init__(self, data, n): self.data=data; self.name=n
def read(self): return self.data
raw = doc_loader.extract_text_from_file(
Wrapper(f.read(), uploaded_file.name), use_vision=use_vision, api_key=key
)
# B. Parse
proc = OutlineProcessor(raw)
items = proc.parse()
# C. Flatten
out_txt = []
bar = st.progress(0)
for i, item in enumerate(items):
p = f"Context: {item['context']}\nTarget: {item['target']}\nRewrite as one sentence."
m = [{"role":"user", "content": p}]
res, _ = query_model_universal(m, 300, model_choice, st.session_state.get("user_openai_key"))
out_txt.append(res)
bar.progress((i+1)/len(items))
# D. Store Result in Session State
final_flattened_text = "\n".join(out_txt)
st.session_state.flattened_result = {
"text": final_flattened_text,
"source": f"{uploaded_file.name}_flat"
}
st.rerun() # Refresh to show the new result/buttons
# Display Result & Index Option
if st.session_state.flattened_result:
res = st.session_state.flattened_result
st.success("Flattening Complete!")
st.text_area("Result", res["text"], height=200)
# The New Button
if st.button("📥 Index This Flattened Version"):
with st.spinner("Indexing Flattened Text..."):
ok, msg = rag_engine.process_and_add_text(
res["text"],
res["source"],
st.session_state.username
)
if ok:
tracker.upload_user_db(st.session_state.username) # Sync!
st.success(msg)
else:
st.error(msg)
st.divider()
# DB MANAGER
st.subheader("Database Management")
docs = rag_engine.list_documents(st.session_state.username)
if docs:
for d in docs:
c1, c2 = st.columns([4,1])
c1.text(f"📄 {d['filename']} ({d['chunks']} chunks)")
if c2.button("🗑️", key=d['source']):
rag_engine.delete_document(st.session_state.username, d['source'])
tracker.upload_user_db(st.session_state.username)
st.rerun()
else:
st.info("Database Empty.")