Nyha15 commited on
Commit
76fd075
Β·
1 Parent(s): 88eb801

Added LLM

Browse files
Files changed (1) hide show
  1. app.py +160 -214
app.py CHANGED
@@ -1,32 +1,33 @@
1
  """
2
- Data Analyst Duo MCP Implementation - Full Working Version
3
- Supports loading any CSV over HTTP(S), including:
4
- - Default cereal dataset
5
- - Seaborn diamonds.csv
6
- - FiveThirtyEight candy-data.csv
7
  """
8
 
9
  import os
10
  import json
11
- import datetime
12
- import logging
13
  import uuid
 
 
14
  from io import StringIO
15
 
16
  import pandas as pd
17
  import numpy as np
18
  import requests
19
  import gradio as gr
 
20
 
21
- # Configure logging
 
 
 
 
 
22
  logging.basicConfig(
23
  level=logging.INFO,
24
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
25
  )
26
- logger = logging.getLogger(__name__)
27
-
28
- # ============== MCP Protocol Implementation ==============
29
 
 
30
  class MCPMessage:
31
  def __init__(self, sender, message_type, content):
32
  self.id = str(uuid.uuid4())
@@ -36,31 +37,27 @@ class MCPMessage:
36
  self.timestamp = datetime.datetime.now().isoformat()
37
 
38
  def to_dict(self):
39
- return {
40
- "id": self.id,
41
- "sender": self.sender,
42
- "message_type": self.message_type,
43
- "content": self.content,
44
- "timestamp": self.timestamp
45
- }
46
 
47
  class MCPTool:
48
- def __init__(self, name, description, function):
49
  self.name = name
50
  self.description = description
51
- self.function = function
52
 
53
  def execute(self, params):
54
- return self.function(params)
55
 
56
  class MCPAgent:
57
  def __init__(self, name, description):
58
  self.name = name
59
  self.description = description
60
  self.tools = {}
61
- self.message_queue = []
62
  self.peers = {}
63
- self.message_history = []
 
64
 
65
  def register_tool(self, tool):
66
  self.tools[tool.name] = tool
@@ -68,220 +65,169 @@ class MCPAgent:
68
  def connect(self, peer):
69
  self.peers[peer.name] = peer
70
 
71
- def send_message(self, receiver, message_type, content):
72
- if receiver not in self.peers:
73
- raise ValueError(f"Peer {receiver} not found")
74
- msg = MCPMessage(self.name, message_type, content)
75
- self.message_history.append({"type": "sent", "message": msg.to_dict()})
76
- self.peers[receiver].receive_message(msg)
77
- logger.info(f"{self.name} β†’ {receiver}: {message_type}")
78
  return msg.to_dict()
79
 
80
- def receive_message(self, message):
81
- self.message_queue.append(message)
82
- self.message_history.append({"type": "received", "message": message.to_dict()})
83
- logger.info(f"{self.name} received {message.message_type} from {message.sender}")
84
-
85
- def process_messages(self):
86
- responses = []
87
- while self.message_queue:
88
- msg = self.message_queue.pop(0)
89
- resp = self.handle_message(msg)
90
- responses.append(resp)
91
- return responses
92
 
93
- def get_message_history(self):
94
- return self.message_history
 
 
95
 
96
  def handle_message(self, message):
97
- raise NotImplementedError("Override in subclass")
98
 
99
- # ============== Compute Agent ==============
 
100
 
 
101
  class ComputeAgent(MCPAgent):
102
- def __init__(self, name="ComputeAgent"):
103
- super().__init__(name, "Loads and computes on datasets")
104
- self.dataframe = None
105
-
106
- # Tools
107
- self.register_tool(MCPTool("load_dataset", "Load a dataset from URL", self._load_dataset))
108
- self.register_tool(MCPTool("compute_statistics", "Compute basic statistics", self._compute_statistics))
109
- self.register_tool(MCPTool("compute_correlation", "Compute correlation matrix", self._compute_correlation))
110
 
111
- def _load_dataset(self, params):
112
  url = params.get("url", "").strip()
113
- # default cereal dataset
114
  if not url or url.lower() == "default":
115
  url = "https://raw.githubusercontent.com/datasciencedojo/datasets/master/cereal.csv"
116
  try:
117
- # fetch via pandas
118
- self.dataframe = pd.read_csv(url)
119
- return {
120
- "status": "success",
121
- "rows": self.dataframe.shape[0],
122
- "columns": list(self.dataframe.columns),
123
- "preview": self.dataframe.head(5).to_dict(orient="records")
124
- }
125
  except Exception as e:
