chat / docs /orchestration_implementation_plan.md
rejig-ai's picture
With logging enabled
6c31f22

A newer version of the Gradio SDK is available: 6.12.0

Upgrade

Orchestration Implementation Plan

Overview

Replace the current single-hop triage agent with an intelligent orchestrator that maintains conversation-level state and enables multi-step workflows with data passing between agents.

Current Architecture Limitations

  1. Stateless agent calls - No context preservation between interactions
  2. Single-hop routing - Triage agent can only route to one destination
  3. No data flow - Agents cannot pass structured data to each other
  4. Limited understanding - Simple keyword matching vs. intent comprehension

Proposed Architecture: Conversation-Level State Orchestration

Core Components

1. Conversation Context Model (agent_system/context.py)

from pydantic import BaseModel
from typing import Dict, Any, List, Optional
from datetime import datetime
from enum import Enum
import uuid

class ArtifactType(str, Enum):
    SCRIPT = "script"
    VIDEO = "video"
    SEARCH_RESULTS = "search_results"
    CUSTOM = "custom"

class Artifact(BaseModel):
    """Any output from an agent that can be referenced later"""
    id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    type: ArtifactType
    content: Dict[str, Any]
    created_at: datetime = Field(default_factory=datetime.now)
    created_by: str  # agent name
    metadata: Dict[str, Any] = {}

class ConversationContext(BaseModel):
    """Maintains state throughout a conversation"""
    artifacts: Dict[str, Artifact] = {}  # id -> artifact
    last_artifact_by_type: Dict[ArtifactType, str] = {}  # type -> artifact_id
    workflow_history: List[Dict[str, Any]] = []
    
    def add_artifact(self, artifact: Artifact) -> None:
        """Store artifact and update type mapping"""
        self.artifacts[artifact.id] = artifact
        self.last_artifact_by_type[artifact.type] = artifact.id
        self.workflow_history.append({
            "action": "created_artifact",
            "artifact_id": artifact.id,
            "type": artifact.type,
            "timestamp": artifact.created_at
        })
    
    def get_latest(self, artifact_type: ArtifactType) -> Optional[Artifact]:
        """Get most recent artifact of a type"""
        artifact_id = self.last_artifact_by_type.get(artifact_type)
        return self.artifacts.get(artifact_id) if artifact_id else None
    
    def get_context_summary(self) -> Dict[str, Any]:
        """Summarize available artifacts for orchestrator"""
        return {
            "available_artifacts": [
                {
                    "id": a.id,
                    "type": a.type,
                    "created_by": a.created_by,
                    "summary": a.metadata.get("summary", "No summary")
                }
                for a in self.artifacts.values()
            ],
            "last_artifacts": {
                type.value: id for type, id in self.last_artifact_by_type.items()
            }
        }

2. Orchestrator Agent (Replace triage_agent in agents.py)

orchestrator_agent = Agent(
    name="orchestrator",
    instructions="""
    You are an intelligent orchestrator that plans and executes multi-step workflows.
    
    CAPABILITIES:
    You have access to specialized agents as tools. You can:
    1. Call multiple agents in sequence
    2. Pass outputs from one agent as inputs to another
    3. Access previously created artifacts (scripts, videos, search results)
    4. Handle modifications to existing content
    
    PLANNING:
    For each user request:
    1. Understand what the user wants
    2. Check if it relates to existing artifacts
    3. Plan the sequence of operations needed
    4. Execute the plan by calling appropriate tools
    
    CONTEXT AWARENESS:
    You receive a context summary with each request showing:
    - available_artifacts: List of previously created content
    - last_artifacts: Most recent artifact of each type
    
    When users say "the script", "that video", etc., refer to the last_artifacts.
    
    EXAMPLES:
    - "Create a video script about today's weather"
      β†’ search_web(query="today's weather [location]")
      β†’ create_script(topic="weather", context=search_results)
    
    - "Now create a video from it"
      β†’ create_video(script_id=last_script_id)
    
    - "Change the temperature to 75 degrees"
      β†’ modify_script(script_id=last_script_id, modifications="change temperature to 75 degrees")
    
    Always think step by step and call the appropriate tools.
    """,
    tools=[
        # Agents exposed as tools - defined in agent_tools.py
    ]
)

