Spaces:
Running
Running
File size: 10,289 Bytes
2f49513 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 |
# Pipecat Multi-Threading Integration
## Overview
This document explains how the multi-threaded telco agent is integrated with the Pipecat voice pipeline using WebRTC.
## Architecture
```
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Browser (WebRTC) β
ββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββ
β
β Audio Stream
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β pipeline.py (FastAPI + Pipecat) β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Pipeline: β β
β β WebRTC β ASR β LangGraphLLMService β TTS β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β langgraph_llm_service.py β
β β β
ββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββ
β
β HTTP/WebSocket
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β LangGraph Server (langgraph dev) β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β react_agent.py (Multi-threaded) β β
β β β β
β β Main Thread: Handles long operations β β
β β Secondary Thread: Handles interim queries β β
β β β β
β β Store: Coordinates between threads β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
```
## How It Works
### 1. **LangGraphLLMService** (`langgraph_llm_service.py`)
This service acts as a bridge between Pipecat's frame-based processing and LangGraph's agent.
#### Key Changes:
**a) Dual Thread Management:**
```python
self._thread_id_main: Optional[str] = None # For long operations
self._thread_id_secondary: Optional[str] = None # For interim queries
```
**b) Operation Status Checking:**
```python
async def _check_long_operation_running(self) -> bool:
"""Check if a long operation is currently running via the store."""
# Queries LangGraph store for "running" status
# Returns True if a long operation is in progress
```
**c) Automatic Routing:**
```python
# Before each message, check if long operation is running
long_operation_running = await self._check_long_operation_running()
if long_operation_running:
thread_type = "secondary" # Route to secondary thread
else:
thread_type = "main" # Route to main thread
```
**d) Input Format:**
```python
# New multi-threaded format
input_payload = {
"messages": [{"type": "human", "content": text}],
"thread_type": "main" or "secondary",
"interim_messages_reset": bool,
}
# Config includes namespace for coordination
config = {
"configurable": {
"user_email": self.user_email,
"thread_id": thread_id,
"namespace_for_memory": ["user@example.com", "tools_updates"],
}
}
```
### 2. **Pipeline Configuration** (`pipeline.py`)
```python
# Enable multi-threading for specific assistants
enable_multi_threading = selected_assistant in ["telco-agent", "wire-transfer-agent"]
llm = LangGraphLLMService(
base_url=os.getenv("LANGGRAPH_BASE_URL", "http://127.0.0.1:2024"),
assistant=selected_assistant,
enable_multi_threading=enable_multi_threading, # NEW
)
```
### 3. **React Agent** (`react_agent.py`)
Already updated to handle multi-threaded input format (see `MULTI_THREAD_README.md`).
## Flow Example
### User says: "Close my contract"
```
1. Browser (WebRTC) β Pipecat Pipeline
2. ASR converts to text: "Close my contract"
3. LangGraphLLMService receives text
4. Service checks store: No long operation running
5. Service sends to main thread:
{
"messages": [{"type": "human", "content": "Close my contract"}],
"thread_type": "main",
"interim_messages_reset": True
}
6. Agent starts 50-second contract closure
7. Agent writes status to store: {"status": "running", "progress": 10}
8. TTS speaks: "Processing your contract closure..."
```
### User says (5 seconds later): "What's the status?"
```
1. Browser (WebRTC) β Pipecat Pipeline
2. ASR converts to text: "What's the status?"
3. LangGraphLLMService receives text
4. Service checks store: Long operation IS running β
5. Service sends to secondary thread:
{
"messages": [{"type": "human", "content": "What's the status?"}],
"thread_type": "secondary",
"interim_messages_reset": False
}
6. Secondary thread checks status tool
7. Agent responds: "Your request is 20% complete"
8. TTS speaks response
9. Main thread continues running in background
```
### Main operation completes (50 seconds later)
```
1. Main thread finishes contract closure
2. Agent synthesizes: result + interim conversation
3. Agent sets completion flag in store
4. TTS speaks: "Your contract has been closed..."
5. Service detects completion on next message
6. Routes future messages to main thread
```
## Configuration
### Environment Variables
```bash
# LangGraph Server
LANGGRAPH_BASE_URL=http://127.0.0.1:2024
LANGGRAPH_ASSISTANT=telco-agent
# User identification (for namespace)
USER_EMAIL=test@example.com
# Enable debug logging
LANGGRAPH_DEBUG_STREAM=true
```
### Enable/Disable Multi-Threading
**For specific agents:**
```python
# In pipeline.py
enable_multi_threading = selected_assistant in ["telco-agent", "wire-transfer-agent"]
```
**Via environment variable (optional):**
```python
enable_multi_threading = os.getenv("ENABLE_MULTI_THREADING", "true").lower() == "true"
```
**Disable for an agent:**
```python
llm = LangGraphLLMService(
assistant="some-other-agent",
enable_multi_threading=False, # Use simple single-threaded mode
)
```
## Store Keys Used
The service queries these store keys for coordination:
| Key | Purpose | Set By |
|-----|---------|--------|
| `working-tool-status-update` | Current tool progress | Agent's long-running tools |
| `main_operation_complete` | Completion signal | Agent's main thread |
| `secondary_interim_messages` | Interim conversation | Agent's secondary thread |
## Backward Compatibility
When `enable_multi_threading=False`:
- Uses single thread
- Sends simple message format: `[HumanMessage(content=text)]`
- No store coordination
- Works with non-multi-threaded agents
## Benefits
1. **Non-Blocking Voice UX**: User can continue talking during long operations
2. **Transparent**: User doesn't need to know about threading
3. **Automatic Routing**: Service handles main/secondary routing automatically
4. **Store-Based**: No client-side coordination needed
5. **Backward Compatible**: Existing agents work without changes
## Testing
### With Web UI
1. Start LangGraph server: `langgraph dev`
2. Start pipeline: `python pipeline.py`
3. Open browser to `http://localhost:7860`
4. Select "Telco Agent"
5. Say: "Close my contract" β Confirm with "yes"
6. While processing, say: "What's the status?"
7. Agent should respond with progress while operation continues
### With Client Script
```bash
# Terminal 1: Start LangGraph
cd examples/voice_agent_multi_thread/agents
langgraph dev
# Terminal 2: Test with client
cd examples/voice_agent_multi_thread/agents
python telco_client.py --interactive
```
## Troubleshooting
### Messages always go to main thread
- Check that `enable_multi_threading=True`
- Verify long-running tools are writing status to store
- Check namespace matches: `("user_email", "tools_updates")`
### Secondary thread not responding
- Ensure secondary thread has limited tool set
- Check `SECONDARY_SYSTEM_PROMPT` in `react_agent.py`
- Verify `check_status` tool is included
### Synthesis not working
- Check `secondary_interim_messages` in store
- Verify meaningful messages filter in agent
- Check synthesis prompt in agent
## Performance
- **Store queries**: ~10-20ms per check
- **Thread switching**: Negligible (routing decision)
- **Memory overhead**: Two threads vs one
- **Latency impact**: Minimal (<50ms added per request)
## Future Enhancements
1. **Session persistence**: Store thread IDs in Redis
2. **Multiple long operations**: Queue system
3. **Progress streaming**: Real-time progress updates
4. **Cancellation**: User can cancel long operations
5. **Thread pooling**: Reuse secondary threads
## Related Files
- `langgraph_llm_service.py` - Service implementation
- `pipeline.py` - Pipeline configuration
- `react_agent.py` - Multi-threaded agent
- `tools.py` - Long-running tools with progress reporting
- `helper_functions.py` - Store coordination utilities
- `telco_client.py` - CLI test client
## Credits
Implementation: Option 1 (Tool-Level Designation)
Date: September 30, 2025
|