126
- logger.exception("Error loading dataset")
127
- return {"status": "error", "message": str(e)}
128
 
129
- def _compute_statistics(self, params):
130
- if self.dataframe is None:
131
- return {"status": "error", "message": "No dataset loaded"}
132
  try:
133
- cols = params.get("columns", list(self.dataframe.select_dtypes(include=[np.number]).columns))
134
- stats = self.dataframe[cols].describe().to_dict()
135
- return {"status": "success", "statistics": stats}
136
  except Exception as e:
137
- logger.exception("Error computing statistics")
138
- return {"status": "error", "message": str(e)}
139
 
140
- def _compute_correlation(self, params):
141
- if self.dataframe is None:
142
- return {"status": "error", "message": "No dataset loaded"}
143
  try:
144
- cols = params.get("columns", list(self.dataframe.select_dtypes(include=[np.number]).columns))
145
- corr = self.dataframe[cols].corr().to_dict()
146
- return {"status": "success", "correlation_matrix": corr}
147
  except Exception as e:
148
- logger.exception("Error computing correlation")
149
- return {"status": "error", "message": str(e)}
150
-
151
- def handle_message(self, message):
152
- mtype = message.message_type
153
- content = message.content
154
- if mtype == "request_data_load":
155
- result = self._load_dataset(content)
156
- return self.send_message(message.sender, "data_load_result", result)
157
- elif mtype == "request_statistics":
158
- result = self._compute_statistics(content)
159
- return self.send_message(message.sender, "statistics_result", result)
160
- elif mtype == "request_correlation":
161
- result = self._compute_correlation(content)
162
- return self.send_message(message.sender, "correlation_result", result)
163
- else:
164
- return {"status": "error", "message": f"Unknown message type {mtype}"}
165
-
166
- # ============== Interpret Agent ==============
167
-
168
  class InterpretAgent(MCPAgent):
169
- def __init__(self, name="InterpretAgent"):
170
- super().__init__(name, "Interprets and reports on results")
171
- self.dataset_info = None
172
- self.statistics = None
173
- self.correlation = None
174
-
175
- self.register_tool(MCPTool("interpret_statistics", "", self._interpret_statistics))
176
- self.register_tool(MCPTool("interpret_correlation", "", self._interpret_correlation))
177
- self.register_tool(MCPTool("generate_report", "", self._generate_report))
178
-
179
- def _interpret_statistics(self, params):
180
- stats = self.statistics.get("statistics", {})
181
- insights = []
182
- for col, vals in stats.items():
183
- if "mean" in vals:
184
- insights.append(f"{col} avg = {vals['mean']:.2f}")
185
  if "min" in vals and "max" in vals:
186
- insights.append(f"{col} ranges {vals['min']:.2f}–{vals['max']:.2f}")
187
- return {"status": "success", "insights": insights[:3], "summary": "Stats interpreted"}
188
-
189
- def _interpret_correlation(self, params):
190
- return {"status": "success", "insights": ["Correlation matrix computed"], "summary": ""}
191
-
192
- def _generate_report(self, params):
193
- sections = []
194
- if self.dataset_info:
195
- sections.append({
196
- "title": "Overview",
197
- "content": f"{self.dataset_info['rows']} rows Γ— {len(self.dataset_info['columns'])} cols"
198
- })
199
- sections.append({"title": "Conclusion", "content": "Analysis complete."})
200
- return {
201
- "status": "success",
202
- "report": {
203
- "title": params.get("report_title", "Report"),
204
- "sections": sections
205
- }
206
- }
207
-
208
- def handle_message(self, message):
209
- mtype = message.message_type
210
- content = message.content
211
- if mtype == "data_load_result":
212
- self.dataset_info = content
213
- return self.send_message(message.sender, "ack", {"status": "loaded"})
214
- elif mtype == "statistics_result":
215
- self.statistics = content
216
- interp = self._interpret_statistics({})
217
- return self.send_message(message.sender, "statistics_interpretation", interp)
218
- elif mtype == "correlation_result":
219
- self.correlation = content
220
- interp = self._interpret_correlation({})
221
- return self.send_message(message.sender, "correlation_interpretation", interp)
222
- elif mtype == "request_report":
223
- report = self._generate_report(content)
224
- return self.send_message(message.sender, "report_result", report)
225
- else:
226
- return {"status": "error", "message": f"Unknown message type {mtype}"}
227
-
228
- # ============== Main Workflow ==============
229
-
 
 
 
 
 
 
230
  class DataAnalystDuo:
