cryogenic22 commited on
Commit
0dbbd2b
·
verified ·
1 Parent(s): c98e890

Create workflows/sales_analysis.py

Browse files
Files changed (1) hide show
  1. workflows/sales_analysis.py +384 -0
workflows/sales_analysis.py ADDED
@@ -0,0 +1,384 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ from typing import Dict, List, Any, Tuple, Optional, TypedDict, Annotated, Literal, cast
4
+ from datetime import datetime
5
+ import uuid
6
+ from pydantic import BaseModel, Field
7
+ import langgraph.graph as lg
8
+ from langgraph.graph import END, StateGraph
9
+ from langchain_anthropic import ChatAnthropic
10
+ from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
11
+
12
+ # Import the agents
13
+ from agents.planning_agent import PlanningAgent, AnalysisPlan
14
+ from agents.data_agent import DataAgent, DataRequest, DataSource
15
+ from agents.analytics_agent import AnalyticsAgent, AnalysisRequest, AnalysisResult
16
+ from agents.qa_agent import QAAgent, ValidationRequest, ValidationResult
17
+ from agents.insights_agent import InsightsAgent, InsightRequest, InsightCard
18
+
19
+ # Define the workflow state
20
+ class WorkflowState(TypedDict):
21
+ alert: str
22
+ status: Literal["planning", "data_collection", "analysis", "validation", "insights", "complete", "error"]
23
+ plan: Optional[AnalysisPlan]
24
+ data_requests: List[DataRequest]
25
+ data_sources: Dict[str, DataSource]
26
+ analysis_requests: List[AnalysisRequest]
27
+ analysis_results: Dict[str, AnalysisResult]
28
+ validation_requests: List[ValidationRequest]
29
+ validation_results: Dict[str, ValidationResult]
30
+ insight_requests: List[InsightRequest]
31
+ insight_cards: Dict[str, InsightCard]
32
+ error: Optional[str]
33
+ logs: List[Dict[str, Any]]
34
+ visualizations: List[str]
35
+
36
+ # Function to initialize workflow state
37
+ def create_initial_state(alert: str) -> WorkflowState:
38
+ """Create initial workflow state from alert"""
39
+ return WorkflowState(
40
+ alert=alert,
41
+ status="planning",
42
+ plan=None,
43
+ data_requests=[],
44
+ data_sources={},
45
+ analysis_requests=[],
46
+ analysis_results={},
47
+ validation_requests=[],
48
+ validation_results={},
49
+ insight_requests=[],
50
+ insight_cards={},
51
+ error=None,
52
+ logs=[],
53
+ visualizations=[]
54
+ )
55
+
56
+ # Define the agent nodes
57
+ class SalesAnalysisWorkflow:
58
+ """Orchestrates the sales analysis workflow with multiple agents"""
59
+
60
+ def __init__(self, db_path: str = "data/pharma_db.sqlite"):
61
+ """Initialize the workflow with agents"""
62
+ # Initialize agents
63
+ self.planning_agent = PlanningAgent()
64
+ self.data_agent = DataAgent(db_path=db_path)
65
+ self.analytics_agent = AnalyticsAgent()
66
+ self.qa_agent = QAAgent()
67
+ self.insights_agent = InsightsAgent()
68
+
69
+ # Create the workflow graph
70
+ self.workflow = self._build_workflow()
71
+
72
+ def _build_workflow(self) -> StateGraph:
73
+ """Build the workflow graph"""
74
+ # Create the state graph
75
+ workflow = StateGraph(WorkflowState)
76
+
77
+ # Add nodes for each agent
78
+ workflow.add_node("planning", self.planning_node)
79
+ workflow.add_node("data_collection", self.data_collection_node)
80
+ workflow.add_node("analysis", self.analysis_node)
81
+ workflow.add_node("validation", self.validation_node)
82
+ workflow.add_node("insights", self.insights_node)
83
+
84
+ # Define the workflow edges
85
+ workflow.add_edge("planning", "data_collection")
86
+ workflow.add_edge("data_collection", "analysis")
87
+ workflow.add_edge("analysis", "validation")
88
+ workflow.add_edge("validation", "insights")
89
+ workflow.add_edge("insights", END)
90
+
91
+ # Add error handling
92
+ workflow.add_node("handle_error", self.handle_error_node)
93
+ workflow.add_edge_condition(
94
+ "planning", "handle_error",
95
+ lambda x: x["status"] == "error"
96
+ )
97
+ workflow.add_edge_condition(
98
+ "data_collection", "handle_error",
99
+ lambda x: x["status"] == "error"
100
+ )
101
+ workflow.add_edge_condition(
102
+ "analysis", "handle_error",
103
+ lambda x: x["status"] == "error"
104
+ )
105
+ workflow.add_edge_condition(
106
+ "validation", "handle_error",
107
+ lambda x: x["status"] == "error"
108
+ )
109
+ workflow.add_edge_condition(
110
+ "insights", "handle_error",
111
+ lambda x: x["status"] == "error"
112
+ )
113
+ workflow.add_edge("handle_error", END)
114
+
115
+ # Set the entry point
116
+ workflow.set_entry_point("planning")
117
+
118
+ return workflow.compile()
119
+
120
+ def planning_node(self, state: WorkflowState) -> WorkflowState:
121
+ """Planning agent node"""
122
+ try:
123
+ # Log the step
124
+ state["logs"].append({
125
+ "timestamp": datetime.now().isoformat(),
126
+ "step": "planning",
127
+ "message": "Planning analysis approach"
128
+ })
129
+
130
+ # Update status
131
+ state["status"] = "planning"
132
+
133
+ # Create analysis plan
134
+ analysis_plan, plan_dict = self.planning_agent.create_analysis_plan(state["alert"])
135
+
136
+ # Store the plan in the state
137
+ state["plan"] = analysis_plan
138
+
139
+ # Create data requests from plan
140
+ data_requests = []
141
+ for i, data_source in enumerate(analysis_plan.required_data_sources):
142
+ data_requests.append(DataRequest(
143
+ request_id=f"data_{i}",
144
+ description=data_source["purpose"],
145
+ tables=[data_source["table"]],
146
+ purpose=data_source["purpose"]
147
+ ))
148
+
149
+ # Store data requests in the state
150
+ state["data_requests"] = data_requests
151
+
152
+ # Update status for next step
153
+ state["status"] = "data_collection"
154
+
155
+ return state
156
+ except Exception as e:
157
+ # Handle error
158
+ state["status"] = "error"
159
+ state["error"] = f"Planning error: {str(e)}"
160
+ return state
161
+
162
+ def data_collection_node(self, state: WorkflowState) -> WorkflowState:
163
+ """Data collection agent node"""
164
+ try:
165
+ # Log the step
166
+ state["logs"].append({
167
+ "timestamp": datetime.now().isoformat(),
168
+ "step": "data_collection",
169
+ "message": f"Collecting data from {len(state['data_requests'])} sources"
170
+ })
171
+
172
+ # Update status
173
+ state["status"] = "data_collection"
174
+
175
+ # Process data requests
176
+ data_sources = self.data_agent.get_data_for_analysis(state["data_requests"])
177
+
178
+ # Store data sources in the state
179
+ state["data_sources"] = data_sources
180
+
181
+ # Create analysis requests based on plan
182
+ analysis_requests = []
183
+ if state["plan"]:
184
+ analysis_approaches = state["plan"].analysis_approaches
185
+ for i, approach in enumerate(analysis_approaches):
186
+ analysis_requests.append(AnalysisRequest(
187
+ request_id=f"analysis_{i}",
188
+ description=approach["purpose"],
189
+ data_sources=list(data_sources.keys()),
190
+ analysis_type=approach["type"],
191
+ purpose=approach["purpose"]
192
+ ))
193
+
194
+ # Store analysis requests in the state
195
+ state["analysis_requests"] = analysis_requests
196
+
197
+ # Update status for next step
198
+ state["status"] = "analysis"
199
+
200
+ return state
201
+ except Exception as e:
202
+ # Handle error
203
+ state["status"] = "error"
204
+ state["error"] = f"Data collection error: {str(e)}"
205
+ return state
206
+
207
+ def analysis_node(self, state: WorkflowState) -> WorkflowState:
208
+ """Analytics agent node"""
209
+ try:
210
+ # Log the step
211
+ state["logs"].append({
212
+ "timestamp": datetime.now().isoformat(),
213
+ "step": "analysis",
214
+ "message": f"Performing {len(state['analysis_requests'])} analysis tasks"
215
+ })
216
+
217
+ # Update status
218
+ state["status"] = "analysis"
219
+
220
+ # Process analysis requests
221
+ analysis_results = {}
222
+ for request in state["analysis_requests"]:
223
+ result = self.analytics_agent.perform_analysis(request, state["data_sources"])
224
+ analysis_results[request.request_id] = result
225
+
226
+ # Store analysis results in the state
227
+ state["analysis_results"] = analysis_results
228
+
229
+ # Create validation requests
230
+ validation_requests = []
231
+ for analysis_id, result in analysis_results.items():
232
+ validation_requests.append(ValidationRequest(
233
+ request_id=analysis_id,
234
+ original_problem=state["alert"],
235
+ analysis_results=result.model_dump(),
236
+ data_sources=list(state["data_sources"].keys())
237
+ ))
238
+
239
+ # Store validation requests in the state
240
+ state["validation_requests"] = validation_requests
241
+
242
+ # Update status for next step
243
+ state["status"] = "validation"
244
+
245
+ return state
246
+ except Exception as e:
247
+ # Handle error
248
+ state["status"] = "error"
249
+ state["error"] = f"Analysis error: {str(e)}"
250
+ return state
251
+
252
+ def validation_node(self, state: WorkflowState) -> WorkflowState:
253
+ """QA agent node"""
254
+ try:
255
+ # Log the step
256
+ state["logs"].append({
257
+ "timestamp": datetime.now().isoformat(),
258
+ "step": "validation",
259
+ "message": f"Validating {len(state['validation_requests'])} analysis results"
260
+ })
261
+
262
+ # Update status
263
+ state["status"] = "validation"
264
+
265
+ # Process validation requests
266
+ validation_results = {}
267
+ for request in state["validation_requests"]:
268
+ result = self.qa_agent.validate_analysis(request, state["data_sources"])
269
+ validation_results[request.request_id] = result
270
+
271
+ # Store validation results in the state
272
+ state["validation_results"] = validation_results
273
+
274
+ # Create insight requests
275
+ insight_requests = []
276
+ for analysis_id, validation in validation_results.items():
277
+ analysis_result = state["analysis_results"][analysis_id]
278
+ insight_requests.append(InsightRequest(
279
+ request_id=analysis_id,
280
+ original_problem=state["alert"],
281
+ analysis_results=analysis_result.model_dump(),
282
+ validation_results=validation.model_dump(),
283
+ target_audience="executive"
284
+ ))
285
+
286
+ # Store insight requests in the state
287
+ state["insight_requests"] = insight_requests
288
+
289
+ # Update status for next step
290
+ state["status"] = "insights"
291
+
292
+ return state
293
+ except Exception as e:
294
+ # Handle error
295
+ state["status"] = "error"
296
+ state["error"] = f"Validation error: {str(e)}"
297
+ return state
298
+
299
+ def insights_node(self, state: WorkflowState) -> WorkflowState:
300
+ """Insights agent node"""
301
+ try:
302
+ # Log the step
303
+ state["logs"].append({
304
+ "timestamp": datetime.now().isoformat(),
305
+ "step": "insights",
306
+ "message": f"Generating insights from {len(state['insight_requests'])} validated analyses"
307
+ })
308
+
309
+ # Update status
310
+ state["status"] = "insights"
311
+
312
+ # Process insight requests
313
+ insight_cards = {}
314
+ visualizations = []
315
+ for request in state["insight_requests"]:
316
+ # Generate insight card
317
+ insight_card = self.insights_agent.generate_insights(request)
318
+ insight_cards[request.request_id] = insight_card
319
+
320
+ # Generate visualizations for the insight card
321
+ viz_files = self.insights_agent.generate_visualizations(insight_card, state["data_sources"])
322
+ visualizations.extend(viz_files)
323
+
324
+ # Store insight cards and visualizations in the state
325
+ state["insight_cards"] = insight_cards
326
+ state["visualizations"] = visualizations
327
+
328
+ # Update status for completion
329
+ state["status"] = "complete"
330
+
331
+ # Log completion
332
+ state["logs"].append({
333
+ "timestamp": datetime.now().isoformat(),
334
+ "step": "complete",
335
+ "message": f"Analysis workflow completed with {len(insight_cards)} insight cards"
336
+ })
337
+
338
+ return state
339
+ except Exception as e:
340
+ # Handle error
341
+ state["status"] = "error"
342
+ state["error"] = f"Insights error: {str(e)}"
343
+ return state
344
+
345
+ def handle_error_node(self, state: WorkflowState) -> WorkflowState:
346
+ """Error handling node"""
347
+ # Log the error
348
+ state["logs"].append({
349
+ "timestamp": datetime.now().isoformat(),
350
+ "step": "error",
351
+ "message": f"Workflow error: {state['error']}"
352
+ })
353
+
354
+ return state
355
+
356
+ def run_workflow(self, alert: str) -> WorkflowState:
357
+ """Run the workflow with a given alert"""
358
+ # Create initial state
359
+ initial_state = create_initial_state(alert)
360
+
361
+ # Run the workflow
362
+ final_state = self.workflow.invoke(initial_state)
363
+
364
+ return final_state
365
+
366
+ # For testing
367
+ if __name__ == "__main__":
368
+ # Set API key for testing
369
+ os.environ["ANTHROPIC_API_KEY"] = "your_api_key_here"
370
+
371
+ # Create workflow
372
+ workflow = SalesAnalysisWorkflow(db_path="data/pharma_db.sqlite")
373
+
374
+ # Run workflow with test alert
375
+ alert = "Sales of DrugX down 15% in Northeast region over past 30 days compared to forecast."
376
+ result = workflow.run_workflow(alert)
377
+
378
+ # Print workflow results
379
+ print(f"Workflow status: {result['status']}")
380
+ if result["status"] == "error":
381
+ print(f"Error: {result['error']}")
382
+ else:
383
+ print(f"Generated {len(result['insight_cards'])} insight cards")
384
+ print(f"Generated {len(result['visualizations'])} visualizations")