Spaces:
Sleeping
Sleeping
| # examples/demonstrate_orchestration.py | |
| """ | |
| Demonstration of MasterLLM Orchestrator with true agent-to-agent communication. | |
| This script demonstrates: | |
| 1. MasterLLM creating an execution plan | |
| 2. Delegating tasks to subordinate agents | |
| 3. Evaluating agent responses | |
| 4. Rejecting/correcting outputs | |
| 5. Modifying the plan based on feedback | |
| 6. Synthesizing final results | |
| """ | |
| import json | |
| import os | |
| import sys | |
| # Add parent directory to path for imports | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from services.agents import MasterOrchestratorAgent | |
| def demonstrate_orchestration(): | |
| """ | |
| Demonstrate full orchestration flow with plan modification and rejection. | |
| """ | |
| print("=" * 80) | |
| print("MasterLLM Orchestrator Demonstration") | |
| print("=" * 80) | |
| print() | |
| # Initialize master orchestrator | |
| print("Initializing MasterLLM Orchestrator...") | |
| master = MasterOrchestratorAgent() | |
| print(f"β Master agent created: {master.name}") | |
| print(f" - Delegation enabled: {master.agent.allow_delegation}") | |
| print(f" - Model: {master.model}") | |
| print() | |
| # PHASE 1: Planning | |
| print("PHASE 1: Creating Initial Plan") | |
| print("-" * 80) | |
| plan_v1 = master.create_plan( | |
| description="Extract and analyze document content", | |
| context={ | |
| "file_path": "example_document.pdf", | |
| "user_request": "extract text and create summary" | |
| } | |
| ) | |
| print(f"β Plan v{plan_v1['version']} created") | |
| print(f" Description: {plan_v1['description']}") | |
| print() | |
| # PHASE 2: Delegation (delegate to 3 agents) | |
| print("PHASE 2: Delegating Tasks to Subordinate Agents") | |
| print("-" * 80) | |
| # Task 1: Extract text | |
| print("\n[Task 1] Delegating to extract_text agent...") | |
| response1 = master.delegate_task( | |
| agent_name="extract_text", | |
| task_description="Extract all text from the document", | |
| task_input={ | |
| "filename": "example_document.pdf", | |
| "temp_files": {"example_document.pdf": "/tmp/example_document.pdf"}, | |
| "start_page": 1, | |
| "end_page": 1 | |
| } | |
| ) | |
| print(f"β Response received from {response1.from_agent}") | |
| print(f" Status: {response1.content.get('status')}") | |
| print(f" Message ID: {response1.message_id}") | |
| # Task 2: Classify content | |
| print("\n[Task 2] Delegating to classify agent...") | |
| response2 = master.delegate_task( | |
| agent_name="classify", | |
| task_description="Classify the document type", | |
| task_input={ | |
| "text": "Sample document text for classification", | |
| "start_page": 1, | |
| "end_page": 1 | |
| } | |
| ) | |
| print(f"β Response received from {response2.from_agent}") | |
| print(f" Status: {response2.content.get('status')}") | |
| print(f" Message ID: {response2.message_id}") | |
| # Task 3: Summarize | |
| print("\n[Task 3] Delegating to summarize agent...") | |
| response3 = master.delegate_task( | |
| agent_name="summarize", | |
| task_description="Create a brief summary", | |
| task_input={ | |
| "text": "Sample document text to summarize", | |
| "start_page": 1, | |
| "end_page": 1 | |
| } | |
| ) | |
| print(f"β Response received from {response3.from_agent}") | |
| print(f" Status: {response3.content.get('status')}") | |
| print(f" Message ID: {response3.message_id}") | |
| print() | |
| # PHASE 3: Evaluation | |
| print("PHASE 3: Evaluating Agent Responses") | |
| print("-" * 80) | |
| eval1 = master.evaluate_response(response1, {"min_confidence": 0.7}) | |
| eval2 = master.evaluate_response(response2, {"min_confidence": 0.7}) | |
| eval3 = master.evaluate_response(response3, {"min_confidence": 0.7}) | |
| print(f"\n[Evaluation 1] extract_text: Accepted={eval1['accepted']}, Confidence={eval1['confidence']}") | |
| print(f" Reason: {eval1['reason']}") | |
| print(f"\n[Evaluation 2] classify: Accepted={eval2['accepted']}, Confidence={eval2['confidence']}") | |
| print(f" Reason: {eval2['reason']}") | |
| print(f"\n[Evaluation 3] summarize: Accepted={eval3['accepted']}, Confidence={eval3['confidence']}") | |
| print(f" Reason: {eval3['reason']}") | |
| print() | |
| # PHASE 4: Rejection (simulate rejecting one output) | |
| print("PHASE 4: Output Rejection") | |
| print("-" * 80) | |
| # Reject the classify output (for demonstration) | |
| print(f"\n[Rejection] Rejecting output from classify agent...") | |
| rejection = master.reject_output( | |
| agent_name="classify", | |
| message_id=response2.message_id, | |
| reason="Classification confidence too low for decision-making" | |
| ) | |
| print(f"β Rejection sent to {rejection.to_agent}") | |
| print(f" Reason: {rejection.content['reason']}") | |
| print(f" Rejected Message ID: {rejection.content['rejected_message_id']}") | |
| print() | |
| # PHASE 5: Plan Modification | |
| print("PHASE 5: Modifying Execution Plan") | |
| print("-" * 80) | |
| plan_v2 = master.modify_plan( | |
| description="Extract, verify, and analyze with enhanced validation", | |
| reason="Classification agent output was rejected due to low confidence", | |
| modifications=[ | |
| "Added validation step before classification", | |
| "Increased confidence threshold for classification", | |
| "Added fallback to NER if classification fails" | |
| ] | |
| ) | |
| print(f"\nβ Plan modified: v{plan_v1['version']} β v{plan_v2['version']}") | |
| print(f" Reason: {plan_v2['modification_reason']}") | |
| print(f" Modifications:") | |
| for mod in plan_v2['modifications']: | |
| print(f" β’ {mod}") | |
| print() | |
| # PHASE 6: Final Decision | |
| print("PHASE 6: Final Decision and Summary") | |
| print("-" * 80) | |
| summary = master.get_execution_summary() | |
| print(f"\nExecution Summary:") | |
| print(f" - Orchestrator: {summary['orchestrator']}") | |
| print(f" - Total Plans: {len(summary['plan_versions'])}") | |
| print(f" - Total Messages: {summary['total_messages']}") | |
| print(f" - Rejections: {len(summary['rejections'])}") | |
| print(f" - Timestamp: {summary['execution_timestamp']}") | |
| # Verify agentic flow | |
| print("\n" + "=" * 80) | |
| print("Agentic Flow Verification:") | |
| print("=" * 80) | |
| verification = { | |
| "distinct_agents_used": len(set([msg['to_agent'] for msg in summary['agent_messages'] if msg['message_type'] == 'task'])), | |
| "delegation_occurred": any(msg['message_type'] == 'task' for msg in summary['agent_messages']), | |
| "plan_modified": len(summary['plan_versions']) > 1, | |
| "rejection_occurred": len(summary['rejections']) > 0, | |
| "agentic_flow_verified": True | |
| } | |
| print(f"β Distinct agents used: {verification['distinct_agents_used']}") | |
| print(f"β Delegation occurred: {verification['delegation_occurred']}") | |
| print(f"β Plan modified: {verification['plan_modified']}") | |
| print(f"β Rejection occurred: {verification['rejection_occurred']}") | |
| print(f"\n{'β'} AGENTIC FLOW VERIFIED: {verification['agentic_flow_verified']}") | |
| # Output JSON report | |
| print("\n" + "=" * 80) | |
| print("JSON Report:") | |
| print("=" * 80) | |
| report = { | |
| "plan_versions": summary['plan_versions'], | |
| "agent_messages": summary['agent_messages'], | |
| "rejections": summary['rejections'], | |
| "final_decision": "Document processing completed with plan modification and quality control", | |
| "agentic_flow_verified": verification['agentic_flow_verified'] | |
| } | |
| print(json.dumps(report, indent=2)) | |
| return report | |
| if __name__ == "__main__": | |
| # Note: This requires USE_AGENTS=true in .env and valid GEMINI_API_KEY | |
| # For demonstration without actual API calls, agents will show error responses | |
| # but the orchestration flow will still be demonstrated | |
| try: | |
| result = demonstrate_orchestration() | |
| print("\nβ Demonstration completed successfully!") | |
| except Exception as e: | |
| print(f"\nβ Demonstration failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |