MCP / app.py
Nyha15
Removed LLM call
bff9719
"""
app.py β€” Data Analyst Duo MCP (no OpenAI) Gradio Space
Shows preview table, stats, corr, plus full JSON histories, with rule-based interpretation.
"""
import os
import uuid
import logging
import datetime
import pandas as pd
import numpy as np
import gradio as gr
# β€”β€”β€” Logging β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s:%(name)s: %(message)s"
)
logger = logging.getLogger("DataAnalystDuo")
# β€”β€”β€” MCP Core β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
class MCPMessage:
def __init__(self, sender, message_type, content):
self.id = str(uuid.uuid4())
self.sender = sender
self.message_type = message_type
self.content = content
self.timestamp = datetime.datetime.now().isoformat()
def to_dict(self):
return {
"id": self.id,
"sender": self.sender,
"message_type": self.message_type,
"content": self.content,
"timestamp": self.timestamp,
}
class MCPTool:
def __init__(self, name, description, func):
self.name = name
self.description = description
self.func = func
def execute(self, params):
return self.func(params)
class MCPAgent:
def __init__(self, name, description):
self.name = name
self.description = description
self.tools = {}
self.peers = {}
self.queue = []
self.history = []
def register_tool(self, tool):
self.tools[tool.name] = tool
def connect(self, peer):
self.peers[peer.name] = peer
def send_message(self, to, mtype, content):
if to not in self.peers:
raise ValueError(f"Peer {to} not found")
msg = MCPMessage(self.name, mtype, content)
self.history.append({"type": "sent", "message": msg.to_dict()})
self.peers[to].receive(msg)
logger.info(f"{self.name} β†’ {to}: {mtype}")
return msg.to_dict()
def receive(self, msg):
self.queue.append(msg)
self.history.append({"type": "received", "message": msg.to_dict()})
logger.info(f"{self.name} received {msg.message_type} from {msg.sender}")
def process(self):
while self.queue:
msg = self.queue.pop(0)
self.handle_message(msg)
def handle_message(self, message):
raise NotImplementedError
def get_history(self):
return self.history
# β€”β€”β€” ComputeAgent β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
class ComputeAgent(MCPAgent):
def __init__(self):
super().__init__("ComputeAgent", "Loads & computes data")
self.df = None
self.register_tool(MCPTool("load_dataset", "Load CSV", self._load))
self.register_tool(MCPTool("compute_statistics", "Stats", self._stats))
self.register_tool(MCPTool("compute_correlation", "Corr", self._corr))
def _load(self, params):
url = params.get("url", "").strip()
if not url:
url = "https://raw.githubusercontent.com/mwaskom/seaborn-data/master/diamonds.csv"
try:
self.df = pd.read_csv(url)
return {
"status": "success",
"rows": self.df.shape[0],
"columns": list(self.df.columns),
"preview": self.df.head(5).to_dict(orient="records")
}
except Exception as e:
logger.exception("Load failed")
return {"status": "error", "message": str(e)}
def _stats(self, params):
if self.df is None:
return {"status": "error", "message": "No data loaded"}
cols = self.df.select_dtypes(include=[np.number]).columns
stats = self.df[cols].describe().to_dict()
return {"status": "success", "statistics": stats}
def _corr(self, params):
if self.df is None:
return {"status": "error", "message": "No data loaded"}
cols = self.df.select_dtypes(include=[np.number]).columns
corr = self.df[cols].corr().to_dict()
return {"status": "success", "correlation_matrix": corr}
def handle_message(self, m):
if m.message_type == "request_data_load":
res = self._load(m.content)
self.send_message(m.sender, "data_load_result", res)
elif m.message_type == "request_statistics":
res = self._stats(m.content)
self.send_message(m.sender, "statistics_result", res)
elif m.message_type == "request_correlation":
res = self._corr(m.content)
self.send_message(m.sender, "correlation_result", res)
# β€”β€”β€” InterpretAgent β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
class InterpretAgent(MCPAgent):
def __init__(self):
super().__init__("InterpretAgent", "Generates insights from stats & corr")
self.data_info = None
self.stats = None
self.corr = None
self.register_tool(MCPTool("interpret_statistics", "Rule-based stats insights", self._int_stats))
self.register_tool(MCPTool("interpret_correlation", "Rule-based corr insights", self._int_corr))
def _int_stats(self, params):
stats = self.stats.get("statistics", {})
insights = []
# Pick top 3 columns by range (max-min)
ranges = {col: vals.get("max", 0) - vals.get("min", 0) for col, vals in stats.items()}
top3 = sorted(ranges, key=ranges.get, reverse=True)[:3]
for col in top3:
vals = stats[col]
insights.append(f"{col}: mean={vals['mean']:.2f}, range=[{vals['min']:.2f}, {vals['max']:.2f}]")
return {"status": "success", "insights": insights, "summary": "Top 3 columns by range"}
def _int_corr(self, params):
cm = self.corr.get("correlation_matrix", {})
pairs = []
for c1, row in cm.items():
for c2, val in row.items():
if c1 != c2:
pairs.append((c1, c2, val))
# sort by absolute correlation
top3 = sorted(pairs, key=lambda x: abs(x[2]), reverse=True)[:3]
insights = [f"{c1} vs {c2}: corr={corr:.2f}" for c1, c2, corr in top3]
return {"status": "success", "insights": insights, "summary": "Top 3 correlated pairs"}
def handle_message(self, m):
if m.message_type == "data_load_result":
self.data_info = m.content
self.send_message(m.sender, "ack", {"status": "loaded"})
elif m.message_type == "statistics_result":
self.stats = m.content
res = self.tools["interpret_statistics"].execute({})
self.send_message(m.sender, "statistics_interpretation", res)
elif m.message_type == "correlation_result":
self.corr = m.content
res = self.tools["interpret_correlation"].execute({})
self.send_message(m.sender, "correlation_interpretation", res)
elif m.message_type == "request_report":
# assemble a simple markdown report
report_md = "## Analysis Report\n"
report_md += "### Stats Insights\n- " + "\n- ".join(self.tools["interpret_statistics"].execute({})["insights"]) + "\n"
report_md += "### Corr Insights\n- " + "\n- ".join(self.tools["interpret_correlation"].execute({})["insights"]) + "\n"
self.send_message(m.sender, "report_result", {"status": "success", "report_md": report_md})
# β€”β€”β€” Orchestration β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
class DataAnalystDuo:
def __init__(self):
self.C = ComputeAgent()
self.I = InterpretAgent()
self.C.connect(self.I)
self.I.connect(self.C)
def run(self, url):
self.I.send_message("ComputeAgent", "request_data_load", {"url": url})
self.C.process(); self.I.process()
self.I.send_message("ComputeAgent", "request_statistics", {})
self.C.process(); self.I.process()
self.I.send_message("ComputeAgent", "request_correlation", {})
self.C.process(); self.I.process()
self.C.send_message("InterpretAgent", "request_report", {})
self.I.process(); self.C.process()
hist_c = self.C.get_history()
hist_i = self.I.get_history()
load = next(m['message']['content'] for m in hist_c if m['message']['message_type']=='data_load_result')
stats = next(m['message']['content'] for m in hist_c if m['message']['message_type']=='statistics_result')
corr = next(m['message']['content'] for m in hist_c if m['message']['message_type']=='correlation_result')
preview_df = pd.DataFrame(load.get('preview', []))
# extract latest report
report = next(m['message']['content'] for m in hist_i if m['message']['message_type']=='report_result')
return preview_df, stats, corr, hist_c, hist_i, report['report_md']
# β€”β€”β€” Gradio app β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
def run_analysis(url: str):
return DataAnalystDuo().run(url)
demo = gr.Interface(
fn=run_analysis,
inputs=[gr.Textbox(label="CSV URL", placeholder="https://...")],
outputs=[
gr.Dataframe(label="Preview (first 5 rows)"),
gr.JSON(label="Statistics"),
gr.JSON(label="Correlation Matrix"),
gr.JSON(label="Compute History"),
gr.JSON(label="Interpret History"),
gr.Markdown(label="Analysis Report")
],
title="Data Analyst Duo",
description="Paste any CSV URL (e.g. diamonds.csv) to see data + stats + insights + report"
)
if __name__ == "__main__":
demo.launch(
server_name="0.0.0.0",
server_port=int(os.environ.get("PORT", 7860)),
share=True
)