3. Agent Tools Factory (agent_system/agent_tools.py)

async def create_agent_tool(agent: Agent, context: ConversationContext):
    """Factory to create context-aware tools from agents"""
    
    async def tool_wrapper(**kwargs) -> Dict[str, Any]:
        # Add context information to agent input
        if "context_artifacts" in kwargs:
            # Resolve artifact references
            artifacts_data = {}
            for artifact_id in kwargs.get("context_artifacts", []):
                if artifact_id in context.artifacts:
                    artifacts_data[artifact_id] = context.artifacts[artifact_id].content
            kwargs["artifacts_data"] = artifacts_data
        
        # Run the agent
        result = await Runner.run(agent, kwargs)
        
        # Extract and store output as artifact
        output = result.final_output
        
        # Create artifact based on agent type
        artifact_type = ArtifactType.CUSTOM
        if "script" in agent.name:
            artifact_type = ArtifactType.SCRIPT
        elif "video" in agent.name:
            artifact_type = ArtifactType.VIDEO
        elif "search" in agent.name:
            artifact_type = ArtifactType.SEARCH_RESULTS
        
        artifact = Artifact(
            type=artifact_type,
            content=output if isinstance(output, dict) else {"data": output},
            created_by=agent.name,
            metadata={"summary": str(output)[:200]}
        )
        
        context.add_artifact(artifact)
        
        return {
            "artifact_id": artifact.id,
            "output": output
        }
    
    return tool_wrapper

4. Updated Chat Interface (ui/chat_interface.py)

from agent_system.context import ConversationContext, Artifact
from agent_system.agent_tools import create_search_tool, create_script_tool, create_video_tool

class ChatState:
    """Maintains conversation state"""
    def __init__(self):
        self.context = ConversationContext()
        self.orchestrator = None
        self._setup_orchestrator()
    
    def _setup_orchestrator(self):
        """Initialize orchestrator with context-aware tools"""
        # Create tools that have access to context
        search_tool = create_search_tool(self.context)
        script_tool = create_script_tool(self.context)
        video_tool = create_video_tool(self.context)
        
        # Clone orchestrator and add tools
        self.orchestrator = orchestrator_agent.clone(
            tools=[search_tool, script_tool, video_tool]
        )

# Modify the chat_with_agents function
async def chat_with_agents(message: str, history: List[Dict[str, Any]], state: ChatState = None):
    """
    Process user messages through the orchestrated agent system.
    
    Args:
        message: Current user message
        history: Conversation history
        state: Conversation state (created per session)
    """
    # Initialize state if not provided (first message)
    if state is None:
        state = ChatState()
    
    # Get authentication state (keep existing auth flow)
    auth_state = get_auth_state_from_history(history)
    
    # Handle authentication first (keep existing logic)
    if not auth_state.is_authenticated:
        # ... existing auth handling ...
        pass
    
    # For authenticated users, process through orchestrator
    try:
        # Prepare orchestrator input with context
        orchestrator_input = {
            "user_message": message,
            "context_summary": state.context.get_context_summary(),
            "conversation_history": history[-5:]  # Last 5 exchanges
        }
        
        # Run orchestrator
        result = await Runner.run_streamed(
            state.orchestrator,
            orchestrator_input
        )
        
        # Stream responses
        async for event in process_orchestrator_response(result, state):
            yield event
            
    except Exception as e:
        logger.error(f"Orchestration error: {e}")
        error_response = f"I encountered an error: {str(e)}"
        history.append({"role": "assistant", "content": error_response})
        yield history

5. Gradio Interface Updates

