ChatbotRAG / agent_chat_stream.py
minhvtt's picture
Upload 16 files
7caa85c verified
raw
history blame
3.39 kB
"""
Agent Chat Streaming Endpoint
SSE-based real-time streaming for Sales & Feedback agents
"""
from typing import AsyncGenerator
from stream_utils import format_sse, EVENT_STATUS, EVENT_TOKEN, EVENT_DONE, EVENT_ERROR, EVENT_METADATA
from datetime import datetime
async def agent_chat_stream(
request,
agent_service,
conversation_service
) -> AsyncGenerator[str, None]:
"""
Stream agent responses in real-time (SSE format)
Args:
request: ChatRequest with message, session_id, mode, user_id
agent_service: AgentService instance
conversation_service: ConversationService instance
Yields SSE events:
- status: Processing updates
- token: Text chunks
- metadata: Session info
- done: Completion signal
- error: Error messages
"""
try:
# === SESSION MANAGEMENT ===
session_id = request.session_id
if not session_id:
session_id = conversation_service.create_session(
metadata={"user_agent": "api", "created_via": "agent_stream"},
user_id=request.user_id
)
yield format_sse(EVENT_METADATA, {"session_id": session_id})
# Get conversation history
history = conversation_service.get_history(session_id)
# Convert to messages format
messages = []
for h in history:
messages.append({"role": h["role"], "content": h["content"]})
# Determine mode
mode = getattr(request, 'mode', 'sales') # Default to sales
# === STATUS UPDATE ===
if mode == 'feedback':
yield format_sse(EVENT_STATUS, "Đang kiểm tra lịch sử sự kiện của bạn...")
else:
yield format_sse(EVENT_STATUS, "Đang tư vấn...")
# === CALL AGENT ===
result = await agent_service.chat(
user_message=request.message,
conversation_history=messages,
mode=mode,
user_id=request.user_id
)
agent_response = result["message"]
# === STREAM RESPONSE TOKEN BY TOKEN ===
# Simple character-by-character streaming
chunk_size = 5 # Characters per chunk
for i in range(0, len(agent_response), chunk_size):
chunk = agent_response[i:i+chunk_size]
yield format_sse(EVENT_TOKEN, chunk)
# Small delay for smoother streaming
import asyncio
await asyncio.sleep(0.02)
# === SAVE HISTORY ===
conversation_service.add_message(
session_id=session_id,
role="user",
content=request.message
)
conversation_service.add_message(
session_id=session_id,
role="assistant",
content=agent_response
)
# === DONE ===
yield format_sse(EVENT_DONE, {
"session_id": session_id,
"timestamp": datetime.utcnow().isoformat(),
"mode": mode,
"tool_calls": len(result.get("tool_calls", []))
})
except Exception as e:
print(f"⚠️ Agent Stream Error: {e}")
yield format_sse(EVENT_ERROR, str(e))