Spaces:
Sleeping
Sleeping
| # services/agents/master_orchestrator.py | |
| """ | |
| MasterLLM Orchestrator Agent - Coordinates subordinate agents | |
| This is a TRUE CrewAI agent with delegation capabilities. | |
| Does NOT inherit from BaseUtilityAgent (different purpose). | |
| """ | |
| import os | |
| import json | |
| import logging | |
| from typing import Dict, Any, List, Optional | |
| from datetime import datetime, timezone | |
| from crewai import Agent, Task, Crew | |
| from services.agents.message_dispatcher import MessageDispatcher, AgentMessage | |
| logger = logging.getLogger("agent.masterllm") | |
| class MasterOrchestratorAgent: | |
| """ | |
| Master orchestrator agent with delegation capabilities. | |
| Responsibilities: | |
| - Plan creation and versioning | |
| - Task delegation to subordinate agents | |
| - Response evaluation | |
| - Plan modification based on feedback | |
| - Output rejection/correction | |
| Does NOT perform domain tasks directly. | |
| """ | |
| def __init__(self, model: Optional[str] = None): | |
| """ | |
| Initialize MasterLLM orchestrator. | |
| Args: | |
| model: LLM model to use (defaults to AGENT_MODEL env var) | |
| """ | |
| self.name = "masterllm" | |
| self.model = model or os.getenv("AGENT_MODEL", "gemini/gemini-2.0-flash-exp") | |
| self.dispatcher = MessageDispatcher() | |
| self.plan_versions: List[Dict[str, Any]] = [] | |
| self.rejections: List[Dict[str, Any]] = [] | |
| # Create CrewAI agent with DELEGATION ENABLED | |
| self.agent = Agent( | |
| role="Master Orchestrator and Planning Agent", | |
| goal="Coordinate subordinate agents to accomplish complex document processing tasks", | |
| backstory="""You are MasterLLM, an expert orchestrator agent responsible for | |
| planning, delegating, and coordinating work across specialized document processing agents. | |
| You do NOT perform document processing tasks yourself - you delegate to specialists. | |
| You create plans, assign tasks, evaluate outputs, and make decisions based on agent feedback. | |
| You are critical and thorough, willing to reject poor outputs and request corrections.""", | |
| allow_delegation=True, # CRITICAL: Enable delegation | |
| verbose=os.getenv("AGENT_LOG_LEVEL", "INFO") == "DEBUG", | |
| llm=self._create_llm() | |
| ) | |
| logger.info(f"MasterLLM Orchestrator initialized (delegation: ENABLED)") | |
| def _create_llm(self): | |
| """Create LLM instance compatible with CrewAI ≥0.80.0""" | |
| # CrewAI ≥0.80.0 has native Gemini support via google-generativeai | |
| # We use the model string directly instead of a wrapper | |
| # CrewAI will handle the LLM initialization internally | |
| # Ensure API key is set | |
| if not os.getenv("GEMINI_API_KEY") and not os.getenv("GOOGLE_API_KEY"): | |
| raise ValueError("GEMINI_API_KEY or GOOGLE_API_KEY not found in environment") | |
| # Return the model string - CrewAI will handle it | |
| # CrewAI ≥0.80.0 accepts model strings directly | |
| return self.model | |
| def create_plan(self, description: str, context: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Create an execution plan. | |
| Args: | |
| description: Natural language description of the plan | |
| context: Additional context (file info, user request, etc.) | |
| Returns: | |
| Plan dictionary with version number and steps | |
| """ | |
| version = len(self.plan_versions) + 1 | |
| plan = { | |
| "version": version, | |
| "description": description, | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| "context": context, | |
| "steps": [] | |
| } | |
| self.plan_versions.append(plan) | |
| logger.info(f"Plan v{version} created: {description}") | |
| return plan | |
| def delegate_task( | |
| self, | |
| agent_name: str, | |
| task_description: str, | |
| task_input: Dict[str, Any] | |
| ) -> AgentMessage: | |
| """ | |
| Delegate a task to a subordinate agent. | |
| Args: | |
| agent_name: Name of the agent (e.g., "extract_text") | |
| task_description: Human-readable task description | |
| task_input: Input parameters for the agent | |
| Returns: | |
| Response message from the agent | |
| """ | |
| # Send task via dispatcher | |
| task_message = self.dispatcher.send_task( | |
| from_agent=self.name, | |
| to_agent=agent_name, | |
| task={ | |
| "description": task_description, | |
| "input": task_input | |
| } | |
| ) | |
| # Execute task (synchronous) | |
| response = self.dispatcher.execute_task(task_message) | |
| logger.info(f"Delegation to {agent_name} completed: {response.content.get('status')}") | |
| return response | |
| def evaluate_response( | |
| self, | |
| response: AgentMessage, | |
| acceptance_criteria: Optional[Dict[str, Any]] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Evaluate an agent's response. | |
| Args: | |
| response: Response message from agent | |
| acceptance_criteria: Optional criteria for acceptance | |
| Returns: | |
| Evaluation dict with accepted (bool), confidence, reason | |
| """ | |
| content = response.content | |
| # Check for errors | |
| if content.get("status") == "failed": | |
| return { | |
| "accepted": False, | |
| "confidence": 0.0, | |
| "reason": f"Agent execution failed: {content.get('error')}" | |
| } | |
| # Extract confidence if available | |
| confidence = content.get("confidence", content.get("agent_metadata", {}).get("confidence", 0.8)) | |
| # Default evaluation (can be enhanced with criteria) | |
| if acceptance_criteria: | |
| min_confidence = acceptance_criteria.get("min_confidence", 0.7) | |
| accepted = confidence >= min_confidence | |
| reason = f"Confidence {confidence} vs threshold {min_confidence}" | |
| else: | |
| # Default: accept if confidence > 0.6 | |
| accepted = confidence > 0.6 | |
| reason = f"Default threshold check (confidence: {confidence})" | |
| logger.info(f"Response evaluated: {response.from_agent} - Accepted: {accepted} ({reason})") | |
| return { | |
| "accepted": accepted, | |
| "confidence": confidence, | |
| "reason": reason | |
| } | |
| def reject_output( | |
| self, | |
| agent_name: str, | |
| message_id: str, | |
| reason: str | |
| ) -> AgentMessage: | |
| """ | |
| Reject an agent's output and send rejection message. | |
| Args: | |
| agent_name: Name of agent whose output is rejected | |
| message_id: ID of the message being rejected | |
| reason: Reason for rejection | |
| Returns: | |
| Rejection message | |
| """ | |
| rejection_msg = self.dispatcher.send_rejection( | |
| from_agent=self.name, | |
| to_agent=agent_name, | |
| original_message_id=message_id, | |
| reason=reason | |
| ) | |
| # Track rejection | |
| self.rejections.append({ | |
| "agent": agent_name, | |
| "message_id": message_id, | |
| "reason": reason, | |
| "timestamp": rejection_msg.timestamp | |
| }) | |
| logger.warning(f"Output rejected: {agent_name} - {reason}") | |
| return rejection_msg | |
| def modify_plan( | |
| self, | |
| description: str, | |
| reason: str, | |
| modifications: List[str] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Modify the current plan (create new version). | |
| Args: | |
| description: Description of the modified plan | |
| reason: Why the plan was modified | |
| modifications: List of changes made | |
| Returns: | |
| New plan version | |
| """ | |
| version = len(self.plan_versions) + 1 | |
| previous_version = self.plan_versions[-1] if self.plan_versions else None | |
| plan = { | |
| "version": version, | |
| "description": description, | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| "previous_version": previous_version["version"] if previous_version else None, | |
| "modification_reason": reason, | |
| "modifications": modifications, | |
| "steps": [] | |
| } | |
| self.plan_versions.append(plan) | |
| logger.info(f"Plan modified: v{previous_version['version'] if previous_version else 0} → v{version} ({reason})") | |
| return plan | |
| def get_execution_summary(self) -> Dict[str, Any]: | |
| """ | |
| Get summary of the entire orchestration execution. | |
| Returns: | |
| Summary with plans, messages, rejections, etc. | |
| """ | |
| return { | |
| "orchestrator": self.name, | |
| "model": self.model, | |
| "plan_versions": self.plan_versions, | |
| "total_messages": len(self.dispatcher.message_log.messages), | |
| "agent_messages": self.dispatcher.get_conversation_log(), | |
| "rejections": self.rejections, | |
| "execution_timestamp": datetime.now(timezone.utc).isoformat() | |
| } | |