cryogenic22 commited on
Commit
18580d4
·
verified ·
1 Parent(s): 48d4cca

Update workflows/sales_analysis.py

Browse files
Files changed (1) hide show
  1. workflows/sales_analysis.py +157 -289
workflows/sales_analysis.py CHANGED
@@ -1,5 +1,7 @@
1
  import os
2
  import json
 
 
3
  from typing import Dict, List, Any, Tuple, Optional, TypedDict, Annotated, Literal, cast
4
  from datetime import datetime
5
  import uuid
@@ -9,6 +11,13 @@ from langgraph.graph import END, StateGraph
9
  from langchain_anthropic import ChatAnthropic
10
  from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
11
 
 
 
 
 
 
 
 
12
  # Import the agents
13
  from agents.planning_agent import PlanningAgent, AnalysisPlan
14
  from agents.data_agent import DataAgent, DataRequest, DataSource
@@ -36,6 +45,7 @@ class WorkflowState(TypedDict):
36
  # Function to initialize workflow state
37
  def create_initial_state(alert: str) -> WorkflowState:
38
  """Create initial workflow state from alert"""
 
39
  return WorkflowState(
40
  alert=alert,
41
  status="planning",
@@ -49,7 +59,11 @@ def create_initial_state(alert: str) -> WorkflowState:
49
  insight_requests=[],
50
  insight_cards={},
51
  error=None,
52
- logs=[],
 
 
 
 
53
  visualizations=[]
54
  )
55
 
@@ -59,317 +73,171 @@ class SalesAnalysisWorkflow:
59
 
60
  def __init__(self, db_path: str = "data/pharma_db.sqlite"):
61
  """Initialize the workflow with agents"""
62
- # Initialize agents
63
- self.planning_agent = PlanningAgent()
64
- self.data_agent = DataAgent(db_path=db_path)
65
- self.analytics_agent = AnalyticsAgent()
66
- self.qa_agent = QAAgent()
67
- self.insights_agent = InsightsAgent()
68
-
69
- # Create the workflow graph
70
- self.workflow = self._build_workflow()
71
-
72
- def _build_workflow(self) -> StateGraph:
73
- """Build the workflow graph"""
74
- # Create the state graph
75
- workflow = StateGraph(WorkflowState)
76
 
77
- # Add nodes for each agent
78
- workflow.add_node("planning", self.planning_node)
79
- workflow.add_node("data_collection", self.data_collection_node)
80
- workflow.add_node("analysis", self.analysis_node)
81
- workflow.add_node("validation", self.validation_node)
82
- workflow.add_node("insights", self.insights_node)
83
-
84
- # Define the workflow edges
85
- workflow.add_edge("planning", "data_collection")
86
- workflow.add_edge("data_collection", "analysis")
87
- workflow.add_edge("analysis", "validation")
88
- workflow.add_edge("validation", "insights")
89
- workflow.add_edge("insights", END)
90
 
91
- # Add error handling
92
- workflow.add_node("handle_error", self.handle_error_node)
 
 
 
 
 
 
93
 
94
- # Add conditional edges for error handling
95
- # Using the newer add_conditional_edges method
96
- def route_on_error(state):
97
- return "handle_error" if state["status"] == "error" else None
98
-
99
- workflow.add_conditional_edges("planning", route_on_error)
100
- workflow.add_conditional_edges("data_collection", route_on_error)
101
- workflow.add_conditional_edges("analysis", route_on_error)
102
- workflow.add_conditional_edges("validation", route_on_error)
103
- workflow.add_conditional_edges("insights", route_on_error)
104
- workflow.add_edge("handle_error", END)
105
 
106
- # Set the entry point
107
- workflow.set_entry_point("planning")
108
 
