cryogenic22 commited on
Commit
2bb7e0c
·
verified ·
1 Parent(s): f85e2e6

Update workflows/sales_analysis.py

Browse files
Files changed (1) hide show
  1. workflows/sales_analysis.py +281 -53
workflows/sales_analysis.py CHANGED
@@ -6,8 +6,7 @@ from typing import Dict, List, Any, Tuple, Optional, TypedDict, Annotated, Liter
6
  from datetime import datetime
7
  import uuid
8
  from pydantic import BaseModel, Field
9
- import langgraph.graph as lg
10
- from langgraph.graph import END, StateGraph
11
  from langchain_anthropic import ChatAnthropic
12
  from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
13
 
@@ -76,12 +75,12 @@ class SalesAnalysisWorkflow:
76
  logger.info("Initializing SalesAnalysisWorkflow")
77
 
78
  try:
79
- # Initialize agents with comprehensive error handling
80
- self.planning_agent = self._safe_agent_init(PlanningAgent, "Planning")
81
- self.data_agent = self._safe_agent_init(DataAgent, "Data", db_path)
82
- self.analytics_agent = self._safe_agent_init(AnalyticsAgent, "Analytics")
83
- self.qa_agent = self._safe_agent_init(QAAgent, "QA")
84
- self.insights_agent = self._safe_agent_init(InsightsAgent, "Insights")
85
 
86
  # Create the workflow graph
87
  logger.info("Building workflow graph")
@@ -94,32 +93,6 @@ class SalesAnalysisWorkflow:
94
  logger.error(traceback.format_exc())
95
  raise
96
 
97
- def _safe_agent_init(self, agent_class, agent_name, *args, **kwargs):
98
- """
99
- Safely initialize an agent with logging and error handling
100
-
101
- Args:
102
- agent_class (type): The agent class to initialize
103
- agent_name (str): Name of the agent for logging
104
- *args: Positional arguments for agent initialization
105
- **kwargs: Keyword arguments for agent initialization
106
-
107
- Returns:
108
- Initialized agent
109
-
110
- Raises:
111
- Exception if agent initialization fails
112
- """
113
- try:
114
- logger.info(f"Initializing {agent_name} Agent")
115
- agent = agent_class(*args, **kwargs)
116
- logger.info(f"{agent_name} Agent initialized successfully")
117
- return agent
118
- except Exception as e:
119
- logger.error(f"Failed to initialize {agent_name} Agent: {e}")
120
- logger.error(traceback.format_exc())
121
- raise RuntimeError(f"{agent_name} Agent initialization failed: {str(e)}")
122
-
123
  def _build_workflow(self) -> StateGraph:
124
  """Build the workflow graph with comprehensive logging"""
125
  logger.info("Building workflow graph")
@@ -128,12 +101,277 @@ class SalesAnalysisWorkflow:
128
  # Create the state graph
129
  workflow = StateGraph(WorkflowState)
130
 
131
- # Add nodes for each agent
132
- workflow.add_node("planning", self.planning_node)
133
- workflow.add_node("data_collection", self.data_collection_node)
134
- workflow.add_node("analysis", self.analysis_node)
135
- workflow.add_node("validation", self.validation_node)
136
- workflow.add_node("insights", self.insights_node)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
137
 
138
  # Define the workflow edges
139
  workflow.add_edge("planning", "data_collection")
@@ -142,14 +380,10 @@ class SalesAnalysisWorkflow:
142
  workflow.add_edge("validation", "insights")
143
  workflow.add_edge("insights", END)
144
 
145
- # Add error handling
146
- workflow.add_node("handle_error", self.handle_error_node)
147
-
148
  # Add conditional edges for error handling
149
  def route_on_error(state):
150
- logger.warning(f"Routing to error handler. Current status: {state['status']}")
151
  return "handle_error" if state["status"] == "error" else None
152
-
153
  workflow.add_conditional_edges("planning", route_on_error)
154
  workflow.add_conditional_edges("data_collection", route_on_error)
155
  workflow.add_conditional_edges("analysis", route_on_error)