def create_chat_interface():
    """Create the Gradio chat interface with state management"""
    
    with gr.Blocks(title="AI Agent Chat") as demo:
        # Add state component
        chat_state = gr.State(ChatState())
        
        with gr.Row():
            with gr.Column(scale=3):
                chatbot = gr.Chatbot(
                    value=[],
                    elem_id="chatbot",
                    height=600
                )
                msg = gr.Textbox(
                    label="Message",
                    placeholder="Type your message here...",
                    lines=2
                )
                submit = gr.Button("Send", variant="primary")
                clear = gr.Button("Clear Chat")
            
            with gr.Column(scale=1):
                # Add context viewer
                gr.Markdown("### Current Context")
                context_display = gr.JSON(
                    value={},
                    label="Active Artifacts"
                )
        
        # Update handlers to pass state
        async def respond(message, chat_history, state):
            if not message:
                return chat_history, ""
            
            chat_history.append({"role": "user", "content": message})
            
            async for updated_history in chat_with_agents(message, chat_history, state):
                yield updated_history, ""
                # Update context display
                yield updated_history, "", state.context.get_context_summary()
        
        # Wire up events
        msg.submit(respond, [msg, chatbot, chat_state], [chatbot, msg, context_display])
        submit.click(respond, [msg, chatbot, chat_state], [chatbot, msg, context_display])
        
        # Clear preserves state but clears history
        clear.click(lambda s: ([], s), [chat_state], [chatbot, chat_state])
    
    return demo

Implementation Steps

Step 1: Create Context System

  1. Create agent_system/context.py with the models above
  2. Add necessary imports to __init__.py

Step 2: Create Agent Tools

  1. Create agent_system/agent_tools.py
  2. Implement tool wrappers for each agent type
  3. Ensure tools can access and store artifacts

Step 3: Replace Triage with Orchestrator

  1. In agents.py, comment out triage_agent
  2. Add orchestrator_agent definition
  3. Update imports and dependencies

Step 4: Update Chat Interface

  1. Add ChatState class to maintain conversation state
  2. Modify chat_with_agents to use orchestrator
  3. Update Gradio interface to show context

Step 5: Update Agent Instructions

  1. Modify existing agents to accept context artifacts
  2. Ensure script agent can handle modifications
  3. Ensure video agent can accept script artifacts

Example Workflows

Example 1: Data-Driven Script Creation

User: "Create a video script about today's mortgage rates"

  1. Orchestrator recognizes need for current data
  2. Calls: search_web("today's mortgage rates")
  3. Stores search results as artifact
  4. Calls: create_script(topic="mortgage rates", context_artifacts=[search_artifact_id])
  5. Script agent receives real data and creates accurate script
  6. Script stored as artifact

Example 2: Script Modification

User: "Change the rate to 6.7%"

  1. Orchestrator recognizes modification request
  2. Identifies last script artifact from context
  3. Calls: create_script(original_script_id=last_script_id, modifications="change rate to 6.7%")
  4. Script agent modifies and returns updated script
  5. New version stored as artifact

Example 3: Multi-Step Video Creation

User: "Create a video about Apple's stock performance this week"

  1. Search for Apple stock data
  2. Create script with financial data
  3. User: "Make it more upbeat"
  4. Modify script tone
  5. User: "Now create the video"
  6. Create video from modified script

Key Benefits

  1. Stateful Conversations - Context preserved throughout session
  2. Multi-Step Workflows - Agents can build on each other's outputs
  3. Flexible Modifications - Any artifact can be referenced and modified
  4. Data-Driven Content - Real data flows from search to content creation
  5. No Hardcoding - System adapts to any domain or content type

Testing Plan

  1. Test basic workflows (search β†’ script β†’ video)
  2. Test modification flows (create β†’ modify β†’ modify again)
  3. Test artifact references ("the script", "that video")
  4. Test error handling (missing artifacts, failed searches)
  5. Test complex workflows (multiple searches, multiple scripts)

Notes

  • Authentication flow remains unchanged
  • Existing agents need minimal modifications
  • System is extensible - new agents can be added as tools
  • Context viewer in UI helps users understand available artifacts