109
- return workflow.compile()
110
-
111
- def planning_node(self, state: WorkflowState) -> WorkflowState:
112
- """Planning agent node"""
113
- try:
114
- # Log the step
115
- state["logs"].append({
116
- "timestamp": datetime.now().isoformat(),
117
- "step": "planning",
118
- "message": "Planning analysis approach"
119
- })
120
-
121
- # Update status
122
- state["status"] = "planning"
123
-
124
- # Create analysis plan
125
- analysis_plan, plan_dict = self.planning_agent.create_analysis_plan(state["alert"])
126
-
127
- # Store the plan in the state
128
- state["plan"] = analysis_plan
129
-
130
- # Create data requests from plan
131
- data_requests = []
132
- for i, data_source in enumerate(analysis_plan.required_data_sources):
133
- data_requests.append(DataRequest(
134
- request_id=f"data_{i}",
135
- description=data_source["purpose"],
136
- tables=[data_source["table"]],
137
- purpose=data_source["purpose"]
138
- ))
139
-
140
- # Store data requests in the state
141
- state["data_requests"] = data_requests
142
-
143
- # Update status for next step
144
- state["status"] = "data_collection"
145
-
146
- return state
147
- except Exception as e:
148
- # Handle error
149
- state["status"] = "error"
150
- state["error"] = f"Planning error: {str(e)}"
151
- return state
152
-
153
- def data_collection_node(self, state: WorkflowState) -> WorkflowState:
154
- """Data collection agent node"""
155
  try:
156
- # Log the step
157
- state["logs"].append({
158
- "timestamp": datetime.now().isoformat(),
159
- "step": "data_collection",
160
- "message": f"Collecting data from {len(state['data_requests'])} sources"
161
- })
162
-
163
- # Update status
164
- state["status"] = "data_collection"
165
-
166
- # Process data requests
167
- data_sources = self.data_agent.get_data_for_analysis(state["data_requests"])
168
-
169
- # Store data sources in the state
170
- state["data_sources"] = data_sources
171
-
172
- # Create analysis requests based on plan
173
- analysis_requests = []
174
- if state["plan"]:
175
- analysis_approaches = state["plan"].analysis_approaches
176
- for i, approach in enumerate(analysis_approaches):
177
- analysis_requests.append(AnalysisRequest(
178
- request_id=f"analysis_{i}",
179
- description=approach["purpose"],
180
- data_sources=list(data_sources.keys()),
181
- analysis_type=approach["type"],
182
- purpose=approach["purpose"]
183
- ))
184
-
185
- # Store analysis requests in the state
186
- state["analysis_requests"] = analysis_requests
187
-
188
- # Update status for next step
189
- state["status"] = "analysis"
190
-
191
- return state
192
  except Exception as e:
193
- # Handle error
194
- state["status"] = "error"
195
- state["error"] = f"Data collection error: {str(e)}"
196
- return state
197
 
198
- def analysis_node(self, state: WorkflowState) -> WorkflowState:
199
- """Analytics agent node"""
 
 
200
  try:
201
- # Log the step
202
- state["logs"].append({
203
- "timestamp": datetime.now().isoformat(),
204
- "step": "analysis",
205
- "message": f"Performing {len(state['analysis_requests'])} analysis tasks"
206
- })
207
-
208
- # Update status
209
- state["status"] = "analysis"
210
-
211
- # Process analysis requests
212
- analysis_results = {}
213
- for request in state["analysis_requests"]:
214
- result = self.analytics_agent.perform_analysis(request, state["data_sources"])
215
- analysis_results[request.request_id] = result
216
-
217
- # Store analysis results in the state
218
- state["analysis_results"] = analysis_results
219
-
220
- # Create validation requests
221
- validation_requests = []
222
- for analysis_id, result in analysis_results.items():
223
- validation_requests.append(ValidationRequest(
224
- request_id=analysis_id,
225
- original_problem=state["alert"],
226
- analysis_results=result.model_dump(),
227
- data_sources=list(state["data_sources"].keys())
228
- ))
229
-
230
- # Store validation requests in the state
231
- state["validation_requests"] = validation_requests
232
-
233
- # Update status for next step
234
- state["status"] = "validation"
235
-
236
- return state
 
 
237
  except Exception as e:
238
- # Handle error
239
- state["status"] = "error"
240
- state["error"] = f"Analysis error: {str(e)}"
241
- return state
242
 
243
- def validation_node(self, state: WorkflowState) -> WorkflowState:
244
- """QA agent node"""
 
 
 
 
 
 
 
 
 
 
245
  try:
