File size: 17,230 Bytes
30ba4e2
 
 
 
2b2cad6
30ba4e2
 
 
 
 
 
 
 
 
2b2cad6
 
 
 
 
30ba4e2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a56df83
30ba4e2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2b2cad6
 
30ba4e2
 
 
 
 
 
 
 
 
 
 
 
 
 
2b2cad6
30ba4e2
 
 
 
 
 
 
 
 
 
2b2cad6
 
 
30ba4e2
2b2cad6
 
 
 
 
 
 
 
30ba4e2
2b2cad6
 
30ba4e2
2b2cad6
 
30ba4e2
2b2cad6
 
 
 
 
 
 
 
 
 
 
30ba4e2
2b2cad6
 
 
 
 
 
 
30ba4e2
2b2cad6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
30ba4e2
2b2cad6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
30ba4e2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
import os
import json
import pandas as pd
import numpy as np
import sys
from typing import Dict, List, Any, Tuple, Optional
from pydantic import BaseModel, Field
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
import re
import matplotlib.pyplot as plt
import seaborn as sns
from io import StringIO
import logging

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("analytics_agent")

class AnalysisRequest(BaseModel):
    """Structure for an analysis request"""
    request_id: str
    description: str
    data_sources: List[str]
    analysis_type: str
    parameters: Dict[str, Any] = None
    purpose: str

class AnalysisResult(BaseModel):
    """Structure for analysis results"""
    result_id: str
    name: str
    description: str
    analysis_type: str
    code: str
    visualizations: List[str] = None
    insights: List[Dict[str, Any]] = None
    metrics: Dict[str, float] = None
    model_details: Dict[str, Any] = None
    attribution: Dict[str, float] = None
    confidence: float = None

