chat / docs /orchestration_implementation_plan.md
rejig-ai's picture
With logging enabled
6c31f22
# Orchestration Implementation Plan
## Overview
Replace the current single-hop triage agent with an intelligent orchestrator that maintains conversation-level state and enables multi-step workflows with data passing between agents.
## Current Architecture Limitations
1. **Stateless agent calls** - No context preservation between interactions
2. **Single-hop routing** - Triage agent can only route to one destination
3. **No data flow** - Agents cannot pass structured data to each other
4. **Limited understanding** - Simple keyword matching vs. intent comprehension
## Proposed Architecture: Conversation-Level State Orchestration
### Core Components
#### 1. Conversation Context Model (`agent_system/context.py`)
```python
from pydantic import BaseModel
from typing import Dict, Any, List, Optional
from datetime import datetime
from enum import Enum
import uuid
class ArtifactType(str, Enum):
SCRIPT = "script"
VIDEO = "video"
SEARCH_RESULTS = "search_results"
CUSTOM = "custom"
class Artifact(BaseModel):
"""Any output from an agent that can be referenced later"""
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
type: ArtifactType
content: Dict[str, Any]
created_at: datetime = Field(default_factory=datetime.now)
created_by: str # agent name
metadata: Dict[str, Any] = {}
class ConversationContext(BaseModel):
"""Maintains state throughout a conversation"""
artifacts: Dict[str, Artifact] = {} # id -> artifact
last_artifact_by_type: Dict[ArtifactType, str] = {} # type -> artifact_id
workflow_history: List[Dict[str, Any]] = []
def add_artifact(self, artifact: Artifact) -> None:
"""Store artifact and update type mapping"""
self.artifacts[artifact.id] = artifact
self.last_artifact_by_type[artifact.type] = artifact.id
self.workflow_history.append({
"action": "created_artifact",
"artifact_id": artifact.id,
"type": artifact.type,
"timestamp": artifact.created_at
})
def get_latest(self, artifact_type: ArtifactType) -> Optional[Artifact]:
"""Get most recent artifact of a type"""
artifact_id = self.last_artifact_by_type.get(artifact_type)
return self.artifacts.get(artifact_id) if artifact_id else None
def get_context_summary(self) -> Dict[str, Any]:
"""Summarize available artifacts for orchestrator"""
return {
"available_artifacts": [
{
"id": a.id,
"type": a.type,
"created_by": a.created_by,
"summary": a.metadata.get("summary", "No summary")
}
for a in self.artifacts.values()
],
"last_artifacts": {
type.value: id for type, id in self.last_artifact_by_type.items()
}
}
```
#### 2. Orchestrator Agent (Replace `triage_agent` in `agents.py`)
```python
orchestrator_agent = Agent(
name="orchestrator",
instructions="""
You are an intelligent orchestrator that plans and executes multi-step workflows.
CAPABILITIES:
You have access to specialized agents as tools. You can:
1. Call multiple agents in sequence
2. Pass outputs from one agent as inputs to another
3. Access previously created artifacts (scripts, videos, search results)
4. Handle modifications to existing content
PLANNING:
For each user request:
1. Understand what the user wants
2. Check if it relates to existing artifacts
3. Plan the sequence of operations needed
4. Execute the plan by calling appropriate tools
CONTEXT AWARENESS:
You receive a context summary with each request showing:
- available_artifacts: List of previously created content
- last_artifacts: Most recent artifact of each type
When users say "the script", "that video", etc., refer to the last_artifacts.
EXAMPLES:
- "Create a video script about today's weather"
→ search_web(query="today's weather [location]")
→ create_script(topic="weather", context=search_results)
- "Now create a video from it"
→ create_video(script_id=last_script_id)
- "Change the temperature to 75 degrees"
→ modify_script(script_id=last_script_id, modifications="change temperature to 75 degrees")
Always think step by step and call the appropriate tools.
""",
tools=[
# Agents exposed as tools - defined in agent_tools.py
]
)
```
#### 3. Agent Tools Factory (`agent_system/agent_tools.py`)
```python
async def create_agent_tool(agent: Agent, context: ConversationContext):
"""Factory to create context-aware tools from agents"""
async def tool_wrapper(**kwargs) -> Dict[str, Any]:
# Add context information to agent input
if "context_artifacts" in kwargs:
# Resolve artifact references
artifacts_data = {}
for artifact_id in kwargs.get("context_artifacts", []):
if artifact_id in context.artifacts:
artifacts_data[artifact_id] = context.artifacts[artifact_id].content
kwargs["artifacts_data"] = artifacts_data
# Run the agent
result = await Runner.run(agent, kwargs)
# Extract and store output as artifact
output = result.final_output
# Create artifact based on agent type
artifact_type = ArtifactType.CUSTOM
if "script" in agent.name:
artifact_type = ArtifactType.SCRIPT
elif "video" in agent.name:
artifact_type = ArtifactType.VIDEO
elif "search" in agent.name:
artifact_type = ArtifactType.SEARCH_RESULTS
artifact = Artifact(
type=artifact_type,
content=output if isinstance(output, dict) else {"data": output},
created_by=agent.name,
metadata={"summary": str(output)[:200]}
)
context.add_artifact(artifact)
return {
"artifact_id": artifact.id,
"output": output
}
return tool_wrapper
```
#### 4. Updated Chat Interface (`ui/chat_interface.py`)
```python
from agent_system.context import ConversationContext, Artifact
from agent_system.agent_tools import create_search_tool, create_script_tool, create_video_tool
class ChatState:
"""Maintains conversation state"""
def __init__(self):
self.context = ConversationContext()
self.orchestrator = None
self._setup_orchestrator()
def _setup_orchestrator(self):
"""Initialize orchestrator with context-aware tools"""
# Create tools that have access to context
search_tool = create_search_tool(self.context)
script_tool = create_script_tool(self.context)
video_tool = create_video_tool(self.context)
# Clone orchestrator and add tools
self.orchestrator = orchestrator_agent.clone(
tools=[search_tool, script_tool, video_tool]
)
# Modify the chat_with_agents function
async def chat_with_agents(message: str, history: List[Dict[str, Any]], state: ChatState = None):
"""
Process user messages through the orchestrated agent system.
Args:
message: Current user message
history: Conversation history
state: Conversation state (created per session)
"""
# Initialize state if not provided (first message)
if state is None:
state = ChatState()
# Get authentication state (keep existing auth flow)
auth_state = get_auth_state_from_history(history)
# Handle authentication first (keep existing logic)
if not auth_state.is_authenticated:
# ... existing auth handling ...
pass
# For authenticated users, process through orchestrator
try:
# Prepare orchestrator input with context
orchestrator_input = {
"user_message": message,
"context_summary": state.context.get_context_summary(),
"conversation_history": history[-5:] # Last 5 exchanges
}
# Run orchestrator
result = await Runner.run_streamed(
state.orchestrator,
orchestrator_input
)
# Stream responses
async for event in process_orchestrator_response(result, state):
yield event
except Exception as e:
logger.error(f"Orchestration error: {e}")
error_response = f"I encountered an error: {str(e)}"
history.append({"role": "assistant", "content": error_response})
yield history
```
#### 5. Gradio Interface Updates
```python
def create_chat_interface():
"""Create the Gradio chat interface with state management"""
with gr.Blocks(title="AI Agent Chat") as demo:
# Add state component
chat_state = gr.State(ChatState())
with gr.Row():
with gr.Column(scale=3):
chatbot = gr.Chatbot(
value=[],
elem_id="chatbot",
height=600
)
msg = gr.Textbox(
label="Message",
placeholder="Type your message here...",
lines=2
)
submit = gr.Button("Send", variant="primary")
clear = gr.Button("Clear Chat")
with gr.Column(scale=1):
# Add context viewer
gr.Markdown("### Current Context")
context_display = gr.JSON(
value={},
label="Active Artifacts"
)
# Update handlers to pass state
async def respond(message, chat_history, state):
if not message:
return chat_history, ""
chat_history.append({"role": "user", "content": message})
async for updated_history in chat_with_agents(message, chat_history, state):
yield updated_history, ""
# Update context display
yield updated_history, "", state.context.get_context_summary()
# Wire up events
msg.submit(respond, [msg, chatbot, chat_state], [chatbot, msg, context_display])
submit.click(respond, [msg, chatbot, chat_state], [chatbot, msg, context_display])
# Clear preserves state but clears history
clear.click(lambda s: ([], s), [chat_state], [chatbot, chat_state])
return demo
```
## Implementation Steps
### Step 1: Create Context System
1. Create `agent_system/context.py` with the models above
2. Add necessary imports to `__init__.py`
### Step 2: Create Agent Tools
1. Create `agent_system/agent_tools.py`
2. Implement tool wrappers for each agent type
3. Ensure tools can access and store artifacts
### Step 3: Replace Triage with Orchestrator
1. In `agents.py`, comment out `triage_agent`
2. Add `orchestrator_agent` definition
3. Update imports and dependencies
### Step 4: Update Chat Interface
1. Add `ChatState` class to maintain conversation state
2. Modify `chat_with_agents` to use orchestrator
3. Update Gradio interface to show context
### Step 5: Update Agent Instructions
1. Modify existing agents to accept context artifacts
2. Ensure script agent can handle modifications
3. Ensure video agent can accept script artifacts
## Example Workflows
### Example 1: Data-Driven Script Creation
**User**: "Create a video script about today's mortgage rates"
1. Orchestrator recognizes need for current data
2. Calls: `search_web("today's mortgage rates")`
3. Stores search results as artifact
4. Calls: `create_script(topic="mortgage rates", context_artifacts=[search_artifact_id])`
5. Script agent receives real data and creates accurate script
6. Script stored as artifact
### Example 2: Script Modification
**User**: "Change the rate to 6.7%"
1. Orchestrator recognizes modification request
2. Identifies last script artifact from context
3. Calls: `create_script(original_script_id=last_script_id, modifications="change rate to 6.7%")`
4. Script agent modifies and returns updated script
5. New version stored as artifact
### Example 3: Multi-Step Video Creation
**User**: "Create a video about Apple's stock performance this week"
1. Search for Apple stock data
2. Create script with financial data
3. User: "Make it more upbeat"
4. Modify script tone
5. User: "Now create the video"
6. Create video from modified script
## Key Benefits
1. **Stateful Conversations** - Context preserved throughout session
2. **Multi-Step Workflows** - Agents can build on each other's outputs
3. **Flexible Modifications** - Any artifact can be referenced and modified
4. **Data-Driven Content** - Real data flows from search to content creation
5. **No Hardcoding** - System adapts to any domain or content type
## Testing Plan
1. Test basic workflows (search → script → video)
2. Test modification flows (create → modify → modify again)
3. Test artifact references ("the script", "that video")
4. Test error handling (missing artifacts, failed searches)
5. Test complex workflows (multiple searches, multiple scripts)
## Notes
- Authentication flow remains unchanged
- Existing agents need minimal modifications
- System is extensible - new agents can be added as tools
- Context viewer in UI helps users understand available artifacts