231
  def __init__(self):
232
- self.compute_agent = ComputeAgent()
233
- self.interpret_agent = InterpretAgent()
234
- self.compute_agent.connect(self.interpret_agent)
235
- self.interpret_agent.connect(self.compute_agent)
236
-
237
- def run_analysis(self, dataset_url="default"):
238
- # 1. Load
239
- self.interpret_agent.send_message("ComputeAgent", "request_data_load", {"url": dataset_url})
240
- self.compute_agent.process_messages(); self.interpret_agent.process_messages()
241
- # 2. Stats
242
- self.interpret_agent.send_message("ComputeAgent", "request_statistics", {})
243
- self.compute_agent.process_messages(); self.interpret_agent.process_messages()
244
- # 3. Corr
245
- self.interpret_agent.send_message("ComputeAgent", "request_correlation", {})
246
- self.compute_agent.process_messages(); self.interpret_agent.process_messages()
247
- # 4. Report
248
- self.compute_agent.send_message("InterpretAgent", "request_report", {"report_title": "Analysis Report"})
249
- self.interpret_agent.process_messages(); self.compute_agent.process_messages()
250
-
251
- return {
252
- "compute": self.compute_agent.get_message_history(),
253
- "interpret": self.interpret_agent.get_message_history()
254
- }
255
-
256
- # ============== Gradio Interface ==============
257
-
258
- def format_json(data):
259
- return json.dumps(data, indent=2) if isinstance(data, (dict, list)) else str(data)
260
-
261
- def run_analysis(dataset_url):
262
- duo = DataAnalystDuo()
263
- histories = duo.run_analysis(dataset_url.strip())
264
 
265
- all_msgs = []
266
- for side in ["compute", "interpret"]:
267
- for entry in histories[side]:
268
- msg = entry["message"]
269
- line = (f"[{msg['timestamp']}] {msg['sender']} "
270
- f"{entry['type'].upper()} {msg['message_type']}\n"
271
- f"{format_json(msg['content'])}\n\n" + "-"*60 + "\n")
272
- all_msgs.append((msg['timestamp'], line))
273
- all_msgs.sort(key=lambda x: x[0])
274
- return "\n".join(line for _, line in all_msgs)
275
 
276
- with gr.Blocks(title="Data Analyst Duo MCP") as app:
277
- gr.Markdown("## Data Analyst Duo β€” Load any CSV URL")
278
- input_box = gr.Textbox(
279
- label="Dataset URL",
280
- placeholder="e.g. https://raw.githubusercontent.com/.../diamonds.csv"
281
- )
282
- run_btn = gr.Button("Run")
283
- output_box = gr.Textbox(label="MCP Flow", lines=25)
284
- run_btn.click(fn=run_analysis, inputs=input_box, outputs=output_box)
285
 
286
- if __name__ == "__main__":
287
- app.launch()
 
1
  """
2
+ Data Analyst Duo MCP with OpenAI Integration
 
 
 
 
3
  """
4
 
5
  import os
6
  import json
 
 
7
  import uuid
8
+ import logging
9
+ import datetime
10
  from io import StringIO
11
 
12
  import pandas as pd
13
  import numpy as np
14
  import requests
15
  import gradio as gr
16
+ import openai
17
 
18
+ # β€”β€”β€” OpenAI Setup β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
19
+ openai.api_key = os.getenv("OPENAI_API_KEY")
20
+ if not openai.api_key:
21
+ raise EnvironmentError("Missing OPENAI_API_KEY environment variable")
22
+
23
+ # β€”β€”β€” Logging setup β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
24
  logging.basicConfig(
25
  level=logging.INFO,
26
+ format="%(asctime)s %(levelname)s:%(name)s: %(message)s"
27
  )
28
+ logger = logging.getLogger("DataAnalystDuo")
 
 
29
 
30
+ # β€”β€”β€” MCP Core β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
31
  class MCPMessage:
32
  def __init__(self, sender, message_type, content):
33
  self.id = str(uuid.uuid4())
 
37
  self.timestamp = datetime.datetime.now().isoformat()
38
 
39
  def to_dict(self):