class AnalyticsAgent:
    """Agent responsible for data analysis and modeling"""
    
    def __init__(self):
        """Initialize the analytics agent"""
        # Set up Claude API client
        api_key = os.getenv("ANTHROPIC_API_KEY")
        if not api_key:
            raise ValueError("ANTHROPIC_API_KEY not found in environment variables")
        
        self.llm = ChatAnthropic(
            model="claude-3-7-sonnet-20250219",
            anthropic_api_key=api_key,
            temperature=0.1
        )
        
        # Create analysis code generation prompt
        self.analysis_prompt = ChatPromptTemplate.from_messages([
            ("system", """You are an expert data scientist specializing in pharmaceutical sales analysis.
Your task is to generate Python code to analyze data based on specific requirements.

For each analysis request:
1. Generate clear, efficient pandas and numpy code
2. Include appropriate data visualization with matplotlib/seaborn
3. Apply statistical methods relevant to the analysis type
4. Add detailed comments explaining your approach
5. Extract and highlight key insights from the analysis

The analysis should be thorough and focused on addressing the specific business question.
Make sure to handle potential data issues and explain your assumptions.

Format your response with a code block:
```python
# Analysis code
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

def run_analysis(data_sources):
    # Your analysis code here
    
    # Return results as a dictionary
    return {
        "insights": [
            {"finding": "Key finding 1", "details": "Explanation", "impact": "Business impact"},
            # More insights...
        ],
        "metrics": {
            "metric1": value1,
            "metric2": value2,
            # More metrics...
        },
        "visualizations": ["fig1", "fig2"],  # References to generated figures
        "attribution": {
            "factor1": 0.65,  # 65% attribution to factor1
            "factor2": 0.25,  # 25% attribution to factor2
            "factor3": 0.10   # 10% attribution to factor3
        },
        "confidence": 0.95  # 95% confidence in the analysis
    }
```

After the code block, explain your analytical approach and any assumptions.
"""),
            ("human", """
Analysis Request: {description}

Available data sources:
{data_sources}

Analysis type: {analysis_type}

Parameters: {parameters}

Purpose: {purpose}

Please generate Python code to perform this analysis.
""")
        ])
        
        # Set up the analysis chain
        self.analysis_chain = (
            self.analysis_prompt
            | self.llm
            | StrOutputParser()
        )
        
        # In-memory storage for analysis artifacts
        self.analysis_artifacts = {}
        
        logger.info("Analytics Agent initialized successfully")
    
    def extract_python_from_response(self, response: str) -> str:
        """Extract Python code from LLM response"""
        # Extract Python between ```python and ``` markers
        python_match = re.search(r'```python\s*(.*?)\s*```', response, re.DOTALL)
        if python_match:
            return python_match.group(1).strip()
        
        # If not found with python tag, try generic code block
        python_match = re.search(r'```\s*(.*?)\s*```', response, re.DOTALL)
        if python_match:
            return python_match.group(1).strip()
        
        # If all else fails, return empty string
        logger.warning("No code block found in response")
        return ""
    
    def extract_insights_from_code_output(self, output: Dict[str, Any]) -> Tuple[List[Dict[str, Any]], Dict[str, float], float]:
        """Extract insights, attribution, and confidence from code output"""
        insights = output.get("insights", [])
        attribution = output.get("attribution", {})
        confidence = output.get("confidence", 0.0)
        
        return insights, attribution, confidence
    
    def generate_default_analysis(self, request: AnalysisRequest, data_sources: Dict[str, Any]) -> Dict[str, Any]:
        """Generate a default analysis output when code execution fails"""
        logger.info(f"Generating default analysis for {request.description}")
        
        # Create default insights based on request description
        insights = [
            {
                "finding": f"Analysis of {request.description}",
                "details": "Default analysis created due to execution issues",
                "impact": "Recommend manual investigation"
            }
        ]
        
        # Create default attribution
        attribution = {"unknown_factors": 1.0}
        
        # Default metrics
        metrics = {"analysis_completion": 0.0}
        
        return {
            "insights": insights,
            "attribution": attribution,
            "metrics": metrics,
            "visualizations": [],
            "confidence": 0.5
        }
    
    def perform_analysis(self, request: AnalysisRequest, data_sources: Dict[str, Any]) -> AnalysisResult:
        """Perform analysis based on request and return results"""
        logger.info(f"Analytics Agent: Performing {request.analysis_type} analysis - {request.description}")
        
        try:
            # Format data sources description for the prompt
            data_sources_desc = ""
            for source_id, source in data_sources.items():
                if not hasattr(source, 'content') or source.content is None:
                    logger.warning(f"Data source {source_id} has no content attribute or content is None")
                    continue
                
                df = source.content
                data_sources_desc += f"Data source '{source_id}' ({source.name}):\n"
                data_sources_desc += f"- Shape: {df.shape[0]} rows, {df.shape[1]} columns\n"
                data_sources_desc += f"- Columns: {', '.join(df.columns)}\n"
                data_sources_desc += f"- Sample data:\n{df.head(3).to_string()}\n\n"
            
            # Format the request for the prompt
            request_data = {
                "description": request.description,
                "data_sources": data_sources_desc,
                "analysis_type": request.analysis_type,
                "parameters": json.dumps(request.parameters, indent=2) if request.parameters else "None",
                "purpose": request.purpose
            }
            
            # Generate analysis code
            logger.info("Generating analysis code")
            response = self.analysis_chain.invoke(request_data)
            
            # Extract Python code
            python_code = self.extract_python_from_response(response)
            
            # Initialize default values
            insights = []
            attribution = {}
            confidence = 0.0
            visualizations = []
            metrics = {}
            
            if not python_code:
                logger.warning("No analysis code generated. Using default analysis.")
                default_analysis = self.generate_default_analysis(request, data_sources)
                insights = default_analysis["insights"]
                attribution = default_analysis["attribution"]
                confidence = default_analysis["confidence"]
                metrics = default_analysis["metrics"]
            else:
                try:
                    # Prepare data sources for the analysis
                    analysis_data_sources = {}
                    for src_id, src in data_sources.items():
                        if hasattr(src, 'content') and src.content is not None:
                            analysis_data_sources[src_id] = src.content
                    
                    if not analysis_data_sources:
                        logger.warning("No valid data sources found for analysis")
                        default_analysis = self.generate_default_analysis(request, data_sources)
                        insights = default_analysis["insights"]
                        attribution = default_analysis["attribution"]
                        confidence = default_analysis["confidence"]
                        metrics = default_analysis["metrics"]
                    else:
                        # Create a local namespace with access to pandas, numpy, etc.
                        local_namespace = {
                            "pd": pd,
                            "np": np,
                            "plt": plt,
                            "sns": sns,
                            "data_sources": analysis_data_sources
                        }
                        
                        # Capture print outputs
                        stdout_backup = sys.stdout
                        sys.stdout = mystdout = StringIO()
                        
                        # Execute the code
                        logger.info("Executing analysis code")
                        exec(python_code, local_namespace)
                        
                        # Restore stdout
                        sys.stdout = stdout_backup
                        print_output = mystdout.getvalue()
                        logger.debug(f"Code execution output: {print_output}")
                        
                        # Look for a run_analysis function and execute it
                        if "run_analysis" in local_namespace:
                            logger.info("Running analysis function")
                            analysis_output = local_namespace["run_analysis"](analysis_data_sources)
                            
                            if isinstance(analysis_output, dict):
                                insights = analysis_output.get("insights", [])
                                attribution = analysis_output.get("attribution", {})
                                confidence = analysis_output.get("confidence", 0.0)
                                metrics = analysis_output.get("metrics", {})
                                visualizations = analysis_output.get("visualizations", [])
                                
                                # Store any figures in the local namespace as base64 encoded images
                                for var_name, var_value in local_namespace.items():
                                    if isinstance(var_value, plt.Figure):
                                        fig_filename = f"figure_{request.request_id}_{var_name}.png"
                                        var_value.savefig(fig_filename)
                                        self.analysis_artifacts[fig_filename] = fig_filename
                                        visualizations.append(fig_filename)
                            else:
                                logger.warning(f"run_analysis returned non-dict type: {type(analysis_output)}")
                                default_analysis = self.generate_default_analysis(request, data_sources)
                                insights = default_analysis["insights"]
                                attribution = default_analysis["attribution"]
                                confidence = default_analysis["confidence"]
                                metrics = default_analysis["metrics"]
                        else:
                            logger.warning("No run_analysis function found in generated code")
                            # Generate a minimal default analysis
                            default_analysis = self.generate_default_analysis(request, data_sources)
                            insights = default_analysis["insights"]
                            attribution = default_analysis["attribution"]
                            confidence = default_analysis["confidence"]
                            metrics = default_analysis["metrics"]
                            
                except Exception as e:
                    logger.error(f"Analysis execution error: {e}", exc_info=True)
                    logger.error(f"Python code that failed: {python_code}")
                    
                    # Generate a minimal default analysis on execution failure
                    default_analysis = self.generate_default_analysis(request, data_sources)
                    insights = default_analysis["insights"]
                    attribution = default_analysis["attribution"]
                    confidence = default_analysis["confidence"]
                    metrics = default_analysis["metrics"]
            
            # Ensure we have at least one insight
            if not insights:
                insights = [{"finding": "No specific insights found", "details": "Analysis completed but no significant patterns were identified", "impact": "No immediate action required"}]
            
            # Ensure we have attribution
            if not attribution:
                attribution = {"unattributed_factors": 1.0}
            
            # Create analysis result
            result = AnalysisResult(
                result_id=f"analysis_{request.request_id}",
                name=f"Analysis of {request.description}",
                description=request.description,
                analysis_type=request.analysis_type,
                code=python_code,
                visualizations=visualizations,
                insights=insights,
                metrics=metrics,
                attribution=attribution,
                confidence=confidence
            )
            
            logger.info(f"Analysis for {request.description} completed successfully")
            return result
            
        except Exception as e:
            logger.error(f"Error in perform_analysis: {e}", exc_info=True)
            
            # Create a fallback analysis result on error
            default_analysis = self.generate_default_analysis(request, data_sources)
            
            return AnalysisResult(
                result_id=f"analysis_{request.request_id}",
                name=f"Analysis of {request.description} (Error)",
                description=request.description,
                analysis_type=request.analysis_type,
                code="# Error during analysis",
                insights=default_analysis["insights"],
                metrics=default_analysis["metrics"],
                attribution=default_analysis["attribution"],
                confidence=default_analysis["confidence"]
            )

