Spaces:
Runtime error
Runtime error
| """ | |
| Executor agent for pharmaceutical data management. | |
| This agent executes SQL queries and reports on the results. | |
| """ | |
| import time | |
| import json | |
| from typing import Dict, Any, List | |
| from anthropic.types import MessageParam | |
| def executor_agent(anthropic_client, db, state: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Agent that executes the SQL queries and reports results. | |
| Args: | |
| anthropic_client: The Anthropic client for calling Claude API | |
| db: The database connection | |
| state: Current state of the agent workflow | |
| Returns: | |
| Updated state values only (not the entire state) | |
| """ | |
| # Get current messages and SQL queries | |
| messages = state.get("messages", []) | |
| sql_queries = state.get("sql_queries", []) | |
| # System message as a string | |
| system_message = """ | |
| You are an AI assistant specializing in executing and validating pharmaceutical data pipelines. | |
| Your job is to execute the SQL queries and report on the results. | |
| For each query: | |
| 1. Execute it and capture the results | |
| 2. Validate the data (check for issues, errors, unexpected results) | |
| 3. Provide a clear summary of what was accomplished | |
| Tag your complete response with EXECUTION_COMPLETE when all queries are processed. | |
| """ | |
| # Execute the queries | |
| execution_results = _execute_queries(db, sql_queries) | |
| # Calculate overall success | |
| success_rate = sum(1 for r in execution_results if r["success"]) / len(execution_results) if execution_results else 0 | |
| # Prepare context for Claude | |
| execution_context = json.dumps(execution_results, indent=2) | |
| # Format messages for the Anthropic API | |
| formatted_messages = [] | |
| for msg in messages: | |
| if isinstance(msg, dict) and "role" in msg and "content" in msg: | |
| formatted_messages.append(MessageParam( | |
| role=msg["role"], | |
| content=msg["content"] | |
| )) | |
| # Add final user message with context | |
| formatted_messages.append( | |
| MessageParam( | |
| role="user", | |
| content=f"Here are the execution results of the SQL queries. Please analyze and report on them.\n\n{execution_context}" | |
| ) | |
| ) | |
| try: | |
| # Call Claude API with system parameter separately | |
| response = anthropic_client.messages.create( | |
| model="claude-3-7-sonnet-20250219", | |
| system=system_message, | |
| messages=formatted_messages, | |
| max_tokens=2000 | |
| ) | |
| # Extract the response | |
| agent_response = response.content[0].text | |
| # Clean response text | |
| clean_response = agent_response.replace("EXECUTION_COMPLETE", "").strip() | |
| # Return only the STATE UPDATES, not the entire state | |
| result = {} | |
| # Add a new message to the list (will be combined with existing via operator.add) | |
| result["messages"] = [{"role": "assistant", "content": clean_response}] | |
| # Update execution results | |
| result["execution_results"] = { | |
| "queries_executed": len(execution_results), | |
| "success_rate": success_rate, | |
| "details": execution_results, | |
| "summary": clean_response, | |
| "completed_at": time.time() | |
| } | |
| # Calculate confidence scores | |
| result["confidence_scores"] = { | |
| "intent_understanding": 0.95, # High by default for demo | |
| "plan_quality": 0.85 if success_rate > 0.8 else 0.6, | |
| "execution_success": success_rate, | |
| "overall": (0.95 + (0.85 if success_rate > 0.8 else 0.6) + success_rate) / 3 | |
| } | |
| # Update status | |
| result["status"] = "complete" | |
| result["current_agent"] = "complete" | |
| return result | |
| except Exception as e: | |
| # Handle any errors - return only state updates | |
| print(f"Error in executor_agent: {str(e)}") | |
| return { | |
| "messages": [{"role": "assistant", "content": f"I encountered an error: {str(e)}"}], | |
| "current_agent": "executor_agent", | |
| "status": "error" | |
| } | |
| def _execute_queries(db, sql_queries: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """ | |
| Execute SQL queries against the database. | |
| Args: | |
| db: The database connection | |
| sql_queries: List of SQL queries to execute | |
| Returns: | |
| List of execution results | |
| """ | |
| execution_results = [] | |
| for query in sql_queries: | |
| try: | |
| # Execute the query | |
| result = db.execute_query(query["sql"]) | |
| # Add to results | |
| execution_results.append({ | |
| "query_name": query["name"], | |
| "success": "error" not in result, | |
| "result_summary": "Successfully processed data" if "error" not in result else result["error"], | |
| "row_count": len(result.get("data", [])) if "data" in result else 0, | |
| "executed_at": time.time() | |
| }) | |
| except Exception as e: | |
| # Handle execution errors | |
| execution_results.append({ | |
| "query_name": query["name"], | |
| "success": False, | |
| "result_summary": f"Error: {str(e)}", | |
| "row_count": 0, | |
| "executed_at": time.time() | |
| }) | |
| return execution_results |