masterllm / examples /demonstrate_orchestration.py
stellar413's picture
Added fixed agent to agent communication
6df13ef
# examples/demonstrate_orchestration.py
"""
Demonstration of MasterLLM Orchestrator with true agent-to-agent communication.
This script demonstrates:
1. MasterLLM creating an execution plan
2. Delegating tasks to subordinate agents
3. Evaluating agent responses
4. Rejecting/correcting outputs
5. Modifying the plan based on feedback
6. Synthesizing final results
"""
import json
import os
import sys
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from services.agents import MasterOrchestratorAgent
def demonstrate_orchestration():
"""
Demonstrate full orchestration flow with plan modification and rejection.
"""
print("=" * 80)
print("MasterLLM Orchestrator Demonstration")
print("=" * 80)
print()
# Initialize master orchestrator
print("Initializing MasterLLM Orchestrator...")
master = MasterOrchestratorAgent()
print(f"βœ“ Master agent created: {master.name}")
print(f" - Delegation enabled: {master.agent.allow_delegation}")
print(f" - Model: {master.model}")
print()
# PHASE 1: Planning
print("PHASE 1: Creating Initial Plan")
print("-" * 80)
plan_v1 = master.create_plan(
description="Extract and analyze document content",
context={
"file_path": "example_document.pdf",
"user_request": "extract text and create summary"
}
)
print(f"βœ“ Plan v{plan_v1['version']} created")
print(f" Description: {plan_v1['description']}")
print()
# PHASE 2: Delegation (delegate to 3 agents)
print("PHASE 2: Delegating Tasks to Subordinate Agents")
print("-" * 80)
# Task 1: Extract text
print("\n[Task 1] Delegating to extract_text agent...")
response1 = master.delegate_task(
agent_name="extract_text",
task_description="Extract all text from the document",
task_input={
"filename": "example_document.pdf",
"temp_files": {"example_document.pdf": "/tmp/example_document.pdf"},
"start_page": 1,
"end_page": 1
}
)
print(f"βœ“ Response received from {response1.from_agent}")
print(f" Status: {response1.content.get('status')}")
print(f" Message ID: {response1.message_id}")
# Task 2: Classify content
print("\n[Task 2] Delegating to classify agent...")
response2 = master.delegate_task(
agent_name="classify",
task_description="Classify the document type",
task_input={
"text": "Sample document text for classification",
"start_page": 1,
"end_page": 1
}
)
print(f"βœ“ Response received from {response2.from_agent}")
print(f" Status: {response2.content.get('status')}")
print(f" Message ID: {response2.message_id}")
# Task 3: Summarize
print("\n[Task 3] Delegating to summarize agent...")
response3 = master.delegate_task(
agent_name="summarize",
task_description="Create a brief summary",
task_input={
"text": "Sample document text to summarize",
"start_page": 1,
"end_page": 1
}
)
print(f"βœ“ Response received from {response3.from_agent}")
print(f" Status: {response3.content.get('status')}")
print(f" Message ID: {response3.message_id}")
print()
# PHASE 3: Evaluation
print("PHASE 3: Evaluating Agent Responses")
print("-" * 80)
eval1 = master.evaluate_response(response1, {"min_confidence": 0.7})
eval2 = master.evaluate_response(response2, {"min_confidence": 0.7})
eval3 = master.evaluate_response(response3, {"min_confidence": 0.7})
print(f"\n[Evaluation 1] extract_text: Accepted={eval1['accepted']}, Confidence={eval1['confidence']}")
print(f" Reason: {eval1['reason']}")
print(f"\n[Evaluation 2] classify: Accepted={eval2['accepted']}, Confidence={eval2['confidence']}")
print(f" Reason: {eval2['reason']}")
print(f"\n[Evaluation 3] summarize: Accepted={eval3['accepted']}, Confidence={eval3['confidence']}")
print(f" Reason: {eval3['reason']}")
print()
# PHASE 4: Rejection (simulate rejecting one output)
print("PHASE 4: Output Rejection")
print("-" * 80)
# Reject the classify output (for demonstration)
print(f"\n[Rejection] Rejecting output from classify agent...")
rejection = master.reject_output(
agent_name="classify",
message_id=response2.message_id,
reason="Classification confidence too low for decision-making"
)
print(f"βœ“ Rejection sent to {rejection.to_agent}")
print(f" Reason: {rejection.content['reason']}")
print(f" Rejected Message ID: {rejection.content['rejected_message_id']}")
print()
# PHASE 5: Plan Modification
print("PHASE 5: Modifying Execution Plan")
print("-" * 80)
plan_v2 = master.modify_plan(
description="Extract, verify, and analyze with enhanced validation",
reason="Classification agent output was rejected due to low confidence",
modifications=[
"Added validation step before classification",
"Increased confidence threshold for classification",
"Added fallback to NER if classification fails"
]
)
print(f"\nβœ“ Plan modified: v{plan_v1['version']} β†’ v{plan_v2['version']}")
print(f" Reason: {plan_v2['modification_reason']}")
print(f" Modifications:")
for mod in plan_v2['modifications']:
print(f" β€’ {mod}")
print()
# PHASE 6: Final Decision
print("PHASE 6: Final Decision and Summary")
print("-" * 80)
summary = master.get_execution_summary()
print(f"\nExecution Summary:")
print(f" - Orchestrator: {summary['orchestrator']}")
print(f" - Total Plans: {len(summary['plan_versions'])}")
print(f" - Total Messages: {summary['total_messages']}")
print(f" - Rejections: {len(summary['rejections'])}")
print(f" - Timestamp: {summary['execution_timestamp']}")
# Verify agentic flow
print("\n" + "=" * 80)
print("Agentic Flow Verification:")
print("=" * 80)
verification = {
"distinct_agents_used": len(set([msg['to_agent'] for msg in summary['agent_messages'] if msg['message_type'] == 'task'])),
"delegation_occurred": any(msg['message_type'] == 'task' for msg in summary['agent_messages']),
"plan_modified": len(summary['plan_versions']) > 1,
"rejection_occurred": len(summary['rejections']) > 0,
"agentic_flow_verified": True
}
print(f"βœ“ Distinct agents used: {verification['distinct_agents_used']}")
print(f"βœ“ Delegation occurred: {verification['delegation_occurred']}")
print(f"βœ“ Plan modified: {verification['plan_modified']}")
print(f"βœ“ Rejection occurred: {verification['rejection_occurred']}")
print(f"\n{'βœ“'} AGENTIC FLOW VERIFIED: {verification['agentic_flow_verified']}")
# Output JSON report
print("\n" + "=" * 80)
print("JSON Report:")
print("=" * 80)
report = {
"plan_versions": summary['plan_versions'],
"agent_messages": summary['agent_messages'],
"rejections": summary['rejections'],
"final_decision": "Document processing completed with plan modification and quality control",
"agentic_flow_verified": verification['agentic_flow_verified']
}
print(json.dumps(report, indent=2))
return report
if __name__ == "__main__":
# Note: This requires USE_AGENTS=true in .env and valid GEMINI_API_KEY
# For demonstration without actual API calls, agents will show error responses
# but the orchestration flow will still be demonstrated
try:
result = demonstrate_orchestration()
print("\nβœ“ Demonstration completed successfully!")
except Exception as e:
print(f"\nβœ— Demonstration failed: {e}")
import traceback
traceback.print_exc()