# For testing
if __name__ == "__main__":
    import sys
    
    # Set API key for testing
    os.environ["ANTHROPIC_API_KEY"] = "your_api_key_here"
    
    # Create mock data for testing
    test_df = pd.DataFrame({
        'date': pd.date_range(start='2023-01-01', periods=12, freq='M'),
        'region': ['Northeast'] * 12,
        'sales': [100, 110, 105, 115, 120, 115, 110, 105, 95, 85, 80, 70],
        'target': [100, 105, 110, 115, 120, 125, 130, 135, 140, 145, 150, 155]
    })
    
    # Create mock data source
    from dataclasses import dataclass
    
    @dataclass
    class MockDataSource:
        content: pd.DataFrame
        name: str
    
    data_sources = {
        "sales_data": MockDataSource(content=test_df, name="Monthly sales data")
    }
    
    # Create mock analysis request
    class MockAnalysisRequest:
        def __init__(self):
            self.request_id = "test"
            self.description = "Sales trend analysis for the Northeast region"
            self.data_sources = ["sales_data"]
            self.analysis_type = "time_series"
            self.parameters = {"detect_anomalies": True}
            self.purpose = "Identify factors causing the sales decline"
    
    agent = AnalyticsAgent()
    result = agent.perform_analysis(MockAnalysisRequest(), data_sources)
    print(f"Generated code:\n{result.code}")
    print(f"Insights: {json.dumps(result.insights, indent=2)}")
    print(f"Attribution: {json.dumps(result.attribution, indent=2)}")