# examples/demonstrate_orchestration.py """ Demonstration of MasterLLM Orchestrator with true agent-to-agent communication. This script demonstrates: 1. MasterLLM creating an execution plan 2. Delegating tasks to subordinate agents 3. Evaluating agent responses 4. Rejecting/correcting outputs 5. Modifying the plan based on feedback 6. Synthesizing final results """ import json import os import sys # Add parent directory to path for imports sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from services.agents import MasterOrchestratorAgent def demonstrate_orchestration(): """ Demonstrate full orchestration flow with plan modification and rejection. """ print("=" * 80) print("MasterLLM Orchestrator Demonstration") print("=" * 80) print() # Initialize master orchestrator print("Initializing MasterLLM Orchestrator...") master = MasterOrchestratorAgent() print(f"✓ Master agent created: {master.name}") print(f" - Delegation enabled: {master.agent.allow_delegation}") print(f" - Model: {master.model}") print() # PHASE 1: Planning print("PHASE 1: Creating Initial Plan") print("-" * 80) plan_v1 = master.create_plan( description="Extract and analyze document content", context={ "file_path": "example_document.pdf", "user_request": "extract text and create summary" } ) print(f"✓ Plan v{plan_v1['version']} created") print(f" Description: {plan_v1['description']}") print() # PHASE 2: Delegation (delegate to 3 agents) print("PHASE 2: Delegating Tasks to Subordinate Agents") print("-" * 80) # Task 1: Extract text print("\n[Task 1] Delegating to extract_text agent...") response1 = master.delegate_task( agent_name="extract_text", task_description="Extract all text from the document", task_input={ "filename": "example_document.pdf", "temp_files": {"example_document.pdf": "/tmp/example_document.pdf"}, "start_page": 1, "end_page": 1 } ) print(f"✓ Response received from {response1.from_agent}") print(f" Status: {response1.content.get('status')}") print(f" Message ID: {response1.message_id}") # Task 2: Classify content print("\n[Task 2] Delegating to classify agent...") response2 = master.delegate_task( agent_name="classify", task_description="Classify the document type", task_input={ "text": "Sample document text for classification", "start_page": 1, "end_page": 1 } ) print(f"✓ Response received from {response2.from_agent}") print(f" Status: {response2.content.get('status')}") print(f" Message ID: {response2.message_id}") # Task 3: Summarize print("\n[Task 3] Delegating to summarize agent...") response3 = master.delegate_task( agent_name="summarize", task_description="Create a brief summary", task_input={ "text": "Sample document text to summarize", "start_page": 1, "end_page": 1 } ) print(f"✓ Response received from {response3.from_agent}") print(f" Status: {response3.content.get('status')}") print(f" Message ID: {response3.message_id}") print() # PHASE 3: Evaluation print("PHASE 3: Evaluating Agent Responses") print("-" * 80) eval1 = master.evaluate_response(response1, {"min_confidence": 0.7}) eval2 = master.evaluate_response(response2, {"min_confidence": 0.7}) eval3 = master.evaluate_response(response3, {"min_confidence": 0.7}) print(f"\n[Evaluation 1] extract_text: Accepted={eval1['accepted']}, Confidence={eval1['confidence']}") print(f" Reason: {eval1['reason']}") print(f"\n[Evaluation 2] classify: Accepted={eval2['accepted']}, Confidence={eval2['confidence']}") print(f" Reason: {eval2['reason']}") print(f"\n[Evaluation 3] summarize: Accepted={eval3['accepted']}, Confidence={eval3['confidence']}") print(f" Reason: {eval3['reason']}") print() # PHASE 4: Rejection (simulate rejecting one output) print("PHASE 4: Output Rejection") print("-" * 80) # Reject the classify output (for demonstration) print(f"\n[Rejection] Rejecting output from classify agent...") rejection = master.reject_output( agent_name="classify", message_id=response2.message_id, reason="Classification confidence too low for decision-making" ) print(f"✓ Rejection sent to {rejection.to_agent}") print(f" Reason: {rejection.content['reason']}") print(f" Rejected Message ID: {rejection.content['rejected_message_id']}") print() # PHASE 5: Plan Modification print("PHASE 5: Modifying Execution Plan") print("-" * 80) plan_v2 = master.modify_plan( description="Extract, verify, and analyze with enhanced validation", reason="Classification agent output was rejected due to low confidence", modifications=[ "Added validation step before classification", "Increased confidence threshold for classification", "Added fallback to NER if classification fails" ] ) print(f"\n✓ Plan modified: v{plan_v1['version']} → v{plan_v2['version']}") print(f" Reason: {plan_v2['modification_reason']}") print(f" Modifications:") for mod in plan_v2['modifications']: print(f" • {mod}") print() # PHASE 6: Final Decision print("PHASE 6: Final Decision and Summary") print("-" * 80) summary = master.get_execution_summary() print(f"\nExecution Summary:") print(f" - Orchestrator: {summary['orchestrator']}") print(f" - Total Plans: {len(summary['plan_versions'])}") print(f" - Total Messages: {summary['total_messages']}") print(f" - Rejections: {len(summary['rejections'])}") print(f" - Timestamp: {summary['execution_timestamp']}") # Verify agentic flow print("\n" + "=" * 80) print("Agentic Flow Verification:") print("=" * 80) verification = { "distinct_agents_used": len(set([msg['to_agent'] for msg in summary['agent_messages'] if msg['message_type'] == 'task'])), "delegation_occurred": any(msg['message_type'] == 'task' for msg in summary['agent_messages']), "plan_modified": len(summary['plan_versions']) > 1, "rejection_occurred": len(summary['rejections']) > 0, "agentic_flow_verified": True } print(f"✓ Distinct agents used: {verification['distinct_agents_used']}") print(f"✓ Delegation occurred: {verification['delegation_occurred']}") print(f"✓ Plan modified: {verification['plan_modified']}") print(f"✓ Rejection occurred: {verification['rejection_occurred']}") print(f"\n{'✓'} AGENTIC FLOW VERIFIED: {verification['agentic_flow_verified']}") # Output JSON report print("\n" + "=" * 80) print("JSON Report:") print("=" * 80) report = { "plan_versions": summary['plan_versions'], "agent_messages": summary['agent_messages'], "rejections": summary['rejections'], "final_decision": "Document processing completed with plan modification and quality control", "agentic_flow_verified": verification['agentic_flow_verified'] } print(json.dumps(report, indent=2)) return report if __name__ == "__main__": # Note: This requires USE_AGENTS=true in .env and valid GEMINI_API_KEY # For demonstration without actual API calls, agents will show error responses # but the orchestration flow will still be demonstrated try: result = demonstrate_orchestration() print("\n✓ Demonstration completed successfully!") except Exception as e: print(f"\n✗ Demonstration failed: {e}") import traceback traceback.print_exc()