cryogenic22's picture
Update agents/executor.py
9a091e0 verified
"""
Executor agent for pharmaceutical data management.
This agent executes SQL queries and reports on the results.
"""
import time
import json
from typing import Dict, Any, List
from anthropic.types import MessageParam
def executor_agent(anthropic_client, db, state: Dict[str, Any]) -> Dict[str, Any]:
"""
Agent that executes the SQL queries and reports results.
Args:
anthropic_client: The Anthropic client for calling Claude API
db: The database connection
state: Current state of the agent workflow
Returns:
Updated state values only (not the entire state)
"""
# Get current messages and SQL queries
messages = state.get("messages", [])
sql_queries = state.get("sql_queries", [])
# System message as a string
system_message = """
You are an AI assistant specializing in executing and validating pharmaceutical data pipelines.
Your job is to execute the SQL queries and report on the results.
For each query:
1. Execute it and capture the results
2. Validate the data (check for issues, errors, unexpected results)
3. Provide a clear summary of what was accomplished
Tag your complete response with EXECUTION_COMPLETE when all queries are processed.
"""
# Execute the queries
execution_results = _execute_queries(db, sql_queries)
# Calculate overall success
success_rate = sum(1 for r in execution_results if r["success"]) / len(execution_results) if execution_results else 0
# Prepare context for Claude
execution_context = json.dumps(execution_results, indent=2)
# Format messages for the Anthropic API
formatted_messages = []
for msg in messages:
if isinstance(msg, dict) and "role" in msg and "content" in msg:
formatted_messages.append(MessageParam(
role=msg["role"],
content=msg["content"]
))
# Add final user message with context
formatted_messages.append(
MessageParam(
role="user",
content=f"Here are the execution results of the SQL queries. Please analyze and report on them.\n\n{execution_context}"
)
)
try:
# Call Claude API with system parameter separately
response = anthropic_client.messages.create(
model="claude-3-7-sonnet-20250219",
system=system_message,
messages=formatted_messages,
max_tokens=2000
)
# Extract the response
agent_response = response.content[0].text
# Clean response text
clean_response = agent_response.replace("EXECUTION_COMPLETE", "").strip()
# Return only the STATE UPDATES, not the entire state
result = {}
# Add a new message to the list (will be combined with existing via operator.add)
result["messages"] = [{"role": "assistant", "content": clean_response}]
# Update execution results
result["execution_results"] = {
"queries_executed": len(execution_results),
"success_rate": success_rate,
"details": execution_results,
"summary": clean_response,
"completed_at": time.time()
}
# Calculate confidence scores
result["confidence_scores"] = {
"intent_understanding": 0.95, # High by default for demo
"plan_quality": 0.85 if success_rate > 0.8 else 0.6,
"execution_success": success_rate,
"overall": (0.95 + (0.85 if success_rate > 0.8 else 0.6) + success_rate) / 3
}
# Update status
result["status"] = "complete"
result["current_agent"] = "complete"
return result
except Exception as e:
# Handle any errors - return only state updates
print(f"Error in executor_agent: {str(e)}")
return {
"messages": [{"role": "assistant", "content": f"I encountered an error: {str(e)}"}],
"current_agent": "executor_agent",
"status": "error"
}
def _execute_queries(db, sql_queries: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Execute SQL queries against the database.
Args:
db: The database connection
sql_queries: List of SQL queries to execute
Returns:
List of execution results
"""
execution_results = []
for query in sql_queries:
try:
# Execute the query
result = db.execute_query(query["sql"])
# Add to results
execution_results.append({
"query_name": query["name"],
"success": "error" not in result,
"result_summary": "Successfully processed data" if "error" not in result else result["error"],
"row_count": len(result.get("data", [])) if "data" in result else 0,
"executed_at": time.time()
})
except Exception as e:
# Handle execution errors
execution_results.append({
"query_name": query["name"],
"success": False,
"result_summary": f"Error: {str(e)}",
"row_count": 0,
"executed_at": time.time()
})
return execution_results