40
+ return {"id": self.id, "sender": self.sender,
41
+ "message_type": self.message_type,
42
+ "content": self.content, "timestamp": self.timestamp}
 
 
 
 
43
 
44
  class MCPTool:
45
+ def __init__(self, name, description, func):
46
  self.name = name
47
  self.description = description
48
+ self.func = func
49
 
50
  def execute(self, params):
51
+ return self.func(params)
52
 
53
  class MCPAgent:
54
  def __init__(self, name, description):
55
  self.name = name
56
  self.description = description
57
  self.tools = {}
 
58
  self.peers = {}
59
+ self.queue = []
60
+ self.history = []
61
 
62
  def register_tool(self, tool):
63
  self.tools[tool.name] = tool
 
65
  def connect(self, peer):
66
  self.peers[peer.name] = peer
67
 
68
+ def send_message(self, to, mtype, content):
69
+ if to not in self.peers:
70
+ raise ValueError(f"Peer {to} not found")
71
+ msg = MCPMessage(self.name, mtype, content)
72
+ self.history.append({"type": "sent", "message": msg.to_dict()})
73
+ self.peers[to].receive(msg)
74
+ logger.info(f"{self.name} β†’ {to}: {mtype}")
75
  return msg.to_dict()
76
 
77
+ def receive(self, msg):
78
+ self.queue.append(msg)
79
+ self.history.append({"type": "received", "message": msg.to_dict()})
80
+ logger.info(f"{self.name} received {msg.message_type} from {msg.sender}")
 
 
 
 
 
 
 
 
81
 
82
+ def process(self):
83
+ while self.queue:
84
+ msg = self.queue.pop(0)
85
+ self.handle_message(msg)
86
 
87
  def handle_message(self, message):
88
+ raise NotImplementedError
89
 
90
+ def get_history(self):
91
+ return self.history
92
 
93
+ # β€”β€”β€” ComputeAgent β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
94
  class ComputeAgent(MCPAgent):
95
+ def __init__(self):
96
+ super().__init__("ComputeAgent", "Loads & computes data")
97
+ self.df = None
98
+ self.register_tool(MCPTool("load_dataset", "Load CSV from URL", self._load))
99
+ self.register_tool(MCPTool("compute_statistics", "Descriptive stats", self._stats))
100
+ self.register_tool(MCPTool("compute_correlation", "Correlation matrix", self._corr))
 
 
101
 
102
+ def _load(self, params):
103
  url = params.get("url", "").strip()
 
104
  if not url or url.lower() == "default":
105
  url = "https://raw.githubusercontent.com/datasciencedojo/datasets/master/cereal.csv"
106
  try:
107
+ self.df = pd.read_csv(url)
108
+ return {"status":"success","rows":self.df.shape[0],
109
+ "columns":list(self.df.columns),
110
+ "preview":self.df.head(5).to_dict(orient="records")}
 
 
 
 
111
  except Exception as e:
112
+ logger.exception("Load failed")
113
+ return {"status":"error","message":str(e)}
114
 
115
+ def _stats(self, params):
116
+ if self.df is None:
117
+ return {"status":"error","message":"No data loaded"}
118
  try:
119
+ cols = list(self.df.select_dtypes(include=[np.number]).columns)
120
+ stats = self.df[cols].describe().to_dict()
121
+ return {"status":"success","statistics":stats}
122
  except Exception as e:
123
+ logger.exception("Stats failed")
124
+ return {"status":"error","message":str(e)}
125
 
126
+ def _corr(self, params):
127
+ if self.df is None:
128
+ return {"status":"error","message":"No data loaded"}
129
  try:
130
+ cols = list(self.df.select_dtypes(include=[np.number]).columns)
131
+ corr = self.df[cols].corr().to_dict()
132
+ return {"status":"success","correlation_matrix":corr}
133
  except Exception as e:
134
+ logger.exception("Corr failed")
135
+ return {"status":"error","message":str(e)}
136
+
137
+ def handle_message(self, m):
138
+ if m.message_type == "request_data_load":
139
+ res = self._load(m.content);
140
+ self.send_message(m.sender, "data_load_result", res)
141
+ elif m.message_type == "request_statistics":
142
+ res = self._stats(m.content);
143
+ self.send_message(m.sender, "statistics_result", res)
144
+ elif m.message_type == "request_correlation":
145
+ res = self._corr(m.content);
146
+ self.send_message(m.sender, "correlation_result", res)
147
+
148
+ # β€”β€”β€” InterpretAgent with LLM β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
 
 
 
 
 