246
- # Log the step
247
- state["logs"].append({
248
- "timestamp": datetime.now().isoformat(),
249
- "step": "validation",
250
- "message": f"Validating {len(state['validation_requests'])} analysis results"
251
- })
252
-
253
- # Update status
254
- state["status"] = "validation"
255
-
256
- # Process validation requests
257
- validation_results = {}
258
- for request in state["validation_requests"]:
259
- result = self.qa_agent.validate_analysis(request, state["data_sources"])
260
- validation_results[request.request_id] = result
261
-
262
- # Store validation results in the state
263
- state["validation_results"] = validation_results
264
-
265
- # Create insight requests
266
- insight_requests = []
267
- for analysis_id, validation in validation_results.items():
268
- analysis_result = state["analysis_results"][analysis_id]
269
- insight_requests.append(InsightRequest(
270
- request_id=analysis_id,
271
- original_problem=state["alert"],
272
- analysis_results=analysis_result.model_dump(),
273
- validation_results=validation.model_dump(),
274
- target_audience="executive"
275
- ))
276
 
277
- # Store insight requests in the state
278
- state["insight_requests"] = insight_requests
 
279
 
280
- # Update status for next step
281
- state["status"] = "insights"
282
 
283
- return state
 
284
  except Exception as e:
285
- # Handle error
286
- state["status"] = "error"
287
- state["error"] = f"Validation error: {str(e)}"
288
- return state
289
-
290
- def insights_node(self, state: WorkflowState) -> WorkflowState:
291
- """Insights agent node"""
292
- try:
293
- # Log the step
294
- state["logs"].append({
295
- "timestamp": datetime.now().isoformat(),
296
- "step": "insights",
297
- "message": f"Generating insights from {len(state['insight_requests'])} validated analyses"
298
- })
299
-
300
- # Update status
301
- state["status"] = "insights"
302
-
303
- # Process insight requests
304
- insight_cards = {}
305
- visualizations = []
306
- for request in state["insight_requests"]:
307
- # Generate insight card
308
- insight_card = self.insights_agent.generate_insights(request)
309
- insight_cards[request.request_id] = insight_card
310
-
311
- # Generate visualizations for the insight card
312
- viz_files = self.insights_agent.generate_visualizations(insight_card, state["data_sources"])
313
- visualizations.extend(viz_files)
314
-
315
- # Store insight cards and visualizations in the state
316
- state["insight_cards"] = insight_cards
317
- state["visualizations"] = visualizations
318
-
319
- # Update status for completion
320
- state["status"] = "complete"
321
-
322
- # Log completion
323
- state["logs"].append({
324
  "timestamp": datetime.now().isoformat(),
325
- "step": "complete",
326
- "message": f"Analysis workflow completed with {len(insight_cards)} insight cards"
327
  })
328
 
329
- return state
330
- except Exception as e:
331
- # Handle error
332
- state["status"] = "error"
333
- state["error"] = f"Insights error: {str(e)}"
334
- return state
335
-
336
- def handle_error_node(self, state: WorkflowState) -> WorkflowState:
337
- """Error handling node"""
338
- # Log the error
339
- state["logs"].append({
340
- "timestamp": datetime.now().isoformat(),
341
- "step": "error",
342
- "message": f"Workflow error: {state['error']}"
343
- })
344
-
345
- return state
346
-
347
- def run_workflow(self, alert: str) -> WorkflowState:
348
- """Run the workflow with a given alert"""
349
- # Create initial state
350
- initial_state = create_initial_state(alert)
351
-
352
- # Run the workflow
353
- final_state = self.workflow.invoke(initial_state)
354
-
355
- return final_state
356
 
357
  # For testing
358
  if __name__ == "__main__":
 
 
 
359
  # Set API key for testing
360
  os.environ["ANTHROPIC_API_KEY"] = "your_api_key_here"
361
 
362
- # Create workflow
363
- workflow = SalesAnalysisWorkflow(db_path="data/pharma_db.sqlite")
364
-
365
- # Run workflow with test alert
366
- alert = "Sales of DrugX down 15% in Northeast region over past 30 days compared to forecast."
367
- result = workflow.run_workflow(alert)
 
 
 
 
 
 
 
 
 
368
 
369
- # Print workflow results
370
- print(f"Workflow status: {result['status']}")
371
- if result["status"] == "error":
372
- print(f"Error: {result['error']}")
373
- else:
374
- print(f"Generated {len(result['insight_cards'])} insight cards")
375
- print(f"Generated {len(result['visualizations'])} visualizations")
 
