cryogenic22 commited on
Commit
87fe343
·
verified ·
1 Parent(s): 1bcd3df

Update agents/sql_generator.py

Browse files
Files changed (1) hide show
  1. agents/sql_generator.py +38 -62
agents/sql_generator.py CHANGED
@@ -5,11 +5,10 @@ This agent converts a data pipeline plan into executable SQL queries.
5
 
6
  import time
7
  from typing import Dict, Any, List
8
- from agents.state import AgentState
9
- from anthropic import Anthropic
10
  from anthropic.types import MessageParam
11
 
12
- def sql_generator_agent(anthropic_client: Anthropic, state: AgentState) -> AgentState:
13
  """
14
  Agent that generates SQL queries based on the pipeline plan.
15
 
@@ -22,7 +21,6 @@ def sql_generator_agent(anthropic_client: Anthropic, state: AgentState) -> Agent
22
  """
23
  # Get current messages and pipeline plan
24
  messages = state.get("messages", [])
25
- current_agent = state.get("current_agent", ["sql_generator_agent"])
26
  pipeline_plan = state.get("pipeline_plan", {})
27
 
28
  # Add agent-specific instructions
@@ -50,68 +48,46 @@ def sql_generator_agent(anthropic_client: Anthropic, state: AgentState) -> Agent
50
  - Data products: DP_SALES_DASHBOARD, DP_HCP_TARGETING
51
  """
52
 
53
- # Convert messages to the format expected by Anthropic API
54
- prompt_messages = []
55
- for msg in messages:
56
- prompt_messages.append(MessageParam(
57
- role=msg["role"],
58
- content=msg["content"]
59
- ))
60
-
61
- # Add final user message with context
62
- prompt_messages.append(
63
- MessageParam(
64
- role="user",
65
- content=f"Based on this pipeline plan, generate the SQL queries needed. {context}"
66
- )
67
  )
68
 
69
- try:
70
- # Call Claude API
71
- response = anthropic_client.messages.create(
72
- model="claude-3-7-sonnet-20250219",
73
- system=system_message,
74
- messages=prompt_messages,
75
- max_tokens=3000
76
- )
77
-
78
- # Extract the response
79
- agent_response = response.content[0].text
80
-
81
- # Check if SQL generation is complete
82
- sql_complete = "SQL_COMPLETE" in agent_response
83
-
84
- # Prepare the new state updates
85
- new_state = state.copy()
86
-
87
- # Add agent's response to messages
88
- new_state["messages"].append({
89
- "role": "assistant",
90
- "content": agent_response.replace("SQL_COMPLETE", "").strip()
91
- })
92
-
93
  # Extract SQL queries from the response
94
- if sql_complete:
95
- sql_queries = _extract_sql_queries(agent_response)
96
-
97
- new_state["sql_queries"] = sql_queries
98
- # Use .append() to add next agent to the list
99
- new_state["current_agent"].append("executor_agent")
100
- else:
101
- # Use .append() to add current agent if SQL generation is not complete
102
- new_state["current_agent"].append("sql_generator_agent")
103
 
104
- return new_state
105
-
106
- except Exception as e:
107
- # Handle any errors
108
- print(f"Error in sql_generator_agent: {str(e)}")
109
- return {
110
- "messages": state.get("messages", []) + [
111
- {"role": "assistant", "content": f"I encountered an error: {str(e)}"}
112
- ],
113
- "current_agent": current_agent + ["sql_generator_agent"]
114
- }
 
 
115
 
116
  def _extract_sql_queries(response: str) -> List[Dict[str, Any]]:
117
  """
 
5
 
6
  import time
7
  from typing import Dict, Any, List
8
+ from langgraph.graph.message import add_messages
 
9
  from anthropic.types import MessageParam
10
 
11
+ def sql_generator_agent(anthropic_client, state: Dict[str, Any]) -> Dict[str, Any]:
12
  """
13
  Agent that generates SQL queries based on the pipeline plan.
14
 
 
21
  """
22
  # Get current messages and pipeline plan
23
  messages = state.get("messages", [])
 
24
  pipeline_plan = state.get("pipeline_plan", {})
25
 
26
  # Add agent-specific instructions
 
48
  - Data products: DP_SALES_DASHBOARD, DP_HCP_TARGETING
49
  """
50
 
51
+ # Prepare prompt for Claude
52
+ prompt_messages = [
53
+ MessageParam(role="system", content=system_message),
54
+ *[MessageParam(role=m["role"], content=m["content"]) for m in messages],
55
+ MessageParam(role="user", content=f"Based on this pipeline plan, generate the SQL queries needed. {context}")
56
+ ]
57
+
58
+ # Call Claude API
59
+ response = anthropic_client.messages.create(
60
+ model="claude-3-7-sonnet-20250219",
61
+ messages=prompt_messages,
62
+ max_tokens=3000
 
 
63
  )
64
 
65
+ # Extract the response
66
+ agent_response = response.content[0].text
67
+
68
+ # Check if SQL generation is complete
69
+ sql_complete = "SQL_COMPLETE" in agent_response
70
+
71
+ # Update state based on SQL completeness
72
+ new_state = state.copy()
73
+ if sql_complete:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74
  # Extract SQL queries from the response
75
+ # In a production system, you would parse the SQL more carefully
76
+ sql_queries = _extract_sql_queries(agent_response)
 
 
 
 
 
 
 
77
 
78
+ new_state["sql_queries"] = sql_queries
79
+ new_state["current_agent"] = "executor_agent"
80
+ else:
81
+ # Need more information or work, stay with SQL generator agent
82
+ new_state["current_agent"] = "sql_generator_agent"
83
+
84
+ # Add agent's response to messages
85
+ new_messages = add_messages(state, [
86
+ {"role": "assistant", "content": agent_response.replace("SQL_COMPLETE", "").strip()}
87
+ ])
88
+ new_state["messages"] = new_messages
89
+
90
+ return new_state
91
 
92
  def _extract_sql_queries(response: str) -> List[Dict[str, Any]]:
93
  """