Spaces:
Runtime error
Runtime error
| import os | |
| import json | |
| import pandas as pd | |
| import numpy as np | |
| import sys | |
| from typing import Dict, List, Any, Tuple, Optional | |
| from pydantic import BaseModel, Field | |
| from langchain_anthropic import ChatAnthropic | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_core.output_parsers import StrOutputParser | |
| import re | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from io import StringIO | |
| import logging | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger("analytics_agent") | |
| class AnalysisRequest(BaseModel): | |
| """Structure for an analysis request""" | |
| request_id: str | |
| description: str | |
| data_sources: List[str] | |
| analysis_type: str | |
| parameters: Dict[str, Any] = None | |
| purpose: str | |
| class AnalysisResult(BaseModel): | |
| """Structure for analysis results""" | |
| result_id: str | |
| name: str | |
| description: str | |
| analysis_type: str | |
| code: str | |
| visualizations: List[str] = None | |
| insights: List[Dict[str, Any]] = None | |
| metrics: Dict[str, float] = None | |
| model_details: Dict[str, Any] = None | |
| attribution: Dict[str, float] = None | |
| confidence: float = None | |
| class AnalyticsAgent: | |
| """Agent responsible for data analysis and modeling""" | |
| def __init__(self): | |
| """Initialize the analytics agent""" | |
| # Set up Claude API client | |
| api_key = os.getenv("ANTHROPIC_API_KEY") | |
| if not api_key: | |
| raise ValueError("ANTHROPIC_API_KEY not found in environment variables") | |
| self.llm = ChatAnthropic( | |
| model="claude-3-7-sonnet-20250219", | |
| anthropic_api_key=api_key, | |
| temperature=0.1 | |
| ) | |
| # Create analysis code generation prompt | |
| self.analysis_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", """You are an expert data scientist specializing in pharmaceutical sales analysis. | |
| Your task is to generate Python code to analyze data based on specific requirements. | |
| For each analysis request: | |
| 1. Generate clear, efficient pandas and numpy code | |
| 2. Include appropriate data visualization with matplotlib/seaborn | |
| 3. Apply statistical methods relevant to the analysis type | |
| 4. Add detailed comments explaining your approach | |
| 5. Extract and highlight key insights from the analysis | |
| The analysis should be thorough and focused on addressing the specific business question. | |
| Make sure to handle potential data issues and explain your assumptions. | |
| Format your response with a code block: | |
| ```python | |
| # Analysis code | |
| import pandas as pd | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| def run_analysis(data_sources): | |
| # Your analysis code here | |
| # Return results as a dictionary | |
| return { | |
| "insights": [ | |
| {"finding": "Key finding 1", "details": "Explanation", "impact": "Business impact"}, | |
| # More insights... | |
| ], | |
| "metrics": { | |
| "metric1": value1, | |
| "metric2": value2, | |
| # More metrics... | |
| }, | |
| "visualizations": ["fig1", "fig2"], # References to generated figures | |
| "attribution": { | |
| "factor1": 0.65, # 65% attribution to factor1 | |
| "factor2": 0.25, # 25% attribution to factor2 | |
| "factor3": 0.10 # 10% attribution to factor3 | |
| }, | |
| "confidence": 0.95 # 95% confidence in the analysis | |
| } | |
| ``` | |
| After the code block, explain your analytical approach and any assumptions. | |
| """), | |
| ("human", """ | |
| Analysis Request: {description} | |
| Available data sources: | |
| {data_sources} | |
| Analysis type: {analysis_type} | |
| Parameters: {parameters} | |
| Purpose: {purpose} | |
| Please generate Python code to perform this analysis. | |
| """) | |
| ]) | |
| # Set up the analysis chain | |
| self.analysis_chain = ( | |
| self.analysis_prompt | |
| | self.llm | |
| | StrOutputParser() | |
| ) | |
| # In-memory storage for analysis artifacts | |
| self.analysis_artifacts = {} | |
| logger.info("Analytics Agent initialized successfully") | |
| def extract_python_from_response(self, response: str) -> str: | |
| """Extract Python code from LLM response""" | |
| # Extract Python between ```python and ``` markers | |
| python_match = re.search(r'```python\s*(.*?)\s*```', response, re.DOTALL) | |
| if python_match: | |
| return python_match.group(1).strip() | |
| # If not found with python tag, try generic code block | |
| python_match = re.search(r'```\s*(.*?)\s*```', response, re.DOTALL) | |
| if python_match: | |
| return python_match.group(1).strip() | |
| # If all else fails, return empty string | |
| logger.warning("No code block found in response") | |
| return "" | |
| def extract_insights_from_code_output(self, output: Dict[str, Any]) -> Tuple[List[Dict[str, Any]], Dict[str, float], float]: | |
| """Extract insights, attribution, and confidence from code output""" | |
| insights = output.get("insights", []) | |
| attribution = output.get("attribution", {}) | |
| confidence = output.get("confidence", 0.0) | |
| return insights, attribution, confidence | |
| def generate_default_analysis(self, request: AnalysisRequest, data_sources: Dict[str, Any]) -> Dict[str, Any]: | |
| """Generate a default analysis output when code execution fails""" | |
| logger.info(f"Generating default analysis for {request.description}") | |
| # Create default insights based on request description | |
| insights = [ | |
| { | |
| "finding": f"Analysis of {request.description}", | |
| "details": "Default analysis created due to execution issues", | |
| "impact": "Recommend manual investigation" | |
| } | |
| ] | |
| # Create default attribution | |
| attribution = {"unknown_factors": 1.0} | |
| # Default metrics | |
| metrics = {"analysis_completion": 0.0} | |
| return { | |
| "insights": insights, | |
| "attribution": attribution, | |
| "metrics": metrics, | |
| "visualizations": [], | |
| "confidence": 0.5 | |
| } | |
| def perform_analysis(self, request: AnalysisRequest, data_sources: Dict[str, Any]) -> AnalysisResult: | |
| """Perform analysis based on request and return results""" | |
| logger.info(f"Analytics Agent: Performing {request.analysis_type} analysis - {request.description}") | |
| try: | |
| # Format data sources description for the prompt | |
| data_sources_desc = "" | |
| for source_id, source in data_sources.items(): | |
| if not hasattr(source, 'content') or source.content is None: | |
| logger.warning(f"Data source {source_id} has no content attribute or content is None") | |
| continue | |
| df = source.content | |
| data_sources_desc += f"Data source '{source_id}' ({source.name}):\n" | |
| data_sources_desc += f"- Shape: {df.shape[0]} rows, {df.shape[1]} columns\n" | |
| data_sources_desc += f"- Columns: {', '.join(df.columns)}\n" | |
| data_sources_desc += f"- Sample data:\n{df.head(3).to_string()}\n\n" | |
| # Format the request for the prompt | |
| request_data = { | |
| "description": request.description, | |
| "data_sources": data_sources_desc, | |
| "analysis_type": request.analysis_type, | |
| "parameters": json.dumps(request.parameters, indent=2) if request.parameters else "None", | |
| "purpose": request.purpose | |
| } | |
| # Generate analysis code | |
| logger.info("Generating analysis code") | |
| response = self.analysis_chain.invoke(request_data) | |
| # Extract Python code | |
| python_code = self.extract_python_from_response(response) | |
| # Initialize default values | |
| insights = [] | |
| attribution = {} | |
| confidence = 0.0 | |
| visualizations = [] | |
| metrics = {} | |
| if not python_code: | |
| logger.warning("No analysis code generated. Using default analysis.") | |
| default_analysis = self.generate_default_analysis(request, data_sources) | |
| insights = default_analysis["insights"] | |
| attribution = default_analysis["attribution"] | |
| confidence = default_analysis["confidence"] | |
| metrics = default_analysis["metrics"] | |
| else: | |
| try: | |
| # Prepare data sources for the analysis | |
| analysis_data_sources = {} | |
| for src_id, src in data_sources.items(): | |
| if hasattr(src, 'content') and src.content is not None: | |
| analysis_data_sources[src_id] = src.content | |
| if not analysis_data_sources: | |
| logger.warning("No valid data sources found for analysis") | |
| default_analysis = self.generate_default_analysis(request, data_sources) | |
| insights = default_analysis["insights"] | |
| attribution = default_analysis["attribution"] | |
| confidence = default_analysis["confidence"] | |
| metrics = default_analysis["metrics"] | |
| else: | |
| # Create a local namespace with access to pandas, numpy, etc. | |
| local_namespace = { | |
| "pd": pd, | |
| "np": np, | |
| "plt": plt, | |
| "sns": sns, | |
| "data_sources": analysis_data_sources | |
| } | |
| # Capture print outputs | |
| stdout_backup = sys.stdout | |
| sys.stdout = mystdout = StringIO() | |
| # Execute the code | |
| logger.info("Executing analysis code") | |
| exec(python_code, local_namespace) | |
| # Restore stdout | |
| sys.stdout = stdout_backup | |
| print_output = mystdout.getvalue() | |
| logger.debug(f"Code execution output: {print_output}") | |
| # Look for a run_analysis function and execute it | |
| if "run_analysis" in local_namespace: | |
| logger.info("Running analysis function") | |
| analysis_output = local_namespace["run_analysis"](analysis_data_sources) | |
| if isinstance(analysis_output, dict): | |
| insights = analysis_output.get("insights", []) | |
| attribution = analysis_output.get("attribution", {}) | |
| confidence = analysis_output.get("confidence", 0.0) | |
| metrics = analysis_output.get("metrics", {}) | |
| visualizations = analysis_output.get("visualizations", []) | |
| # Store any figures in the local namespace as base64 encoded images | |
| for var_name, var_value in local_namespace.items(): | |
| if isinstance(var_value, plt.Figure): | |
| fig_filename = f"figure_{request.request_id}_{var_name}.png" | |
| var_value.savefig(fig_filename) | |
| self.analysis_artifacts[fig_filename] = fig_filename | |
| visualizations.append(fig_filename) | |
| else: | |
| logger.warning(f"run_analysis returned non-dict type: {type(analysis_output)}") | |
| default_analysis = self.generate_default_analysis(request, data_sources) | |
| insights = default_analysis["insights"] | |
| attribution = default_analysis["attribution"] | |
| confidence = default_analysis["confidence"] | |
| metrics = default_analysis["metrics"] | |
| else: | |
| logger.warning("No run_analysis function found in generated code") | |
| # Generate a minimal default analysis | |
| default_analysis = self.generate_default_analysis(request, data_sources) | |
| insights = default_analysis["insights"] | |
| attribution = default_analysis["attribution"] | |
| confidence = default_analysis["confidence"] | |
| metrics = default_analysis["metrics"] | |
| except Exception as e: | |
| logger.error(f"Analysis execution error: {e}", exc_info=True) | |
| logger.error(f"Python code that failed: {python_code}") | |
| # Generate a minimal default analysis on execution failure | |
| default_analysis = self.generate_default_analysis(request, data_sources) | |
| insights = default_analysis["insights"] | |
| attribution = default_analysis["attribution"] | |
| confidence = default_analysis["confidence"] | |
| metrics = default_analysis["metrics"] | |
| # Ensure we have at least one insight | |
| if not insights: | |
| insights = [{"finding": "No specific insights found", "details": "Analysis completed but no significant patterns were identified", "impact": "No immediate action required"}] | |
| # Ensure we have attribution | |
| if not attribution: | |
| attribution = {"unattributed_factors": 1.0} | |
| # Create analysis result | |
| result = AnalysisResult( | |
| result_id=f"analysis_{request.request_id}", | |
| name=f"Analysis of {request.description}", | |
| description=request.description, | |
| analysis_type=request.analysis_type, | |
| code=python_code, | |
| visualizations=visualizations, | |
| insights=insights, | |
| metrics=metrics, | |
| attribution=attribution, | |
| confidence=confidence | |
| ) | |
| logger.info(f"Analysis for {request.description} completed successfully") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error in perform_analysis: {e}", exc_info=True) | |
| # Create a fallback analysis result on error | |
| default_analysis = self.generate_default_analysis(request, data_sources) | |
| return AnalysisResult( | |
| result_id=f"analysis_{request.request_id}", | |
| name=f"Analysis of {request.description} (Error)", | |
| description=request.description, | |
| analysis_type=request.analysis_type, | |
| code="# Error during analysis", | |
| insights=default_analysis["insights"], | |
| metrics=default_analysis["metrics"], | |
| attribution=default_analysis["attribution"], | |
| confidence=default_analysis["confidence"] | |
| ) | |
| # For testing | |
| if __name__ == "__main__": | |
| import sys | |
| # Set API key for testing | |
| os.environ["ANTHROPIC_API_KEY"] = "your_api_key_here" | |
| # Create mock data for testing | |
| test_df = pd.DataFrame({ | |
| 'date': pd.date_range(start='2023-01-01', periods=12, freq='M'), | |
| 'region': ['Northeast'] * 12, | |
| 'sales': [100, 110, 105, 115, 120, 115, 110, 105, 95, 85, 80, 70], | |
| 'target': [100, 105, 110, 115, 120, 125, 130, 135, 140, 145, 150, 155] | |
| }) | |
| # Create mock data source | |
| from dataclasses import dataclass | |
| class MockDataSource: | |
| content: pd.DataFrame | |
| name: str | |
| data_sources = { | |
| "sales_data": MockDataSource(content=test_df, name="Monthly sales data") | |
| } | |
| # Create mock analysis request | |
| class MockAnalysisRequest: | |
| def __init__(self): | |
| self.request_id = "test" | |
| self.description = "Sales trend analysis for the Northeast region" | |
| self.data_sources = ["sales_data"] | |
| self.analysis_type = "time_series" | |
| self.parameters = {"detect_anomalies": True} | |
| self.purpose = "Identify factors causing the sales decline" | |
| agent = AnalyticsAgent() | |
| result = agent.perform_analysis(MockAnalysisRequest(), data_sources) | |
| print(f"Generated code:\n{result.code}") | |
| print(f"Insights: {json.dumps(result.insights, indent=2)}") | |
| print(f"Attribution: {json.dumps(result.attribution, indent=2)}") |