1
  import os
2
  import json
3
+ import logging
4
+ import traceback
5
  from typing import Dict, List, Any, Tuple, Optional, TypedDict, Annotated, Literal, cast
6
  from datetime import datetime
7
  import uuid
 
11
  from langchain_anthropic import ChatAnthropic
12
  from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
13
 
14
+ # Configure logging
15
+ logging.basicConfig(
16
+ level=logging.DEBUG,
17
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
18
+ )
19
+ logger = logging.getLogger("sales_analysis_workflow")
20
+
21
  # Import the agents
22
  from agents.planning_agent import PlanningAgent, AnalysisPlan
23
  from agents.data_agent import DataAgent, DataRequest, DataSource
 
45
  # Function to initialize workflow state
46
  def create_initial_state(alert: str) -> WorkflowState:
47
  """Create initial workflow state from alert"""
48
+ logger.info(f"Creating initial state for alert: {alert}")
49
  return WorkflowState(
50
  alert=alert,
51
  status="planning",
 
59
  insight_requests=[],
60
  insight_cards={},
61
  error=None,
62
+ logs=[{
63
+ "timestamp": datetime.now().isoformat(),
64
+ "step": "init",
65
+ "message": f"Workflow initialized with alert: {alert}"
66
+ }],
67
  visualizations=[]
68
  )
69
 
 
73
 
74
  def __init__(self, db_path: str = "data/pharma_db.sqlite"):
75
  """Initialize the workflow with agents"""
76
+ logger.info("Initializing SalesAnalysisWorkflow")
 
 
 
 
 
 
 
 
 
 
 
 
 
77
 
78
+ try:
79
+ # Initialize agents with comprehensive error handling
80
+ self.planning_agent = self._safe_agent_init(PlanningAgent, "Planning")
81
+ self.data_agent = self._safe_agent_init(DataAgent, "Data", db_path)
82
+ self.analytics_agent = self._safe_agent_init(AnalyticsAgent, "Analytics")
83
+ self.qa_agent = self._safe_agent_init(QAAgent, "QA")
84
+ self.insights_agent = self._safe_agent_init(InsightsAgent, "Insights")
85
+
86
+ # Create the workflow graph
87
+ logger.info("Building workflow graph")
88
+ self.workflow = self._build_workflow()
89
+
90
+ logger.info("SalesAnalysisWorkflow initialized successfully")
91
 
92
+ except Exception as e:
93
+ logger.error(f"Workflow initialization failed: {e}")
94
+ logger.error(traceback.format_exc())
95
+ raise
96
+
97
+ def _safe_agent_init(self, agent_class, agent_name, *args, **kwargs):
98
+ """
99
+ Safely initialize an agent with logging and error handling
100
 
101
+ Args:
102
+ agent_class (type): The agent class to initialize
103
+ agent_name (str): Name of the agent for logging
104
+ *args: Positional arguments for agent initialization
105
+ **kwargs: Keyword arguments for agent initialization
 
 
 
 
 
 
106
 
107
+ Returns:
108
+ Initialized agent
109
 
110
+ Raises:
111
+ Exception if agent initialization fails
112
+ """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
113
  try:
114
+ logger.info(f"Initializing {agent_name} Agent")
115
+ agent = agent_class(*args, **kwargs)
116
+ logger.info(f"{agent_name} Agent initialized successfully")
117
+ return agent
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
118
  except Exception as e:
119
+ logger.error(f"Failed to initialize {agent_name} Agent: {e}")
120
+ logger.error(traceback.format_exc())
121
+ raise RuntimeError(f"{agent_name} Agent initialization failed: {str(e)}")
 
122
 
123
+ def _build_workflow(self) -> StateGraph:
124
+ """Build the workflow graph with comprehensive logging"""
125
+ logger.info("Building workflow graph")
126
+
127
  try:
