import os import json import logging import traceback from typing import Dict, List, Any, Tuple, Optional, TypedDict, Annotated, Literal, cast from datetime import datetime import uuid from pydantic import BaseModel, Field from langgraph.graph import StateGraph, END from langchain_anthropic import ChatAnthropic from langchain_core.messages import AIMessage, HumanMessage, SystemMessage # Configure logging logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("sales_analysis_workflow") # Import the agents from agents.planning_agent import PlanningAgent, AnalysisPlan from agents.data_agent import DataAgent, DataRequest, DataSource from agents.analytics_agent import AnalyticsAgent, AnalysisRequest, AnalysisResult from agents.qa_agent import QAAgent, ValidationRequest, ValidationResult from agents.insights_agent import InsightsAgent, InsightRequest, InsightCard # Define the workflow state class WorkflowState(TypedDict): alert: str status: Literal["planning", "data_collection", "analysis", "validation", "insights", "complete", "error"] plan: Optional[AnalysisPlan] data_requests: List[DataRequest] data_sources: Dict[str, DataSource] analysis_requests: List[AnalysisRequest] analysis_results: Dict[str, AnalysisResult] validation_requests: List[ValidationRequest] validation_results: Dict[str, ValidationResult] insight_requests: List[InsightRequest] insight_cards: Dict[str, InsightCard] error: Optional[str] logs: List[Dict[str, Any]] visualizations: List[str] # Function to initialize workflow state def create_initial_state(alert: str) -> WorkflowState: """Create initial workflow state from alert""" logger.info(f"Creating initial state for alert: {alert}") return WorkflowState( alert=alert, status="planning", plan=None, data_requests=[], data_sources={}, analysis_requests=[], analysis_results={}, validation_requests=[], validation_results={}, insight_requests=[], insight_cards={}, error=None, logs=[{ "timestamp": datetime.now().isoformat(), "step": "init", "message": f"Workflow initialized with alert: {alert}" }], visualizations=[] ) # Define the agent nodes class SalesAnalysisWorkflow: """Orchestrates the sales analysis workflow with multiple agents""" def __init__(self, db_path: str = "data/pharma_db.sqlite"): """Initialize the workflow with agents""" logger.info("Initializing SalesAnalysisWorkflow") try: # Initialize agents self.planning_agent = PlanningAgent() self.data_agent = DataAgent(db_path=db_path) self.analytics_agent = AnalyticsAgent() self.qa_agent = QAAgent() self.insights_agent = InsightsAgent() # Create the workflow graph logger.info("Building workflow graph") self.workflow = self._build_workflow() logger.info("SalesAnalysisWorkflow initialized successfully") except Exception as e: logger.error(f"Workflow initialization failed: {e}") logger.error(traceback.format_exc()) raise def _build_workflow(self) -> StateGraph: """Build the workflow graph""" logger.info("Building workflow graph") try: # Create the state graph workflow = StateGraph(WorkflowState) # Add nodes for each agent workflow.add_node("planning", self.planning_node) workflow.add_node("data_collection", self.data_collection_node) workflow.add_node("analysis", self.analysis_node) workflow.add_node("validation", self.validation_node) workflow.add_node("insights", self.insights_node) # Define the normal workflow edges (the happy path) workflow.add_edge("planning", "data_collection") workflow.add_edge("data_collection", "analysis") workflow.add_edge("analysis", "validation") workflow.add_edge("validation", "insights") workflow.add_edge("insights", END) # Add error handling workflow.add_node("handle_error", self.handle_error_node) # Define a simple linear workflow without error handling for now # This should work with any version of LangGraph workflow.add_edge("planning", "data_collection") workflow.add_edge("data_collection", "analysis") workflow.add_edge("analysis", "validation") workflow.add_edge("validation", "insights") workflow.add_edge("insights", END) # We'll handle errors within each node function instead of using graph branching workflow.add_edge("handle_error", END) # Set the entry point workflow.set_entry_point("planning") logger.info("Workflow graph built successfully") return workflow.compile() except Exception as e: logger.error(f"Workflow graph construction failed: {e}") logger.error(traceback.format_exc()) raise def run_workflow(self, alert: str) -> WorkflowState: """ Run the workflow with a given alert, with comprehensive logging Args: alert (str): Description of the sales alert Returns: WorkflowState: Final state of the workflow """ logger.info(f"Starting workflow run with alert: {alert}") try: # Create initial state initial_state = create_initial_state(alert) # Run the workflow logger.info("Invoking workflow") final_state = self.workflow.invoke(initial_state) logger.info(f"Workflow completed. Final status: {final_state['status']}") return final_state except Exception as e: logger.error(f"Workflow execution failed: {e}") logger.error(traceback.format_exc()) # Create an error state error_state = create_initial_state(alert) error_state["status"] = "error" error_state["error"] = str(e) error_state["logs"].append({ "timestamp": datetime.now().isoformat(), "step": "error", "message": f"Workflow execution failed: {e}" }) return error_state # Agent node implementations need to be added here def planning_node(self, state: WorkflowState) -> WorkflowState: """Planning agent node""" try: # Log the step state["logs"].append({ "timestamp": datetime.now().isoformat(), "step": "planning", "message": "Planning analysis approach" }) # Update status state["status"] = "planning" # Create analysis plan analysis_plan, plan_dict = self.planning_agent.create_analysis_plan(state["alert"]) # Store the plan in the state state["plan"] = analysis_plan # Create data requests from plan data_requests = [] for i, data_source in enumerate(analysis_plan.required_data_sources): data_requests.append(DataRequest( request_id=f"data_{i}", description=data_source["purpose"], tables=[data_source["table"]], purpose=data_source["purpose"] )) # Store data requests in the state state["data_requests"] = data_requests # Update status for next step state["status"] = "data_collection" return state except Exception as e: # Handle error state["status"] = "error" state["error"] = f"Planning error: {str(e)}" return state def data_collection_node(self, state: WorkflowState) -> WorkflowState: """Data collection agent node""" try: # Log the step state["logs"].append({ "timestamp": datetime.now().isoformat(), "step": "data_collection", "message": f"Collecting data from {len(state['data_requests'])} sources" }) # Update status state["status"] = "data_collection" # Process data requests data_sources = self.data_agent.get_data_for_analysis(state["data_requests"]) # Store data sources in the state state["data_sources"] = data_sources # Create analysis requests based on plan analysis_requests = [] if state["plan"]: analysis_approaches = state["plan"].analysis_approaches for i, approach in enumerate(analysis_approaches): analysis_requests.append(AnalysisRequest( request_id=f"analysis_{i}", description=approach["purpose"], data_sources=list(data_sources.keys()), analysis_type=approach["type"], purpose=approach["purpose"] )) # Store analysis requests in the state state["analysis_requests"] = analysis_requests # Update status for next step state["status"] = "analysis" return state except Exception as e: # Handle error state["status"] = "error" state["error"] = f"Data collection error: {str(e)}" return state def analysis_node(self, state: WorkflowState) -> WorkflowState: """Analytics agent node""" try: # Log the step state["logs"].append({ "timestamp": datetime.now().isoformat(), "step": "analysis", "message": f"Performing {len(state['analysis_requests'])} analysis tasks" }) # Update status state["status"] = "analysis" # Process analysis requests analysis_results = {} for request in state["analysis_requests"]: result = self.analytics_agent.perform_analysis(request, state["data_sources"]) analysis_results[request.request_id] = result # Store analysis results in the state state["analysis_results"] = analysis_results # Create validation requests validation_requests = [] for analysis_id, result in analysis_results.items(): validation_requests.append(ValidationRequest( request_id=analysis_id, original_problem=state["alert"], analysis_results=result.model_dump(), data_sources=list(state["data_sources"].keys()) )) # Store validation requests in the state state["validation_requests"] = validation_requests # Update status for next step state["status"] = "validation" return state except Exception as e: # Handle error state["status"] = "error" state["error"] = f"Analysis error: {str(e)}" return state def validation_node(self, state: WorkflowState) -> WorkflowState: """QA agent node""" try: # Log the step state["logs"].append({ "timestamp": datetime.now().isoformat(), "step": "validation", "message": f"Validating {len(state['validation_requests'])} analysis results" }) # Update status state["status"] = "validation" # Process validation requests validation_results = {} for request in state["validation_requests"]: result = self.qa_agent.validate_analysis(request, state["data_sources"]) validation_results[request.request_id] = result # Store validation results in the state state["validation_results"] = validation_results # Create insight requests insight_requests = [] for analysis_id, validation in validation_results.items(): analysis_result = state["analysis_results"][analysis_id] insight_requests.append(InsightRequest( request_id=analysis_id, original_problem=state["alert"], analysis_results=analysis_result.model_dump(), validation_results=validation.model_dump(), target_audience="executive" )) # Store insight requests in the state state["insight_requests"] = insight_requests # Update status for next step state["status"] = "insights" return state except Exception as e: # Handle error state["status"] = "error" state["error"] = f"Validation error: {str(e)}" return state def insights_node(self, state: WorkflowState) -> WorkflowState: """Insights agent node""" try: # Log the step state["logs"].append({ "timestamp": datetime.now().isoformat(), "step": "insights", "message": f"Generating insights from {len(state['insight_requests'])} validated analyses" }) # Update status state["status"] = "insights" # Process insight requests insight_cards = {} visualizations = [] for request in state["insight_requests"]: # Generate insight card insight_card = self.insights_agent.generate_insights(request) insight_cards[request.request_id] = insight_card # Generate visualizations for the insight card viz_files = self.insights_agent.generate_visualizations(insight_card, state["data_sources"]) visualizations.extend(viz_files) # Store insight cards and visualizations in the state state["insight_cards"] = insight_cards state["visualizations"] = visualizations # Update status for completion state["status"] = "complete" # Log completion state["logs"].append({ "timestamp": datetime.now().isoformat(), "step": "complete", "message": f"Analysis workflow completed with {len(insight_cards)} insight cards" }) return state except Exception as e: # Handle error state["status"] = "error" state["error"] = f"Insights error: {str(e)}" return state def handle_error_node(self, state: WorkflowState) -> WorkflowState: """Error handling node""" # Log the error state["logs"].append({ "timestamp": datetime.now().isoformat(), "step": "error", "message": f"Workflow error: {state['error']}" }) return state # For testing if __name__ == "__main__": # Configure logging to show all details logging.basicConfig(level=logging.DEBUG) # Set API key for testing os.environ["ANTHROPIC_API_KEY"] = "your_api_key_here" try: # Create workflow workflow = SalesAnalysisWorkflow(db_path="data/pharma_db.sqlite") # Run workflow with test alert alert = "Sales of DrugX down 15% in Northeast region over past 30 days compared to forecast." result = workflow.run_workflow(alert) # Print workflow results print(f"Workflow status: {result['status']}") if result['status'] == "error": print(f"Error: {result['error']}") else: print(f"Generated {len(result.get('insight_cards', {}))} insight cards") print(f"Generated {len(result.get('visualizations', []))} visualizations") except Exception as e: print(f"Test run failed: {e}") print(traceback.format_exc())