# services/agents/base_agent.py """ Base class for all utility agents with logging and CrewAI integration. """ import os import hashlib import json import logging from datetime import datetime, timezone from typing import Dict, Any, Callable, Optional from abc import ABC, abstractmethod from crewai import Agent, Task, Crew # Configure logging logging.basicConfig( level=os.getenv("AGENT_LOG_LEVEL", "INFO"), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) class BaseUtilityAgent(ABC): """ Base class for all utility agents. Each agent: - Wraps one utility function from /utilities - Uses CrewAI Agent with LiteLLM/Gemini - Logs all executions with structured metadata - Exposes run(input: dict) -> dict interface """ def __init__( self, name: str, role: str, goal: str, backstory: str, utility_function: Callable, model: Optional[str] = None ): """ Initialize the agent. Args: name: Agent identifier (e.g., "extract_text") role: Agent's role description goal: Agent's primary goal backstory: Agent's backstory for context utility_function: The original utility function to wrap model: LLM model to use (defaults to env AGENT_MODEL) """ self.name = name self.utility_function = utility_function self.model = model or os.getenv("AGENT_MODEL", "gemini/gemini-2.0-flash-exp") self.logger = logging.getLogger(f"agent.{name}") # Create CrewAI agent with LiteLLM self.agent = Agent( role=role, goal=goal, backstory=backstory, allow_delegation=False, # CRITICAL: No delegation in Phase 1 verbose=os.getenv("AGENT_LOG_LEVEL", "INFO") == "DEBUG", llm=self._create_llm() ) def _create_llm(self): """Create LLM instance compatible with CrewAI ≥0.80.0""" # CrewAI ≥0.80.0 has native Gemini support via google-generativeai # We use the model string directly instead of a wrapper # CrewAI will handle the LLM initialization internally # For CrewAI with Gemini, we can pass the model string directly # The format is "gemini/" # CrewAI will use the GEMINI_API_KEY or GOOGLE_API_KEY from environment # Ensure API key is set if not os.getenv("GEMINI_API_KEY") and not os.getenv("GOOGLE_API_KEY"): raise ValueError("GEMINI_API_KEY or GOOGLE_API_KEY not found in environment") # Return the model string - CrewAI will handle it # CrewAI ≥0.80.0 accepts model strings directly return self.model def _hash_data(self, data: Any) -> str: """Create SHA256 hash of data for logging.""" json_str = json.dumps(data, sort_keys=True, default=str) return hashlib.sha256(json_str.encode()).hexdigest()[:16] def _log_execution( self, input_data: Dict[str, Any], output_data: Dict[str, Any], execution_time: float, success: bool, error: Optional[str] = None ): """Log agent execution with structured metadata.""" log_entry = { "timestamp": datetime.now(timezone.utc).isoformat(), "agent_name": self.name, "model_used": self.model, "input_hash": self._hash_data(input_data), "output_hash": self._hash_data(output_data) if success else None, "execution_time_ms": round(execution_time * 1000, 2), "success": success, "error": error } if success: self.logger.info(f"Agent execution: {json.dumps(log_entry)}") else: self.logger.error(f"Agent execution failed: {json.dumps(log_entry)}") @abstractmethod def _prepare_task_description(self, input_data: Dict[str, Any]) -> str: """ Prepare the task description for the CrewAI agent. This method should be implemented by each concrete agent to translate the input dict into a natural language task. Args: input_data: Input dictionary from caller Returns: Task description string for the agent """ pass def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: """ Execute the agent with the given input. This is the MANDATORY interface contract. Args: input_data: Input dictionary specific to the utility Returns: Dictionary with: - Original utility output fields - confidence: float (0-1) - agent_metadata: execution details """ start_time = datetime.now(timezone.utc) try: # Handle task message structure from MasterOrchestrator # Task messages have structure: {"description": "...", "input": {...}} # We need to extract the actual input for the utility if "input" in input_data and "description" in input_data: # This is a task message from MasterOrchestrator actual_input = input_data["input"] task_description = input_data["description"] else: # Direct call (backward compatibility) actual_input = input_data task_description = None # Step 1: Call the original utility function # This ensures backward compatibility and correctness utility_result = self.utility_function(actual_input) # Step 2: Create a CrewAI task for the agent to validate/enhance the result # The agent doesn't replace the utility - it adds intelligence on top if not task_description: task_description = self._prepare_task_description(actual_input) task = Task( description=task_description, agent=self.agent, expected_output="Validation summary and confidence score" ) # Step 3: Execute the agent task crew = Crew( agents=[self.agent], tasks=[task], verbose=False ) # Agent provides validation/confidence agent_output = crew.kickoff() # Step 4: Combine utility result with agent metadata execution_time = (datetime.now(timezone.utc) - start_time).total_seconds() result = { **utility_result, # Original utility output "confidence": self._extract_confidence(str(agent_output)), "agent_metadata": { "agent_name": self.name, "model": self.model, "execution_time_ms": round(execution_time * 1000, 2), "validation": str(agent_output)[:200] # Truncated for brevity } } # Step 5: Log execution self._log_execution(actual_input, result, execution_time, True) return result except Exception as e: execution_time = (datetime.now(timezone.utc) - start_time).total_seconds() error_msg = str(e) # Log failure self._log_execution(input_data, {}, execution_time, False, error_msg) # Re-raise with context raise RuntimeError(f"Agent {self.name} failed: {error_msg}") from e def _extract_confidence(self, agent_output: str) -> float: """ Extract confidence score from agent output. Default implementation looks for patterns like "confidence: 0.95" Subclasses can override for custom extraction. """ import re # Look for confidence pattern match = re.search(r'confidence[:\s]+([0-9.]+)', agent_output.lower()) if match: try: return float(match.group(1)) except ValueError: pass # Default to high confidence if utility succeeded return 0.9