""" Executor agent for pharmaceutical data management. This agent executes SQL queries and reports on the results. """ import time import json from typing import Dict, Any, List from anthropic.types import MessageParam def executor_agent(anthropic_client, db, state: Dict[str, Any]) -> Dict[str, Any]: """ Agent that executes the SQL queries and reports results. Args: anthropic_client: The Anthropic client for calling Claude API db: The database connection state: Current state of the agent workflow Returns: Updated state values only (not the entire state) """ # Get current messages and SQL queries messages = state.get("messages", []) sql_queries = state.get("sql_queries", []) # System message as a string system_message = """ You are an AI assistant specializing in executing and validating pharmaceutical data pipelines. Your job is to execute the SQL queries and report on the results. For each query: 1. Execute it and capture the results 2. Validate the data (check for issues, errors, unexpected results) 3. Provide a clear summary of what was accomplished Tag your complete response with EXECUTION_COMPLETE when all queries are processed. """ # Execute the queries execution_results = _execute_queries(db, sql_queries) # Calculate overall success success_rate = sum(1 for r in execution_results if r["success"]) / len(execution_results) if execution_results else 0 # Prepare context for Claude execution_context = json.dumps(execution_results, indent=2) # Format messages for the Anthropic API formatted_messages = [] for msg in messages: if isinstance(msg, dict) and "role" in msg and "content" in msg: formatted_messages.append(MessageParam( role=msg["role"], content=msg["content"] )) # Add final user message with context formatted_messages.append( MessageParam( role="user", content=f"Here are the execution results of the SQL queries. Please analyze and report on them.\n\n{execution_context}" ) ) try: # Call Claude API with system parameter separately response = anthropic_client.messages.create( model="claude-3-7-sonnet-20250219", system=system_message, messages=formatted_messages, max_tokens=2000 ) # Extract the response agent_response = response.content[0].text # Clean response text clean_response = agent_response.replace("EXECUTION_COMPLETE", "").strip() # Return only the STATE UPDATES, not the entire state result = {} # Add a new message to the list (will be combined with existing via operator.add) result["messages"] = [{"role": "assistant", "content": clean_response}] # Update execution results result["execution_results"] = { "queries_executed": len(execution_results), "success_rate": success_rate, "details": execution_results, "summary": clean_response, "completed_at": time.time() } # Calculate confidence scores result["confidence_scores"] = { "intent_understanding": 0.95, # High by default for demo "plan_quality": 0.85 if success_rate > 0.8 else 0.6, "execution_success": success_rate, "overall": (0.95 + (0.85 if success_rate > 0.8 else 0.6) + success_rate) / 3 } # Update status result["status"] = "complete" result["current_agent"] = "complete" return result except Exception as e: # Handle any errors - return only state updates print(f"Error in executor_agent: {str(e)}") return { "messages": [{"role": "assistant", "content": f"I encountered an error: {str(e)}"}], "current_agent": "executor_agent", "status": "error" } def _execute_queries(db, sql_queries: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Execute SQL queries against the database. Args: db: The database connection sql_queries: List of SQL queries to execute Returns: List of execution results """ execution_results = [] for query in sql_queries: try: # Execute the query result = db.execute_query(query["sql"]) # Add to results execution_results.append({ "query_name": query["name"], "success": "error" not in result, "result_summary": "Successfully processed data" if "error" not in result else result["error"], "row_count": len(result.get("data", [])) if "data" in result else 0, "executed_at": time.time() }) except Exception as e: # Handle execution errors execution_results.append({ "query_name": query["name"], "success": False, "result_summary": f"Error: {str(e)}", "row_count": 0, "executed_at": time.time() }) return execution_results