Spaces:
Runtime error
Runtime error
| """ | |
| Simplified Planning Agent for Pharmaceutical Analytics | |
| This version uses direct API calls instead of LangChain components | |
| """ | |
| import os | |
| import json | |
| import re | |
| from typing import Dict, List, Any, Tuple | |
| from pydantic import BaseModel, Field | |
| # Define analysis plan schema | |
| class AnalysisPlan(BaseModel): | |
| """Planning agent output with analysis plan details""" | |
| problem_statement: str = Field(description="Refined problem statement based on the alert") | |
| required_data_sources: List[Dict[str, str]] = Field( | |
| description="List of data sources needed with table name and purpose") | |
| analysis_approaches: List[Dict[str, str]] = Field( | |
| description="List of analytical approaches to be used with type and purpose") | |
| tasks: List[Dict[str, Any]] = Field( | |
| description="Ordered list of tasks to execute with dependencies") | |
| expected_insights: List[str] = Field( | |
| description="List of expected insights that would answer the problem") | |
| class PlanningAgent: | |
| """Agent responsible for planning the analysis workflow""" | |
| def __init__(self): | |
| """Initialize the planning agent""" | |
| api_key = os.getenv("ANTHROPIC_API_KEY") | |
| if not api_key: | |
| raise ValueError("ANTHROPIC_API_KEY not found in environment variables") | |
| self.api_key = api_key | |
| print("Planning Agent initialized successfully") | |
| def create_analysis_plan(self, alert_description: str) -> Tuple[AnalysisPlan, Dict]: | |
| """Generate an analysis plan based on the alert description""" | |
| print("Planning Agent: Creating analysis plan...") | |
| # Create the system prompt and user message | |
| system_prompt = """You are an expert pharmaceutical analytics planning agent. | |
| Your task is to create a detailed analysis plan to investigate sales anomalies. | |
| For pharmaceutical sales analysis: | |
| - Consider product performance, competitor activities, prescriber behavior | |
| - Include geographic, temporal, and demographic dimensions in your analysis | |
| - Consider both internal factors (supply, marketing) and external factors (market events, seasonality) | |
| Your output should be a complete JSON-formatted analysis plan following this structure: | |
| { | |
| "problem_statement": "Clear definition of the problem to solve", | |
| "required_data_sources": [ | |
| {"table": "sales", "purpose": "Core sales metrics analysis"}, | |
| {"table": "regions", "purpose": "Geographic segmentation"} | |
| ], | |
| "analysis_approaches": [ | |
| {"type": "time_series_decomposition", "purpose": "Separate trend from seasonality"}, | |
| {"type": "comparative_analysis", "purpose": "Compare performance across regions"} | |
| ], | |
| "tasks": [ | |
| { | |
| "id": 1, | |
| "name": "Data acquisition", | |
| "description": "Pull relevant data from sources", | |
| "agent": "data_agent", | |
| "dependencies": [], | |
| "expected_output": "Cleaned datasets for analysis" | |
| }, | |
| { | |
| "id": 2, | |
| "name": "Analysis execution", | |
| "description": "Perform statistical analysis", | |
| "agent": "analytics_agent", | |
| "dependencies": [1], | |
| "expected_output": "Analysis results" | |
| } | |
| ], | |
| "expected_insights": [ | |
| "Primary factors contributing to sales decline", | |
| "Regional variations in performance" | |
| ] | |
| } | |
| Be thorough but focus on creating a practical analysis workflow. | |
| """ | |
| user_message = f"Create an analysis plan for the following alert: {alert_description}" | |
| # Make direct API call to Claude | |
| try: | |
| import anthropic | |
| client = anthropic.Anthropic(api_key=self.api_key) | |
| # FIXED: Pass system as a parameter, not as a message | |
| response = client.messages.create( | |
| model="claude-3-haiku-20240307", | |
| max_tokens=2000, | |
| temperature=0.2, | |
| system=system_prompt, # Pass as system parameter | |
| messages=[ | |
| {"role": "user", "content": user_message} | |
| ] | |
| ) | |
| # Extract response content | |
| response_text = response.content[0].text | |
| # Extract JSON from the response | |
| plan_dict = self.extract_json_from_text(response_text) | |
| # Convert to Pydantic model for validation | |
| analysis_plan = AnalysisPlan.model_validate(plan_dict) | |
| return analysis_plan, plan_dict | |
| except Exception as e: | |
| print(f"Error creating analysis plan: {e}") | |
| raise | |
| def extract_json_from_text(self, text: str) -> Dict: | |
| """Extract JSON from text that might contain additional content""" | |
| try: | |
| # First try to parse the entire text as JSON | |
| return json.loads(text) | |
| except json.JSONDecodeError: | |
| # Try to find JSON block with regex | |
| json_pattern = r'```json\s*([\s\S]*?)\s*```' | |
| match = re.search(json_pattern, text) | |
| if match: | |
| try: | |
| return json.loads(match.group(1)) | |
| except json.JSONDecodeError: | |
| pass | |
| # Try to find anything that looks like JSON | |
| json_pattern = r'({[\s\S]*})' | |
| match = re.search(json_pattern, text) | |
| if match: | |
| try: | |
| return json.loads(match.group(1)) | |
| except json.JSONDecodeError: | |
| pass | |
| # If all extraction attempts fail | |
| raise ValueError(f"Could not extract JSON from response: {text}") | |
| # For testing | |
| if __name__ == "__main__": | |
| # Get API key from environment | |
| agent = PlanningAgent() | |
| alert = "Sales of DrugX down 15% in Northeast region over past 30 days compared to forecast." | |
| plan, plan_dict = agent.create_analysis_plan(alert) | |
| print(json.dumps(plan_dict, indent=2)) |