# Orchestration Implementation Plan ## Overview Replace the current single-hop triage agent with an intelligent orchestrator that maintains conversation-level state and enables multi-step workflows with data passing between agents. ## Current Architecture Limitations 1. **Stateless agent calls** - No context preservation between interactions 2. **Single-hop routing** - Triage agent can only route to one destination 3. **No data flow** - Agents cannot pass structured data to each other 4. **Limited understanding** - Simple keyword matching vs. intent comprehension ## Proposed Architecture: Conversation-Level State Orchestration ### Core Components #### 1. Conversation Context Model (`agent_system/context.py`) ```python from pydantic import BaseModel from typing import Dict, Any, List, Optional from datetime import datetime from enum import Enum import uuid class ArtifactType(str, Enum): SCRIPT = "script" VIDEO = "video" SEARCH_RESULTS = "search_results" CUSTOM = "custom" class Artifact(BaseModel): """Any output from an agent that can be referenced later""" id: str = Field(default_factory=lambda: str(uuid.uuid4())) type: ArtifactType content: Dict[str, Any] created_at: datetime = Field(default_factory=datetime.now) created_by: str # agent name metadata: Dict[str, Any] = {} class ConversationContext(BaseModel): """Maintains state throughout a conversation""" artifacts: Dict[str, Artifact] = {} # id -> artifact last_artifact_by_type: Dict[ArtifactType, str] = {} # type -> artifact_id workflow_history: List[Dict[str, Any]] = [] def add_artifact(self, artifact: Artifact) -> None: """Store artifact and update type mapping""" self.artifacts[artifact.id] = artifact self.last_artifact_by_type[artifact.type] = artifact.id self.workflow_history.append({ "action": "created_artifact", "artifact_id": artifact.id, "type": artifact.type, "timestamp": artifact.created_at }) def get_latest(self, artifact_type: ArtifactType) -> Optional[Artifact]: """Get most recent artifact of a type""" artifact_id = self.last_artifact_by_type.get(artifact_type) return self.artifacts.get(artifact_id) if artifact_id else None def get_context_summary(self) -> Dict[str, Any]: """Summarize available artifacts for orchestrator""" return { "available_artifacts": [ { "id": a.id, "type": a.type, "created_by": a.created_by, "summary": a.metadata.get("summary", "No summary") } for a in self.artifacts.values() ], "last_artifacts": { type.value: id for type, id in self.last_artifact_by_type.items() } } ``` #### 2. Orchestrator Agent (Replace `triage_agent` in `agents.py`) ```python orchestrator_agent = Agent( name="orchestrator", instructions=""" You are an intelligent orchestrator that plans and executes multi-step workflows. CAPABILITIES: You have access to specialized agents as tools. You can: 1. Call multiple agents in sequence 2. Pass outputs from one agent as inputs to another 3. Access previously created artifacts (scripts, videos, search results) 4. Handle modifications to existing content PLANNING: For each user request: 1. Understand what the user wants 2. Check if it relates to existing artifacts 3. Plan the sequence of operations needed 4. Execute the plan by calling appropriate tools CONTEXT AWARENESS: You receive a context summary with each request showing: - available_artifacts: List of previously created content - last_artifacts: Most recent artifact of each type When users say "the script", "that video", etc., refer to the last_artifacts. EXAMPLES: - "Create a video script about today's weather" → search_web(query="today's weather [location]") → create_script(topic="weather", context=search_results) - "Now create a video from it" → create_video(script_id=last_script_id) - "Change the temperature to 75 degrees" → modify_script(script_id=last_script_id, modifications="change temperature to 75 degrees") Always think step by step and call the appropriate tools. """, tools=[ # Agents exposed as tools - defined in agent_tools.py ] ) ``` #### 3. Agent Tools Factory (`agent_system/agent_tools.py`) ```python async def create_agent_tool(agent: Agent, context: ConversationContext): """Factory to create context-aware tools from agents""" async def tool_wrapper(**kwargs) -> Dict[str, Any]: # Add context information to agent input if "context_artifacts" in kwargs: # Resolve artifact references artifacts_data = {} for artifact_id in kwargs.get("context_artifacts", []): if artifact_id in context.artifacts: artifacts_data[artifact_id] = context.artifacts[artifact_id].content kwargs["artifacts_data"] = artifacts_data # Run the agent result = await Runner.run(agent, kwargs) # Extract and store output as artifact output = result.final_output # Create artifact based on agent type artifact_type = ArtifactType.CUSTOM if "script" in agent.name: artifact_type = ArtifactType.SCRIPT elif "video" in agent.name: artifact_type = ArtifactType.VIDEO elif "search" in agent.name: artifact_type = ArtifactType.SEARCH_RESULTS artifact = Artifact( type=artifact_type, content=output if isinstance(output, dict) else {"data": output}, created_by=agent.name, metadata={"summary": str(output)[:200]} ) context.add_artifact(artifact) return { "artifact_id": artifact.id, "output": output } return tool_wrapper ``` #### 4. Updated Chat Interface (`ui/chat_interface.py`) ```python from agent_system.context import ConversationContext, Artifact from agent_system.agent_tools import create_search_tool, create_script_tool, create_video_tool class ChatState: """Maintains conversation state""" def __init__(self): self.context = ConversationContext() self.orchestrator = None self._setup_orchestrator() def _setup_orchestrator(self): """Initialize orchestrator with context-aware tools""" # Create tools that have access to context search_tool = create_search_tool(self.context) script_tool = create_script_tool(self.context) video_tool = create_video_tool(self.context) # Clone orchestrator and add tools self.orchestrator = orchestrator_agent.clone( tools=[search_tool, script_tool, video_tool] ) # Modify the chat_with_agents function async def chat_with_agents(message: str, history: List[Dict[str, Any]], state: ChatState = None): """ Process user messages through the orchestrated agent system. Args: message: Current user message history: Conversation history state: Conversation state (created per session) """ # Initialize state if not provided (first message) if state is None: state = ChatState() # Get authentication state (keep existing auth flow) auth_state = get_auth_state_from_history(history) # Handle authentication first (keep existing logic) if not auth_state.is_authenticated: # ... existing auth handling ... pass # For authenticated users, process through orchestrator try: # Prepare orchestrator input with context orchestrator_input = { "user_message": message, "context_summary": state.context.get_context_summary(), "conversation_history": history[-5:] # Last 5 exchanges } # Run orchestrator result = await Runner.run_streamed( state.orchestrator, orchestrator_input ) # Stream responses async for event in process_orchestrator_response(result, state): yield event except Exception as e: logger.error(f"Orchestration error: {e}") error_response = f"I encountered an error: {str(e)}" history.append({"role": "assistant", "content": error_response}) yield history ``` #### 5. Gradio Interface Updates ```python def create_chat_interface(): """Create the Gradio chat interface with state management""" with gr.Blocks(title="AI Agent Chat") as demo: # Add state component chat_state = gr.State(ChatState()) with gr.Row(): with gr.Column(scale=3): chatbot = gr.Chatbot( value=[], elem_id="chatbot", height=600 ) msg = gr.Textbox( label="Message", placeholder="Type your message here...", lines=2 ) submit = gr.Button("Send", variant="primary") clear = gr.Button("Clear Chat") with gr.Column(scale=1): # Add context viewer gr.Markdown("### Current Context") context_display = gr.JSON( value={}, label="Active Artifacts" ) # Update handlers to pass state async def respond(message, chat_history, state): if not message: return chat_history, "" chat_history.append({"role": "user", "content": message}) async for updated_history in chat_with_agents(message, chat_history, state): yield updated_history, "" # Update context display yield updated_history, "", state.context.get_context_summary() # Wire up events msg.submit(respond, [msg, chatbot, chat_state], [chatbot, msg, context_display]) submit.click(respond, [msg, chatbot, chat_state], [chatbot, msg, context_display]) # Clear preserves state but clears history clear.click(lambda s: ([], s), [chat_state], [chatbot, chat_state]) return demo ``` ## Implementation Steps ### Step 1: Create Context System 1. Create `agent_system/context.py` with the models above 2. Add necessary imports to `__init__.py` ### Step 2: Create Agent Tools 1. Create `agent_system/agent_tools.py` 2. Implement tool wrappers for each agent type 3. Ensure tools can access and store artifacts ### Step 3: Replace Triage with Orchestrator 1. In `agents.py`, comment out `triage_agent` 2. Add `orchestrator_agent` definition 3. Update imports and dependencies ### Step 4: Update Chat Interface 1. Add `ChatState` class to maintain conversation state 2. Modify `chat_with_agents` to use orchestrator 3. Update Gradio interface to show context ### Step 5: Update Agent Instructions 1. Modify existing agents to accept context artifacts 2. Ensure script agent can handle modifications 3. Ensure video agent can accept script artifacts ## Example Workflows ### Example 1: Data-Driven Script Creation **User**: "Create a video script about today's mortgage rates" 1. Orchestrator recognizes need for current data 2. Calls: `search_web("today's mortgage rates")` 3. Stores search results as artifact 4. Calls: `create_script(topic="mortgage rates", context_artifacts=[search_artifact_id])` 5. Script agent receives real data and creates accurate script 6. Script stored as artifact ### Example 2: Script Modification **User**: "Change the rate to 6.7%" 1. Orchestrator recognizes modification request 2. Identifies last script artifact from context 3. Calls: `create_script(original_script_id=last_script_id, modifications="change rate to 6.7%")` 4. Script agent modifies and returns updated script 5. New version stored as artifact ### Example 3: Multi-Step Video Creation **User**: "Create a video about Apple's stock performance this week" 1. Search for Apple stock data 2. Create script with financial data 3. User: "Make it more upbeat" 4. Modify script tone 5. User: "Now create the video" 6. Create video from modified script ## Key Benefits 1. **Stateful Conversations** - Context preserved throughout session 2. **Multi-Step Workflows** - Agents can build on each other's outputs 3. **Flexible Modifications** - Any artifact can be referenced and modified 4. **Data-Driven Content** - Real data flows from search to content creation 5. **No Hardcoding** - System adapts to any domain or content type ## Testing Plan 1. Test basic workflows (search → script → video) 2. Test modification flows (create → modify → modify again) 3. Test artifact references ("the script", "that video") 4. Test error handling (missing artifacts, failed searches) 5. Test complex workflows (multiple searches, multiple scripts) ## Notes - Authentication flow remains unchanged - Existing agents need minimal modifications - System is extensible - new agents can be added as tools - Context viewer in UI helps users understand available artifacts