cryogenic22 commited on
Commit
9bf4daf
·
verified ·
1 Parent(s): e4905e2

Create agents/executor.py

Browse files
Files changed (1) hide show
  1. agents/executor.py +133 -0
agents/executor.py ADDED
@@ -0,0 +1,133 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Executor agent for pharmaceutical data management.
3
+ This agent executes SQL queries and reports on the results.
4
+ """
5
+
6
+ import time
7
+ import json
8
+ from typing import Dict, Any, List
9
+ from langgraph.graph.message import add_messages
10
+ from anthropic.types import MessageParam
11
+
12
+ def executor_agent(anthropic_client, db, state: Dict[str, Any]) -> Dict[str, Any]:
13
+ """
14
+ Agent that executes the SQL queries and reports results.
15
+
16
+ Args:
17
+ anthropic_client: The Anthropic client for calling Claude API
18
+ db: The database connection
19
+ state: Current state of the agent workflow
20
+
21
+ Returns:
22
+ Updated state
23
+ """
24
+ # Get current messages and SQL queries
25
+ messages = state["messages"]
26
+ sql_queries = state["sql_queries"]
27
+
28
+ # Add agent-specific instructions
29
+ system_message = """
30
+ You are an AI assistant specializing in executing and validating pharmaceutical data pipelines.
31
+ Your job is to execute the SQL queries and report on the results.
32
+
33
+ For each query:
34
+ 1. Execute it and capture the results
35
+ 2. Validate the data (check for issues, errors, unexpected results)
36
+ 3. Provide a clear summary of what was accomplished
37
+
38
+ Tag your complete response with EXECUTION_COMPLETE when all queries are processed.
39
+ """
40
+
41
+ # Execute the queries
42
+ execution_results = _execute_queries(db, sql_queries)
43
+
44
+ # Calculate overall success
45
+ success_rate = sum(1 for r in execution_results if r["success"]) / len(execution_results) if execution_results else 0
46
+
47
+ # Prepare context for Claude
48
+ execution_context = json.dumps(execution_results, indent=2)
49
+
50
+ # Prepare prompt for Claude
51
+ prompt_messages = [
52
+ MessageParam(role="system", content=system_message),
53
+ *[MessageParam(role=m["role"], content=m["content"]) for m in messages],
54
+ MessageParam(role="user", content=f"Here are the execution results of the SQL queries. Please analyze and report on them.\n\n{execution_context}")
55
+ ]
56
+
57
+ # Call Claude API
58
+ response = anthropic_client.messages.create(
59
+ model="claude-3-7-sonnet-20250219",
60
+ messages=prompt_messages,
61
+ max_tokens=2000
62
+ )
63
+
64
+ # Extract the response
65
+ agent_response = response.content[0].text
66
+
67
+ # Update state with execution results
68
+ new_state = state.copy()
69
+ new_state["execution_results"] = {
70
+ "queries_executed": len(execution_results),
71
+ "success_rate": success_rate,
72
+ "details": execution_results,
73
+ "summary": agent_response.replace("EXECUTION_COMPLETE", "").strip(),
74
+ "completed_at": time.time()
75
+ }
76
+
77
+ # Calculate confidence scores
78
+ confidence_scores = {
79
+ "intent_understanding": 0.95, # High by default for demo
80
+ "plan_quality": 0.85 if success_rate > 0.8 else 0.6,
81
+ "execution_success": success_rate,
82
+ "overall": (0.95 + (0.85 if success_rate > 0.8 else 0.6) + success_rate) / 3
83
+ }
84
+ new_state["confidence_scores"] = confidence_scores
85
+
86
+ # Update status
87
+ new_state["status"] = "complete"
88
+ new_state["current_agent"] = "complete"
89
+
90
+ # Add agent's response to messages
91
+ new_messages = add_messages(state, [
92
+ {"role": "assistant", "content": agent_response.replace("EXECUTION_COMPLETE", "").strip()}
93
+ ])
94
+ new_state["messages"] = new_messages
95
+
96
+ return new_state
97
+
98
+ def _execute_queries(db, sql_queries: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
99
+ """
100
+ Execute SQL queries against the database.
101
+
102
+ Args:
103
+ db: The database connection
104
+ sql_queries: List of SQL queries to execute
105
+
106
+ Returns:
107
+ List of execution results
108
+ """
109
+ execution_results = []
110
+ for query in sql_queries:
111
+ try:
112
+ # Execute the query
113
+ result = db.execute_query(query["sql"])
114
+
115
+ # Add to results
116
+ execution_results.append({
117
+ "query_name": query["name"],
118
+ "success": "error" not in result,
119
+ "result_summary": "Successfully processed data" if "error" not in result else result["error"],
120
+ "row_count": len(result.get("data", [])) if "data" in result else 0,
121
+ "executed_at": time.time()
122
+ })
123
+ except Exception as e:
124
+ # Handle execution errors
125
+ execution_results.append({
126
+ "query_name": query["name"],
127
+ "success": False,
128
+ "result_summary": f"Error: {str(e)}",
129
+ "row_count": 0,
130
+ "executed_at": time.time()
131
+ })
132
+
133
+ return execution_results