Spaces:
Runtime error
Runtime error
| import os | |
| import json | |
| import logging | |
| import traceback | |
| from typing import Dict, List, Any, Tuple, Optional, TypedDict, Annotated, Literal, cast | |
| from datetime import datetime | |
| import uuid | |
| from pydantic import BaseModel, Field | |
| from langgraph.graph import StateGraph, END | |
| from langchain_anthropic import ChatAnthropic | |
| from langchain_core.messages import AIMessage, HumanMessage, SystemMessage | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.DEBUG, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger("sales_analysis_workflow") | |
| # Import the agents | |
| from agents.planning_agent import PlanningAgent, AnalysisPlan | |
| from agents.data_agent import DataAgent, DataRequest, DataSource | |
| from agents.analytics_agent import AnalyticsAgent, AnalysisRequest, AnalysisResult | |
| from agents.qa_agent import QAAgent, ValidationRequest, ValidationResult | |
| from agents.insights_agent import InsightsAgent, InsightRequest, InsightCard | |
| # Define the workflow state | |
| class WorkflowState(TypedDict): | |
| alert: str | |
| status: Literal["planning", "data_collection", "analysis", "validation", "insights", "complete", "error"] | |
| plan: Optional[AnalysisPlan] | |
| data_requests: List[DataRequest] | |
| data_sources: Dict[str, DataSource] | |
| analysis_requests: List[AnalysisRequest] | |
| analysis_results: Dict[str, AnalysisResult] | |
| validation_requests: List[ValidationRequest] | |
| validation_results: Dict[str, ValidationResult] | |
| insight_requests: List[InsightRequest] | |
| insight_cards: Dict[str, InsightCard] | |
| error: Optional[str] | |
| logs: List[Dict[str, Any]] | |
| visualizations: List[str] | |
| # Function to initialize workflow state | |
| def create_initial_state(alert: str) -> WorkflowState: | |
| """Create initial workflow state from alert""" | |
| logger.info(f"Creating initial state for alert: {alert}") | |
| return WorkflowState( | |
| alert=alert, | |
| status="planning", | |
| plan=None, | |
| data_requests=[], | |
| data_sources={}, | |
| analysis_requests=[], | |
| analysis_results={}, | |
| validation_requests=[], | |
| validation_results={}, | |
| insight_requests=[], | |
| insight_cards={}, | |
| error=None, | |
| logs=[{ | |
| "timestamp": datetime.now().isoformat(), | |
| "step": "init", | |
| "message": f"Workflow initialized with alert: {alert}" | |
| }], | |
| visualizations=[] | |
| ) | |
| # Define the agent nodes | |
| class SalesAnalysisWorkflow: | |
| """Orchestrates the sales analysis workflow with multiple agents""" | |
| def __init__(self, db_path: str = "data/pharma_db.sqlite"): | |
| """Initialize the workflow with agents""" | |
| logger.info("Initializing SalesAnalysisWorkflow") | |
| try: | |
| # Initialize agents | |
| self.planning_agent = PlanningAgent() | |
| self.data_agent = DataAgent(db_path=db_path) | |
| self.analytics_agent = AnalyticsAgent() | |
| self.qa_agent = QAAgent() | |
| self.insights_agent = InsightsAgent() | |
| # Create the workflow graph | |
| logger.info("Building workflow graph") | |
| self.workflow = self._build_workflow() | |
| logger.info("SalesAnalysisWorkflow initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Workflow initialization failed: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise | |
| def _build_workflow(self) -> StateGraph: | |
| """Build the workflow graph""" | |
| logger.info("Building workflow graph") | |
| try: | |
| # Create the state graph | |
| workflow = StateGraph(WorkflowState) | |
| # Add nodes for each agent | |
| workflow.add_node("planning", self.planning_node) | |
| workflow.add_node("data_collection", self.data_collection_node) | |
| workflow.add_node("analysis", self.analysis_node) | |
| workflow.add_node("validation", self.validation_node) | |
| workflow.add_node("insights", self.insights_node) | |
| # Define the normal workflow edges (the happy path) | |
| workflow.add_edge("planning", "data_collection") | |
| workflow.add_edge("data_collection", "analysis") | |
| workflow.add_edge("analysis", "validation") | |
| workflow.add_edge("validation", "insights") | |
| workflow.add_edge("insights", END) | |
| # Add error handling | |
| workflow.add_node("handle_error", self.handle_error_node) | |
| # Define a simple linear workflow without error handling for now | |
| # This should work with any version of LangGraph | |
| workflow.add_edge("planning", "data_collection") | |
| workflow.add_edge("data_collection", "analysis") | |
| workflow.add_edge("analysis", "validation") | |
| workflow.add_edge("validation", "insights") | |
| workflow.add_edge("insights", END) | |
| # We'll handle errors within each node function instead of using graph branching | |
| workflow.add_edge("handle_error", END) | |
| # Set the entry point | |
| workflow.set_entry_point("planning") | |
| logger.info("Workflow graph built successfully") | |
| return workflow.compile() | |
| except Exception as e: | |
| logger.error(f"Workflow graph construction failed: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise | |
| def run_workflow(self, alert: str) -> WorkflowState: | |
| """ | |
| Run the workflow with a given alert, with comprehensive logging | |
| Args: | |
| alert (str): Description of the sales alert | |
| Returns: | |
| WorkflowState: Final state of the workflow | |
| """ | |
| logger.info(f"Starting workflow run with alert: {alert}") | |
| try: | |
| # Create initial state | |
| initial_state = create_initial_state(alert) | |
| # Run the workflow | |
| logger.info("Invoking workflow") | |
| final_state = self.workflow.invoke(initial_state) | |
| logger.info(f"Workflow completed. Final status: {final_state['status']}") | |
| return final_state | |
| except Exception as e: | |
| logger.error(f"Workflow execution failed: {e}") | |
| logger.error(traceback.format_exc()) | |
| # Create an error state | |
| error_state = create_initial_state(alert) | |
| error_state["status"] = "error" | |
| error_state["error"] = str(e) | |
| error_state["logs"].append({ | |
| "timestamp": datetime.now().isoformat(), | |
| "step": "error", | |
| "message": f"Workflow execution failed: {e}" | |
| }) | |
| return error_state | |
| # Agent node implementations need to be added here | |
| def planning_node(self, state: WorkflowState) -> WorkflowState: | |
| """Planning agent node""" | |
| try: | |
| # Log the step | |
| state["logs"].append({ | |
| "timestamp": datetime.now().isoformat(), | |
| "step": "planning", | |
| "message": "Planning analysis approach" | |
| }) | |
| # Update status | |
| state["status"] = "planning" | |
| # Create analysis plan | |
| analysis_plan, plan_dict = self.planning_agent.create_analysis_plan(state["alert"]) | |
| # Store the plan in the state | |
| state["plan"] = analysis_plan | |
| # Create data requests from plan | |
| data_requests = [] | |
| for i, data_source in enumerate(analysis_plan.required_data_sources): | |
| data_requests.append(DataRequest( | |
| request_id=f"data_{i}", | |
| description=data_source["purpose"], | |
| tables=[data_source["table"]], | |
| purpose=data_source["purpose"] | |
| )) | |
| # Store data requests in the state | |
| state["data_requests"] = data_requests | |
| # Update status for next step | |
| state["status"] = "data_collection" | |
| return state | |
| except Exception as e: | |
| # Handle error | |
| state["status"] = "error" | |
| state["error"] = f"Planning error: {str(e)}" | |
| return state | |
| def data_collection_node(self, state: WorkflowState) -> WorkflowState: | |
| """Data collection agent node""" | |
| try: | |
| # Log the step | |
| state["logs"].append({ | |
| "timestamp": datetime.now().isoformat(), | |
| "step": "data_collection", | |
| "message": f"Collecting data from {len(state['data_requests'])} sources" | |
| }) | |
| # Update status | |
| state["status"] = "data_collection" | |
| # Process data requests | |
| data_sources = self.data_agent.get_data_for_analysis(state["data_requests"]) | |
| # Store data sources in the state | |
| state["data_sources"] = data_sources | |
| # Create analysis requests based on plan | |
| analysis_requests = [] | |
| if state["plan"]: | |
| analysis_approaches = state["plan"].analysis_approaches | |
| for i, approach in enumerate(analysis_approaches): | |
| analysis_requests.append(AnalysisRequest( | |
| request_id=f"analysis_{i}", | |
| description=approach["purpose"], | |
| data_sources=list(data_sources.keys()), | |
| analysis_type=approach["type"], | |
| purpose=approach["purpose"] | |
| )) | |
| # Store analysis requests in the state | |
| state["analysis_requests"] = analysis_requests | |
| # Update status for next step | |
| state["status"] = "analysis" | |
| return state | |
| except Exception as e: | |
| # Handle error | |
| state["status"] = "error" | |
| state["error"] = f"Data collection error: {str(e)}" | |
| return state | |
| def analysis_node(self, state: WorkflowState) -> WorkflowState: | |
| """Analytics agent node""" | |
| try: | |
| # Log the step | |
| state["logs"].append({ | |
| "timestamp": datetime.now().isoformat(), | |
| "step": "analysis", | |
| "message": f"Performing {len(state['analysis_requests'])} analysis tasks" | |
| }) | |
| # Update status | |
| state["status"] = "analysis" | |
| # Process analysis requests | |
| analysis_results = {} | |
| for request in state["analysis_requests"]: | |
| result = self.analytics_agent.perform_analysis(request, state["data_sources"]) | |
| analysis_results[request.request_id] = result | |
| # Store analysis results in the state | |
| state["analysis_results"] = analysis_results | |
| # Create validation requests | |
| validation_requests = [] | |
| for analysis_id, result in analysis_results.items(): | |
| validation_requests.append(ValidationRequest( | |
| request_id=analysis_id, | |
| original_problem=state["alert"], | |
| analysis_results=result.model_dump(), | |
| data_sources=list(state["data_sources"].keys()) | |
| )) | |
| # Store validation requests in the state | |
| state["validation_requests"] = validation_requests | |
| # Update status for next step | |
| state["status"] = "validation" | |
| return state | |
| except Exception as e: | |
| # Handle error | |
| state["status"] = "error" | |
| state["error"] = f"Analysis error: {str(e)}" | |
| return state | |
| def validation_node(self, state: WorkflowState) -> WorkflowState: | |
| """QA agent node""" | |
| try: | |
| # Log the step | |
| state["logs"].append({ | |
| "timestamp": datetime.now().isoformat(), | |
| "step": "validation", | |
| "message": f"Validating {len(state['validation_requests'])} analysis results" | |
| }) | |
| # Update status | |
| state["status"] = "validation" | |
| # Process validation requests | |
| validation_results = {} | |
| for request in state["validation_requests"]: | |
| result = self.qa_agent.validate_analysis(request, state["data_sources"]) | |
| validation_results[request.request_id] = result | |
| # Store validation results in the state | |
| state["validation_results"] = validation_results | |
| # Create insight requests | |
| insight_requests = [] | |
| for analysis_id, validation in validation_results.items(): | |
| analysis_result = state["analysis_results"][analysis_id] | |
| insight_requests.append(InsightRequest( | |
| request_id=analysis_id, | |
| original_problem=state["alert"], | |
| analysis_results=analysis_result.model_dump(), | |
| validation_results=validation.model_dump(), | |
| target_audience="executive" | |
| )) | |
| # Store insight requests in the state | |
| state["insight_requests"] = insight_requests | |
| # Update status for next step | |
| state["status"] = "insights" | |
| return state | |
| except Exception as e: | |
| # Handle error | |
| state["status"] = "error" | |
| state["error"] = f"Validation error: {str(e)}" | |
| return state | |
| def insights_node(self, state: WorkflowState) -> WorkflowState: | |
| """Insights agent node""" | |
| try: | |
| # Log the step | |
| state["logs"].append({ | |
| "timestamp": datetime.now().isoformat(), | |
| "step": "insights", | |
| "message": f"Generating insights from {len(state['insight_requests'])} validated analyses" | |
| }) | |
| # Update status | |
| state["status"] = "insights" | |
| # Process insight requests | |
| insight_cards = {} | |
| visualizations = [] | |
| for request in state["insight_requests"]: | |
| # Generate insight card | |
| insight_card = self.insights_agent.generate_insights(request) | |
| insight_cards[request.request_id] = insight_card | |
| # Generate visualizations for the insight card | |
| viz_files = self.insights_agent.generate_visualizations(insight_card, state["data_sources"]) | |
| visualizations.extend(viz_files) | |
| # Store insight cards and visualizations in the state | |
| state["insight_cards"] = insight_cards | |
| state["visualizations"] = visualizations | |
| # Update status for completion | |
| state["status"] = "complete" | |
| # Log completion | |
| state["logs"].append({ | |
| "timestamp": datetime.now().isoformat(), | |
| "step": "complete", | |
| "message": f"Analysis workflow completed with {len(insight_cards)} insight cards" | |
| }) | |
| return state | |
| except Exception as e: | |
| # Handle error | |
| state["status"] = "error" | |
| state["error"] = f"Insights error: {str(e)}" | |
| return state | |
| def handle_error_node(self, state: WorkflowState) -> WorkflowState: | |
| """Error handling node""" | |
| # Log the error | |
| state["logs"].append({ | |
| "timestamp": datetime.now().isoformat(), | |
| "step": "error", | |
| "message": f"Workflow error: {state['error']}" | |
| }) | |
| return state | |
| # For testing | |
| if __name__ == "__main__": | |
| # Configure logging to show all details | |
| logging.basicConfig(level=logging.DEBUG) | |
| # Set API key for testing | |
| os.environ["ANTHROPIC_API_KEY"] = "your_api_key_here" | |
| try: | |
| # Create workflow | |
| workflow = SalesAnalysisWorkflow(db_path="data/pharma_db.sqlite") | |
| # Run workflow with test alert | |
| alert = "Sales of DrugX down 15% in Northeast region over past 30 days compared to forecast." | |
| result = workflow.run_workflow(alert) | |
| # Print workflow results | |
| print(f"Workflow status: {result['status']}") | |
| if result['status'] == "error": | |
| print(f"Error: {result['error']}") | |
| else: | |
| print(f"Generated {len(result.get('insight_cards', {}))} insight cards") | |
| print(f"Generated {len(result.get('visualizations', []))} visualizations") | |
| except Exception as e: | |
| print(f"Test run failed: {e}") | |
| print(traceback.format_exc()) |