Spaces:
Running
Running
| """ | |
| Chat API Routes — Standard and streaming endpoints for the Awn AI chatbot. | |
| """ | |
| from fastapi import APIRouter, HTTPException, Query, Request | |
| from fastapi.responses import StreamingResponse | |
| import json | |
| import logging | |
| from app.schemas import ChatRequest, ChatResponse, ChatMode, ToolConfirmationRequest | |
| from app.services.chat.chat_engine import chat_engine | |
| from app.services.chat.api.llm_router import llm_router | |
| from app.services.chat.confirmation_manager import get_confirmation_manager, ConfirmationStatus | |
| from app.core.rate_limiter import chat_rate_limiter, global_rate_limiter, ip_rate_limiter | |
| router = APIRouter() | |
| logger = logging.getLogger(__name__) | |
| async def chat_endpoint(payload: ChatRequest, request: Request): | |
| """ | |
| Processes standard text-based chat interactions returning a single synchronous response. | |
| Expects a ChatRequest payload containing the user message, session ID, and chat mode. | |
| Under the hood, it applies rate-limiting, RAG context retrieval, and multi-provider LLM processing. | |
| Returns a ChatResponse encompassing the generated text, updated history, and optional tool confirmations. | |
| """ | |
| try: | |
| user_key = payload.session_id or "anonymous" | |
| client_ip = request.client.host if request.client else "unknown_ip" | |
| # 1. Global Quota Protection | |
| global_allowed, global_retry = await global_rate_limiter.is_allowed("global_system") | |
| if not global_allowed: | |
| raise HTTPException( | |
| status_code=503, | |
| detail="System under heavy load. Please try again later.", | |
| headers={"Retry-After": str(global_retry)}, | |
| ) | |
| # 1.5. IP Quota Protection | |
| ip_allowed, ip_retry = await ip_rate_limiter.is_allowed(client_ip) | |
| if not ip_allowed: | |
| raise HTTPException( | |
| status_code=429, | |
| detail="Too many requests from this IP.", | |
| headers={"Retry-After": str(ip_retry)}, | |
| ) | |
| # 2. Per-User Quota Protection | |
| allowed, retry_after = await chat_rate_limiter.is_allowed(user_key) | |
| if not allowed: | |
| raise HTTPException( | |
| status_code=429, | |
| detail=f"Rate limit exceeded. Try again in {retry_after} seconds.", | |
| headers={"Retry-After": str(retry_after)}, | |
| ) | |
| # Router handles all provider availability, fallback, and degradation internally | |
| mode = getattr(payload, "mode", ChatMode.AGENT) | |
| response_text, updated_history, confirmation = await chat_engine.get_chat_response( | |
| message=payload.message, | |
| history=payload.history, | |
| session_id=payload.session_id, | |
| mode=mode, | |
| family_id=payload.family_id, | |
| access_token=payload.access_token, | |
| ) | |
| return ChatResponse( | |
| response=response_text, | |
| history=updated_history, | |
| confirmation=confirmation | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error("Error in chat endpoint: %s", e, exc_info=True) | |
| raise HTTPException(status_code=500, detail="Failed to get response from Chatbot.") | |
| async def chat_stream(payload: ChatRequest, request: Request): | |
| """ | |
| Handles streaming chat interactions utilizing Server-Sent Events (SSE) for low-latency responses. | |
| Accepts the identical ChatRequest payload but yields generated text tokens in real-time. | |
| Implements multi-tiered rate limiting and routes through the robust AI provider circuit-breaker. | |
| Returns a sequential `text/event-stream` ending gracefully with a `data: [DONE]` signal. | |
| """ | |
| try: | |
| user_key = payload.session_id or "anonymous" | |
| client_ip = request.client.host if request.client else "unknown_ip" | |
| # 1. Global Quota Protection | |
| global_allowed, global_retry = await global_rate_limiter.is_allowed("global_system") | |
| if not global_allowed: | |
| raise HTTPException( | |
| status_code=503, | |
| detail="System under heavy load. Please try again later.", | |
| headers={"Retry-After": str(global_retry)}, | |
| ) | |
| # 1.5. IP Quota Protection | |
| ip_allowed, ip_retry = await ip_rate_limiter.is_allowed(client_ip) | |
| if not ip_allowed: | |
| raise HTTPException( | |
| status_code=429, | |
| detail="Too many requests from this IP.", | |
| headers={"Retry-After": str(ip_retry)}, | |
| ) | |
| # 2. Per-User Quota Protection | |
| allowed, retry_after = await chat_rate_limiter.is_allowed(user_key) | |
| if not allowed: | |
| raise HTTPException( | |
| status_code=429, | |
| detail=f"Rate limit exceeded. Try again in {retry_after} seconds.", | |
| headers={"Retry-After": str(retry_after)}, | |
| ) | |
| # Router handles all provider availability, fallback, and degradation internally | |
| async def safe_stream(): | |
| """Wrapper to catch errors during streaming iteration.""" | |
| try: | |
| mode = getattr(payload, "mode", ChatMode.AGENT) | |
| async for chunk in chat_engine.stream_chat_response( | |
| message=payload.message, | |
| history=payload.history, | |
| session_id=payload.session_id, | |
| mode=mode, | |
| family_id=payload.family_id, | |
| access_token=payload.access_token, | |
| ): | |
| yield chunk | |
| except Exception as stream_err: | |
| logger.error("Error during streaming chat: %s", stream_err, exc_info=True) | |
| msg = "عذراً، حدث خطأ فني أثناء البث." | |
| yield f"data: {json.dumps(msg, ensure_ascii=False)}\n\n" | |
| yield "data: [DONE]\n\n" | |
| return StreamingResponse( | |
| safe_stream(), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no", # Disable Nginx buffering | |
| "Content-Encoding": "identity", # Bypass GZip middleware — CRITICAL for streaming | |
| }, | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error("Error in streaming chat: %s", e, exc_info=True) | |
| raise HTTPException(status_code=500, detail="Failed to start streaming chat.") | |
| async def chat_confirm( | |
| confirmation_id: str, | |
| approved: bool = Query(..., description="User decision: true to approve, false to reject"), | |
| ): | |
| """ | |
| Processes mandatory user confirmations for sensitive tool operations triggered in AI Agent mode. | |
| Requires a valid confirmation UUID and a boolean decision matrix (true to auto-approve, false to reject). | |
| Validates expiration state, shifts the execution tracker, and securely unblocks backend tasks. | |
| Returns a structured confirmation block containing the execution outcome and localized UI alerts. | |
| """ | |
| try: | |
| confirmation_manager = get_confirmation_manager() | |
| confirmation = confirmation_manager.get_confirmation(confirmation_id) | |
| if not confirmation: | |
| raise HTTPException(status_code=404, detail="Confirmation request not found or expired.") | |
| if confirmation.status != ConfirmationStatus.PENDING: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Confirmation already processed: {confirmation.status.value}" | |
| ) | |
| # Process approval or rejection | |
| if approved: | |
| success = confirmation_manager.approve(confirmation_id) | |
| if success: | |
| logger.info(f"Confirmation {confirmation_id} approved: {confirmation.tool_name}") | |
| return { | |
| "status": "approved", | |
| "confirmation_id": confirmation_id, | |
| "message": "✅ تمت الموافقة على العملية. جاري التنفيذ...", | |
| "tool_name": confirmation.tool_name, | |
| } | |
| else: | |
| raise HTTPException(status_code=400, detail="Failed to approve confirmation.") | |
| else: | |
| success = confirmation_manager.reject(confirmation_id) | |
| if success: | |
| logger.info(f"Confirmation {confirmation_id} rejected: {confirmation.tool_name}") | |
| return { | |
| "status": "rejected", | |
| "confirmation_id": confirmation_id, | |
| "message": "❌ تم رفض العملية.", | |
| "tool_name": confirmation.tool_name, | |
| } | |
| else: | |
| raise HTTPException(status_code=400, detail="Failed to reject confirmation.") | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error in confirm endpoint: {e}", exc_info=True) | |
| raise HTTPException(status_code=500, detail="Failed to process confirmation.") | |