masterllm / services /agents /master_orchestrator.py
stellar413's picture
solved chatllm problems
2610f53
# services/agents/master_orchestrator.py
"""
MasterLLM Orchestrator Agent - Coordinates subordinate agents
This is a TRUE CrewAI agent with delegation capabilities.
Does NOT inherit from BaseUtilityAgent (different purpose).
"""
import os
import json
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime, timezone
from crewai import Agent, Task, Crew
from services.agents.message_dispatcher import MessageDispatcher, AgentMessage
logger = logging.getLogger("agent.masterllm")
class MasterOrchestratorAgent:
"""
Master orchestrator agent with delegation capabilities.
Responsibilities:
- Plan creation and versioning
- Task delegation to subordinate agents
- Response evaluation
- Plan modification based on feedback
- Output rejection/correction
Does NOT perform domain tasks directly.
"""
def __init__(self, model: Optional[str] = None):
"""
Initialize MasterLLM orchestrator.
Args:
model: LLM model to use (defaults to AGENT_MODEL env var)
"""
self.name = "masterllm"
self.model = model or os.getenv("AGENT_MODEL", "gemini/gemini-2.0-flash-exp")
self.dispatcher = MessageDispatcher()
self.plan_versions: List[Dict[str, Any]] = []
self.rejections: List[Dict[str, Any]] = []
# Create CrewAI agent with DELEGATION ENABLED
self.agent = Agent(
role="Master Orchestrator and Planning Agent",
goal="Coordinate subordinate agents to accomplish complex document processing tasks",
backstory="""You are MasterLLM, an expert orchestrator agent responsible for
planning, delegating, and coordinating work across specialized document processing agents.
You do NOT perform document processing tasks yourself - you delegate to specialists.
You create plans, assign tasks, evaluate outputs, and make decisions based on agent feedback.
You are critical and thorough, willing to reject poor outputs and request corrections.""",
allow_delegation=True, # CRITICAL: Enable delegation
verbose=os.getenv("AGENT_LOG_LEVEL", "INFO") == "DEBUG",
llm=self._create_llm()
)
logger.info(f"MasterLLM Orchestrator initialized (delegation: ENABLED)")
def _create_llm(self):
"""Create LLM instance compatible with CrewAI ≥0.80.0"""
# CrewAI ≥0.80.0 has native Gemini support via google-generativeai
# We use the model string directly instead of a wrapper
# CrewAI will handle the LLM initialization internally
# Ensure API key is set
if not os.getenv("GEMINI_API_KEY") and not os.getenv("GOOGLE_API_KEY"):
raise ValueError("GEMINI_API_KEY or GOOGLE_API_KEY not found in environment")
# Return the model string - CrewAI will handle it
# CrewAI ≥0.80.0 accepts model strings directly
return self.model
def create_plan(self, description: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Create an execution plan.
Args:
description: Natural language description of the plan
context: Additional context (file info, user request, etc.)
Returns:
Plan dictionary with version number and steps
"""
version = len(self.plan_versions) + 1
plan = {
"version": version,
"description": description,
"created_at": datetime.now(timezone.utc).isoformat(),
"context": context,
"steps": []
}
self.plan_versions.append(plan)
logger.info(f"Plan v{version} created: {description}")
return plan
def delegate_task(
self,
agent_name: str,
task_description: str,
task_input: Dict[str, Any]
) -> AgentMessage:
"""
Delegate a task to a subordinate agent.
Args:
agent_name: Name of the agent (e.g., "extract_text")
task_description: Human-readable task description
task_input: Input parameters for the agent
Returns:
Response message from the agent
"""
# Send task via dispatcher
task_message = self.dispatcher.send_task(
from_agent=self.name,
to_agent=agent_name,
task={
"description": task_description,
"input": task_input
}
)
# Execute task (synchronous)
response = self.dispatcher.execute_task(task_message)
logger.info(f"Delegation to {agent_name} completed: {response.content.get('status')}")
return response
def evaluate_response(
self,
response: AgentMessage,
acceptance_criteria: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Evaluate an agent's response.
Args:
response: Response message from agent
acceptance_criteria: Optional criteria for acceptance
Returns:
Evaluation dict with accepted (bool), confidence, reason
"""
content = response.content
# Check for errors
if content.get("status") == "failed":
return {
"accepted": False,
"confidence": 0.0,
"reason": f"Agent execution failed: {content.get('error')}"
}
# Extract confidence if available
confidence = content.get("confidence", content.get("agent_metadata", {}).get("confidence", 0.8))
# Default evaluation (can be enhanced with criteria)
if acceptance_criteria:
min_confidence = acceptance_criteria.get("min_confidence", 0.7)
accepted = confidence >= min_confidence
reason = f"Confidence {confidence} vs threshold {min_confidence}"
else:
# Default: accept if confidence > 0.6
accepted = confidence > 0.6
reason = f"Default threshold check (confidence: {confidence})"
logger.info(f"Response evaluated: {response.from_agent} - Accepted: {accepted} ({reason})")
return {
"accepted": accepted,
"confidence": confidence,
"reason": reason
}
def reject_output(
self,
agent_name: str,
message_id: str,
reason: str
) -> AgentMessage:
"""
Reject an agent's output and send rejection message.
Args:
agent_name: Name of agent whose output is rejected
message_id: ID of the message being rejected
reason: Reason for rejection
Returns:
Rejection message
"""
rejection_msg = self.dispatcher.send_rejection(
from_agent=self.name,
to_agent=agent_name,
original_message_id=message_id,
reason=reason
)
# Track rejection
self.rejections.append({
"agent": agent_name,
"message_id": message_id,
"reason": reason,
"timestamp": rejection_msg.timestamp
})
logger.warning(f"Output rejected: {agent_name} - {reason}")
return rejection_msg
def modify_plan(
self,
description: str,
reason: str,
modifications: List[str]
) -> Dict[str, Any]:
"""
Modify the current plan (create new version).
Args:
description: Description of the modified plan
reason: Why the plan was modified
modifications: List of changes made
Returns:
New plan version
"""
version = len(self.plan_versions) + 1
previous_version = self.plan_versions[-1] if self.plan_versions else None
plan = {
"version": version,
"description": description,
"created_at": datetime.now(timezone.utc).isoformat(),
"previous_version": previous_version["version"] if previous_version else None,
"modification_reason": reason,
"modifications": modifications,
"steps": []
}
self.plan_versions.append(plan)
logger.info(f"Plan modified: v{previous_version['version'] if previous_version else 0} → v{version} ({reason})")
return plan
def get_execution_summary(self) -> Dict[str, Any]:
"""
Get summary of the entire orchestration execution.
Returns:
Summary with plans, messages, rejections, etc.
"""
return {
"orchestrator": self.name,
"model": self.model,
"plan_versions": self.plan_versions,
"total_messages": len(self.dispatcher.message_log.messages),
"agent_messages": self.dispatcher.get_conversation_log(),
"rejections": self.rejections,
"execution_timestamp": datetime.now(timezone.utc).isoformat()
}