Spaces:
Runtime error
Runtime error
File size: 5,415 Bytes
9bf4daf 2364cf6 9bf4daf f798709 9bf4daf 9a091e0 9bf4daf f798709 b1e2bae 9bf4daf 9a091e0 9bf4daf e31ceda 21aa8e8 e31ceda 21aa8e8 f798709 21aa8e8 e31ceda 21aa8e8 e31ceda 9a091e0 e31ceda 9a091e0 e31ceda 9a091e0 e31ceda 9a091e0 e31ceda 9a091e0 e31ceda 9a091e0 e31ceda 9a091e0 e31ceda 9a091e0 e31ceda 9a091e0 e31ceda 9a091e0 9bf4daf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
"""
Executor agent for pharmaceutical data management.
This agent executes SQL queries and reports on the results.
"""
import time
import json
from typing import Dict, Any, List
from anthropic.types import MessageParam
def executor_agent(anthropic_client, db, state: Dict[str, Any]) -> Dict[str, Any]:
"""
Agent that executes the SQL queries and reports results.
Args:
anthropic_client: The Anthropic client for calling Claude API
db: The database connection
state: Current state of the agent workflow
Returns:
Updated state values only (not the entire state)
"""
# Get current messages and SQL queries
messages = state.get("messages", [])
sql_queries = state.get("sql_queries", [])
# System message as a string
system_message = """
You are an AI assistant specializing in executing and validating pharmaceutical data pipelines.
Your job is to execute the SQL queries and report on the results.
For each query:
1. Execute it and capture the results
2. Validate the data (check for issues, errors, unexpected results)
3. Provide a clear summary of what was accomplished
Tag your complete response with EXECUTION_COMPLETE when all queries are processed.
"""
# Execute the queries
execution_results = _execute_queries(db, sql_queries)
# Calculate overall success
success_rate = sum(1 for r in execution_results if r["success"]) / len(execution_results) if execution_results else 0
# Prepare context for Claude
execution_context = json.dumps(execution_results, indent=2)
# Format messages for the Anthropic API
formatted_messages = []
for msg in messages:
if isinstance(msg, dict) and "role" in msg and "content" in msg:
formatted_messages.append(MessageParam(
role=msg["role"],
content=msg["content"]
))
# Add final user message with context
formatted_messages.append(
MessageParam(
role="user",
content=f"Here are the execution results of the SQL queries. Please analyze and report on them.\n\n{execution_context}"
)
)
try:
# Call Claude API with system parameter separately
response = anthropic_client.messages.create(
model="claude-3-7-sonnet-20250219",
system=system_message,
messages=formatted_messages,
max_tokens=2000
)
# Extract the response
agent_response = response.content[0].text
# Clean response text
clean_response = agent_response.replace("EXECUTION_COMPLETE", "").strip()
# Return only the STATE UPDATES, not the entire state
result = {}
# Add a new message to the list (will be combined with existing via operator.add)
result["messages"] = [{"role": "assistant", "content": clean_response}]
# Update execution results
result["execution_results"] = {
"queries_executed": len(execution_results),
"success_rate": success_rate,
"details": execution_results,
"summary": clean_response,
"completed_at": time.time()
}
# Calculate confidence scores
result["confidence_scores"] = {
"intent_understanding": 0.95, # High by default for demo
"plan_quality": 0.85 if success_rate > 0.8 else 0.6,
"execution_success": success_rate,
"overall": (0.95 + (0.85 if success_rate > 0.8 else 0.6) + success_rate) / 3
}
# Update status
result["status"] = "complete"
result["current_agent"] = "complete"
return result
except Exception as e:
# Handle any errors - return only state updates
print(f"Error in executor_agent: {str(e)}")
return {
"messages": [{"role": "assistant", "content": f"I encountered an error: {str(e)}"}],
"current_agent": "executor_agent",
"status": "error"
}
def _execute_queries(db, sql_queries: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Execute SQL queries against the database.
Args:
db: The database connection
sql_queries: List of SQL queries to execute
Returns:
List of execution results
"""
execution_results = []
for query in sql_queries:
try:
# Execute the query
result = db.execute_query(query["sql"])
# Add to results
execution_results.append({
"query_name": query["name"],
"success": "error" not in result,
"result_summary": "Successfully processed data" if "error" not in result else result["error"],
"row_count": len(result.get("data", [])) if "data" in result else 0,
"executed_at": time.time()
})
except Exception as e:
# Handle execution errors
execution_results.append({
"query_name": query["name"],
"success": False,
"result_summary": f"Error: {str(e)}",
"row_count": 0,
"executed_at": time.time()
})
return execution_results |