Spaces:
Sleeping
Sleeping
| # app.py | |
| """ | |
| ContextPilot Gradio App | |
| ======================= | |
| This is the main application file for ContextPilot - an autonomous context | |
| engineering system for LLM conversations. | |
| ARCHITECTURE OVERVIEW | |
| --------------------- | |
| ContextPilot uses a two-LLM architecture to optimize token usage: | |
| 1. CONTEXT_LLM (smaller, efficient): | |
| - Configured in context_pilot_workflow.py | |
| - Used for topic detection and context management decisions | |
| - Example: openai/gpt-oss-20b | |
| 2. RESPONSE_LLM (capable, quality): | |
| - Configured here in app.py | |
| - Used for generating actual responses to user queries | |
| - Example: openai/gpt-oss-120b | |
| TWO CONTEXT MODES | |
| ----------------- | |
| ContextPilot supports two modes for storing conversation context: | |
| SUMMARY MODE (default): | |
| - Stores a summary + key facts for each topic | |
| - When returning to a topic, the summary is injected into system prompt | |
| - Most token-efficient option | |
| - Best for: general conversations, FAQ-style interactions | |
| - Trade-off: some detail may be lost in summarization | |
| Flow: | |
| 1. User message β Topic detection (no change) | |
| 2. Curated context = System prompt + topic summary + session messages + user message | |
| 3. Response LLM generates answer | |
| 4. Session messages buffered for continuity | |
| 5. On topic change β LLM generates summary β stored in contexts | |
| FULL MODE: | |
| - Stores the complete message history for each topic | |
| - When returning to a topic, full history is restored as chat messages | |
| - Maximum context preservation | |
| - Best for: technical discussions, debugging, detailed Q&A | |
| - Trade-off: uses more tokens when returning to topics with long history | |
| Flow: | |
| 1. User message β Topic detection (no change) | |
| 2. Curated context = System prompt + stored full history + session messages + user message | |
| 3. Response LLM generates answer | |
| 4. Session messages buffered (full user/assistant messages) | |
| 5. On topic change β full session history β stored in contexts | |
| CONTEXT SUMMARY EXTRACTION | |
| -------------------------- | |
| The response LLM is instructed to generate a compact context summary at the | |
| end of each response using special tags: | |
| <context_summary>Q: user question | A: brief answer</context_summary> | |
| This summary is: | |
| - Extracted and hidden from the user | |
| - Used for efficient context storage | |
| - Helps the topic detection LLM understand conversation flow | |
| TOKEN SAVINGS TRACKING | |
| ---------------------- | |
| The app tracks and displays: | |
| - Curated Tokens: What was sent to the response LLM | |
| - Full Context Tokens: What would be sent without curation | |
| - Tokens Saved: The difference (your savings!) | |
| - Detection Tokens: Tokens used by topic detection LLM | |
| """ | |
| import gradio as gr | |
| import asyncio | |
| import os | |
| import sys | |
| import json | |
| import time | |
| from pathlib import Path | |
| from dotenv import load_dotenv | |
| from mcp import ClientSession, StdioServerParameters | |
| from mcp.client.stdio import stdio_client | |
| from openai import OpenAI | |
| load_dotenv() | |
| # ============================================================================= | |
| # Configuration | |
| # ============================================================================= | |
| CONTEXT_STORE_PATH = Path(__file__).parent / ".context_store.json" | |
| # LLM Client for generating responses (more capable model) | |
| # RESPONSE_LLM: Used for generating actual responses (higher quality) | |
| # CONTEXT_LLM: Used for topic detection in workflow (cheaper, configured there) | |
| RESPONSE_LLM_MODEL = os.getenv("RESPONSE_LLM", "openai/gpt-oss-120b") | |
| NEBIUS_BASE_URL = os.getenv("NEBIUS_BASE_URL") | |
| NEBIUS_API_KEY = os.getenv("NEBIUS_API_KEY") | |
| # Validate required environment variables | |
| if not NEBIUS_BASE_URL: | |
| print("WARNING: NEBIUS_BASE_URL not set. Response generation will fail.") | |
| if not NEBIUS_API_KEY: | |
| print("WARNING: NEBIUS_API_KEY not set. Response generation will fail.") | |
| print(f"[ContextPilot] Response LLM: {RESPONSE_LLM_MODEL}") | |
| print(f"[ContextPilot] API Base: {NEBIUS_BASE_URL}") | |
| print(f"[ContextPilot] API Key set: {bool(NEBIUS_API_KEY)}") | |
| # Threshold for summarizing responses before storing in context (in characters) | |
| # Responses longer than this will be summarized to reduce context size | |
| SUMMARIZE_THRESHOLD = int(os.getenv("SUMMARIZE_THRESHOLD", "500")) | |
| llm_client = OpenAI( | |
| api_key=NEBIUS_API_KEY, | |
| base_url=NEBIUS_BASE_URL, | |
| ) | |
| LLM_MODEL = RESPONSE_LLM_MODEL | |
| # Context summary tag that the LLM will use | |
| CONTEXT_SUMMARY_TAG = "<context_summary>" | |
| CONTEXT_SUMMARY_END_TAG = "</context_summary>" | |
| def add_context_summary_instruction(messages: list[dict]) -> list[dict]: | |
| """ | |
| Add instruction to the system prompt asking the LLM to generate | |
| a compact context summary at the end of its response. | |
| """ | |
| if not messages: | |
| return messages | |
| instruction = ( | |
| "\n\n[IMPORTANT: At the very end of your response, add a brief context line in this exact format:\n" | |
| f"{CONTEXT_SUMMARY_TAG}Q: <user's question in 5-10 words> | A: <your answer in 10-20 words>{CONTEXT_SUMMARY_END_TAG}\n" | |
| "This helps track conversation context. The user won't see this tag.]" | |
| ) | |
| # Clone messages and modify system prompt | |
| modified = [] | |
| for msg in messages: | |
| if msg.get("role") == "system": | |
| modified.append({ | |
| "role": "system", | |
| "content": msg.get("content", "") + instruction | |
| }) | |
| else: | |
| modified.append(msg) | |
| return modified | |
| def extract_context_summary(response: str) -> tuple[str, str]: | |
| """ | |
| Extract the context summary from the response and return | |
| (clean_response, context_summary). | |
| """ | |
| if CONTEXT_SUMMARY_TAG not in response: | |
| # No summary tag, create a simple one | |
| return response, "" | |
| try: | |
| start = response.index(CONTEXT_SUMMARY_TAG) | |
| end = response.index(CONTEXT_SUMMARY_END_TAG) + len(CONTEXT_SUMMARY_END_TAG) | |
| context_summary = response[start + len(CONTEXT_SUMMARY_TAG):end - len(CONTEXT_SUMMARY_END_TAG)].strip() | |
| clean_response = (response[:start] + response[end:]).strip() | |
| return clean_response, context_summary | |
| except ValueError: | |
| return response, "" | |
| def summarize_for_context(text: str, max_chars: int = 300) -> str: | |
| """ | |
| Summarize long text before storing in context. | |
| This keeps the context small for the smaller context detection model. | |
| Fallback when LLM doesn't generate a context summary tag. | |
| """ | |
| if len(text) <= SUMMARIZE_THRESHOLD: | |
| return text | |
| try: | |
| result = llm_client.chat.completions.create( | |
| model=LLM_MODEL, | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": f"Summarize the following text in under {max_chars} characters. " | |
| "Keep the key information and main points. Be concise." | |
| }, | |
| {"role": "user", "content": text} | |
| ], | |
| max_tokens=150, | |
| ) | |
| summary = result.choices[0].message.content.strip() | |
| return f"[Summary] {summary}" | |
| except Exception as e: | |
| # If summarization fails, truncate instead | |
| return text[:max_chars] + "..." | |
| # ============================================================================= | |
| # Context Store Helpers | |
| # ============================================================================= | |
| def clear_context_store(): | |
| """Clear the persistent context store.""" | |
| if CONTEXT_STORE_PATH.exists(): | |
| CONTEXT_STORE_PATH.unlink() | |
| # Returns: chatbot, current_messages, curated, full, saved_this, pct, detection, topic, switches, stored, contexts, logs | |
| return ([], [], 0, 0, 0, 0, 0, "", 0, 0, [], "ποΈ All contexts cleared!") | |
| def get_current_mode() -> str: | |
| """Get the current context mode from store.""" | |
| if CONTEXT_STORE_PATH.exists(): | |
| try: | |
| store = json.loads(CONTEXT_STORE_PATH.read_text()) | |
| return store.get("mode", "summary") | |
| except (json.JSONDecodeError, IOError): | |
| pass | |
| return "summary" | |
| def set_context_mode(mode: str): | |
| """Set the context mode and clear all contexts.""" | |
| # Create fresh store with new mode | |
| store = { | |
| "contexts": {}, | |
| "current_topic": None, | |
| "mode": mode, | |
| "current_session_messages": [], | |
| "stats": {"total_tokens": 0, "tokens_saved": 0, "context_switches": 0, | |
| "cumulative_full_tokens": 0, "cumulative_tokens_saved": 0} | |
| } | |
| CONTEXT_STORE_PATH.write_text(json.dumps(store, indent=2)) | |
| return store | |
| def load_current_contexts(): | |
| """Load current contexts from disk for display.""" | |
| if CONTEXT_STORE_PATH.exists(): | |
| try: | |
| store = json.loads(CONTEXT_STORE_PATH.read_text()) | |
| contexts = store.get("contexts", {}) | |
| stats = store.get("stats", {}) | |
| current_topic = store.get("current_topic", "") | |
| stored_contexts_data = [ | |
| { | |
| "topic": topic, | |
| "summary": ctx.get("summary", ""), | |
| "key_facts": ctx.get("key_facts", []), | |
| "tokens": ctx.get("tokens", 0), | |
| "is_current": topic == current_topic | |
| } | |
| for topic, ctx in contexts.items() | |
| ] | |
| return ( | |
| current_topic, | |
| stats.get("total_tokens", 0), | |
| stats.get("tokens_saved", 0), | |
| stats.get("context_switches", 0), | |
| len(contexts), | |
| stored_contexts_data, | |
| ) | |
| except (json.JSONDecodeError, IOError): | |
| pass | |
| return "", 0, 0, 0, 0, [] | |
| # ============================================================================= | |
| # MCP Client (for context curation) | |
| # ============================================================================= | |
| async def call_context_curator(message: str, chat_history: list) -> dict: | |
| """Call the MCP server to curate context (detect topic, build messages).""" | |
| # Pass current environment to subprocess so it gets the API keys | |
| env = os.environ.copy() | |
| server_params = StdioServerParameters( | |
| command=sys.executable, | |
| args=[os.path.join(os.path.dirname(__file__), "mcp_server.py")], | |
| env=env, # Pass environment variables to subprocess | |
| ) | |
| async with stdio_client(server_params, errlog=sys.stderr) as (read_stream, write_stream): | |
| async with ClientSession(read_stream, write_stream) as session: | |
| await session.initialize() | |
| result = await session.call_tool("context-pilot", { | |
| "run_args": { | |
| "msg": message, | |
| "chat_history": chat_history | |
| } | |
| }) | |
| # Extract text content from MCP response | |
| if result.content and len(result.content) > 0: | |
| for content in result.content: | |
| if hasattr(content, 'text') and content.text: | |
| try: | |
| return json.loads(content.text) | |
| except json.JSONDecodeError as e: | |
| print(f"JSON decode error: {e}") | |
| print(f"Raw content: {content.text[:500] if len(content.text) > 500 else content.text}") | |
| return {"error": f"Invalid JSON response: {e}"} | |
| print(f"No valid content in MCP response: {result}") | |
| return {"error": "No content in MCP response"} | |
| # ============================================================================= | |
| # LLM Response Generation | |
| # ============================================================================= | |
| def generate_response_stream(curated_messages: list[dict]): | |
| """Generate streaming LLM response from curated messages.""" | |
| # Add context summary instruction to get compact context from LLM | |
| messages_with_instruction = add_context_summary_instruction(curated_messages) | |
| try: | |
| stream = llm_client.chat.completions.create( | |
| model=LLM_MODEL, | |
| messages=messages_with_instruction, | |
| stream=True, | |
| ) | |
| for chunk in stream: | |
| if chunk.choices and chunk.choices[0].delta.content: | |
| yield chunk.choices[0].delta.content | |
| except Exception as e: | |
| yield f"Error generating response: {e}" | |
| # ============================================================================= | |
| # Chat Handler | |
| # ============================================================================= | |
| async def curate_context(message: str, history: list) -> tuple[list[dict], list, dict]: | |
| """ | |
| Curate context for the message (async). | |
| Returns curated_messages, decisions, and stats. | |
| """ | |
| # Convert Gradio history format | |
| chat_history = [h for h in history if isinstance(h, dict)] | |
| # Call context curator | |
| curation_result = await call_context_curator(message, chat_history) | |
| if not curation_result or "error" in curation_result: | |
| error_msg = curation_result.get("error", "Unknown error") if curation_result else "No response" | |
| print(f"Curation error: {error_msg}") | |
| return [], [f"β οΈ Error: {error_msg}"], {"logs": [f"Error: {error_msg}"]} | |
| return ( | |
| curation_result.get("curated_messages", []), | |
| curation_result.get("decisions", []), | |
| curation_result.get("stats", {}) | |
| ) | |
| def append_to_session_buffer(message: str, response: str): | |
| """ | |
| Append the exchange to the session buffer. | |
| This is a temporary buffer for the current topic's conversation. | |
| When topic changes, the workflow's save_context tool will move this | |
| buffer to permanent storage. | |
| Both modes buffer messages so the LLM has context for continuation. | |
| """ | |
| if not CONTEXT_STORE_PATH.exists(): | |
| return | |
| try: | |
| store = json.loads(CONTEXT_STORE_PATH.read_text()) | |
| if "current_session_messages" not in store: | |
| store["current_session_messages"] = [] | |
| # Store user message and response | |
| store["current_session_messages"].append({"role": "user", "content": message}) | |
| store["current_session_messages"].append({"role": "assistant", "content": response}) | |
| CONTEXT_STORE_PATH.write_text(json.dumps(store, indent=2)) | |
| except (json.JSONDecodeError, IOError) as e: | |
| print(f"Error appending to session buffer: {e}") | |
| def count_tokens(text: str) -> int: | |
| """Estimate token count.""" | |
| return len(text) // 4 if text else 0 | |
| def respond_stream(message: str, chat_history: list): | |
| """ | |
| Streaming chat handler: | |
| 1. Curate context (non-streaming) | |
| 2. Stream LLM response | |
| 3. In full mode, append messages to session store | |
| """ | |
| if not message.strip(): | |
| yield (chat_history, [], 0, 0, 0, 0, 0, "", 0, 0, [], "") | |
| return | |
| # Step 1: Curate context (blocking) | |
| curated_messages, decisions, stats = asyncio.run( | |
| curate_context(message, chat_history) | |
| ) | |
| if not curated_messages: | |
| chat_history.append({"role": "user", "content": message}) | |
| chat_history.append({"role": "assistant", "content": "Error: Could not curate context"}) | |
| yield (chat_history, [], 0, 0, 0, 0, 0, "", 0, 0, [], "") | |
| return | |
| # Add user message to UI chat | |
| chat_history.append({"role": "user", "content": message}) | |
| # Add decisions as assistant message if any | |
| if decisions: | |
| decisions_text = "\n".join(decisions) | |
| chat_history.append({"role": "assistant", "content": decisions_text}) | |
| # Step 2: Stream LLM response | |
| chat_history.append({"role": "assistant", "content": ""}) | |
| logs = "\n".join(stats.get("logs", [])) | |
| stored_contexts_data = stats.get("stored_contexts_data", []) | |
| # Collect full response for session messages | |
| full_response = "" | |
| # Stream the response | |
| for chunk in generate_response_stream(curated_messages): | |
| chat_history[-1]["content"] += chunk | |
| full_response += chunk | |
| yield ( | |
| chat_history, | |
| curated_messages, # Current messages sent to LLM | |
| stats.get("curated_tokens", 0), | |
| stats.get("full_context_tokens", 0), | |
| stats.get("tokens_saved_this_request", 0), | |
| stats.get("savings_percent", 0), | |
| stats.get("detection_tokens", 0), | |
| stats.get("current_topic", ""), | |
| stats.get("context_switches", 0), | |
| stats.get("stored_contexts", 0), | |
| stored_contexts_data, | |
| logs, | |
| ) | |
| # After streaming completes: | |
| # 1. Extract context summary from response (LLM generates this) | |
| # 2. Clean the response for display | |
| # 3. Store the compact context summary instead of full response | |
| clean_response, context_summary = extract_context_summary(full_response) | |
| # Update the chat history with clean response (remove context summary tag) | |
| if context_summary: | |
| chat_history[-1]["content"] = clean_response | |
| # Final yield with cleaned response | |
| yield ( | |
| chat_history, | |
| curated_messages, | |
| stats.get("curated_tokens", 0), | |
| stats.get("full_context_tokens", 0), | |
| stats.get("tokens_saved_this_request", 0), | |
| stats.get("savings_percent", 0), | |
| stats.get("detection_tokens", 0), | |
| stats.get("current_topic", ""), | |
| stats.get("context_switches", 0), | |
| stats.get("stored_contexts", 0), | |
| stored_contexts_data, | |
| logs, | |
| ) | |
| # In full mode, buffer the exchange for when topic changes | |
| # In summary mode, we don't buffer - LLM generates summary on topic change | |
| append_to_session_buffer(message, clean_response) | |
| # ============================================================================= | |
| # Gradio Interface | |
| # ============================================================================= | |
| with gr.Blocks() as demo: | |
| gr.Markdown( | |
| """ | |
| # π§ ContextPilot | |
| ### Autonomous Context Engineering for LLM Conversations | |
| ContextPilot automatically detects topic shifts and manages conversation context | |
| to provide more relevant responses. | |
| """ | |
| ) | |
| # Mode toggle section | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| mode_radio = gr.Radio( | |
| choices=["summary", "full"], | |
| value=get_current_mode(), | |
| label="Context Mode", | |
| info="Summary: saves key facts only | Full: saves complete message history", | |
| interactive=True, | |
| ) | |
| with gr.Column(scale=1): | |
| mode_status = gr.Textbox( | |
| value=f"Current mode: {get_current_mode()}", | |
| label="Status", | |
| interactive=False, | |
| show_label=False, | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| chatbot = gr.Chatbot(label="Chat", height=450) | |
| with gr.Row(): | |
| msg = gr.Textbox( | |
| label="Message", | |
| placeholder="Type your message here...", | |
| scale=4, | |
| show_label=False, | |
| ) | |
| submit = gr.Button("Send", variant="primary", scale=1) | |
| with gr.Row(): | |
| clear = gr.Button("Clear Chat") | |
| clear_contexts = gr.Button("ποΈ Clear All Contexts", variant="stop") | |
| with gr.Accordion("π¨ Current Request (Messages Sent to LLM)", open=False): | |
| current_messages_display = gr.JSON(label="Curated Messages", show_label=False) | |
| with gr.Column(scale=1): | |
| with gr.Tabs(): | |
| with gr.Tab("π° Token Savings"): | |
| gr.Markdown("### Response LLM (This Request)") | |
| with gr.Row(): | |
| curated_tokens = gr.Number(label="Curated (sent)", interactive=False) | |
| full_tokens = gr.Number(label="Full (would be)", interactive=False) | |
| with gr.Row(): | |
| saved_this = gr.Number(label="Tokens Saved", interactive=False) | |
| savings_pct = gr.Number(label="Savings %", interactive=False) | |
| gr.Markdown("### Topic Detection") | |
| detection_tokens = gr.Number(label="Detection Tokens", interactive=False) | |
| with gr.Tab("π Stats"): | |
| current_topic = gr.Textbox(label="Current Topic", interactive=False) | |
| context_switches = gr.Number(label="Context Switches", interactive=False) | |
| stored_contexts = gr.Number(label="Stored Contexts", interactive=False) | |
| with gr.Tab("π Stored Contexts"): | |
| contexts_display = gr.JSON(label="Context Store", show_label=False) | |
| refresh_contexts = gr.Button("π Refresh") | |
| with gr.Tab("π Logs"): | |
| logs_display = gr.Textbox( | |
| label="Workflow Logs", | |
| interactive=False, | |
| lines=12, | |
| show_label=False, | |
| ) | |
| # Event handlers | |
| def respond(message: str, chat_history: list): | |
| """Wrapper that clears input and starts streaming.""" | |
| # Return empty message immediately to clear input | |
| # The actual response is handled by respond_stream | |
| for result in respond_stream(message, chat_history): | |
| yield ("",) + result | |
| def clear_chat(): | |
| return ([], [], 0, 0, 0, 0, 0, "", 0, 0, [], "") | |
| def refresh_contexts_display(): | |
| """Refresh the contexts display from disk.""" | |
| current, total, saved, switches, count, contexts = load_current_contexts() | |
| return current, switches, count, contexts | |
| def switch_mode(new_mode: str): | |
| """Switch context mode and clear all data.""" | |
| set_context_mode(new_mode) | |
| status = f"β Switched to {new_mode} mode. All contexts cleared." | |
| # Return cleared state for all outputs | |
| return ( | |
| status, # mode_status | |
| [], # chatbot | |
| [], # current_messages_display | |
| 0, 0, 0, 0, # token stats this request (curated, full, saved, pct) | |
| 0, # detection_tokens | |
| "", # current_topic | |
| 0, 0, # switches, stored | |
| [], # contexts_display | |
| f"π Mode switched to: {new_mode}", # logs_display | |
| ) | |
| # Wire up events | |
| outputs = [ | |
| msg, chatbot, current_messages_display, | |
| curated_tokens, full_tokens, saved_this, savings_pct, | |
| detection_tokens, | |
| current_topic, context_switches, stored_contexts, | |
| contexts_display, logs_display | |
| ] | |
| clear_outputs = [ | |
| chatbot, current_messages_display, | |
| curated_tokens, full_tokens, saved_this, savings_pct, | |
| detection_tokens, | |
| current_topic, context_switches, stored_contexts, | |
| contexts_display, logs_display | |
| ] | |
| msg.submit(respond, [msg, chatbot], outputs) | |
| submit.click(respond, [msg, chatbot], outputs) | |
| clear.click(clear_chat, None, clear_outputs) | |
| clear_contexts.click(clear_context_store, None, clear_outputs) | |
| refresh_contexts.click(refresh_contexts_display, None, [current_topic, context_switches, stored_contexts, contexts_display]) | |
| # Mode switch clears everything | |
| mode_outputs = [ | |
| mode_status, chatbot, current_messages_display, | |
| curated_tokens, full_tokens, saved_this, savings_pct, | |
| detection_tokens, | |
| current_topic, context_switches, stored_contexts, | |
| contexts_display, logs_display | |
| ] | |
| mode_radio.change(switch_mode, [mode_radio], mode_outputs) | |
| if __name__ == "__main__": | |
| demo.launch() | |