128
+ # Create the state graph
129
+ workflow = StateGraph(WorkflowState)
130
+
131
+ # Add nodes for each agent
132
+ workflow.add_node("planning", self.planning_node)
133
+ workflow.add_node("data_collection", self.data_collection_node)
134
+ workflow.add_node("analysis", self.analysis_node)
135
+ workflow.add_node("validation", self.validation_node)
136
+ workflow.add_node("insights", self.insights_node)
137
+
138
+ # Define the workflow edges
139
+ workflow.add_edge("planning", "data_collection")
140
+ workflow.add_edge("data_collection", "analysis")
141
+ workflow.add_edge("analysis", "validation")
142
+ workflow.add_edge("validation", "insights")
143
+ workflow.add_edge("insights", END)
144
+
145
+ # Add error handling
146
+ workflow.add_node("handle_error", self.handle_error_node)
147
+
148
+ # Add conditional edges for error handling
149
+ def route_on_error(state):
150
+ logger.warning(f"Routing to error handler. Current status: {state['status']}")
151
+ return "handle_error" if state["status"] == "error" else None
152
+
153
+ workflow.add_conditional_edges("planning", route_on_error)
154
+ workflow.add_conditional_edges("data_collection", route_on_error)
155
+ workflow.add_conditional_edges("analysis", route_on_error)
156
+ workflow.add_conditional_edges("validation", route_on_error)
157
+ workflow.add_conditional_edges("insights", route_on_error)
158
+ workflow.add_edge("handle_error", END)
159
+
160
+ # Set the entry point
161
+ workflow.set_entry_point("planning")
162
+
163
+ logger.info("Workflow graph built successfully")
164
+ return workflow.compile()
165
+
166
  except Exception as e:
167
+ logger.error(f"Workflow graph construction failed: {e}")
168
+ logger.error(traceback.format_exc())
169
+ raise
 
170
 
171
+ def run_workflow(self, alert: str) -> WorkflowState:
172
+ """
173
+ Run the workflow with a given alert, with comprehensive logging
174
+
175
+ Args:
176
+ alert (str): Description of the sales alert
177
+
178
+ Returns:
179
+ WorkflowState: Final state of the workflow
180
+ """
181
+ logger.info(f"Starting workflow run with alert: {alert}")
182
+
183
  try:
184
+ # Create initial state
185
+ initial_state = create_initial_state(alert)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
186
 
187
+ # Run the workflow
188
+ logger.info("Invoking workflow")
189
+ final_state = self.workflow.invoke(initial_state)
190
 
191
+ logger.info(f"Workflow completed. Final status: {final_state['status']}")
 
192
 
193
+ return final_state
194
+
195
  except Exception as e:
196
+ logger.error(f"Workflow execution failed: {e}")
197
+ logger.error(traceback.format_exc())
198
+
199
+ # Create an error state
200
+ error_state = create_initial_state(alert)
201
+ error_state["status"] = "error"
202
+ error_state["error"] = str(e)
203
+ error_state["logs"].append({
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
204
  "timestamp": datetime.now().isoformat(),
205
+ "step": "error",
206
+ "message": f"Workflow execution failed: {e}"
207
  })
208
 
209
+ return error_state
210
+
211
+ # Keep the existing node methods, adding more logging
212
+ # (planning_node, data_collection_node, etc.) would follow the same pattern
213
+ # of adding comprehensive logging and error tracking
214
+
215
+ # Keep the rest of the existing implementation the same
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
216
 
217
  # For testing
218
  if __name__ == "__main__":
219
+ # Configure logging to show all details
220
+ logging.basicConfig(level=logging.DEBUG)
221
+
222
  # Set API key for testing
223
  os.environ["ANTHROPIC_API_KEY"] = "your_api_key_here"
224
 
225
+ try:
226
+ # Create workflow
227
+ workflow = SalesAnalysisWorkflow(db_path="data/pharma_db.sqlite")
228
+
229
+ # Run workflow with test alert
230
+ alert = "Sales of DrugX down 15% in Northeast region over past 30 days compared to forecast."
231
+ result = workflow.run_workflow(alert)
232
+
233
+ # Print workflow results
234
+ print(f"Workflow status: {result['status']}")
235
+ if result["status"] == "error":
236
+ print(f"Error: {result['error']}")
237
+ else:
238
+ print(f"Generated {len(result['insight_cards'])} insight cards")
239
+ print(f"Generated {len(result['visualizations'])} visualizations")
240
 
241
+ except Exception as e:
242
+ print(f"Test run failed: {e}")
243
+ print(traceback.format_exc())