File size: 17,164 Bytes
0dbbd2b
 
18580d4
 
0dbbd2b
 
 
 
2bb7e0c
0dbbd2b
 
 
18580d4
 
 
 
 
 
 
0dbbd2b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18580d4
0dbbd2b
 
 
 
 
 
 
 
 
 
 
 
 
18580d4
 
 
 
 
0dbbd2b
 
 
 
 
 
 
 
 
18580d4
a502523
18580d4
2bb7e0c
 
 
 
 
 
18580d4
 
 
 
 
 
a502523
18580d4
 
 
 
 
 
c85c7c1
 
9a7f8ee
c85c7c1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c87abe1
 
 
 
 
 
 
17d32d3
c87abe1
c85c7c1
 
 
 
 
 
 
9a7f8ee
c85c7c1
 
 
 
0dbbd2b
18580d4
 
 
 
 
 
 
 
 
 
 
 
0dbbd2b
18580d4
 
0dbbd2b
18580d4
 
 
0dbbd2b
18580d4
0dbbd2b
18580d4
 
0dbbd2b
18580d4
 
 
 
 
 
 
 
0dbbd2b
18580d4
 
0dbbd2b
 
18580d4
c85c7c1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18580d4
0dbbd2b
 
18580d4
 
 
0dbbd2b
 
 
18580d4
 
 
 
 
 
 
 
 
 
c85c7c1
18580d4
 
2bb7e0c
 
0dbbd2b
18580d4
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
import os
import json
import logging
import traceback
from typing import Dict, List, Any, Tuple, Optional, TypedDict, Annotated, Literal, cast
from datetime import datetime
import uuid
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph, END
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage

# Configure logging
logging.basicConfig(
    level=logging.DEBUG, 
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("sales_analysis_workflow")

# Import the agents
from agents.planning_agent import PlanningAgent, AnalysisPlan
from agents.data_agent import DataAgent, DataRequest, DataSource
from agents.analytics_agent import AnalyticsAgent, AnalysisRequest, AnalysisResult
from agents.qa_agent import QAAgent, ValidationRequest, ValidationResult
from agents.insights_agent import InsightsAgent, InsightRequest, InsightCard

# Define the workflow state
class WorkflowState(TypedDict):
    alert: str
    status: Literal["planning", "data_collection", "analysis", "validation", "insights", "complete", "error"]
    plan: Optional[AnalysisPlan]
    data_requests: List[DataRequest]
    data_sources: Dict[str, DataSource]
    analysis_requests: List[AnalysisRequest]
    analysis_results: Dict[str, AnalysisResult]
    validation_requests: List[ValidationRequest]
    validation_results: Dict[str, ValidationResult]
    insight_requests: List[InsightRequest]
    insight_cards: Dict[str, InsightCard]
    error: Optional[str]
    logs: List[Dict[str, Any]]
    visualizations: List[str]

# Function to initialize workflow state
def create_initial_state(alert: str) -> WorkflowState:
    """Create initial workflow state from alert"""
    logger.info(f"Creating initial state for alert: {alert}")
    return WorkflowState(
        alert=alert,
        status="planning",
        plan=None,
        data_requests=[],
        data_sources={},
        analysis_requests=[],
        analysis_results={},
        validation_requests=[],
        validation_results={},
        insight_requests=[],
        insight_cards={},
        error=None,
        logs=[{
            "timestamp": datetime.now().isoformat(),
            "step": "init",
            "message": f"Workflow initialized with alert: {alert}"
        }],
        visualizations=[]
    )

# Define the agent nodes
class SalesAnalysisWorkflow:
    """Orchestrates the sales analysis workflow with multiple agents"""
    
    def __init__(self, db_path: str = "data/pharma_db.sqlite"):
        """Initialize the workflow with agents"""
        logger.info("Initializing SalesAnalysisWorkflow")
        
        try:
            # Initialize agents
            self.planning_agent = PlanningAgent()
            self.data_agent = DataAgent(db_path=db_path)
            self.analytics_agent = AnalyticsAgent()
            self.qa_agent = QAAgent()
            self.insights_agent = InsightsAgent()
            
            # Create the workflow graph
            logger.info("Building workflow graph")
            self.workflow = self._build_workflow()
            
            logger.info("SalesAnalysisWorkflow initialized successfully")
        
        except Exception as e:
            logger.error(f"Workflow initialization failed: {e}")
            logger.error(traceback.format_exc())
            raise
    
    def _build_workflow(self) -> StateGraph:
        """Build the workflow graph"""
        logger.info("Building workflow graph")
        
        try:
            # Create the state graph
            workflow = StateGraph(WorkflowState)
            
            # Add nodes for each agent
            workflow.add_node("planning", self.planning_node)
            workflow.add_node("data_collection", self.data_collection_node)
            workflow.add_node("analysis", self.analysis_node)
            workflow.add_node("validation", self.validation_node)
            workflow.add_node("insights", self.insights_node)
            
            # Define the normal workflow edges (the happy path)
            workflow.add_edge("planning", "data_collection")
            workflow.add_edge("data_collection", "analysis")
            workflow.add_edge("analysis", "validation")
            workflow.add_edge("validation", "insights")
            workflow.add_edge("insights", END)
            
            # Add error handling
            workflow.add_node("handle_error", self.handle_error_node)
            
            # Define a simple linear workflow without error handling for now
            # This should work with any version of LangGraph
            workflow.add_edge("planning", "data_collection")
            workflow.add_edge("data_collection", "analysis")
            workflow.add_edge("analysis", "validation")
            workflow.add_edge("validation", "insights")
            workflow.add_edge("insights", END)
            
            # We'll handle errors within each node function instead of using graph branching
            workflow.add_edge("handle_error", END)
            
            # Set the entry point
            workflow.set_entry_point("planning")
            
            logger.info("Workflow graph built successfully")
            return workflow.compile()
        
        except Exception as e:
            logger.error(f"Workflow graph construction failed: {e}")
            logger.error(traceback.format_exc())
            raise
    
    def run_workflow(self, alert: str) -> WorkflowState:
        """
        Run the workflow with a given alert, with comprehensive logging
        
        Args:
            alert (str): Description of the sales alert
        
        Returns:
            WorkflowState: Final state of the workflow
        """
        logger.info(f"Starting workflow run with alert: {alert}")
        
        try:
            # Create initial state
            initial_state = create_initial_state(alert)
            
            # Run the workflow
            logger.info("Invoking workflow")
            final_state = self.workflow.invoke(initial_state)
            
            logger.info(f"Workflow completed. Final status: {final_state['status']}")
            
            return final_state
        
        except Exception as e:
            logger.error(f"Workflow execution failed: {e}")
            logger.error(traceback.format_exc())
            
            # Create an error state
            error_state = create_initial_state(alert)
            error_state["status"] = "error"
            error_state["error"] = str(e)
            error_state["logs"].append({
                "timestamp": datetime.now().isoformat(),
                "step": "error",
                "message": f"Workflow execution failed: {e}"
            })
            
            return error_state
            
    # Agent node implementations need to be added here
    def planning_node(self, state: WorkflowState) -> WorkflowState:
        """Planning agent node"""
        try:
            # Log the step
            state["logs"].append({
                "timestamp": datetime.now().isoformat(),
                "step": "planning",
                "message": "Planning analysis approach"
            })
            
            # Update status
            state["status"] = "planning"
            
            # Create analysis plan
            analysis_plan, plan_dict = self.planning_agent.create_analysis_plan(state["alert"])
            
            # Store the plan in the state
            state["plan"] = analysis_plan
            
            # Create data requests from plan
            data_requests = []
            for i, data_source in enumerate(analysis_plan.required_data_sources):
                data_requests.append(DataRequest(
                    request_id=f"data_{i}",
                    description=data_source["purpose"],
                    tables=[data_source["table"]],
                    purpose=data_source["purpose"]
                ))
            
            # Store data requests in the state
            state["data_requests"] = data_requests
            
            # Update status for next step
            state["status"] = "data_collection"
            
            return state
        except Exception as e:
            # Handle error
            state["status"] = "error"
            state["error"] = f"Planning error: {str(e)}"
            return state
    
    def data_collection_node(self, state: WorkflowState) -> WorkflowState:
        """Data collection agent node"""
        try:
            # Log the step
            state["logs"].append({
                "timestamp": datetime.now().isoformat(),
                "step": "data_collection",
                "message": f"Collecting data from {len(state['data_requests'])} sources"
            })
            
            # Update status
            state["status"] = "data_collection"
            
            # Process data requests
            data_sources = self.data_agent.get_data_for_analysis(state["data_requests"])
            
            # Store data sources in the state
            state["data_sources"] = data_sources
            
            # Create analysis requests based on plan
            analysis_requests = []
            if state["plan"]:
                analysis_approaches = state["plan"].analysis_approaches
                for i, approach in enumerate(analysis_approaches):
                    analysis_requests.append(AnalysisRequest(
                        request_id=f"analysis_{i}",
                        description=approach["purpose"],
                        data_sources=list(data_sources.keys()),
                        analysis_type=approach["type"],
                        purpose=approach["purpose"]
                    ))
            
            # Store analysis requests in the state
            state["analysis_requests"] = analysis_requests
            
            # Update status for next step
            state["status"] = "analysis"
            
            return state
        except Exception as e:
            # Handle error
            state["status"] = "error"
            state["error"] = f"Data collection error: {str(e)}"
            return state
    
    def analysis_node(self, state: WorkflowState) -> WorkflowState:
        """Analytics agent node"""
        try:
            # Log the step
            state["logs"].append({
                "timestamp": datetime.now().isoformat(),
                "step": "analysis",
                "message": f"Performing {len(state['analysis_requests'])} analysis tasks"
            })
            
            # Update status
            state["status"] = "analysis"
            
            # Process analysis requests
            analysis_results = {}
            for request in state["analysis_requests"]:
                result = self.analytics_agent.perform_analysis(request, state["data_sources"])
                analysis_results[request.request_id] = result
            
            # Store analysis results in the state
            state["analysis_results"] = analysis_results
            
            # Create validation requests
            validation_requests = []
            for analysis_id, result in analysis_results.items():
                validation_requests.append(ValidationRequest(
                    request_id=analysis_id,
                    original_problem=state["alert"],
                    analysis_results=result.model_dump(),
                    data_sources=list(state["data_sources"].keys())
                ))
            
            # Store validation requests in the state
            state["validation_requests"] = validation_requests
            
            # Update status for next step
            state["status"] = "validation"
            
            return state
        except Exception as e:
            # Handle error
            state["status"] = "error"
            state["error"] = f"Analysis error: {str(e)}"
            return state
    
    def validation_node(self, state: WorkflowState) -> WorkflowState:
        """QA agent node"""
        try:
            # Log the step
            state["logs"].append({
                "timestamp": datetime.now().isoformat(),
                "step": "validation",
                "message": f"Validating {len(state['validation_requests'])} analysis results"
            })
            
            # Update status
            state["status"] = "validation"
            
            # Process validation requests
            validation_results = {}
            for request in state["validation_requests"]:
                result = self.qa_agent.validate_analysis(request, state["data_sources"])
                validation_results[request.request_id] = result
            
            # Store validation results in the state
            state["validation_results"] = validation_results
            
            # Create insight requests
            insight_requests = []
            for analysis_id, validation in validation_results.items():
                analysis_result = state["analysis_results"][analysis_id]
                insight_requests.append(InsightRequest(
                    request_id=analysis_id,
                    original_problem=state["alert"],
                    analysis_results=analysis_result.model_dump(),
                    validation_results=validation.model_dump(),
                    target_audience="executive"
                ))
            
            # Store insight requests in the state
            state["insight_requests"] = insight_requests
            
            # Update status for next step
            state["status"] = "insights"
            
            return state
        except Exception as e:
            # Handle error
            state["status"] = "error"
            state["error"] = f"Validation error: {str(e)}"
            return state
    
    def insights_node(self, state: WorkflowState) -> WorkflowState:
        """Insights agent node"""
        try:
            # Log the step
            state["logs"].append({
                "timestamp": datetime.now().isoformat(),
                "step": "insights",
                "message": f"Generating insights from {len(state['insight_requests'])} validated analyses"
            })
            
            # Update status
            state["status"] = "insights"
            
            # Process insight requests
            insight_cards = {}
            visualizations = []
            for request in state["insight_requests"]:
                # Generate insight card
                insight_card = self.insights_agent.generate_insights(request)
                insight_cards[request.request_id] = insight_card
                
                # Generate visualizations for the insight card
                viz_files = self.insights_agent.generate_visualizations(insight_card, state["data_sources"])
                visualizations.extend(viz_files)
            
            # Store insight cards and visualizations in the state
            state["insight_cards"] = insight_cards
            state["visualizations"] = visualizations
            
            # Update status for completion
            state["status"] = "complete"
            
            # Log completion
            state["logs"].append({
                "timestamp": datetime.now().isoformat(),
                "step": "complete",
                "message": f"Analysis workflow completed with {len(insight_cards)} insight cards"
            })
            
            return state
        except Exception as e:
            # Handle error
            state["status"] = "error"
            state["error"] = f"Insights error: {str(e)}"
            return state
    
    def handle_error_node(self, state: WorkflowState) -> WorkflowState:
        """Error handling node"""
        # Log the error
        state["logs"].append({
            "timestamp": datetime.now().isoformat(),
            "step": "error",
            "message": f"Workflow error: {state['error']}"
        })
        
        return state

# For testing
if __name__ == "__main__":
    # Configure logging to show all details
    logging.basicConfig(level=logging.DEBUG)
    
    # Set API key for testing
    os.environ["ANTHROPIC_API_KEY"] = "your_api_key_here"
    
    try:
        # Create workflow
        workflow = SalesAnalysisWorkflow(db_path="data/pharma_db.sqlite")
        
        # Run workflow with test alert
        alert = "Sales of DrugX down 15% in Northeast region over past 30 days compared to forecast."
        result = workflow.run_workflow(alert)
        
        # Print workflow results
        print(f"Workflow status: {result['status']}")
        if result['status'] == "error":
            print(f"Error: {result['error']}")
        else:
            print(f"Generated {len(result.get('insight_cards', {}))} insight cards")
            print(f"Generated {len(result.get('visualizations', []))} visualizations")
    
    except Exception as e:
        print(f"Test run failed: {e}")
        print(traceback.format_exc())