| # Orchestration Implementation Plan |
|
|
| ## Overview |
|
|
| Replace the current single-hop triage agent with an intelligent orchestrator that maintains conversation-level state and enables multi-step workflows with data passing between agents. |
|
|
| ## Current Architecture Limitations |
|
|
| 1. **Stateless agent calls** - No context preservation between interactions |
| 2. **Single-hop routing** - Triage agent can only route to one destination |
| 3. **No data flow** - Agents cannot pass structured data to each other |
| 4. **Limited understanding** - Simple keyword matching vs. intent comprehension |
|
|
| ## Proposed Architecture: Conversation-Level State Orchestration |
|
|
| ### Core Components |
|
|
| #### 1. Conversation Context Model (`agent_system/context.py`) |
| |
| ```python |
| from pydantic import BaseModel |
| from typing import Dict, Any, List, Optional |
| from datetime import datetime |
| from enum import Enum |
| import uuid |
| |
| class ArtifactType(str, Enum): |
| SCRIPT = "script" |
| VIDEO = "video" |
| SEARCH_RESULTS = "search_results" |
| CUSTOM = "custom" |
| |
| class Artifact(BaseModel): |
| """Any output from an agent that can be referenced later""" |
| id: str = Field(default_factory=lambda: str(uuid.uuid4())) |
| type: ArtifactType |
| content: Dict[str, Any] |
| created_at: datetime = Field(default_factory=datetime.now) |
| created_by: str # agent name |
| metadata: Dict[str, Any] = {} |
| |
| class ConversationContext(BaseModel): |
| """Maintains state throughout a conversation""" |
| artifacts: Dict[str, Artifact] = {} # id -> artifact |
| last_artifact_by_type: Dict[ArtifactType, str] = {} # type -> artifact_id |
| workflow_history: List[Dict[str, Any]] = [] |
| |
| def add_artifact(self, artifact: Artifact) -> None: |
| """Store artifact and update type mapping""" |
| self.artifacts[artifact.id] = artifact |
| self.last_artifact_by_type[artifact.type] = artifact.id |
| self.workflow_history.append({ |
| "action": "created_artifact", |
| "artifact_id": artifact.id, |
| "type": artifact.type, |
| "timestamp": artifact.created_at |
| }) |
| |
| def get_latest(self, artifact_type: ArtifactType) -> Optional[Artifact]: |
| """Get most recent artifact of a type""" |
| artifact_id = self.last_artifact_by_type.get(artifact_type) |
| return self.artifacts.get(artifact_id) if artifact_id else None |
| |
| def get_context_summary(self) -> Dict[str, Any]: |
| """Summarize available artifacts for orchestrator""" |
| return { |
| "available_artifacts": [ |
| { |
| "id": a.id, |
| "type": a.type, |
| "created_by": a.created_by, |
| "summary": a.metadata.get("summary", "No summary") |
| } |
| for a in self.artifacts.values() |
| ], |
| "last_artifacts": { |
| type.value: id for type, id in self.last_artifact_by_type.items() |
| } |
| } |
| ``` |
| |
| #### 2. Orchestrator Agent (Replace `triage_agent` in `agents.py`) |
| |
| ```python |
| orchestrator_agent = Agent( |
| name="orchestrator", |
| instructions=""" |
| You are an intelligent orchestrator that plans and executes multi-step workflows. |
| |
| CAPABILITIES: |
| You have access to specialized agents as tools. You can: |
| 1. Call multiple agents in sequence |
| 2. Pass outputs from one agent as inputs to another |
| 3. Access previously created artifacts (scripts, videos, search results) |
| 4. Handle modifications to existing content |
| |
| PLANNING: |
| For each user request: |
| 1. Understand what the user wants |
| 2. Check if it relates to existing artifacts |
| 3. Plan the sequence of operations needed |
| 4. Execute the plan by calling appropriate tools |
| |
| CONTEXT AWARENESS: |
| You receive a context summary with each request showing: |
| - available_artifacts: List of previously created content |
| - last_artifacts: Most recent artifact of each type |
| |
| When users say "the script", "that video", etc., refer to the last_artifacts. |
| |
| EXAMPLES: |
| - "Create a video script about today's weather" |
| → search_web(query="today's weather [location]") |
| → create_script(topic="weather", context=search_results) |
| |
| - "Now create a video from it" |
| → create_video(script_id=last_script_id) |
| |
| - "Change the temperature to 75 degrees" |
| → modify_script(script_id=last_script_id, modifications="change temperature to 75 degrees") |
| |
| Always think step by step and call the appropriate tools. |
| """, |
| tools=[ |
| # Agents exposed as tools - defined in agent_tools.py |
| ] |
| ) |
| ``` |
| |
| #### 3. Agent Tools Factory (`agent_system/agent_tools.py`) |
|
|
| ```python |
| async def create_agent_tool(agent: Agent, context: ConversationContext): |
| """Factory to create context-aware tools from agents""" |
| |
| async def tool_wrapper(**kwargs) -> Dict[str, Any]: |
| # Add context information to agent input |
| if "context_artifacts" in kwargs: |
| # Resolve artifact references |
| artifacts_data = {} |
| for artifact_id in kwargs.get("context_artifacts", []): |
| if artifact_id in context.artifacts: |
| artifacts_data[artifact_id] = context.artifacts[artifact_id].content |
| kwargs["artifacts_data"] = artifacts_data |
| |
| # Run the agent |
| result = await Runner.run(agent, kwargs) |
| |
| # Extract and store output as artifact |
| output = result.final_output |
| |
| # Create artifact based on agent type |
| artifact_type = ArtifactType.CUSTOM |
| if "script" in agent.name: |
| artifact_type = ArtifactType.SCRIPT |
| elif "video" in agent.name: |
| artifact_type = ArtifactType.VIDEO |
| elif "search" in agent.name: |
| artifact_type = ArtifactType.SEARCH_RESULTS |
| |
| artifact = Artifact( |
| type=artifact_type, |
| content=output if isinstance(output, dict) else {"data": output}, |
| created_by=agent.name, |
| metadata={"summary": str(output)[:200]} |
| ) |
| |
| context.add_artifact(artifact) |
| |
| return { |
| "artifact_id": artifact.id, |
| "output": output |
| } |
| |
| return tool_wrapper |
| ``` |
|
|
| #### 4. Updated Chat Interface (`ui/chat_interface.py`) |
| |
| ```python |
| from agent_system.context import ConversationContext, Artifact |
| from agent_system.agent_tools import create_search_tool, create_script_tool, create_video_tool |
|
|
| class ChatState: |
| """Maintains conversation state""" |
| def __init__(self): |
| self.context = ConversationContext() |
| self.orchestrator = None |
| self._setup_orchestrator() |
| |
| def _setup_orchestrator(self): |
| """Initialize orchestrator with context-aware tools""" |
| # Create tools that have access to context |
| search_tool = create_search_tool(self.context) |
| script_tool = create_script_tool(self.context) |
| video_tool = create_video_tool(self.context) |
| |
| # Clone orchestrator and add tools |
| self.orchestrator = orchestrator_agent.clone( |
| tools=[search_tool, script_tool, video_tool] |
| ) |
| |
| # Modify the chat_with_agents function |
| async def chat_with_agents(message: str, history: List[Dict[str, Any]], state: ChatState = None): |
| """ |
| Process user messages through the orchestrated agent system. |
| |
| Args: |
| message: Current user message |
| history: Conversation history |
| state: Conversation state (created per session) |
| """ |
| # Initialize state if not provided (first message) |
| if state is None: |
| state = ChatState() |
| |
| # Get authentication state (keep existing auth flow) |
| auth_state = get_auth_state_from_history(history) |
| |
| # Handle authentication first (keep existing logic) |
| if not auth_state.is_authenticated: |
| # ... existing auth handling ... |
| pass |
| |
| # For authenticated users, process through orchestrator |
| try: |
| # Prepare orchestrator input with context |
| orchestrator_input = { |
| "user_message": message, |
| "context_summary": state.context.get_context_summary(), |
| "conversation_history": history[-5:] # Last 5 exchanges |
| } |
| |
| # Run orchestrator |
| result = await Runner.run_streamed( |
| state.orchestrator, |
| orchestrator_input |
| ) |
| |
| # Stream responses |
| async for event in process_orchestrator_response(result, state): |
| yield event |
| |
| except Exception as e: |
| logger.error(f"Orchestration error: {e}") |
| error_response = f"I encountered an error: {str(e)}" |
| history.append({"role": "assistant", "content": error_response}) |
| yield history |
| ``` |
| |
| #### 5. Gradio Interface Updates |
|
|
| ```python |
| def create_chat_interface(): |
| """Create the Gradio chat interface with state management""" |
| |
| with gr.Blocks(title="AI Agent Chat") as demo: |
| # Add state component |
| chat_state = gr.State(ChatState()) |
| |
| with gr.Row(): |
| with gr.Column(scale=3): |
| chatbot = gr.Chatbot( |
| value=[], |
| elem_id="chatbot", |
| height=600 |
| ) |
| msg = gr.Textbox( |
| label="Message", |
| placeholder="Type your message here...", |
| lines=2 |
| ) |
| submit = gr.Button("Send", variant="primary") |
| clear = gr.Button("Clear Chat") |
| |
| with gr.Column(scale=1): |
| # Add context viewer |
| gr.Markdown("### Current Context") |
| context_display = gr.JSON( |
| value={}, |
| label="Active Artifacts" |
| ) |
| |
| # Update handlers to pass state |
| async def respond(message, chat_history, state): |
| if not message: |
| return chat_history, "" |
| |
| chat_history.append({"role": "user", "content": message}) |
| |
| async for updated_history in chat_with_agents(message, chat_history, state): |
| yield updated_history, "" |
| # Update context display |
| yield updated_history, "", state.context.get_context_summary() |
| |
| # Wire up events |
| msg.submit(respond, [msg, chatbot, chat_state], [chatbot, msg, context_display]) |
| submit.click(respond, [msg, chatbot, chat_state], [chatbot, msg, context_display]) |
| |
| # Clear preserves state but clears history |
| clear.click(lambda s: ([], s), [chat_state], [chatbot, chat_state]) |
| |
| return demo |
| ``` |
|
|
| ## Implementation Steps |
|
|
| ### Step 1: Create Context System |
| 1. Create `agent_system/context.py` with the models above |
| 2. Add necessary imports to `__init__.py` |
|
|
| ### Step 2: Create Agent Tools |
| 1. Create `agent_system/agent_tools.py` |
| 2. Implement tool wrappers for each agent type |
| 3. Ensure tools can access and store artifacts |
|
|
| ### Step 3: Replace Triage with Orchestrator |
| 1. In `agents.py`, comment out `triage_agent` |
| 2. Add `orchestrator_agent` definition |
| 3. Update imports and dependencies |
|
|
| ### Step 4: Update Chat Interface |
| 1. Add `ChatState` class to maintain conversation state |
| 2. Modify `chat_with_agents` to use orchestrator |
| 3. Update Gradio interface to show context |
|
|
| ### Step 5: Update Agent Instructions |
| 1. Modify existing agents to accept context artifacts |
| 2. Ensure script agent can handle modifications |
| 3. Ensure video agent can accept script artifacts |
|
|
| ## Example Workflows |
|
|
| ### Example 1: Data-Driven Script Creation |
| **User**: "Create a video script about today's mortgage rates" |
|
|
| 1. Orchestrator recognizes need for current data |
| 2. Calls: `search_web("today's mortgage rates")` |
| 3. Stores search results as artifact |
| 4. Calls: `create_script(topic="mortgage rates", context_artifacts=[search_artifact_id])` |
| 5. Script agent receives real data and creates accurate script |
| 6. Script stored as artifact |
|
|
| ### Example 2: Script Modification |
| **User**: "Change the rate to 6.7%" |
|
|
| 1. Orchestrator recognizes modification request |
| 2. Identifies last script artifact from context |
| 3. Calls: `create_script(original_script_id=last_script_id, modifications="change rate to 6.7%")` |
| 4. Script agent modifies and returns updated script |
| 5. New version stored as artifact |
|
|
| ### Example 3: Multi-Step Video Creation |
| **User**: "Create a video about Apple's stock performance this week" |
|
|
| 1. Search for Apple stock data |
| 2. Create script with financial data |
| 3. User: "Make it more upbeat" |
| 4. Modify script tone |
| 5. User: "Now create the video" |
| 6. Create video from modified script |
|
|
| ## Key Benefits |
|
|
| 1. **Stateful Conversations** - Context preserved throughout session |
| 2. **Multi-Step Workflows** - Agents can build on each other's outputs |
| 3. **Flexible Modifications** - Any artifact can be referenced and modified |
| 4. **Data-Driven Content** - Real data flows from search to content creation |
| 5. **No Hardcoding** - System adapts to any domain or content type |
|
|
| ## Testing Plan |
|
|
| 1. Test basic workflows (search → script → video) |
| 2. Test modification flows (create → modify → modify again) |
| 3. Test artifact references ("the script", "that video") |
| 4. Test error handling (missing artifacts, failed searches) |
| 5. Test complex workflows (multiple searches, multiple scripts) |
|
|
| ## Notes |
|
|
| - Authentication flow remains unchanged |
| - Existing agents need minimal modifications |
| - System is extensible - new agents can be added as tools |
| - Context viewer in UI helps users understand available artifacts |