# app.py """ ContextPilot Gradio App ======================= This is the main application file for ContextPilot - an autonomous context engineering system for LLM conversations. ARCHITECTURE OVERVIEW --------------------- ContextPilot uses a two-LLM architecture to optimize token usage: 1. CONTEXT_LLM (smaller, efficient): - Configured in context_pilot_workflow.py - Used for topic detection and context management decisions - Example: openai/gpt-oss-20b 2. RESPONSE_LLM (capable, quality): - Configured here in app.py - Used for generating actual responses to user queries - Example: openai/gpt-oss-120b TWO CONTEXT MODES ----------------- ContextPilot supports two modes for storing conversation context: SUMMARY MODE (default): - Stores a summary + key facts for each topic - When returning to a topic, the summary is injected into system prompt - Most token-efficient option - Best for: general conversations, FAQ-style interactions - Trade-off: some detail may be lost in summarization Flow: 1. User message → Topic detection (no change) 2. Curated context = System prompt + topic summary + session messages + user message 3. Response LLM generates answer 4. Session messages buffered for continuity 5. On topic change → LLM generates summary → stored in contexts FULL MODE: - Stores the complete message history for each topic - When returning to a topic, full history is restored as chat messages - Maximum context preservation - Best for: technical discussions, debugging, detailed Q&A - Trade-off: uses more tokens when returning to topics with long history Flow: 1. User message → Topic detection (no change) 2. Curated context = System prompt + stored full history + session messages + user message 3. Response LLM generates answer 4. Session messages buffered (full user/assistant messages) 5. On topic change → full session history → stored in contexts CONTEXT SUMMARY EXTRACTION -------------------------- The response LLM is instructed to generate a compact context summary at the end of each response using special tags: Q: user question | A: brief answer This summary is: - Extracted and hidden from the user - Used for efficient context storage - Helps the topic detection LLM understand conversation flow TOKEN SAVINGS TRACKING ---------------------- The app tracks and displays: - Curated Tokens: What was sent to the response LLM - Full Context Tokens: What would be sent without curation - Tokens Saved: The difference (your savings!) - Detection Tokens: Tokens used by topic detection LLM """ import gradio as gr import asyncio import os import sys import json import time from pathlib import Path from dotenv import load_dotenv from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client from openai import OpenAI load_dotenv() # ============================================================================= # Configuration # ============================================================================= CONTEXT_STORE_PATH = Path(__file__).parent / ".context_store.json" # LLM Client for generating responses (more capable model) # RESPONSE_LLM: Used for generating actual responses (higher quality) # CONTEXT_LLM: Used for topic detection in workflow (cheaper, configured there) RESPONSE_LLM_MODEL = os.getenv("RESPONSE_LLM", "openai/gpt-oss-120b") NEBIUS_BASE_URL = os.getenv("NEBIUS_BASE_URL") NEBIUS_API_KEY = os.getenv("NEBIUS_API_KEY") # Validate required environment variables if not NEBIUS_BASE_URL: print("WARNING: NEBIUS_BASE_URL not set. Response generation will fail.") if not NEBIUS_API_KEY: print("WARNING: NEBIUS_API_KEY not set. Response generation will fail.") print(f"[ContextPilot] Response LLM: {RESPONSE_LLM_MODEL}") print(f"[ContextPilot] API Base: {NEBIUS_BASE_URL}") print(f"[ContextPilot] API Key set: {bool(NEBIUS_API_KEY)}") # Threshold for summarizing responses before storing in context (in characters) # Responses longer than this will be summarized to reduce context size SUMMARIZE_THRESHOLD = int(os.getenv("SUMMARIZE_THRESHOLD", "500")) llm_client = OpenAI( api_key=NEBIUS_API_KEY, base_url=NEBIUS_BASE_URL, ) LLM_MODEL = RESPONSE_LLM_MODEL # Context summary tag that the LLM will use CONTEXT_SUMMARY_TAG = "" CONTEXT_SUMMARY_END_TAG = "" def add_context_summary_instruction(messages: list[dict]) -> list[dict]: """ Add instruction to the system prompt asking the LLM to generate a compact context summary at the end of its response. """ if not messages: return messages instruction = ( "\n\n[IMPORTANT: At the very end of your response, add a brief context line in this exact format:\n" f"{CONTEXT_SUMMARY_TAG}Q: | A: {CONTEXT_SUMMARY_END_TAG}\n" "This helps track conversation context. The user won't see this tag.]" ) # Clone messages and modify system prompt modified = [] for msg in messages: if msg.get("role") == "system": modified.append({ "role": "system", "content": msg.get("content", "") + instruction }) else: modified.append(msg) return modified def extract_context_summary(response: str) -> tuple[str, str]: """ Extract the context summary from the response and return (clean_response, context_summary). """ if CONTEXT_SUMMARY_TAG not in response: # No summary tag, create a simple one return response, "" try: start = response.index(CONTEXT_SUMMARY_TAG) end = response.index(CONTEXT_SUMMARY_END_TAG) + len(CONTEXT_SUMMARY_END_TAG) context_summary = response[start + len(CONTEXT_SUMMARY_TAG):end - len(CONTEXT_SUMMARY_END_TAG)].strip() clean_response = (response[:start] + response[end:]).strip() return clean_response, context_summary except ValueError: return response, "" def summarize_for_context(text: str, max_chars: int = 300) -> str: """ Summarize long text before storing in context. This keeps the context small for the smaller context detection model. Fallback when LLM doesn't generate a context summary tag. """ if len(text) <= SUMMARIZE_THRESHOLD: return text try: result = llm_client.chat.completions.create( model=LLM_MODEL, messages=[ { "role": "system", "content": f"Summarize the following text in under {max_chars} characters. " "Keep the key information and main points. Be concise." }, {"role": "user", "content": text} ], max_tokens=150, ) summary = result.choices[0].message.content.strip() return f"[Summary] {summary}" except Exception as e: # If summarization fails, truncate instead return text[:max_chars] + "..." # ============================================================================= # Context Store Helpers # ============================================================================= def clear_context_store(): """Clear the persistent context store.""" if CONTEXT_STORE_PATH.exists(): CONTEXT_STORE_PATH.unlink() # Returns: chatbot, current_messages, curated, full, saved_this, pct, detection, topic, switches, stored, contexts, logs return ([], [], 0, 0, 0, 0, 0, "", 0, 0, [], "🗑️ All contexts cleared!") def get_current_mode() -> str: """Get the current context mode from store.""" if CONTEXT_STORE_PATH.exists(): try: store = json.loads(CONTEXT_STORE_PATH.read_text()) return store.get("mode", "summary") except (json.JSONDecodeError, IOError): pass return "summary" def set_context_mode(mode: str): """Set the context mode and clear all contexts.""" # Create fresh store with new mode store = { "contexts": {}, "current_topic": None, "mode": mode, "current_session_messages": [], "stats": {"total_tokens": 0, "tokens_saved": 0, "context_switches": 0, "cumulative_full_tokens": 0, "cumulative_tokens_saved": 0} } CONTEXT_STORE_PATH.write_text(json.dumps(store, indent=2)) return store def load_current_contexts(): """Load current contexts from disk for display.""" if CONTEXT_STORE_PATH.exists(): try: store = json.loads(CONTEXT_STORE_PATH.read_text()) contexts = store.get("contexts", {}) stats = store.get("stats", {}) current_topic = store.get("current_topic", "") stored_contexts_data = [ { "topic": topic, "summary": ctx.get("summary", ""), "key_facts": ctx.get("key_facts", []), "tokens": ctx.get("tokens", 0), "is_current": topic == current_topic } for topic, ctx in contexts.items() ] return ( current_topic, stats.get("total_tokens", 0), stats.get("tokens_saved", 0), stats.get("context_switches", 0), len(contexts), stored_contexts_data, ) except (json.JSONDecodeError, IOError): pass return "", 0, 0, 0, 0, [] # ============================================================================= # MCP Client (for context curation) # ============================================================================= async def call_context_curator(message: str, chat_history: list) -> dict: """Call the MCP server to curate context (detect topic, build messages).""" # Pass current environment to subprocess so it gets the API keys env = os.environ.copy() server_params = StdioServerParameters( command=sys.executable, args=[os.path.join(os.path.dirname(__file__), "mcp_server.py")], env=env, # Pass environment variables to subprocess ) async with stdio_client(server_params, errlog=sys.stderr) as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: await session.initialize() result = await session.call_tool("context-pilot", { "run_args": { "msg": message, "chat_history": chat_history } }) # Extract text content from MCP response if result.content and len(result.content) > 0: for content in result.content: if hasattr(content, 'text') and content.text: try: return json.loads(content.text) except json.JSONDecodeError as e: print(f"JSON decode error: {e}") print(f"Raw content: {content.text[:500] if len(content.text) > 500 else content.text}") return {"error": f"Invalid JSON response: {e}"} print(f"No valid content in MCP response: {result}") return {"error": "No content in MCP response"} # ============================================================================= # LLM Response Generation # ============================================================================= def generate_response_stream(curated_messages: list[dict]): """Generate streaming LLM response from curated messages.""" # Add context summary instruction to get compact context from LLM messages_with_instruction = add_context_summary_instruction(curated_messages) try: stream = llm_client.chat.completions.create( model=LLM_MODEL, messages=messages_with_instruction, stream=True, ) for chunk in stream: if chunk.choices and chunk.choices[0].delta.content: yield chunk.choices[0].delta.content except Exception as e: yield f"Error generating response: {e}" # ============================================================================= # Chat Handler # ============================================================================= async def curate_context(message: str, history: list) -> tuple[list[dict], list, dict]: """ Curate context for the message (async). Returns curated_messages, decisions, and stats. """ # Convert Gradio history format chat_history = [h for h in history if isinstance(h, dict)] # Call context curator curation_result = await call_context_curator(message, chat_history) if not curation_result or "error" in curation_result: error_msg = curation_result.get("error", "Unknown error") if curation_result else "No response" print(f"Curation error: {error_msg}") return [], [f"⚠️ Error: {error_msg}"], {"logs": [f"Error: {error_msg}"]} return ( curation_result.get("curated_messages", []), curation_result.get("decisions", []), curation_result.get("stats", {}) ) def append_to_session_buffer(message: str, response: str): """ Append the exchange to the session buffer. This is a temporary buffer for the current topic's conversation. When topic changes, the workflow's save_context tool will move this buffer to permanent storage. Both modes buffer messages so the LLM has context for continuation. """ if not CONTEXT_STORE_PATH.exists(): return try: store = json.loads(CONTEXT_STORE_PATH.read_text()) if "current_session_messages" not in store: store["current_session_messages"] = [] # Store user message and response store["current_session_messages"].append({"role": "user", "content": message}) store["current_session_messages"].append({"role": "assistant", "content": response}) CONTEXT_STORE_PATH.write_text(json.dumps(store, indent=2)) except (json.JSONDecodeError, IOError) as e: print(f"Error appending to session buffer: {e}") def count_tokens(text: str) -> int: """Estimate token count.""" return len(text) // 4 if text else 0 def respond_stream(message: str, chat_history: list): """ Streaming chat handler: 1. Curate context (non-streaming) 2. Stream LLM response 3. In full mode, append messages to session store """ if not message.strip(): yield (chat_history, [], 0, 0, 0, 0, 0, "", 0, 0, [], "") return # Step 1: Curate context (blocking) curated_messages, decisions, stats = asyncio.run( curate_context(message, chat_history) ) if not curated_messages: chat_history.append({"role": "user", "content": message}) chat_history.append({"role": "assistant", "content": "Error: Could not curate context"}) yield (chat_history, [], 0, 0, 0, 0, 0, "", 0, 0, [], "") return # Add user message to UI chat chat_history.append({"role": "user", "content": message}) # Add decisions as assistant message if any if decisions: decisions_text = "\n".join(decisions) chat_history.append({"role": "assistant", "content": decisions_text}) # Step 2: Stream LLM response chat_history.append({"role": "assistant", "content": ""}) logs = "\n".join(stats.get("logs", [])) stored_contexts_data = stats.get("stored_contexts_data", []) # Collect full response for session messages full_response = "" # Stream the response for chunk in generate_response_stream(curated_messages): chat_history[-1]["content"] += chunk full_response += chunk yield ( chat_history, curated_messages, # Current messages sent to LLM stats.get("curated_tokens", 0), stats.get("full_context_tokens", 0), stats.get("tokens_saved_this_request", 0), stats.get("savings_percent", 0), stats.get("detection_tokens", 0), stats.get("current_topic", ""), stats.get("context_switches", 0), stats.get("stored_contexts", 0), stored_contexts_data, logs, ) # After streaming completes: # 1. Extract context summary from response (LLM generates this) # 2. Clean the response for display # 3. Store the compact context summary instead of full response clean_response, context_summary = extract_context_summary(full_response) # Update the chat history with clean response (remove context summary tag) if context_summary: chat_history[-1]["content"] = clean_response # Final yield with cleaned response yield ( chat_history, curated_messages, stats.get("curated_tokens", 0), stats.get("full_context_tokens", 0), stats.get("tokens_saved_this_request", 0), stats.get("savings_percent", 0), stats.get("detection_tokens", 0), stats.get("current_topic", ""), stats.get("context_switches", 0), stats.get("stored_contexts", 0), stored_contexts_data, logs, ) # In full mode, buffer the exchange for when topic changes # In summary mode, we don't buffer - LLM generates summary on topic change append_to_session_buffer(message, clean_response) # ============================================================================= # Gradio Interface # ============================================================================= with gr.Blocks() as demo: gr.Markdown( """ # 🧭 ContextPilot ### Autonomous Context Engineering for LLM Conversations ContextPilot automatically detects topic shifts and manages conversation context to provide more relevant responses. """ ) # Mode toggle section with gr.Row(): with gr.Column(scale=3): mode_radio = gr.Radio( choices=["summary", "full"], value=get_current_mode(), label="Context Mode", info="Summary: saves key facts only | Full: saves complete message history", interactive=True, ) with gr.Column(scale=1): mode_status = gr.Textbox( value=f"Current mode: {get_current_mode()}", label="Status", interactive=False, show_label=False, ) with gr.Row(): with gr.Column(scale=3): chatbot = gr.Chatbot(label="Chat", height=450) with gr.Row(): msg = gr.Textbox( label="Message", placeholder="Type your message here...", scale=4, show_label=False, ) submit = gr.Button("Send", variant="primary", scale=1) with gr.Row(): clear = gr.Button("Clear Chat") clear_contexts = gr.Button("🗑️ Clear All Contexts", variant="stop") with gr.Accordion("📨 Current Request (Messages Sent to LLM)", open=False): current_messages_display = gr.JSON(label="Curated Messages", show_label=False) with gr.Column(scale=1): with gr.Tabs(): with gr.Tab("💰 Token Savings"): gr.Markdown("### Response LLM (This Request)") with gr.Row(): curated_tokens = gr.Number(label="Curated (sent)", interactive=False) full_tokens = gr.Number(label="Full (would be)", interactive=False) with gr.Row(): saved_this = gr.Number(label="Tokens Saved", interactive=False) savings_pct = gr.Number(label="Savings %", interactive=False) gr.Markdown("### Topic Detection") detection_tokens = gr.Number(label="Detection Tokens", interactive=False) with gr.Tab("📊 Stats"): current_topic = gr.Textbox(label="Current Topic", interactive=False) context_switches = gr.Number(label="Context Switches", interactive=False) stored_contexts = gr.Number(label="Stored Contexts", interactive=False) with gr.Tab("📚 Stored Contexts"): contexts_display = gr.JSON(label="Context Store", show_label=False) refresh_contexts = gr.Button("🔄 Refresh") with gr.Tab("📋 Logs"): logs_display = gr.Textbox( label="Workflow Logs", interactive=False, lines=12, show_label=False, ) # Event handlers def respond(message: str, chat_history: list): """Wrapper that clears input and starts streaming.""" # Return empty message immediately to clear input # The actual response is handled by respond_stream for result in respond_stream(message, chat_history): yield ("",) + result def clear_chat(): return ([], [], 0, 0, 0, 0, 0, "", 0, 0, [], "") def refresh_contexts_display(): """Refresh the contexts display from disk.""" current, total, saved, switches, count, contexts = load_current_contexts() return current, switches, count, contexts def switch_mode(new_mode: str): """Switch context mode and clear all data.""" set_context_mode(new_mode) status = f"✅ Switched to {new_mode} mode. All contexts cleared." # Return cleared state for all outputs return ( status, # mode_status [], # chatbot [], # current_messages_display 0, 0, 0, 0, # token stats this request (curated, full, saved, pct) 0, # detection_tokens "", # current_topic 0, 0, # switches, stored [], # contexts_display f"🔄 Mode switched to: {new_mode}", # logs_display ) # Wire up events outputs = [ msg, chatbot, current_messages_display, curated_tokens, full_tokens, saved_this, savings_pct, detection_tokens, current_topic, context_switches, stored_contexts, contexts_display, logs_display ] clear_outputs = [ chatbot, current_messages_display, curated_tokens, full_tokens, saved_this, savings_pct, detection_tokens, current_topic, context_switches, stored_contexts, contexts_display, logs_display ] msg.submit(respond, [msg, chatbot], outputs) submit.click(respond, [msg, chatbot], outputs) clear.click(clear_chat, None, clear_outputs) clear_contexts.click(clear_context_store, None, clear_outputs) refresh_contexts.click(refresh_contexts_display, None, [current_topic, context_switches, stored_contexts, contexts_display]) # Mode switch clears everything mode_outputs = [ mode_status, chatbot, current_messages_display, curated_tokens, full_tokens, saved_this, savings_pct, detection_tokens, current_topic, context_switches, stored_contexts, contexts_display, logs_display ] mode_radio.change(switch_mode, [mode_radio], mode_outputs) if __name__ == "__main__": demo.launch()