Spaces:
Running
Running
File size: 9,388 Bytes
dc3879e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 |
"""WebSocket event types and broadcasting utilities.
[Task]: T069, T070
[From]: specs/004-ai-chatbot/tasks.md
This module defines the event types for real-time progress streaming
and provides helper functions for broadcasting events to WebSocket clients.
"""
import json
import logging
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field
from ws_manager.manager import manager
logger = logging.getLogger("websockets.events")
class EventType(str, Enum):
"""WebSocket event types for real-time progress updates.
[From]: specs/004-ai-chatbot/plan.md - WebSocket Event Types table
Events flow in this order during AI agent processing:
1. connection_established - WebSocket connection confirmed
2. agent_thinking - AI agent is processing the request
3. tool_starting - A tool is about to be executed
4. tool_progress - Tool execution progress (e.g., "found 3 tasks")
5. tool_complete - Tool finished successfully
6. tool_error - Tool execution failed
7. agent_done - AI agent finished, final response ready
"""
CONNECTION_ESTABLISHED = "connection_established"
"""WebSocket connection successfully established."""
AGENT_THINKING = "agent_thinking"
"""AI agent is processing the user's request."""
TOOL_STARTING = "tool_starting"
"""A tool is about to be executed (e.g., "Searching your tasks...")."""
TOOL_PROGRESS = "tool_progress"
"""Tool execution intermediate progress (e.g., "Found 3 tasks")."""
TOOL_COMPLETE = "tool_complete"
"""Tool finished successfully with result."""
TOOL_ERROR = "tool_error"
"""Tool execution failed with error."""
AGENT_DONE = "agent_done"
"""AI agent finished processing, final response is ready."""
class ToolProgressEvent(BaseModel):
"""Structured event for tool execution progress.
[From]: specs/004-ai-chatbot/research.md - Section 4
This model is used to serialize progress events to JSON for WebSocket transmission.
All fields are optional except event_type and message to support different event types.
Attributes:
event_type: The type of event (from EventType enum)
tool: Name of the tool being executed (if applicable)
task_id: ID of a task being operated on (if applicable)
count: Numeric count for progress (e.g., tasks found)
message: Human-readable progress message
result: Tool execution result (for tool_complete events)
error: Error message (for tool_error events)
"""
event_type: EventType = Field(
...,
description="Type of progress event"
)
tool: str | None = Field(
None,
description="Name of the tool being executed (e.g., 'list_tasks')"
)
task_id: str | None = Field(
None,
description="ID of a task being operated on"
)
count: int | None = Field(
None,
description="Numeric count for progress (e.g., number of tasks found)"
)
message: str = Field(
...,
description="Human-readable progress message"
)
result: dict[str, Any] | None = Field(
None,
description="Tool execution result (for tool_complete events)"
)
error: str | None = Field(
None,
description="Error message (for tool_error events)"
)
# User-friendly message templates for tool events
# [From]: specs/004-ai-chatbot/research.md - Section 6
TOOL_STARTING_MESSAGES = {
"list_tasks": "Searching your tasks...",
"add_task": "Creating a new task...",
"update_task": "Updating your task...",
"complete_task": "Updating task status...",
"delete_task": "Deleting your task...",
"complete_all_tasks": "Marking tasks as complete...",
"delete_all_tasks": "Deleting tasks...",
}
TOOL_COMPLETE_MESSAGES = {
"list_tasks": lambda count: f"Found {count} task{'s' if count != 1 else ''}",
"add_task": lambda task: f"Created: {task.get('title', 'Task')}",
"update_task": lambda task: "Task updated",
"complete_task": lambda _: "Task status updated",
"delete_task": lambda _: "Task deleted",
"complete_all_tasks": lambda count: f"Marked {count} task{'s' if count != 1 else ''} as complete",
"delete_all_tasks": lambda count: f"Deleted {count} task{'s' if count != 1 else ''}",
}
def format_tool_starting_message(tool: str, params: dict[str, Any] | None = None) -> str:
"""Generate user-friendly message for tool starting event.
[From]: specs/004-ai-chatbot/research.md - Section 6
Args:
tool: The tool name being executed
params: Optional tool parameters for context
Returns:
User-friendly message describing what's happening
"""
return TOOL_STARTING_MESSAGES.get(tool, f"Running {tool}...")
def format_tool_complete_message(tool: str, result: dict[str, Any]) -> str:
"""Generate user-friendly message for tool complete event.
[From]: specs/004-ai-chatbot/research.md - Section 6
Args:
tool: The tool name that completed
result: The tool execution result
Returns:
User-friendly message describing the result
"""
message_func = TOOL_COMPLETE_MESSAGES.get(tool)
if message_func:
try:
return message_func(result)
except (KeyError, TypeError):
return f"Completed {tool}"
return f"Completed {tool}"
async def broadcast_progress(user_id: str, event: ToolProgressEvent) -> None:
"""Send progress event to all WebSocket connections for a user.
[From]: specs/004-ai-chatbot/research.md - Section 4
[Task]: T070
This is the primary function called by the AI agent to broadcast
progress events during tool execution. It's non-blocking - if
WebSocket fails, the AI processing continues.
Args:
user_id: The user's unique identifier (UUID string)
event: The ToolProgressEvent to broadcast
Example:
await broadcast_progress(user_id, ToolProgressEvent(
event_type=EventType.TOOL_COMPLETE,
tool="list_tasks",
message="Found 3 tasks",
count=3
))
"""
try:
await manager.broadcast(user_id, event.model_dump())
logger.debug(f"Broadcasted {event.event_type} event for user {user_id}")
except Exception as e:
# Log but don't raise - WebSocket failures shouldn't block AI processing
logger.warning(f"Failed to broadcast progress event for user {user_id}: {e}")
async def broadcast_agent_thinking(user_id: str) -> None:
"""Broadcast that AI agent is thinking.
Helper function for common event type.
Args:
user_id: The user's unique identifier
"""
await broadcast_progress(user_id, ToolProgressEvent(
event_type=EventType.AGENT_THINKING,
message="Processing your request..."
))
async def broadcast_tool_starting(user_id: str, tool: str, params: dict[str, Any] | None = None) -> None:
"""Broadcast that a tool is starting execution.
Helper function for common event type.
Args:
user_id: The user's unique identifier
tool: The tool name
params: Optional tool parameters
"""
await broadcast_progress(user_id, ToolProgressEvent(
event_type=EventType.TOOL_STARTING,
tool=tool,
message=format_tool_starting_message(tool, params)
))
async def broadcast_tool_progress(user_id: str, tool: str, message: str, count: int | None = None) -> None:
"""Broadcast tool execution progress.
Helper function for common event type.
Args:
user_id: The user's unique identifier
tool: The tool name
message: Progress message
count: Optional count for progress
"""
await broadcast_progress(user_id, ToolProgressEvent(
event_type=EventType.TOOL_PROGRESS,
tool=tool,
message=message,
count=count
))
async def broadcast_tool_complete(user_id: str, tool: str, result: dict[str, Any]) -> None:
"""Broadcast that a tool completed successfully.
Helper function for common event type.
Args:
user_id: The user's unique identifier
tool: The tool name
result: Tool execution result
"""
await broadcast_progress(user_id, ToolProgressEvent(
event_type=EventType.TOOL_COMPLETE,
tool=tool,
message=format_tool_complete_message(tool, result),
result=result
))
async def broadcast_tool_error(user_id: str, tool: str, error: str) -> None:
"""Broadcast that a tool execution failed.
Helper function for common event type.
Args:
user_id: The user's unique identifier
tool: The tool name
error: Error message
"""
await broadcast_progress(user_id, ToolProgressEvent(
event_type=EventType.TOOL_ERROR,
tool=tool,
message=f"Error in {tool}: {error}",
error=error
))
async def broadcast_agent_done(user_id: str, response: str) -> None:
"""Broadcast that AI agent finished processing.
Helper function for common event type.
Args:
user_id: The user's unique identifier
response: The final AI response
"""
await broadcast_progress(user_id, ToolProgressEvent(
event_type=EventType.AGENT_DONE,
message="Done!",
result={"response": response}
))
|