File size: 5,415 Bytes
9bf4daf
 
 
 
 
 
 
2364cf6
9bf4daf
 
f798709
9bf4daf
 
 
 
 
 
 
 
 
9a091e0
9bf4daf
f798709
b1e2bae
 
9bf4daf
9a091e0
9bf4daf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e31ceda
 
21aa8e8
e31ceda
 
21aa8e8
 
 
f798709
21aa8e8
e31ceda
21aa8e8
 
 
 
 
 
e31ceda
9a091e0
e31ceda
 
 
 
 
 
 
 
 
 
9a091e0
 
e31ceda
9a091e0
 
 
 
 
e31ceda
 
9a091e0
e31ceda
 
 
9a091e0
e31ceda
 
 
 
9a091e0
e31ceda
 
 
 
 
 
 
9a091e0
 
e31ceda
9a091e0
e31ceda
 
9a091e0
e31ceda
9a091e0
 
 
 
 
9bf4daf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
"""
Executor agent for pharmaceutical data management.
This agent executes SQL queries and reports on the results.
"""

import time
import json
from typing import Dict, Any, List
from anthropic.types import MessageParam

def executor_agent(anthropic_client, db, state: Dict[str, Any]) -> Dict[str, Any]:
    """
    Agent that executes the SQL queries and reports results.
    
    Args:
        anthropic_client: The Anthropic client for calling Claude API
        db: The database connection
        state: Current state of the agent workflow
    
    Returns:
        Updated state values only (not the entire state)
    """
    # Get current messages and SQL queries
    messages = state.get("messages", [])
    sql_queries = state.get("sql_queries", [])
    
    # System message as a string
    system_message = """
    You are an AI assistant specializing in executing and validating pharmaceutical data pipelines.
    Your job is to execute the SQL queries and report on the results.
    
    For each query:
    1. Execute it and capture the results
    2. Validate the data (check for issues, errors, unexpected results)
    3. Provide a clear summary of what was accomplished
    
    Tag your complete response with EXECUTION_COMPLETE when all queries are processed.
    """
    
    # Execute the queries
    execution_results = _execute_queries(db, sql_queries)
    
    # Calculate overall success
    success_rate = sum(1 for r in execution_results if r["success"]) / len(execution_results) if execution_results else 0
    
    # Prepare context for Claude
    execution_context = json.dumps(execution_results, indent=2)
    
    # Format messages for the Anthropic API
    formatted_messages = []
    for msg in messages:
        if isinstance(msg, dict) and "role" in msg and "content" in msg:
            formatted_messages.append(MessageParam(
                role=msg["role"],
                content=msg["content"]
            ))
    
    # Add final user message with context
    formatted_messages.append(
        MessageParam(
            role="user", 
            content=f"Here are the execution results of the SQL queries. Please analyze and report on them.\n\n{execution_context}"
        )
    )
    
    try:
        # Call Claude API with system parameter separately
        response = anthropic_client.messages.create(
            model="claude-3-7-sonnet-20250219",
            system=system_message,
            messages=formatted_messages,
            max_tokens=2000
        )
        
        # Extract the response
        agent_response = response.content[0].text
        
        # Clean response text
        clean_response = agent_response.replace("EXECUTION_COMPLETE", "").strip()
        
        # Return only the STATE UPDATES, not the entire state
        result = {}
        
        # Add a new message to the list (will be combined with existing via operator.add)
        result["messages"] = [{"role": "assistant", "content": clean_response}]
        
        # Update execution results
        result["execution_results"] = {
            "queries_executed": len(execution_results),
            "success_rate": success_rate,
            "details": execution_results,
            "summary": clean_response,
            "completed_at": time.time()
        }
        
        # Calculate confidence scores
        result["confidence_scores"] = {
            "intent_understanding": 0.95,  # High by default for demo
            "plan_quality": 0.85 if success_rate > 0.8 else 0.6,
            "execution_success": success_rate,
            "overall": (0.95 + (0.85 if success_rate > 0.8 else 0.6) + success_rate) / 3
        }
        
        # Update status
        result["status"] = "complete"
        result["current_agent"] = "complete"
        
        return result
    
    except Exception as e:
        # Handle any errors - return only state updates
        print(f"Error in executor_agent: {str(e)}")
        return {
            "messages": [{"role": "assistant", "content": f"I encountered an error: {str(e)}"}],
            "current_agent": "executor_agent",
            "status": "error"
        }

def _execute_queries(db, sql_queries: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Execute SQL queries against the database.
    
    Args:
        db: The database connection
        sql_queries: List of SQL queries to execute
    
    Returns:
        List of execution results
    """
    execution_results = []
    for query in sql_queries:
        try:
            # Execute the query
            result = db.execute_query(query["sql"])
            
            # Add to results
            execution_results.append({
                "query_name": query["name"],
                "success": "error" not in result,
                "result_summary": "Successfully processed data" if "error" not in result else result["error"],
                "row_count": len(result.get("data", [])) if "data" in result else 0,
                "executed_at": time.time()
            })
        except Exception as e:
            # Handle execution errors
            execution_results.append({
                "query_name": query["name"],
                "success": False,
                "result_summary": f"Error: {str(e)}",
                "row_count": 0,
                "executed_at": time.time()
            })
    
    return execution_results