cryogenic22 commited on
Commit
30ba4e2
·
verified ·
1 Parent(s): 4db5880

Create analytics_agent.py

Browse files
Files changed (1) hide show
  1. agents/analytics_agent.py +292 -0
agents/analytics_agent.py ADDED
@@ -0,0 +1,292 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import pandas as pd
4
+ import numpy as np
5
+ from typing import Dict, List, Any, Tuple, Optional
6
+ from pydantic import BaseModel, Field
7
+ from langchain_anthropic import ChatAnthropic
8
+ from langchain_core.prompts import ChatPromptTemplate
9
+ from langchain_core.output_parsers import StrOutputParser
10
+ import re
11
+ import matplotlib.pyplot as plt
12
+ import seaborn as sns
13
+ from io import StringIO
14
+
15
+ class AnalysisRequest(BaseModel):
16
+ """Structure for an analysis request"""
17
+ request_id: str
18
+ description: str
19
+ data_sources: List[str]
20
+ analysis_type: str
21
+ parameters: Dict[str, Any] = None
22
+ purpose: str
23
+
24
+ class AnalysisResult(BaseModel):
25
+ """Structure for analysis results"""
26
+ result_id: str
27
+ name: str
28
+ description: str
29
+ analysis_type: str
30
+ code: str
31
+ visualizations: List[str] = None
32
+ insights: List[Dict[str, Any]] = None
33
+ metrics: Dict[str, float] = None
34
+ model_details: Dict[str, Any] = None
35
+ attribution: Dict[str, float] = None
36
+ confidence: float = None
37
+
38
+ class AnalyticsAgent:
39
+ """Agent responsible for data analysis and modeling"""
40
+
41
+ def __init__(self):
42
+ """Initialize the analytics agent"""
43
+ # Set up Claude API client
44
+ api_key = os.getenv("ANTHROPIC_API_KEY")
45
+ if not api_key:
46
+ raise ValueError("ANTHROPIC_API_KEY not found in environment variables")
47
+
48
+ self.llm = ChatAnthropic(
49
+ model="claude-3-haiku-20240307",
50
+ anthropic_api_key=api_key,
51
+ temperature=0.1
52
+ )
53
+
54
+ # Create analysis code generation prompt
55
+ self.analysis_prompt = ChatPromptTemplate.from_messages([
56
+ ("system", """You are an expert data scientist specializing in pharmaceutical sales analysis.
57
+ Your task is to generate Python code to analyze data based on specific requirements.
58
+
59
+ For each analysis request:
60
+ 1. Generate clear, efficient pandas and numpy code
61
+ 2. Include appropriate data visualization with matplotlib/seaborn
62
+ 3. Apply statistical methods relevant to the analysis type
63
+ 4. Add detailed comments explaining your approach
64
+ 5. Extract and highlight key insights from the analysis
65
+
66
+ The analysis should be thorough and focused on addressing the specific business question.
67
+ Make sure to handle potential data issues and explain your assumptions.
68
+
69
+ Format your response with a code block:
70
+ ```python
71
+ # Analysis code
72
+ import pandas as pd
73
+ import numpy as np
74
+ import matplotlib.pyplot as plt
75
+ import seaborn as sns
76
+
77
+ def run_analysis(data_sources):
78
+ # Your analysis code here
79
+
80
+ # Return results as a dictionary
81
+ return {
82
+ "insights": [
83
+ {"finding": "Key finding 1", "details": "Explanation", "impact": "Business impact"},
84
+ # More insights...
85
+ ],
86
+ "metrics": {
87
+ "metric1": value1,
88
+ "metric2": value2,
89
+ # More metrics...
90
+ },
91
+ "visualizations": ["fig1", "fig2"], # References to generated figures
92
+ "attribution": {
93
+ "factor1": 0.65, # 65% attribution to factor1
94
+ "factor2": 0.25, # 25% attribution to factor2
95
+ "factor3": 0.10 # 10% attribution to factor3
96
+ },
97
+ "confidence": 0.95 # 95% confidence in the analysis
98
+ }
99
+ ```
100
+
101
+ After the code block, explain your analytical approach and any assumptions.
102
+ """),
103
+ ("human", """
104
+ Analysis Request: {description}
105
+
106
+ Available data sources:
107
+ {data_sources}
108
+
109
+ Analysis type: {analysis_type}
110
+
111
+ Parameters: {parameters}
112
+
113
+ Purpose: {purpose}
114
+
115
+ Please generate Python code to perform this analysis.
116
+ """)
117
+ ])
118
+
119
+ # Set up the analysis chain
120
+ self.analysis_chain = (
121
+ self.analysis_prompt
122
+ | self.llm
123
+ | StrOutputParser()
124
+ )
125
+
126
+ # In-memory storage for analysis artifacts
127
+ self.analysis_artifacts = {}
128
+
129
+ def extract_python_from_response(self, response: str) -> str:
130
+ """Extract Python code from LLM response"""
131
+ # Extract Python between ```python and ``` markers
132
+ python_match = re.search(r'```python\s*(.*?)\s*```', response, re.DOTALL)
133
+ if python_match:
134
+ return python_match.group(1).strip()
135
+
136
+ # If not found with python tag, try generic code block
137
+ python_match = re.search(r'```\s*(.*?)\s*```', response, re.DOTALL)
138
+ if python_match:
139
+ return python_match.group(1).strip()
140
+
141
+ # If all else fails, return empty string
142
+ return ""
143
+
144
+ def extract_insights_from_code_output(self, output: Dict[str, Any]) -> Tuple[List[Dict[str, Any]], Dict[str, float], float]:
145
+ """Extract insights, attribution, and confidence from code output"""
146
+ insights = output.get("insights", [])
147
+ attribution = output.get("attribution", {})
148
+ confidence = output.get("confidence", 0.0)
149
+
150
+ return insights, attribution, confidence
151
+
152
+ def perform_analysis(self, request: AnalysisRequest, data_sources: Dict[str, Any]) -> AnalysisResult:
153
+ """Perform analysis based on request and return results"""
154
+ print(f"Analytics Agent: Performing {request.analysis_type} analysis - {request.description}")
155
+
156
+ # Format data sources description for the prompt
157
+ data_sources_desc = ""
158
+ for source_id, source in data_sources.items():
159
+ df = source.content
160
+ data_sources_desc += f"Data source '{source_id}' ({source.name}):\n"
161
+ data_sources_desc += f"- Shape: {df.shape[0]} rows, {df.shape[1]} columns\n"
162
+ data_sources_desc += f"- Columns: {', '.join(df.columns)}\n"
163
+ data_sources_desc += f"- Sample data:\n{df.head(3).to_string()}\n\n"
164
+
165
+ # Format the request for the prompt
166
+ request_data = {
167
+ "description": request.description,
168
+ "data_sources": data_sources_desc,
169
+ "analysis_type": request.analysis_type,
170
+ "parameters": json.dumps(request.parameters, indent=2) if request.parameters else "None",
171
+ "purpose": request.purpose
172
+ }
173
+
174
+ # Generate analysis code
175
+ response = self.analysis_chain.invoke(request_data)
176
+
177
+ # Extract Python code
178
+ python_code = self.extract_python_from_response(response)
179
+
180
+ # Execute analysis (with safety checks)
181
+ insights = []
182
+ attribution = {}
183
+ confidence = 0.0
184
+ visualizations = []
185
+ metrics = {}
186
+
187
+ if not python_code:
188
+ print("Warning: No analysis code generated.")
189
+ else:
190
+ try:
191
+ # Prepare data sources for the analysis
192
+ analysis_data_sources = {src_id: src.content for src_id, src in data_sources.items()}
193
+
194
+ # Create a local namespace with access to pandas, numpy, etc.
195
+ local_namespace = {
196
+ "pd": pd,
197
+ "np": np,
198
+ "plt": plt,
199
+ "sns": sns,
200
+ "data_sources": analysis_data_sources
201
+ }
202
+
203
+ # Capture print outputs
204
+ original_stdout = sys.stdout
205
+ sys.stdout = mystdout = StringIO()
206
+
207
+ # Execute the code
208
+ exec(python_code, local_namespace)
209
+
210
+ # Restore stdout
211
+ sys.stdout = original_stdout
212
+ print_output = mystdout.getvalue()
213
+
214
+ # Look for a run_analysis function and execute it
215
+ if "run_analysis" in local_namespace:
216
+ analysis_output = local_namespace["run_analysis"](analysis_data_sources)
217
+ if isinstance(analysis_output, dict):
218
+ insights = analysis_output.get("insights", [])
219
+ attribution = analysis_output.get("attribution", {})
220
+ confidence = analysis_output.get("confidence", 0.0)
221
+ metrics = analysis_output.get("metrics", {})
222
+ visualizations = analysis_output.get("visualizations", [])
223
+
224
+ # Store any figures in the local namespace as base64 encoded images
225
+ for var_name, var_value in local_namespace.items():
226
+ if isinstance(var_value, plt.Figure):
227
+ fig_filename = f"figure_{request.request_id}_{var_name}.png"
228
+ var_value.savefig(fig_filename)
229
+ self.analysis_artifacts[fig_filename] = fig_filename
230
+ visualizations.append(fig_filename)
231
+
232
+ except Exception as e:
233
+ print(f"Analysis execution error: {e}")
234
+
235
+ # Create analysis result
236
+ result = AnalysisResult(
237
+ result_id=f"analysis_{request.request_id}",
238
+ name=f"Analysis of {request.description}",
239
+ description=request.description,
240
+ analysis_type=request.analysis_type,
241
+ code=python_code,
242
+ visualizations=visualizations,
243
+ insights=insights,
244
+ metrics=metrics,
245
+ attribution=attribution,
246
+ confidence=confidence
247
+ )
248
+
249
+ return result
250
+
251
+ # For testing
252
+ if __name__ == "__main__":
253
+ import sys
254
+
255
+ # Set API key for testing
256
+ os.environ["ANTHROPIC_API_KEY"] = "your_api_key_here"
257
+
258
+ # Create mock data for testing
259
+ test_df = pd.DataFrame({
260
+ 'date': pd.date_range(start='2023-01-01', periods=12, freq='M'),
261
+ 'region': ['Northeast'] * 12,
262
+ 'sales': [100, 110, 105, 115, 120, 115, 110, 105, 95, 85, 80, 70],
263
+ 'target': [100, 105, 110, 115, 120, 125, 130, 135, 140, 145, 150, 155]
264
+ })
265
+
266
+ # Create mock data source
267
+ from dataclasses import dataclass
268
+
269
+ @dataclass
270
+ class MockDataSource:
271
+ content: pd.DataFrame
272
+ name: str
273
+
274
+ data_sources = {
275
+ "sales_data": MockDataSource(content=test_df, name="Monthly sales data")
276
+ }
277
+
278
+ # Create mock analysis request
279
+ class MockAnalysisRequest:
280
+ def __init__(self):
281
+ self.request_id = "test"
282
+ self.description = "Sales trend analysis for the Northeast region"
283
+ self.data_sources = ["sales_data"]
284
+ self.analysis_type = "time_series"
285
+ self.parameters = {"detect_anomalies": True}
286
+ self.purpose = "Identify factors causing the sales decline"
287
+
288
+ agent = AnalyticsAgent()
289
+ result = agent.perform_analysis(MockAnalysisRequest(), data_sources)
290
+ print(f"Generated code:\n{result.code}")
291
+ print(f"Insights: {json.dumps(result.insights, indent=2)}")
292
+ print(f"Attribution: {json.dumps(result.attribution, indent=2)}")