149
  class InterpretAgent(MCPAgent):
150
+ def __init__(self):
151
+ super().__init__("InterpretAgent","Interprets & reports via LLM")
152
+ self.data_info = None
153
+ self.stats = None
154
+ self.corr = None
155
+ # tools for rule-based fallback
156
+ self.register_tool(MCPTool("interpret_statistics","",self._int_stats))
157
+ self.register_tool(MCPTool("interpret_correlation","",self._int_corr))
158
+ # LLM-powered tools
159
+ self.register_tool(MCPTool("llm_interpret","GPT-4 insights",self._llm_interpret))
160
+ self.register_tool(MCPTool("llm_report","GPT-4 report",self._llm_report))
161
+
162
+ def _int_stats(self, params):
163
+ ins=[]
164
+ for col,vals in self.stats.get("statistics",{}).items():
165
+ if "mean" in vals: ins.append(f"{col} avg={vals['mean']:.2f}")
166
  if "min" in vals and "max" in vals:
167
+ ins.append(f"{col} ∈ [{vals['min']:.2f},{vals['max']:.2f}]")
168
+ return {"status":"success","insights":ins[:3],"summary":"Rule-based insights"}
169
+
170
+ def _int_corr(self, params):
171
+ return {"status":"success","insights":["Correlation computed"],"summary":"Rule-based corr"}
172
+
173
+ def _llm_interpret(self, params):
174
+ prompt = (
175
+ "I have these statistics in JSON format:\n" +
176
+ json.dumps(self.stats, indent=2) +
177
+ "\nPlease summarize the top 3 insights in plain English." )
178
+ resp = openai.ChatCompletion.create(
179
+ model="gpt-4",
180
+ messages=[{"role":"user","content":prompt}]
181
+ )
182
+ text = resp.choices[0].message.content.strip()
183
+ lines = [l for l in text.split("\n") if l.strip()]
184
+ return {"status":"success","insights":lines,"summary":text}
185
+
186
+ def _llm_report(self, params):
187
+ prompt = (
188
+ "Generate a concise analysis report in Markdown format based on the following:\n" +
189
+ f"Dataset info: rows={self.data_info['rows']}, cols={len(self.data_info['columns'])}\n" +
190
+ json.dumps(self.stats, indent=2) + "\n" +
191
+ json.dumps(self.corr, indent=2)
192
+ )
193
+ resp = openai.ChatCompletion.create(
194
+ model="gpt-4",
195
+ messages=[{"role":"user","content":prompt}]
196
+ )
197
+ return {"status":"success","report_md":resp.choices[0].message.content.strip()}
198
+
199
+ def handle_message(self, m):
200
+ if m.message_type == "data_load_result":
201
+ self.data_info = m.content
202
+ self.send_message(m.sender,"ack",{"status":"loaded"})
203
+ elif m.message_type == "statistics_result":
204
+ self.stats = m.content
205
+ # LLM-driven interpretation
206
+ llm_res = self._llm_interpret({})
207
+ self.send_message(m.sender,"llm_statistics_interpretation",llm_res)
208
+ elif m.message_type == "correlation_result":
209
+ self.corr = m.content
210
+ llm_res = self._llm_interpret({})
211
+ self.send_message(m.sender,"llm_correlation_interpretation",llm_res)
212
+ elif m.message_type == "request_report":
213
+ llm_res = self._llm_report({})
214
+ self.send_message(m.sender,"report_result",llm_res)
215
+
216
+ # β€”β€”β€” Orchestration & Gradio β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
217
  class DataAnalystDuo:
218
  def __init__(self):
219
+ self.C=ComputeAgent(); self.I=InterpretAgent()
220
+ self.C.connect(self.I); self.I.connect(self.C)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
221
 
222
+ def run(self,url):
223
+ self.I.send_message("ComputeAgent","request_data_load",{"url":url});
224
+ self.C.process(); self.I.process()
225
+ self.I.send_message("ComputeAgent","request_statistics",{});
226
+ self.C.process(); self.I.process()
227
+ self.I.send_message("ComputeAgent","request_correlation",{});
228
+ self.C.process(); self.I.process()
229
+ self.C.send_message("InterpretAgent","request_report",{"report_title":"Analysis Report"});
230
+ self.I.process(); self.C.process()
231
+ return self.C.get_history(), self.I.get_history()
232
 
 
 
 
 
 
 
 
 
 
233