File size: 8,504 Bytes
6df13ef
 
2610f53
6df13ef
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2610f53
 
 
 
6df13ef
2610f53
 
 
 
 
 
 
 
 
 
 
6df13ef
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7ea9284
 
 
 
 
 
 
 
 
 
 
 
6df13ef
 
7ea9284
6df13ef
 
 
7ea9284
 
6df13ef
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7ea9284
6df13ef
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# services/agents/base_agent.py
"""
Base class for all utility agents with logging and CrewAI integration.
"""
import os
import hashlib
import json
import logging
from datetime import datetime, timezone
from typing import Dict, Any, Callable, Optional
from abc import ABC, abstractmethod

from crewai import Agent, Task, Crew

# Configure logging
logging.basicConfig(
    level=os.getenv("AGENT_LOG_LEVEL", "INFO"),
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)


class BaseUtilityAgent(ABC):
    """
    Base class for all utility agents.
    
    Each agent:
    - Wraps one utility function from /utilities
    - Uses CrewAI Agent with LiteLLM/Gemini
    - Logs all executions with structured metadata
    - Exposes run(input: dict) -> dict interface
    """
    
    def __init__(
        self,
        name: str,
        role: str,
        goal: str,
        backstory: str,
        utility_function: Callable,
        model: Optional[str] = None
    ):
        """
        Initialize the agent.
        
        Args:
            name: Agent identifier (e.g., "extract_text")
            role: Agent's role description
            goal: Agent's primary goal
            backstory: Agent's backstory for context
            utility_function: The original utility function to wrap
            model: LLM model to use (defaults to env AGENT_MODEL)
        """
        self.name = name
        self.utility_function = utility_function
        self.model = model or os.getenv("AGENT_MODEL", "gemini/gemini-2.0-flash-exp")
        self.logger = logging.getLogger(f"agent.{name}")
        
        # Create CrewAI agent with LiteLLM
        self.agent = Agent(
            role=role,
            goal=goal,
            backstory=backstory,
            allow_delegation=False,  # CRITICAL: No delegation in Phase 1
            verbose=os.getenv("AGENT_LOG_LEVEL", "INFO") == "DEBUG",
            llm=self._create_llm()
        )
    
    def _create_llm(self):
        """Create LLM instance compatible with CrewAI ≥0.80.0"""
        # CrewAI ≥0.80.0 has native Gemini support via google-generativeai
        # We use the model string directly instead of a wrapper
        # CrewAI will handle the LLM initialization internally
        
        # For CrewAI with Gemini, we can pass the model string directly
        # The format is "gemini/<model-name>"
        # CrewAI will use the GEMINI_API_KEY or GOOGLE_API_KEY from environment
        
        # Ensure API key is set
        if not os.getenv("GEMINI_API_KEY") and not os.getenv("GOOGLE_API_KEY"):
            raise ValueError("GEMINI_API_KEY or GOOGLE_API_KEY not found in environment")
        
        # Return the model string - CrewAI will handle it
        # CrewAI ≥0.80.0 accepts model strings directly
        return self.model
    
    def _hash_data(self, data: Any) -> str:
        """Create SHA256 hash of data for logging."""
        json_str = json.dumps(data, sort_keys=True, default=str)
        return hashlib.sha256(json_str.encode()).hexdigest()[:16]
    
    def _log_execution(
        self,
        input_data: Dict[str, Any],
        output_data: Dict[str, Any],
        execution_time: float,
        success: bool,
        error: Optional[str] = None
    ):
        """Log agent execution with structured metadata."""
        log_entry = {
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "agent_name": self.name,
            "model_used": self.model,
            "input_hash": self._hash_data(input_data),
            "output_hash": self._hash_data(output_data) if success else None,
            "execution_time_ms": round(execution_time * 1000, 2),
            "success": success,
            "error": error
        }
        
        if success:
            self.logger.info(f"Agent execution: {json.dumps(log_entry)}")
        else:
            self.logger.error(f"Agent execution failed: {json.dumps(log_entry)}")
    
    @abstractmethod
    def _prepare_task_description(self, input_data: Dict[str, Any]) -> str:
        """
        Prepare the task description for the CrewAI agent.
        
        This method should be implemented by each concrete agent
        to translate the input dict into a natural language task.
        
        Args:
            input_data: Input dictionary from caller
            
        Returns:
            Task description string for the agent
        """
        pass
    
    def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        Execute the agent with the given input.
        
        This is the MANDATORY interface contract.
        
        Args:
            input_data: Input dictionary specific to the utility
            
        Returns:
            Dictionary with:
                - Original utility output fields
                - confidence: float (0-1)
                - agent_metadata: execution details
        """
        start_time = datetime.now(timezone.utc)
        
        try:
            # Handle task message structure from MasterOrchestrator
            # Task messages have structure: {"description": "...", "input": {...}}
            # We need to extract the actual input for the utility
            if "input" in input_data and "description" in input_data:
                # This is a task message from MasterOrchestrator
                actual_input = input_data["input"]
                task_description = input_data["description"]
            else:
                # Direct call (backward compatibility)
                actual_input = input_data
                task_description = None
            
            # Step 1: Call the original utility function
            # This ensures backward compatibility and correctness
            utility_result = self.utility_function(actual_input)
            
            # Step 2: Create a CrewAI task for the agent to validate/enhance the result
            # The agent doesn't replace the utility - it adds intelligence on top
            if not task_description:
                task_description = self._prepare_task_description(actual_input)
            
            task = Task(
                description=task_description,
                agent=self.agent,
                expected_output="Validation summary and confidence score"
            )
            
            # Step 3: Execute the agent task
            crew = Crew(
                agents=[self.agent],
                tasks=[task],
                verbose=False
            )
            
            # Agent provides validation/confidence
            agent_output = crew.kickoff()
            
            # Step 4: Combine utility result with agent metadata
            execution_time = (datetime.now(timezone.utc) - start_time).total_seconds()
            
            result = {
                **utility_result,  # Original utility output
                "confidence": self._extract_confidence(str(agent_output)),
                "agent_metadata": {
                    "agent_name": self.name,
                    "model": self.model,
                    "execution_time_ms": round(execution_time * 1000, 2),
                    "validation": str(agent_output)[:200]  # Truncated for brevity
                }
            }
            
            # Step 5: Log execution
            self._log_execution(actual_input, result, execution_time, True)
            
            return result
            
        except Exception as e:
            execution_time = (datetime.now(timezone.utc) - start_time).total_seconds()
            error_msg = str(e)
            
            # Log failure
            self._log_execution(input_data, {}, execution_time, False, error_msg)
            
            # Re-raise with context
            raise RuntimeError(f"Agent {self.name} failed: {error_msg}") from e
    
    def _extract_confidence(self, agent_output: str) -> float:
        """
        Extract confidence score from agent output.
        
        Default implementation looks for patterns like "confidence: 0.95"
        Subclasses can override for custom extraction.
        """
        import re
        
        # Look for confidence pattern
        match = re.search(r'confidence[:\s]+([0-9.]+)', agent_output.lower())
        if match:
            try:
                return float(match.group(1))
            except ValueError:
                pass
        
        # Default to high confidence if utility succeeded
        return 0.9