Aoun-Ai / app /api /chat.py
MuhammadMahmoud's picture
enhance rag
468ea61
"""
Chat API Routes — Standard and streaming endpoints for the Awn AI chatbot.
"""
from fastapi import APIRouter, HTTPException, Query, Request
from fastapi.responses import StreamingResponse
import json
import logging
from app.schemas import ChatRequest, ChatResponse, ChatMode, ToolConfirmationRequest
from app.services.chat.chat_engine import chat_engine
from app.services.chat.api.llm_router import llm_router
from app.services.chat.confirmation_manager import get_confirmation_manager, ConfirmationStatus
from app.core.rate_limiter import chat_rate_limiter, global_rate_limiter, ip_rate_limiter
router = APIRouter()
logger = logging.getLogger(__name__)
@router.post("/chat", response_model=ChatResponse)
async def chat_endpoint(payload: ChatRequest, request: Request):
"""
Processes standard text-based chat interactions returning a single synchronous response.
Expects a ChatRequest payload containing the user message, session ID, and chat mode.
Under the hood, it applies rate-limiting, RAG context retrieval, and multi-provider LLM processing.
Returns a ChatResponse encompassing the generated text, updated history, and optional tool confirmations.
"""
try:
user_key = payload.session_id or "anonymous"
client_ip = request.client.host if request.client else "unknown_ip"
# 1. Global Quota Protection
global_allowed, global_retry = await global_rate_limiter.is_allowed("global_system")
if not global_allowed:
raise HTTPException(
status_code=503,
detail="System under heavy load. Please try again later.",
headers={"Retry-After": str(global_retry)},
)
# 1.5. IP Quota Protection
ip_allowed, ip_retry = await ip_rate_limiter.is_allowed(client_ip)
if not ip_allowed:
raise HTTPException(
status_code=429,
detail="Too many requests from this IP.",
headers={"Retry-After": str(ip_retry)},
)
# 2. Per-User Quota Protection
allowed, retry_after = await chat_rate_limiter.is_allowed(user_key)
if not allowed:
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Try again in {retry_after} seconds.",
headers={"Retry-After": str(retry_after)},
)
# Router handles all provider availability, fallback, and degradation internally
mode = getattr(payload, "mode", ChatMode.AGENT)
response_text, updated_history, confirmation = await chat_engine.get_chat_response(
message=payload.message,
history=payload.history,
session_id=payload.session_id,
mode=mode,
family_id=payload.family_id,
access_token=payload.access_token,
)
return ChatResponse(
response=response_text,
history=updated_history,
confirmation=confirmation
)
except HTTPException:
raise
except Exception as e:
logger.error("Error in chat endpoint: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail="Failed to get response from Chatbot.")
@router.post(
"/chat/stream",
response_class=StreamingResponse,
responses={
200: {
"content": {"text/event-stream": {"schema": {"type": "string"}}},
"description": "Stream of text chunks (SSE). Each line: `data: <text>`. Ends with `data: [DONE]`.",
}
},
summary="Streaming Chat (SSE)",
)
async def chat_stream(payload: ChatRequest, request: Request):
"""
Handles streaming chat interactions utilizing Server-Sent Events (SSE) for low-latency responses.
Accepts the identical ChatRequest payload but yields generated text tokens in real-time.
Implements multi-tiered rate limiting and routes through the robust AI provider circuit-breaker.
Returns a sequential `text/event-stream` ending gracefully with a `data: [DONE]` signal.
"""
try:
user_key = payload.session_id or "anonymous"
client_ip = request.client.host if request.client else "unknown_ip"
# 1. Global Quota Protection
global_allowed, global_retry = await global_rate_limiter.is_allowed("global_system")
if not global_allowed:
raise HTTPException(
status_code=503,
detail="System under heavy load. Please try again later.",
headers={"Retry-After": str(global_retry)},
)
# 1.5. IP Quota Protection
ip_allowed, ip_retry = await ip_rate_limiter.is_allowed(client_ip)
if not ip_allowed:
raise HTTPException(
status_code=429,
detail="Too many requests from this IP.",
headers={"Retry-After": str(ip_retry)},
)
# 2. Per-User Quota Protection
allowed, retry_after = await chat_rate_limiter.is_allowed(user_key)
if not allowed:
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Try again in {retry_after} seconds.",
headers={"Retry-After": str(retry_after)},
)
# Router handles all provider availability, fallback, and degradation internally
async def safe_stream():
"""Wrapper to catch errors during streaming iteration."""
try:
mode = getattr(payload, "mode", ChatMode.AGENT)
async for chunk in chat_engine.stream_chat_response(
message=payload.message,
history=payload.history,
session_id=payload.session_id,
mode=mode,
family_id=payload.family_id,
access_token=payload.access_token,
):
yield chunk
except Exception as stream_err:
logger.error("Error during streaming chat: %s", stream_err, exc_info=True)
msg = "عذراً، حدث خطأ فني أثناء البث."
yield f"data: {json.dumps(msg, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
safe_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable Nginx buffering
"Content-Encoding": "identity", # Bypass GZip middleware — CRITICAL for streaming
},
)
except HTTPException:
raise
except Exception as e:
logger.error("Error in streaming chat: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail="Failed to start streaming chat.")
@router.post("/chat/confirm")
async def chat_confirm(
confirmation_id: str,
approved: bool = Query(..., description="User decision: true to approve, false to reject"),
):
"""
Processes mandatory user confirmations for sensitive tool operations triggered in AI Agent mode.
Requires a valid confirmation UUID and a boolean decision matrix (true to auto-approve, false to reject).
Validates expiration state, shifts the execution tracker, and securely unblocks backend tasks.
Returns a structured confirmation block containing the execution outcome and localized UI alerts.
"""
try:
confirmation_manager = get_confirmation_manager()
confirmation = confirmation_manager.get_confirmation(confirmation_id)
if not confirmation:
raise HTTPException(status_code=404, detail="Confirmation request not found or expired.")
if confirmation.status != ConfirmationStatus.PENDING:
raise HTTPException(
status_code=400,
detail=f"Confirmation already processed: {confirmation.status.value}"
)
# Process approval or rejection
if approved:
success = confirmation_manager.approve(confirmation_id)
if success:
logger.info(f"Confirmation {confirmation_id} approved: {confirmation.tool_name}")
return {
"status": "approved",
"confirmation_id": confirmation_id,
"message": "✅ تمت الموافقة على العملية. جاري التنفيذ...",
"tool_name": confirmation.tool_name,
}
else:
raise HTTPException(status_code=400, detail="Failed to approve confirmation.")
else:
success = confirmation_manager.reject(confirmation_id)
if success:
logger.info(f"Confirmation {confirmation_id} rejected: {confirmation.tool_name}")
return {
"status": "rejected",
"confirmation_id": confirmation_id,
"message": "❌ تم رفض العملية.",
"tool_name": confirmation.tool_name,
}
else:
raise HTTPException(status_code=400, detail="Failed to reject confirmation.")
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in confirm endpoint: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Failed to process confirmation.")