# services/agents/master_orchestrator.py """ MasterLLM Orchestrator Agent - Coordinates subordinate agents This is a TRUE CrewAI agent with delegation capabilities. Does NOT inherit from BaseUtilityAgent (different purpose). """ import os import json import logging from typing import Dict, Any, List, Optional from datetime import datetime, timezone from crewai import Agent, Task, Crew from services.agents.message_dispatcher import MessageDispatcher, AgentMessage logger = logging.getLogger("agent.masterllm") class MasterOrchestratorAgent: """ Master orchestrator agent with delegation capabilities. Responsibilities: - Plan creation and versioning - Task delegation to subordinate agents - Response evaluation - Plan modification based on feedback - Output rejection/correction Does NOT perform domain tasks directly. """ def __init__(self, model: Optional[str] = None): """ Initialize MasterLLM orchestrator. Args: model: LLM model to use (defaults to AGENT_MODEL env var) """ self.name = "masterllm" self.model = model or os.getenv("AGENT_MODEL", "gemini/gemini-2.0-flash-exp") self.dispatcher = MessageDispatcher() self.plan_versions: List[Dict[str, Any]] = [] self.rejections: List[Dict[str, Any]] = [] # Create CrewAI agent with DELEGATION ENABLED self.agent = Agent( role="Master Orchestrator and Planning Agent", goal="Coordinate subordinate agents to accomplish complex document processing tasks", backstory="""You are MasterLLM, an expert orchestrator agent responsible for planning, delegating, and coordinating work across specialized document processing agents. You do NOT perform document processing tasks yourself - you delegate to specialists. You create plans, assign tasks, evaluate outputs, and make decisions based on agent feedback. You are critical and thorough, willing to reject poor outputs and request corrections.""", allow_delegation=True, # CRITICAL: Enable delegation verbose=os.getenv("AGENT_LOG_LEVEL", "INFO") == "DEBUG", llm=self._create_llm() ) logger.info(f"MasterLLM Orchestrator initialized (delegation: ENABLED)") def _create_llm(self): """Create LLM instance compatible with CrewAI ≥0.80.0""" # CrewAI ≥0.80.0 has native Gemini support via google-generativeai # We use the model string directly instead of a wrapper # CrewAI will handle the LLM initialization internally # Ensure API key is set if not os.getenv("GEMINI_API_KEY") and not os.getenv("GOOGLE_API_KEY"): raise ValueError("GEMINI_API_KEY or GOOGLE_API_KEY not found in environment") # Return the model string - CrewAI will handle it # CrewAI ≥0.80.0 accepts model strings directly return self.model def create_plan(self, description: str, context: Dict[str, Any]) -> Dict[str, Any]: """ Create an execution plan. Args: description: Natural language description of the plan context: Additional context (file info, user request, etc.) Returns: Plan dictionary with version number and steps """ version = len(self.plan_versions) + 1 plan = { "version": version, "description": description, "created_at": datetime.now(timezone.utc).isoformat(), "context": context, "steps": [] } self.plan_versions.append(plan) logger.info(f"Plan v{version} created: {description}") return plan def delegate_task( self, agent_name: str, task_description: str, task_input: Dict[str, Any] ) -> AgentMessage: """ Delegate a task to a subordinate agent. Args: agent_name: Name of the agent (e.g., "extract_text") task_description: Human-readable task description task_input: Input parameters for the agent Returns: Response message from the agent """ # Send task via dispatcher task_message = self.dispatcher.send_task( from_agent=self.name, to_agent=agent_name, task={ "description": task_description, "input": task_input } ) # Execute task (synchronous) response = self.dispatcher.execute_task(task_message) logger.info(f"Delegation to {agent_name} completed: {response.content.get('status')}") return response def evaluate_response( self, response: AgentMessage, acceptance_criteria: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Evaluate an agent's response. Args: response: Response message from agent acceptance_criteria: Optional criteria for acceptance Returns: Evaluation dict with accepted (bool), confidence, reason """ content = response.content # Check for errors if content.get("status") == "failed": return { "accepted": False, "confidence": 0.0, "reason": f"Agent execution failed: {content.get('error')}" } # Extract confidence if available confidence = content.get("confidence", content.get("agent_metadata", {}).get("confidence", 0.8)) # Default evaluation (can be enhanced with criteria) if acceptance_criteria: min_confidence = acceptance_criteria.get("min_confidence", 0.7) accepted = confidence >= min_confidence reason = f"Confidence {confidence} vs threshold {min_confidence}" else: # Default: accept if confidence > 0.6 accepted = confidence > 0.6 reason = f"Default threshold check (confidence: {confidence})" logger.info(f"Response evaluated: {response.from_agent} - Accepted: {accepted} ({reason})") return { "accepted": accepted, "confidence": confidence, "reason": reason } def reject_output( self, agent_name: str, message_id: str, reason: str ) -> AgentMessage: """ Reject an agent's output and send rejection message. Args: agent_name: Name of agent whose output is rejected message_id: ID of the message being rejected reason: Reason for rejection Returns: Rejection message """ rejection_msg = self.dispatcher.send_rejection( from_agent=self.name, to_agent=agent_name, original_message_id=message_id, reason=reason ) # Track rejection self.rejections.append({ "agent": agent_name, "message_id": message_id, "reason": reason, "timestamp": rejection_msg.timestamp }) logger.warning(f"Output rejected: {agent_name} - {reason}") return rejection_msg def modify_plan( self, description: str, reason: str, modifications: List[str] ) -> Dict[str, Any]: """ Modify the current plan (create new version). Args: description: Description of the modified plan reason: Why the plan was modified modifications: List of changes made Returns: New plan version """ version = len(self.plan_versions) + 1 previous_version = self.plan_versions[-1] if self.plan_versions else None plan = { "version": version, "description": description, "created_at": datetime.now(timezone.utc).isoformat(), "previous_version": previous_version["version"] if previous_version else None, "modification_reason": reason, "modifications": modifications, "steps": [] } self.plan_versions.append(plan) logger.info(f"Plan modified: v{previous_version['version'] if previous_version else 0} → v{version} ({reason})") return plan def get_execution_summary(self) -> Dict[str, Any]: """ Get summary of the entire orchestration execution. Returns: Summary with plans, messages, rejections, etc. """ return { "orchestrator": self.name, "model": self.model, "plan_versions": self.plan_versions, "total_messages": len(self.dispatcher.message_log.messages), "agent_messages": self.dispatcher.get_conversation_log(), "rejections": self.rejections, "execution_timestamp": datetime.now(timezone.utc).isoformat() }