ohmygaugh's picture
All major query types now work:
6422ca4
#!/usr/bin/env python3
"""
Streamlit MCP Monitor & Query Tester
A lightweight monitoring and testing interface for the agentic system.
All database access MUST go through MCP server - no direct connections allowed.
"""
import streamlit as st
import requests
import os
import json
import pandas as pd
from typing import Dict, Any
# --- Configuration ---
AGENT_URL = os.getenv("AGENT_URL", "http://agent:8001/query")
NEO4J_URL = os.getenv("NEO4J_URL", "http://neo4j:7474")
MCP_URL = os.getenv("MCP_URL", "http://mcp:8000/mcp")
MCP_API_KEY = os.getenv("MCP_API_KEY", "dev-key-123")
st.set_page_config(
page_title="GraphRAG Chat",
page_icon="💬",
layout="wide"
)
# --- Session State ---
if 'messages' not in st.session_state:
st.session_state.messages = []
if 'schema_info' not in st.session_state:
st.session_state.schema_info = ""
if 'current_results' not in st.session_state:
st.session_state.current_results = None
# --- Helper Functions ---
def stream_agent_response(question: str):
"""Streams the agent's response, yielding JSON objects."""
try:
with requests.post(AGENT_URL, json={"question": question}, stream=True, timeout=300) as r:
r.raise_for_status()
for line in r.iter_lines():
if line:
try:
yield json.loads(line.decode('utf-8'))
except json.JSONDecodeError:
# Skip malformed JSON lines
continue
except requests.exceptions.RequestException as e:
yield {"error": f"Failed to connect to agent: {e}"}
def fetch_schema_info() -> str:
"""Fetches the database schema from the MCP server for display."""
try:
response = requests.post(
f"{MCP_URL}/discovery/get_relevant_schemas",
headers={"x-api-key": MCP_API_KEY, "Content-Type": "application/json"},
json={"query": ""}
)
response.raise_for_status()
data = response.json()
if data.get("status") == "success":
schemas = data.get("schemas", [])
if not schemas: return "No schema information found."
# Group columns by table
tables = {}
for s in schemas:
table_key = f"{s.get('database', '')}.{s.get('table', '')}"
if table_key not in tables:
tables[table_key] = []
tables[table_key].append(f"{s.get('name', '')} ({s.get('type', [''])[0]})")
schema_text = ""
for table, columns in tables.items():
schema_text += f"**{table}**:\n"
for col in columns:
schema_text += f"- {col}\n"
return schema_text
else:
return f"Error from MCP: {data.get('message', 'Unknown error')}"
except requests.exceptions.RequestException as e:
return f"Could not fetch schema: {e}"
@st.cache_data(ttl=600)
def get_cached_schema():
"""Cache the schema info to avoid repeated calls."""
return fetch_schema_info()
@st.cache_data(ttl=10)
def check_service_health(service_name: str, url: str) -> bool:
"""Checks if a service is reachable. Cached for 10 seconds."""
try:
response = requests.get(url, timeout=2)
return response.status_code in [200, 401]
except Exception:
return False
# --- UI Components ---
def display_sidebar():
with st.sidebar:
st.title("🗄️ Database Schema")
if st.button("🔄 Refresh Schema"):
st.cache_data.clear()
st.session_state.schema_info = get_cached_schema()
st.markdown(st.session_state.schema_info)
st.markdown("---")
st.title("🔌 Service Status")
try:
neo4j_status = "✅ Online" if check_service_health("Neo4j", NEO4J_URL) else "❌ Offline"
mcp_health_url = "http://mcp:8000/health"
mcp_status = "✅ Online" if check_service_health("MCP", mcp_health_url) else "❌ Offline"
except Exception as e:
neo4j_status = "❓ Unknown"
mcp_status = "❓ Unknown"
st.markdown(f"**Neo4j:** {neo4j_status}")
st.markdown(f"**MCP Server:** {mcp_status}")
st.markdown("---")
if st.button("🗑️ Clear Chat History"):
st.session_state.messages = []
st.rerun()
def extract_sql_results(observation_content: str) -> pd.DataFrame | None:
"""Extract SQL results from execute_query tool observation."""
try:
if "execute_query" not in observation_content or "returned:" not in observation_content:
return None
# Look for JSON results in the observation
if "Query returned" in observation_content and "rows:" in observation_content:
# Extract the table format from the text
lines = observation_content.split('\n')
table_start = -1
for i, line in enumerate(lines):
if "Query returned" in line and "rows:" in line:
table_start = i + 1
break
if table_start >= 0 and table_start < len(lines):
# Find the table data
table_lines = []
for i in range(table_start, len(lines)):
line = lines[i].strip()
if line and not line.startswith("Final Answer"):
if "|" in line: # Table format
table_lines.append(line)
elif line.startswith("PT") or line.startswith("DIAB") or line.startswith("NEURO"): # Data row
table_lines.append(line)
elif line.startswith("Final Answer"):
break
if len(table_lines) >= 2: # Headers + at least one data row
# Parse headers
headers = [h.strip() for h in table_lines[0].split('|')]
# Parse data rows
data_rows = []
for line in table_lines[1:]:
if "and" in line and "more rows" in line:
break
row_values = [v.strip() for v in line.split('|')]
if len(row_values) == len(headers):
data_rows.append(row_values)
if data_rows:
return pd.DataFrame(data_rows, columns=headers)
except Exception:
pass
return None
def main():
display_sidebar()
st.title("💬 GraphRAG Conversational Agent")
st.markdown("Ask questions about the life sciences dataset. The agent's thought process will be shown below.")
# Display chat history
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
if message.get("dataframe") is not None:
st.dataframe(message["dataframe"], use_container_width=True)
csv = message["dataframe"].to_csv(index=False)
st.download_button(
label="📥 Download CSV",
data=csv,
file_name="query_results.csv",
mime="text/csv"
)
if prompt := st.chat_input("Ask your question here..."):
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
with st.chat_message("assistant"):
full_response = ""
response_box = st.empty()
sql_results_df = None
for chunk in stream_agent_response(prompt):
if "error" in chunk:
full_response = chunk["error"]
response_box.error(full_response)
break
content = chunk.get("content", "")
if chunk.get("type") == "thought":
full_response += f"🤔 *{content}*\n\n"
elif chunk.get("type") == "observation":
full_response += f"{content}\n\n"
# Try to extract SQL results
df = extract_sql_results(content)
if df is not None:
sql_results_df = df
elif chunk.get("type") == "final_answer":
full_response += f"**Final Answer:**\n{content}"
response_box.markdown(full_response)
# Display DataFrame if SQL results were found
if sql_results_df is not None:
st.markdown("---")
st.markdown("### 📊 Query Results")
st.dataframe(sql_results_df, use_container_width=True)
csv = sql_results_df.to_csv(index=False)
st.download_button(
label="📥 Download CSV",
data=csv,
file_name="query_results.csv",
mime="text/csv",
key=f"download_{len(st.session_state.messages)}"
)
st.session_state.messages.append({
"role": "assistant",
"content": full_response,
"dataframe": sql_results_df
})
if __name__ == "__main__":
main()