""" app.py — Data Analyst Duo MCP (no OpenAI) Gradio Space Shows preview table, stats, corr, plus full JSON histories, with rule-based interpretation. """ import os import uuid import logging import datetime import pandas as pd import numpy as np import gradio as gr # ——— Logging —————————————————————————————————————————————— logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s:%(name)s: %(message)s" ) logger = logging.getLogger("DataAnalystDuo") # ——— MCP Core —————————————————————————————————————————————— class MCPMessage: def __init__(self, sender, message_type, content): self.id = str(uuid.uuid4()) self.sender = sender self.message_type = message_type self.content = content self.timestamp = datetime.datetime.now().isoformat() def to_dict(self): return { "id": self.id, "sender": self.sender, "message_type": self.message_type, "content": self.content, "timestamp": self.timestamp, } class MCPTool: def __init__(self, name, description, func): self.name = name self.description = description self.func = func def execute(self, params): return self.func(params) class MCPAgent: def __init__(self, name, description): self.name = name self.description = description self.tools = {} self.peers = {} self.queue = [] self.history = [] def register_tool(self, tool): self.tools[tool.name] = tool def connect(self, peer): self.peers[peer.name] = peer def send_message(self, to, mtype, content): if to not in self.peers: raise ValueError(f"Peer {to} not found") msg = MCPMessage(self.name, mtype, content) self.history.append({"type": "sent", "message": msg.to_dict()}) self.peers[to].receive(msg) logger.info(f"{self.name} → {to}: {mtype}") return msg.to_dict() def receive(self, msg): self.queue.append(msg) self.history.append({"type": "received", "message": msg.to_dict()}) logger.info(f"{self.name} received {msg.message_type} from {msg.sender}") def process(self): while self.queue: msg = self.queue.pop(0) self.handle_message(msg) def handle_message(self, message): raise NotImplementedError def get_history(self): return self.history # ——— ComputeAgent —————————————————————————————————————————————— class ComputeAgent(MCPAgent): def __init__(self): super().__init__("ComputeAgent", "Loads & computes data") self.df = None self.register_tool(MCPTool("load_dataset", "Load CSV", self._load)) self.register_tool(MCPTool("compute_statistics", "Stats", self._stats)) self.register_tool(MCPTool("compute_correlation", "Corr", self._corr)) def _load(self, params): url = params.get("url", "").strip() if not url: url = "https://raw.githubusercontent.com/mwaskom/seaborn-data/master/diamonds.csv" try: self.df = pd.read_csv(url) return { "status": "success", "rows": self.df.shape[0], "columns": list(self.df.columns), "preview": self.df.head(5).to_dict(orient="records") } except Exception as e: logger.exception("Load failed") return {"status": "error", "message": str(e)} def _stats(self, params): if self.df is None: return {"status": "error", "message": "No data loaded"} cols = self.df.select_dtypes(include=[np.number]).columns stats = self.df[cols].describe().to_dict() return {"status": "success", "statistics": stats} def _corr(self, params): if self.df is None: return {"status": "error", "message": "No data loaded"} cols = self.df.select_dtypes(include=[np.number]).columns corr = self.df[cols].corr().to_dict() return {"status": "success", "correlation_matrix": corr} def handle_message(self, m): if m.message_type == "request_data_load": res = self._load(m.content) self.send_message(m.sender, "data_load_result", res) elif m.message_type == "request_statistics": res = self._stats(m.content) self.send_message(m.sender, "statistics_result", res) elif m.message_type == "request_correlation": res = self._corr(m.content) self.send_message(m.sender, "correlation_result", res) # ——— InterpretAgent —————————————————————————————————————————————— class InterpretAgent(MCPAgent): def __init__(self): super().__init__("InterpretAgent", "Generates insights from stats & corr") self.data_info = None self.stats = None self.corr = None self.register_tool(MCPTool("interpret_statistics", "Rule-based stats insights", self._int_stats)) self.register_tool(MCPTool("interpret_correlation", "Rule-based corr insights", self._int_corr)) def _int_stats(self, params): stats = self.stats.get("statistics", {}) insights = [] # Pick top 3 columns by range (max-min) ranges = {col: vals.get("max", 0) - vals.get("min", 0) for col, vals in stats.items()} top3 = sorted(ranges, key=ranges.get, reverse=True)[:3] for col in top3: vals = stats[col] insights.append(f"{col}: mean={vals['mean']:.2f}, range=[{vals['min']:.2f}, {vals['max']:.2f}]") return {"status": "success", "insights": insights, "summary": "Top 3 columns by range"} def _int_corr(self, params): cm = self.corr.get("correlation_matrix", {}) pairs = [] for c1, row in cm.items(): for c2, val in row.items(): if c1 != c2: pairs.append((c1, c2, val)) # sort by absolute correlation top3 = sorted(pairs, key=lambda x: abs(x[2]), reverse=True)[:3] insights = [f"{c1} vs {c2}: corr={corr:.2f}" for c1, c2, corr in top3] return {"status": "success", "insights": insights, "summary": "Top 3 correlated pairs"} def handle_message(self, m): if m.message_type == "data_load_result": self.data_info = m.content self.send_message(m.sender, "ack", {"status": "loaded"}) elif m.message_type == "statistics_result": self.stats = m.content res = self.tools["interpret_statistics"].execute({}) self.send_message(m.sender, "statistics_interpretation", res) elif m.message_type == "correlation_result": self.corr = m.content res = self.tools["interpret_correlation"].execute({}) self.send_message(m.sender, "correlation_interpretation", res) elif m.message_type == "request_report": # assemble a simple markdown report report_md = "## Analysis Report\n" report_md += "### Stats Insights\n- " + "\n- ".join(self.tools["interpret_statistics"].execute({})["insights"]) + "\n" report_md += "### Corr Insights\n- " + "\n- ".join(self.tools["interpret_correlation"].execute({})["insights"]) + "\n" self.send_message(m.sender, "report_result", {"status": "success", "report_md": report_md}) # ——— Orchestration ————————————————————————————————————————————— class DataAnalystDuo: def __init__(self): self.C = ComputeAgent() self.I = InterpretAgent() self.C.connect(self.I) self.I.connect(self.C) def run(self, url): self.I.send_message("ComputeAgent", "request_data_load", {"url": url}) self.C.process(); self.I.process() self.I.send_message("ComputeAgent", "request_statistics", {}) self.C.process(); self.I.process() self.I.send_message("ComputeAgent", "request_correlation", {}) self.C.process(); self.I.process() self.C.send_message("InterpretAgent", "request_report", {}) self.I.process(); self.C.process() hist_c = self.C.get_history() hist_i = self.I.get_history() load = next(m['message']['content'] for m in hist_c if m['message']['message_type']=='data_load_result') stats = next(m['message']['content'] for m in hist_c if m['message']['message_type']=='statistics_result') corr = next(m['message']['content'] for m in hist_c if m['message']['message_type']=='correlation_result') preview_df = pd.DataFrame(load.get('preview', [])) # extract latest report report = next(m['message']['content'] for m in hist_i if m['message']['message_type']=='report_result') return preview_df, stats, corr, hist_c, hist_i, report['report_md'] # ——— Gradio app ————————————————————————————————————————————— def run_analysis(url: str): return DataAnalystDuo().run(url) demo = gr.Interface( fn=run_analysis, inputs=[gr.Textbox(label="CSV URL", placeholder="https://...")], outputs=[ gr.Dataframe(label="Preview (first 5 rows)"), gr.JSON(label="Statistics"), gr.JSON(label="Correlation Matrix"), gr.JSON(label="Compute History"), gr.JSON(label="Interpret History"), gr.Markdown(label="Analysis Report") ], title="Data Analyst Duo", description="Paste any CSV URL (e.g. diamonds.csv) to see data + stats + insights + report" ) if __name__ == "__main__": demo.launch( server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)), share=True )