Spaces:
Running
Running
| import streamlit as st | |
| import requests | |
| import os | |
| import re | |
| import io | |
| import contextlib | |
| import zipfile | |
| import tracker | |
| import rag_engine | |
| import doc_loader | |
| from openai import OpenAI | |
| from datetime import datetime | |
| from test_integration import run_tests | |
| # --- CONFIGURATION --- | |
| st.set_page_config(page_title="Navy AI Toolkit", page_icon="β", layout="wide") | |
| API_URL_ROOT = os.getenv("API_URL") | |
| OPENAI_KEY = os.getenv("OPENAI_API_KEY") | |
| # --- INITIALIZATION --- | |
| if "roles" not in st.session_state: | |
| st.session_state.roles = [] | |
| # --- FLATTENER LOGIC (Integrated) --- | |
| class OutlineProcessor: | |
| """Parses text outlines for the Flattener tool.""" | |
| def __init__(self, file_content): | |
| self.raw_lines = file_content.split('\n') | |
| def _is_list_item(self, line): | |
| pattern = r"^\s*(\d+\.|[a-zA-Z]\.|-|\*)\s+" | |
| return bool(re.match(pattern, line)) | |
| def _merge_multiline_items(self): | |
| merged_lines = [] | |
| for line in self.raw_lines: | |
| stripped = line.strip() | |
| if not stripped: continue | |
| if not merged_lines: | |
| merged_lines.append(line) | |
| continue | |
| if not self._is_list_item(line): | |
| merged_lines[-1] = merged_lines[-1].rstrip() + " " + stripped | |
| else: | |
| merged_lines.append(line) | |
| return merged_lines | |
| def parse(self): | |
| clean_lines = self._merge_multiline_items() | |
| stack = [] | |
| results = [] | |
| for line in clean_lines: | |
| stripped = line.strip() | |
| indent = len(line) - len(line.lstrip()) | |
| while stack and stack[-1]['indent'] >= indent: | |
| stack.pop() | |
| stack.append({'indent': indent, 'text': stripped}) | |
| if len(stack) > 1: | |
| context_str = " > ".join([item['text'] for item in stack[:-1]]) | |
| else: | |
| context_str = "ROOT" | |
| results.append({"context": context_str, "target": stripped}) | |
| return results | |
| # --- HELPER FUNCTIONS --- | |
| def query_model_universal(messages, max_tokens, model_choice, user_key=None): | |
| """Unified router for both Chat and Tools.""" | |
| # 1. OpenAI Path | |
| if "GPT-4o" in model_choice: | |
| key = user_key if user_key else OPENAI_KEY | |
| if not key: return "[Error: No OpenAI API Key]", None | |
| client = OpenAI(api_key=key) | |
| try: | |
| resp = client.chat.completions.create( | |
| model="gpt-4o", max_tokens=max_tokens, messages=messages, temperature=0.3 | |
| ) | |
| usage = {"input": resp.usage.prompt_tokens, "output": resp.usage.completion_tokens} | |
| return resp.choices[0].message.content, usage | |
| except Exception as e: | |
| return f"[OpenAI Error: {e}]", None | |
| # 2. Local Path | |
| else: | |
| model_map = { | |
| "Granite 4 (IBM)": "granite4:latest", | |
| "Llama 3.2 (Meta)": "llama3.2:latest", | |
| "Gemma 3 (Google)": "gemma3:latest" | |
| } | |
| tech_name = model_map.get(model_choice) | |
| if not tech_name: return "[Error: Model Map Failed]", None | |
| url = f"{API_URL_ROOT}/generate" | |
| # Flatten history for Ollama | |
| hist = "" | |
| sys_msg = "You are a helpful assistant." | |
| for m in messages: | |
| if m['role']=='system': sys_msg = m['content'] | |
| elif m['role']=='user': hist += f"User: {m['content']}\n" | |
| elif m['role']=='assistant': hist += f"Assistant: {m['content']}\n" | |
| hist += "Assistant: " | |
| try: | |
| r = requests.post(url, json={"text": hist, "persona": sys_msg, "max_tokens": max_tokens, "model": tech_name}, timeout=300) | |
| if r.status_code == 200: | |
| d = r.json() | |
| return d.get("response", ""), d.get("usage", {"input":0,"output":0}) | |
| return f"[Local Error {r.status_code}]", None | |
| except Exception as e: | |
| return f"[Conn Error: {e}]", None | |
| def update_sidebar_metrics(): | |
| # Helper to safely update metrics if placeholder exists | |
| if metric_placeholder: | |
| stats = tracker.get_daily_stats() | |
| u_stats = stats["users"].get(st.session_state.username, {"input":0, "output":0}) | |
| metric_placeholder.metric("My Tokens Today", u_stats["input"] + u_stats["output"]) | |
| # --- LOGIN --- | |
| if "authentication_status" not in st.session_state or st.session_state["authentication_status"] is None: | |
| login_tab, register_tab = st.tabs(["π Login", "π Register"]) | |
| with login_tab: | |
| if tracker.check_login(): | |
| # Session Isolation Logic | |
| if "last_user" in st.session_state and st.session_state.last_user != st.session_state.username: | |
| st.session_state.messages = [] | |
| st.session_state.user_openai_key = None | |
| st.session_state.last_user = st.session_state.username | |
| tracker.download_user_db(st.session_state.username) | |
| st.rerun() | |
| with register_tab: | |
| st.header("Create Account") | |
| with st.form("reg_form"): | |
| new_user = st.text_input("Username") | |
| new_name = st.text_input("Display Name") | |
| new_email = st.text_input("Email") | |
| new_pwd = st.text_input("Password", type="password") | |
| invite = st.text_input("Invitation Passcode") | |
| if st.form_submit_button("Register"): | |
| success, msg = tracker.register_user(new_email, new_user, new_name, new_pwd, invite) | |
| if success: | |
| st.success(msg) | |
| else: | |
| st.error(msg) | |
| if not st.session_state.get("authentication_status"): st.stop() | |
| # --- SIDEBAR --- | |
| metric_placeholder = None | |
| with st.sidebar: | |
| st.header("π€ User Profile") | |
| st.write(f"Welcome, **{st.session_state.name}**") | |
| st.header("π Usage Tracker") | |
| metric_placeholder = st.empty() | |
| # Admin Tools | |
| if "admin" in st.session_state.roles: | |
| st.divider() | |
| st.header("π‘οΈ Admin Tools") | |
| log_path = tracker.get_log_path() | |
| if log_path.exists(): | |
| with open(log_path, "r") as f: | |
| log_data = f.read() | |
| st.download_button( | |
| label="π₯ Download Usage Logs", | |
| data=log_data, | |
| file_name=f"usage_log_{datetime.now().strftime('%Y-%m-%d')}.json", | |
| mime="application/json" | |
| ) | |
| st.divider() | |
| # Model Selector | |
| st.header("π§ Intelligence") | |
| model_map = { | |
| "Granite 4 (IBM)": "granite4:latest", | |
| "Llama 3.2 (Meta)": "llama3.2:latest", | |
| "Gemma 3 (Google)": "gemma3:latest" | |
| } | |
| opts = list(model_map.keys()) | |
| model_captions = ["Slower, free, private" for _ in opts] | |
| # Vision Key Input (User or Admin) | |
| is_admin = "admin" in st.session_state.roles | |
| user_key = None | |
| if not is_admin: | |
| user_key = st.text_input( | |
| "π Unlock GPT-4o (Enter API Key)", | |
| type="password", | |
| key=f"key_{st.session_state.username}", | |
| help="Required for Vision Mode and GPT-4o." | |
| ) | |
| if user_key: | |
| st.session_state.user_openai_key = user_key | |
| st.caption("β Key Active") | |
| else: | |
| st.session_state.user_openai_key = None | |
| else: | |
| # Admin defaults to system key, but we ensure state is clean | |
| st.session_state.user_openai_key = None | |
| # Unlock GPT-4o option | |
| if is_admin or st.session_state.get("user_openai_key"): | |
| opts.append("GPT-4o (Omni)") | |
| model_captions.append("Fast, smart, sends data to OpenAI") | |
| model_choice = st.radio("Select Model:", opts, captions=model_captions, key="model_selector_radio") | |
| st.info(f"Connected to: **{model_choice}**") | |
| st.divider() | |
| if st.session_state.authenticator: | |
| st.session_state.authenticator.logout(location='sidebar') | |
| st.divider() | |
| st.subheader("π§ System Diagnostics") | |
| if st.button("Run Integration Test"): | |
| with st.spinner("Running diagnostics..."): | |
| # Create a buffer to capture the text that would normally be printed | |
| f = io.StringIO() | |
| # Redirect 'print' statements to our buffer instead of the console | |
| try: | |
| with contextlib.redirect_stdout(f): | |
| run_tests() | |
| # Display the result in a code block for easy reading | |
| st.success("Tests Completed") | |
| st.code(f.getvalue(), language="text") | |
| except Exception as e: | |
| st.error(f"Test Execution Failed: {e}") | |
| update_sidebar_metrics() | |
| # --- MAIN APP --- | |
| st.title("β Navy AI Toolkit") | |
| tab1, tab2 = st.tabs(["π¬ Chat Playground", "π Knowledge & Tools"]) | |
| # === TAB 1: CHAT === | |
| with tab1: | |
| st.header("Discussion & Analysis") | |
| if "messages" not in st.session_state: st.session_state.messages = [] | |
| c1, c2 = st.columns([3, 1]) | |
| with c1: st.caption(f"Active Model: **{st.session_state.get('model_selector_radio', 'Granite')}**") | |
| with c2: use_rag = st.toggle("Enable Knowledge Base", value=False) | |
| for msg in st.session_state.messages: | |
| with st.chat_message(msg["role"]): st.markdown(msg["content"]) | |
| if prompt := st.chat_input("Input command..."): | |
| st.session_state.messages.append({"role": "user", "content": prompt}) | |
| with st.chat_message("user"): st.markdown(prompt) | |
| # RAG Search | |
| context_txt = "" | |
| # 1. Default System Prompt (No RAG) | |
| sys_p = "You are a helpful AI assistant." | |
| if use_rag: | |
| with st.spinner("Searching Knowledge Base..."): | |
| docs = rag_engine.search_knowledge_base(prompt, st.session_state.username) | |
| if docs: | |
| # 2. Strict System Prompt (With RAG) | |
| # We relax the strictness slightly to allow for inference, | |
| # while still demanding evidence. | |
| sys_p = ( | |
| "You are a Navy Document Analyst. " | |
| "You must answer the user's question based PRIMARILY on the provided Context. " | |
| "If the Context contains the answer, output it clearly. " | |
| "If the Context does NOT contain the answer, simply state: " | |
| "'I cannot find that specific information in the documents provided.'" | |
| ) | |
| # 3. XML-Formatted Context Construction | |
| # This helps the model "see" the start and end of each chunk clearly. | |
| for i, d in enumerate(docs): | |
| src = d.metadata.get('source', 'Unknown') | |
| context_txt += f"<document index='{i+1}' source='{src}'>\n{d.page_content}\n</document>\n" | |
| # 4. Construct Final User Payload | |
| if context_txt: | |
| final_prompt = ( | |
| f"User Question: {prompt}\n\n" | |
| f"<context>\n{context_txt}\n</context>\n\n" | |
| "Instruction: Answer the question using the context above." | |
| ) | |
| else: | |
| final_prompt = prompt | |
| # Generation | |
| with st.chat_message("assistant"): | |
| with st.spinner("Thinking..."): | |
| # Memory Window | |
| hist = [{"role":"system", "content":sys_p}] + st.session_state.messages[-6:-1] + [{"role":"user", "content":final_prompt}] | |
| resp, usage = query_model_universal(hist, 2000, model_choice, st.session_state.get("user_openai_key")) | |
| st.markdown(resp) | |
| if usage: | |
| m_name = "GPT-4o" if "GPT-4o" in model_choice else model_choice.split()[0] | |
| tracker.log_usage(m_name, usage["input"], usage["output"]) | |
| update_sidebar_metrics() | |
| st.session_state.messages.append({"role": "assistant", "content": resp}) | |
| if use_rag and context_txt: | |
| with st.expander("π View Context Used"): | |
| st.text(context_txt) | |
| # === TAB 2: KNOWLEDGE & TOOLS === | |
| with tab2: | |
| st.header("Document Processor") | |
| c1, c2 = st.columns([1, 1]) | |
| with c1: | |
| uploaded_file = st.file_uploader("Upload File (PDF, PPT, Doc, Text)", type=["pdf", "docx", "pptx", "txt", "md"]) | |
| with c2: | |
| use_vision = st.toggle("ποΈ Enable Vision Mode", help="Use GPT-4o to read diagrams/tables. Requires API Key.") | |
| if use_vision and "GPT-4o" not in opts: | |
| st.warning("Vision requires OpenAI Access.") | |
| if uploaded_file: | |
| # Save temp | |
| temp_path = rag_engine.save_uploaded_file(uploaded_file) | |
| # ACTION BAR | |
| col_a, col_b, col_c = st.columns(3) | |
| # 1. ADD TO DB (With Strategy Selection) | |
| with col_a: | |
| chunk_strategy = st.selectbox( | |
| "Chunking Strategy", | |
| ["paragraph", "token"], # Removed 'page' as it is not implemented in new engine yet | |
| help="Paragraph: Standard. Token: Dense text.", | |
| key="chunk_selector" | |
| ) | |
| if st.button("π₯ Add to Knowledge Base", type="primary"): | |
| with st.spinner("Ingesting..."): | |
| # Note: New engine uses internal Tesseract OCR, not GPT-4o Vision | |
| # so we don't pass vision flags or keys here anymore. | |
| ok, msg = rag_engine.ingest_file( | |
| file_path=temp_path, | |
| username=st.session_state.username, | |
| strategy=chunk_strategy | |
| ) | |
| if ok: | |
| tracker.upload_user_db(st.session_state.username) # Auto-Sync | |
| st.success(msg) | |
| else: | |
| st.error(msg) | |
| # 2. SUMMARIZE | |
| with col_b: | |
| # Spacer to align buttons visually since col_a has a selectbox | |
| st.write("") | |
| st.write("") | |
| if st.button("π Summarize Document"): | |
| with st.spinner("Reading & Summarizing..."): | |
| key = st.session_state.get("user_openai_key") or OPENAI_KEY | |
| # Extract raw text first | |
| class FileObj: | |
| def __init__(self, p, n): self.path=p; self.name=n | |
| def read(self): | |
| with open(self.path, "rb") as f: return f.read() | |
| # Extraction | |
| raw = doc_loader.extract_text_from_file( | |
| FileObj(temp_path, uploaded_file.name), | |
| use_vision=use_vision, api_key=key | |
| ) | |
| # Call LLM | |
| prompt = f"Summarize this document into a key executive brief:\n\n{raw[:20000]}" # Truncate for safety | |
| msgs = [{"role":"user", "content": prompt}] | |
| summ, usage = query_model_universal(msgs, 1000, model_choice, st.session_state.get("user_openai_key")) | |
| st.subheader("Summary Result") | |
| st.markdown(summ) | |
| if usage: | |
| m_name = "GPT-4o" if "GPT-4o" in model_choice else model_choice.split()[0] | |
| tracker.log_usage(m_name, usage["input"], usage["output"]) | |
| update_sidebar_metrics() | |
| # 3. FLATTEN | |
| with col_c: | |
| # Spacer to align buttons | |
| st.write("") | |
| st.write("") | |
| # We use a session state variable to store the result so it persists for the "Index" step | |
| if "flattened_result" not in st.session_state: | |
| st.session_state.flattened_result = None | |
| if st.button("π Flatten Context"): | |
| with st.spinner("Flattening..."): | |
| key = st.session_state.get("user_openai_key") or OPENAI_KEY | |
| # A. Extract | |
| with open(temp_path, "rb") as f: | |
| class Wrapper: | |
| def __init__(self, data, n): self.data=data; self.name=n | |
| def read(self): return self.data | |
| raw = doc_loader.extract_text_from_file( | |
| Wrapper(f.read(), uploaded_file.name), use_vision=use_vision, api_key=key | |
| ) | |
| # B. Parse | |
| proc = OutlineProcessor(raw) | |
| items = proc.parse() | |
| # C. Flatten | |
| out_txt = [] | |
| bar = st.progress(0) | |
| for i, item in enumerate(items): | |
| p = f"Context: {item['context']}\nTarget: {item['target']}\nRewrite as one sentence." | |
| m = [{"role":"user", "content": p}] | |
| res, _ = query_model_universal(m, 300, model_choice, st.session_state.get("user_openai_key")) | |
| out_txt.append(res) | |
| bar.progress((i+1)/len(items)) | |
| # D. Store Result in Session State | |
| final_flattened_text = "\n".join(out_txt) | |
| st.session_state.flattened_result = { | |
| "text": final_flattened_text, | |
| "source": f"{uploaded_file.name}_flat" | |
| } | |
| st.rerun() # Refresh to show the new result/buttons | |
| # Display Result & Index Option | |
| if st.session_state.flattened_result: | |
| res = st.session_state.flattened_result | |
| st.success("Flattening Complete!") | |
| st.text_area("Result", res["text"], height=200) | |
| # The New Button | |
| if st.button("π₯ Index This Flattened Version"): | |
| with st.spinner("Indexing Flattened Text..."): | |
| ok, msg = rag_engine.process_and_add_text( | |
| res["text"], | |
| res["source"], | |
| st.session_state.username | |
| ) | |
| if ok: | |
| tracker.upload_user_db(st.session_state.username) # Sync! | |
| st.success(msg) | |
| else: | |
| st.error(msg) | |
| st.divider() | |
| # DB MANAGER | |
| st.subheader("Database Management") | |
| docs = rag_engine.list_documents(st.session_state.username) | |
| if docs: | |
| for d in docs: | |
| c1, c2 = st.columns([4,1]) | |
| c1.text(f"π {d['filename']} ({d['chunks']} chunks)") | |
| if c2.button("ποΈ", key=d['source']): | |
| rag_engine.delete_document(st.session_state.username, d['source']) | |
| tracker.upload_user_db(st.session_state.username) | |
| st.rerun() | |
| else: | |
| st.info("Database Empty.") |