@@ -208,12 +442,6 @@ class SalesAnalysisWorkflow:
208
 
209
  return error_state
210
 
211
- # Keep the existing node methods, adding more logging
212
- # (planning_node, data_collection_node, etc.) would follow the same pattern
213
- # of adding comprehensive logging and error tracking
214
-
215
- # Keep the rest of the existing implementation the same
216
-
217
  # For testing
218
  if __name__ == "__main__":
219
  # Configure logging to show all details
@@ -235,8 +463,8 @@ if __name__ == "__main__":
235
  if result["status"] == "error":
236
  print(f"Error: {result['error']}")
237
  else:
238
- print(f"Generated {len(result['insight_cards'])} insight cards")
239
- print(f"Generated {len(result['visualizations'])} visualizations")
240
 
241
  except Exception as e:
242
  print(f"Test run failed: {e}")
 
6
  from datetime import datetime
7
  import uuid
8
  from pydantic import BaseModel, Field
9
+ from langgraph.graph import StateGraph, END
 
10
  from langchain_anthropic import ChatAnthropic
11
  from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
12
 
 
75
  logger.info("Initializing SalesAnalysisWorkflow")
76
 
77
  try:
78
+ # Initialize agents
79
+ self.planning_agent = PlanningAgent()
80
+ self.data_agent = DataAgent(db_path=db_path)
81
+ self.analytics_agent = AnalyticsAgent()
82
+ self.qa_agent = QAAgent()
83
+ self.insights_agent = InsightsAgent()
84
 
85
  # Create the workflow graph
86
  logger.info("Building workflow graph")
 
93
  logger.error(traceback.format_exc())
94
  raise
95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96
  def _build_workflow(self) -> StateGraph:
97
  """Build the workflow graph with comprehensive logging"""
98
  logger.info("Building workflow graph")
 
101
  # Create the state graph
102
  workflow = StateGraph(WorkflowState)
103
 
