todoappapi / ws_manager /events.py
GrowWithTalha's picture
feat: sync backend changes from main repo
dc3879e
"""WebSocket event types and broadcasting utilities.
[Task]: T069, T070
[From]: specs/004-ai-chatbot/tasks.md
This module defines the event types for real-time progress streaming
and provides helper functions for broadcasting events to WebSocket clients.
"""
import json
import logging
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field
from ws_manager.manager import manager
logger = logging.getLogger("websockets.events")
class EventType(str, Enum):
"""WebSocket event types for real-time progress updates.
[From]: specs/004-ai-chatbot/plan.md - WebSocket Event Types table
Events flow in this order during AI agent processing:
1. connection_established - WebSocket connection confirmed
2. agent_thinking - AI agent is processing the request
3. tool_starting - A tool is about to be executed
4. tool_progress - Tool execution progress (e.g., "found 3 tasks")
5. tool_complete - Tool finished successfully
6. tool_error - Tool execution failed
7. agent_done - AI agent finished, final response ready
"""
CONNECTION_ESTABLISHED = "connection_established"
"""WebSocket connection successfully established."""
AGENT_THINKING = "agent_thinking"
"""AI agent is processing the user's request."""
TOOL_STARTING = "tool_starting"
"""A tool is about to be executed (e.g., "Searching your tasks...")."""
TOOL_PROGRESS = "tool_progress"
"""Tool execution intermediate progress (e.g., "Found 3 tasks")."""
TOOL_COMPLETE = "tool_complete"
"""Tool finished successfully with result."""
TOOL_ERROR = "tool_error"
"""Tool execution failed with error."""
AGENT_DONE = "agent_done"
"""AI agent finished processing, final response is ready."""
class ToolProgressEvent(BaseModel):
"""Structured event for tool execution progress.
[From]: specs/004-ai-chatbot/research.md - Section 4
This model is used to serialize progress events to JSON for WebSocket transmission.
All fields are optional except event_type and message to support different event types.
Attributes:
event_type: The type of event (from EventType enum)
tool: Name of the tool being executed (if applicable)
task_id: ID of a task being operated on (if applicable)
count: Numeric count for progress (e.g., tasks found)
message: Human-readable progress message
result: Tool execution result (for tool_complete events)
error: Error message (for tool_error events)
"""
event_type: EventType = Field(
...,
description="Type of progress event"
)
tool: str | None = Field(
None,
description="Name of the tool being executed (e.g., 'list_tasks')"
)
task_id: str | None = Field(
None,
description="ID of a task being operated on"
)
count: int | None = Field(
None,
description="Numeric count for progress (e.g., number of tasks found)"
)
message: str = Field(
...,
description="Human-readable progress message"
)
result: dict[str, Any] | None = Field(
None,
description="Tool execution result (for tool_complete events)"
)
error: str | None = Field(
None,
description="Error message (for tool_error events)"
)
# User-friendly message templates for tool events
# [From]: specs/004-ai-chatbot/research.md - Section 6
TOOL_STARTING_MESSAGES = {
"list_tasks": "Searching your tasks...",
"add_task": "Creating a new task...",
"update_task": "Updating your task...",
"complete_task": "Updating task status...",
"delete_task": "Deleting your task...",
"complete_all_tasks": "Marking tasks as complete...",
"delete_all_tasks": "Deleting tasks...",
}
TOOL_COMPLETE_MESSAGES = {
"list_tasks": lambda count: f"Found {count} task{'s' if count != 1 else ''}",
"add_task": lambda task: f"Created: {task.get('title', 'Task')}",
"update_task": lambda task: "Task updated",
"complete_task": lambda _: "Task status updated",
"delete_task": lambda _: "Task deleted",
"complete_all_tasks": lambda count: f"Marked {count} task{'s' if count != 1 else ''} as complete",
"delete_all_tasks": lambda count: f"Deleted {count} task{'s' if count != 1 else ''}",
}
def format_tool_starting_message(tool: str, params: dict[str, Any] | None = None) -> str:
"""Generate user-friendly message for tool starting event.
[From]: specs/004-ai-chatbot/research.md - Section 6
Args:
tool: The tool name being executed
params: Optional tool parameters for context
Returns:
User-friendly message describing what's happening
"""
return TOOL_STARTING_MESSAGES.get(tool, f"Running {tool}...")
def format_tool_complete_message(tool: str, result: dict[str, Any]) -> str:
"""Generate user-friendly message for tool complete event.
[From]: specs/004-ai-chatbot/research.md - Section 6
Args:
tool: The tool name that completed
result: The tool execution result
Returns:
User-friendly message describing the result
"""
message_func = TOOL_COMPLETE_MESSAGES.get(tool)
if message_func:
try:
return message_func(result)
except (KeyError, TypeError):
return f"Completed {tool}"
return f"Completed {tool}"
async def broadcast_progress(user_id: str, event: ToolProgressEvent) -> None:
"""Send progress event to all WebSocket connections for a user.
[From]: specs/004-ai-chatbot/research.md - Section 4
[Task]: T070
This is the primary function called by the AI agent to broadcast
progress events during tool execution. It's non-blocking - if
WebSocket fails, the AI processing continues.
Args:
user_id: The user's unique identifier (UUID string)
event: The ToolProgressEvent to broadcast
Example:
await broadcast_progress(user_id, ToolProgressEvent(
event_type=EventType.TOOL_COMPLETE,
tool="list_tasks",
message="Found 3 tasks",
count=3
))
"""
try:
await manager.broadcast(user_id, event.model_dump())
logger.debug(f"Broadcasted {event.event_type} event for user {user_id}")
except Exception as e:
# Log but don't raise - WebSocket failures shouldn't block AI processing
logger.warning(f"Failed to broadcast progress event for user {user_id}: {e}")
async def broadcast_agent_thinking(user_id: str) -> None:
"""Broadcast that AI agent is thinking.
Helper function for common event type.
Args:
user_id: The user's unique identifier
"""
await broadcast_progress(user_id, ToolProgressEvent(
event_type=EventType.AGENT_THINKING,
message="Processing your request..."
))
async def broadcast_tool_starting(user_id: str, tool: str, params: dict[str, Any] | None = None) -> None:
"""Broadcast that a tool is starting execution.
Helper function for common event type.
Args:
user_id: The user's unique identifier
tool: The tool name
params: Optional tool parameters
"""
await broadcast_progress(user_id, ToolProgressEvent(
event_type=EventType.TOOL_STARTING,
tool=tool,
message=format_tool_starting_message(tool, params)
))
async def broadcast_tool_progress(user_id: str, tool: str, message: str, count: int | None = None) -> None:
"""Broadcast tool execution progress.
Helper function for common event type.
Args:
user_id: The user's unique identifier
tool: The tool name
message: Progress message
count: Optional count for progress
"""
await broadcast_progress(user_id, ToolProgressEvent(
event_type=EventType.TOOL_PROGRESS,
tool=tool,
message=message,
count=count
))
async def broadcast_tool_complete(user_id: str, tool: str, result: dict[str, Any]) -> None:
"""Broadcast that a tool completed successfully.
Helper function for common event type.
Args:
user_id: The user's unique identifier
tool: The tool name
result: Tool execution result
"""
await broadcast_progress(user_id, ToolProgressEvent(
event_type=EventType.TOOL_COMPLETE,
tool=tool,
message=format_tool_complete_message(tool, result),
result=result
))
async def broadcast_tool_error(user_id: str, tool: str, error: str) -> None:
"""Broadcast that a tool execution failed.
Helper function for common event type.
Args:
user_id: The user's unique identifier
tool: The tool name
error: Error message
"""
await broadcast_progress(user_id, ToolProgressEvent(
event_type=EventType.TOOL_ERROR,
tool=tool,
message=f"Error in {tool}: {error}",
error=error
))
async def broadcast_agent_done(user_id: str, response: str) -> None:
"""Broadcast that AI agent finished processing.
Helper function for common event type.
Args:
user_id: The user's unique identifier
response: The final AI response
"""
await broadcast_progress(user_id, ToolProgressEvent(
event_type=EventType.AGENT_DONE,
message="Done!",
result={"response": response}
))