104
+ # Define node methods directly in the workflow
105
+ def planning_node(state):
106
+ """Planning agent node"""
107
+ try:
108
+ logger.info("Entering Planning Node")
109
+
110
+ state["logs"].append({
111
+ "timestamp": datetime.now().isoformat(),
112
+ "step": "planning",
113
+ "message": "Planning analysis approach"
114
+ })
115
+
116
+ # Update status
117
+ state["status"] = "planning"
118
+
119
+ # Create analysis plan
120
+ analysis_plan, plan_dict = self.planning_agent.create_analysis_plan(state["alert"])
121
+
122
+ # Store the plan in the state
123
+ state["plan"] = analysis_plan
124
+
125
+ # Create data requests from plan
126
+ data_requests = []
127
+ for i, data_source in enumerate(analysis_plan.required_data_sources):
128
+ data_requests.append(DataRequest(
129
+ request_id=f"data_{i}",
130
+ description=data_source["purpose"],
131
+ tables=[data_source["table"]],
132
+ purpose=data_source["purpose"]
133
+ ))
134
+
135
+ # Store data requests in the state
136
+ state["data_requests"] = data_requests
137
+
138
+ # Update status for next step
139
+ state["status"] = "data_collection"
140
+
141
+ return state
142
+ except Exception as e:
143
+ logger.error(f"Planning node error: {e}")
144
+ logger.error(traceback.format_exc())
145
+
146
+ # Update state to error
147
+ state["status"] = "error"
148
+ state["error"] = f"Planning error: {str(e)}"
149
+
150
+ return state
151
+
152
+ def data_collection_node(state):
153
+ """Data collection agent node"""
154
+ try:
155
+ logger.info("Entering Data Collection Node")
156
+
157
+ state["logs"].append({
158
+ "timestamp": datetime.now().isoformat(),
159
+ "step": "data_collection",
160
+ "message": f"Collecting data from {len(state['data_requests'])} sources"
161
+ })
162
+
163
+ # Update status
164
+ state["status"] = "data_collection"
165
+
166
+ # Process data requests
167
+ data_sources = self.data_agent.get_data_for_analysis(state["data_requests"])
168
+
169
+ # Store data sources in the state
170
+ state["data_sources"] = data_sources
171
+
172
+ # Create analysis requests based on plan
173
+ analysis_requests = []
174
+ if state["plan"]:
175
+ analysis_approaches = state["plan"].analysis_approaches
176
+ for i, approach in enumerate(analysis_approaches):
177
+ analysis_requests.append(AnalysisRequest(
178
+ request_id=f"analysis_{i}",
179
+ description=approach["purpose"],
180
+ data_sources=list(data_sources.keys()),
181
+ analysis_type=approach["type"],
182
+ purpose=approach["purpose"]
183
+ ))
184
+
185
+ # Store analysis requests in the state
186
+ state["analysis_requests"] = analysis_requests
187
+
188
+ # Update status for next step
189
+ state["status"] = "analysis"
190
+
191
+ return state
192
+ except Exception as e:
193
+ logger.error(f"Data collection node error: {e}")
194
+ logger.error(traceback.format_exc())
195
+
196
+ # Update state to error
197
+ state["status"] = "error"
198
+ state["error"] = f"Data collection error: {str(e)}"
199
+
200
+ return state
201
+
202
+ def analysis_node(state):
203
+ """Analytics agent node"""
204
+ try:
205
+ logger.info("Entering Analysis Node")
206
+
207
+ state["logs"].append({
208
+ "timestamp": datetime.now().isoformat(),
209
+ "step": "analysis",
210
+ "message": f"Performing {len(state['analysis_requests'])} analysis tasks"
211
+ })
212
+
213
+ # Update status
214
+ state["status"] = "analysis"
215
+
216
+ # Process analysis requests
217
+ analysis_results = {}
218
+ for request in state["analysis_requests"]:
219
+ result = self.analytics_agent.perform_analysis(request, state["data_sources"])
220
+ analysis_results[request.request_id] = result
221
+
222
+ # Store analysis results in the state
223
+ state["analysis_results"] = analysis_results
224
+
225
+ # Create validation requests
226
+ validation_requests = []
227
+ for analysis_id, result in analysis_results.items():
228
+ validation_requests.append(ValidationRequest(
229
+ request_id=analysis_id,
230
+ original_problem=state["alert"],
231
+ analysis_results=result.model_dump(),
232
+ data_sources=list(state["data_sources"].keys())
233
+ ))
234
+
235
+ # Store validation requests in the state
236
+ state["validation_requests"] = validation_requests
237
+
238
+ # Update status for next step
239
+ state["status"] = "validation"
240
+
241
+ return state
242
+ except Exception as e:
243
+ logger.error(f"Analysis node error: {e}")
244
+ logger.error(traceback.format_exc())
245
+
246
+ # Update state to error
247
+ state["status"] = "error"
248
+ state["error"] = f"Analysis error: {str(e)}"
249
+
250
+ return state
251
+
252
+ def validation_node(state):
253
+ """QA agent node"""
254
+ try:
255
+ logger.info("Entering Validation Node")
256
+
257
+ state["logs"].append({
258
+ "timestamp": datetime.now().isoformat(),
259
+ "step": "validation",
260
+ "message": f"Validating {len(state['validation_requests'])} analysis results"
261
+ })
262
+
263
+ # Update status
264
+ state["status"] = "validation"
265
+
266
+ # Process validation requests
267
+ validation_results = {}
268
+ for request in state["validation_requests"]:
269
+ result = self.qa_agent.validate_analysis(request, state["data_sources"])
270
+ validation_results[request.request_id] = result
271
+
272
+ # Store validation results in the state
273
+ state["validation_results"] = validation_results
274
+
275
+ # Create insight requests
276
+ insight_requests = []
277
+ for analysis_id, validation in validation_results.items():
278
+ analysis_result = state["analysis_results"][analysis_id]
279
+ insight_requests.append(InsightRequest(
280
+ request_id=analysis_id,
281
+ original_problem=state["alert"],
282
+ analysis_results=analysis_result.model_dump(),
283
+ validation_results=validation.model_dump(),
284
+ target_audience="executive"
285
+ ))
286
+
287
+ # Store insight requests in the state
288
+ state["insight_requests"] = insight_requests
289
+
290
+ # Update status for next step
291
+ state["status"] = "insights"
292
+
293
+ return state
294
+ except Exception as e:
295
+ logger.error(f"Validation node error: {e}")
296
+ logger.error(traceback.format_exc())
297
+
298
+ # Update state to error
299
+ state["status"] = "error"
300
+ state["error"] = f"Validation error: {str(e)}"
301
+
302
+ return state
303
+
304
+ def insights_node(state):
305
+ """Insights agent node"""
306
+ try:
307
+ logger.info("Entering Insights Node")
308
+
309
+ state["logs"].append({
310
+ "timestamp": datetime.now().isoformat(),
311
+ "step": "insights",
312
+ "message": f"Generating insights from {len(state['insight_requests'])} validated analyses"
313
+ })
314
+
315
+ # Update status
316
+ state["status"] = "insights"
317
+
318
+ # Process insight requests
319
+ insight_cards = {}
320
+ visualizations = []
321
+ for request in state["insight_requests"]:
322
+ # Generate insight card
323
+ insight_card = self.insights_agent.generate_insights(request)
324
+ insight_cards[request.request_id] = insight_card
325
+
326
+ # Generate visualizations for the insight card
327
+ viz_files = self.insights_agent.generate_visualizations(insight_card, state["data_sources"])
328
+ visualizations.extend(viz_files)
329
+
330
+ # Store insight cards and visualizations in the state
331
+ state["insight_cards"] = insight_cards
332
+ state["visualizations"] = visualizations
333
+
334
+ # Update status for completion
335
+ state["status"] = "complete"
336
+
337
+ # Log completion
338
+ state["logs"].append({
339
+ "timestamp": datetime.now().isoformat(),
340
+ "step": "complete",
341
+ "message": f"Analysis workflow completed with {len(insight_cards)} insight cards"
342
+ })
343
+
344
+ return state
345
+ except Exception as e:
346
+ logger.error(f"Insights node error: {e}")
347
+ logger.error(traceback.format_exc())
348
+
349
+ # Update state to error
350
+ state["status"] = "error"
351
+ state["error"] = f"Insights error: {str(e)}"
352
+
353
+ return state
354
+
355
+ def handle_error_node(state):
356
+ """Error handling node"""
357
+ logger.error(f"Workflow error: {state['error']}")
358
+
359
+ # Log the error
360
+ state["logs"].append({
361
+ "timestamp": datetime.now().isoformat(),
362
+ "step": "error",
363
+ "message": f"Workflow error: {state['error']}"
364
+ })
365
+
366
+ return state
367
+
368
+ # Add nodes to the workflow
369
+ workflow.add_node("planning", planning_node)
370
+ workflow.add_node("data_collection", data_collection_node)
371
+ workflow.add_node("analysis", analysis_node)
372
+ workflow.add_node("validation", validation_node)
373
+ workflow.add_node("insights", insights_node)
374
+ workflow.add_node("handle_error", handle_error_node)
375
 
376
  # Define the workflow edges
377
  workflow.add_edge("planning", "data_collection")
 
380
  workflow.add_edge("validation", "insights")
381
  workflow.add_edge("insights", END)
382
 
 
 
 
383
  # Add conditional edges for error handling
384
  def route_on_error(state):
 
385
  return "handle_error" if state["status"] == "error" else None
386
+
387
  workflow.add_conditional_edges("planning", route_on_error)
388
  workflow.add_conditional_edges("data_collection", route_on_error)
389
  workflow.add_conditional_edges("analysis", route_on_error)
 
442
 
443
  return error_state
444
 
 
 
 
 
 
 
445
  # For testing
446
  if __name__ == "__main__":
447
  # Configure logging to show all details
 
463
  if result["status"] == "error":
464
  print(f"Error: {result['error']}")
465
  else:
466
+ print(f"Generated {len(result.get('insight_cards', {}))} insight cards")
467
+ print(f"Generated {len(result.get('visualizations', []))} visualizations")
468
 
469
  except Exception as e:
470
  print(f"Test run failed: {e}")