Spaces:
Running
Running
File size: 25,621 Bytes
dc3879e a57a50a dc3879e a57a50a dc3879e a57a50a dc3879e a57a50a dc3879e a57a50a dc3879e a57a50a dc3879e a57a50a dc3879e a57a50a dc3879e a57a50a dc3879e a57a50a dc3879e a57a50a dc3879e a57a50a dc3879e a57a50a dc3879e a57a50a dc3879e a57a50a dc3879e a57a50a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 |
"""Chat API endpoint for AI-powered task management.
[Task]: T015, T071
[From]: specs/004-ai-chatbot/tasks.md
This endpoint provides a conversational interface for task management.
Users can create, list, update, complete, and delete tasks through natural language.
Also includes WebSocket endpoint for real-time progress streaming.
"""
import uuid
import logging
import asyncio
from datetime import datetime
from typing import Annotated, Optional
from fastapi import APIRouter, HTTPException, status, Depends, BackgroundTasks, Request
from pydantic import BaseModel, Field, field_validator, ValidationError
from sqlmodel import Session
from sqlalchemy.exc import SQLAlchemyError
from core.database import get_db
from core.validators import validate_message_length
from core.security import decode_access_token
from models.message import Message, MessageRole
from services.security import sanitize_message
from models.conversation import Conversation
from ai_agent import run_agent, is_gemini_configured
from services.conversation import (
get_or_create_conversation,
load_conversation_history,
update_conversation_timestamp
)
from services.rate_limiter import check_rate_limit
# Configure error logger
error_logger = logging.getLogger("api.errors")
error_logger.setLevel(logging.ERROR)
# Request/Response models
class ChatRequest(BaseModel):
"""Request model for chat endpoint.
[From]: specs/004-ai-chatbot/plan.md - API Contract
"""
message: str = Field(
...,
description="User message content",
min_length=1,
max_length=10000 # FR-042
)
conversation_id: Optional[str] = Field(
None,
description="Optional conversation ID to continue existing conversation"
)
@field_validator('message')
@classmethod
def validate_message(cls, v: str) -> str:
"""Validate message content."""
if not v or not v.strip():
raise ValueError("Message content cannot be empty")
if len(v) > 10000:
raise ValueError("Message content exceeds maximum length of 10,000 characters")
return v.strip()
class TaskReference(BaseModel):
"""Reference to a task created or modified by AI."""
id: str
title: str
description: Optional[str] = None
due_date: Optional[str] = None
priority: Optional[str] = None
completed: bool = False
class ChatResponse(BaseModel):
"""Response model for chat endpoint.
[From]: specs/004-ai-chatbot/plan.md - API Contract
"""
response: str = Field(
...,
description="AI assistant's text response"
)
conversation_id: str = Field(
...,
description="Conversation ID (new or existing)"
)
tasks: list[TaskReference] = Field(
default_factory=list,
description="List of tasks created or modified in this interaction"
)
# Create API router
router = APIRouter(prefix="/api", tags=["chat"])
@router.post("/{user_id}/chat", response_model=ChatResponse, status_code=status.HTTP_200_OK)
async def chat(
user_id: str,
request: ChatRequest,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db)
):
"""Process user message through AI agent and return response.
[From]: specs/004-ai-chatbot/spec.md - US1
This endpoint:
1. Validates user input and rate limits
2. Gets or creates conversation
3. Runs AI agent with WebSocket progress streaming
4. Returns AI response immediately
5. Saves messages to DB in background (non-blocking)
Args:
user_id: User ID (UUID string from path)
request: Chat request with message and optional conversation_id
background_tasks: FastAPI background tasks for non-blocking DB saves
db: Database session
Returns:
ChatResponse with AI response, conversation_id, and task references
Raises:
HTTPException 400: Invalid message content
HTTPException 503: AI service unavailable
"""
# Check if Gemini API is configured
# [From]: specs/004-ai-chatbot/tasks.md - T022
# [From]: T060 - Add comprehensive error messages for edge cases
if not is_gemini_configured():
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail={
"error": "AI service unavailable",
"message": "The AI service is currently not configured. Please ensure GEMINI_API_KEY is set in the environment.",
"suggestion": "Contact your administrator or check your API key configuration."
}
)
# Validate user_id format
# [From]: T060 - Add comprehensive error messages for edge cases
try:
user_uuid = uuid.UUID(user_id)
except ValueError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"error": "Invalid user ID",
"message": f"User ID '{user_id}' is not a valid UUID format.",
"expected_format": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
"suggestion": "Ensure you are using a valid UUID for the user_id path parameter."
}
)
# Validate message content
# [From]: T060 - Add comprehensive error messages for edge cases
try:
validated_message = validate_message_length(request.message)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"error": "Message validation failed",
"message": str(e),
"max_length": 10000,
"suggestion": "Keep your message under 10,000 characters and ensure it contains meaningful content."
}
)
# Sanitize message to prevent prompt injection
# [From]: T057 - Implement prompt injection sanitization
# [From]: T060 - Add comprehensive error messages for edge cases
try:
sanitized_message = sanitize_message(validated_message)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"error": "Message content blocked",
"message": str(e),
"suggestion": "Please rephrase your message without attempting to manipulate system instructions."
}
)
# Check rate limit
# [From]: specs/004-ai-chatbot/spec.md - NFR-011
# [From]: T021 - Implement daily message limit enforcement (100/day)
# [From]: T060 - Add comprehensive error messages for edge cases
try:
allowed, remaining, reset_time = check_rate_limit(db, user_uuid)
if not allowed:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail={
"error": "Rate limit exceeded",
"message": "You have reached the daily message limit. Please try again later.",
"limit": 100,
"resets_at": reset_time.isoformat() if reset_time else None,
"suggestion": "Free tier accounts are limited to 100 messages per day. Upgrade for unlimited access."
}
)
except HTTPException:
# Re-raise HTTP exceptions (rate limit errors)
raise
except Exception as e:
# Log unexpected errors but don't block the request
error_logger.error(f"Rate limit check failed for user {user_id}: {e}")
# Continue processing - fail open for rate limit errors
# Get or create conversation
# [From]: T016 - Implement conversation history loading
# [From]: T035 - Handle auto-deleted conversations gracefully
# [From]: T060 - Add comprehensive error messages for edge cases
conversation_id: uuid.UUID
if request.conversation_id:
# Load existing conversation using service
try:
conv_uuid = uuid.UUID(request.conversation_id)
except ValueError:
# Invalid conversation_id format
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"error": "Invalid conversation ID",
"message": f"Conversation ID '{request.conversation_id}' is not a valid UUID format.",
"suggestion": "Provide a valid UUID or omit the conversation_id to start a new conversation."
}
)
try:
conversation = get_or_create_conversation(
db=db,
user_id=user_uuid,
conversation_id=conv_uuid
)
conversation_id = conversation.id
except (KeyError, ValueError) as e:
# Conversation may have been auto-deleted (90-day policy) or otherwise not found
# [From]: T035 - Handle auto-deleted conversations gracefully
# Create a new conversation instead of failing
conversation = get_or_create_conversation(
db=db,
user_id=user_uuid
)
conversation_id = conversation.id
else:
# Create new conversation using service
conversation = get_or_create_conversation(
db=db,
user_id=user_uuid
)
conversation_id = conversation.id
# Load conversation history using service
# [From]: T016 - Implement conversation history loading
# [From]: T060 - Add comprehensive error messages for edge cases
try:
conversation_history = load_conversation_history(
db=db,
conversation_id=conversation_id
)
except SQLAlchemyError as e:
error_logger.error(f"Database error loading conversation history for {conversation_id}: {e}")
# Continue with empty history if load fails
conversation_history = []
# Prepare user message for background save
user_message_id = uuid.uuid4()
user_message_data = {
"id": user_message_id,
"conversation_id": conversation_id,
"user_id": user_uuid,
"role": MessageRole.USER,
"content": sanitized_message,
"created_at": datetime.utcnow()
}
# Add current user message to conversation history for AI processing
# This is critical - the agent needs the user's current message in context
messages_for_agent = conversation_history + [
{"role": "user", "content": sanitized_message}
]
# Run AI agent (non-streaming for legacy endpoint)
# [From]: T014 - Initialize OpenAI Agents SDK with Gemini
# NOTE: Streaming is now handled by ChatKit SSE endpoint
# [From]: T060 - Add comprehensive error messages for edge cases
try:
ai_response_text = await run_agent(
messages=messages_for_agent,
user_id=user_id
)
except ValueError as e:
# Configuration errors (missing API key, invalid model)
# [From]: T022 - Add error handling for Gemini API unavailability
error_logger.error(f"AI configuration error for user {user_id}: {e}")
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail={
"error": "AI service configuration error",
"message": str(e),
"suggestion": "Verify GEMINI_API_KEY and GEMINI_MODEL are correctly configured."
}
)
except ConnectionError as e:
# Network/connection issues
# [From]: T022 - Add error handling for Gemini API unavailability
error_logger.error(f"AI connection error for user {user_id}: {e}")
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail={
"error": "AI service unreachable",
"message": "Could not connect to the AI service. Please check your network connection.",
"suggestion": "If the problem persists, the AI service may be temporarily down."
}
)
except TimeoutError as e:
# Timeout errors
# [From]: T022 - Add error handling for Gemini API unavailability
error_logger.error(f"AI timeout error for user {user_id}: {e}")
raise HTTPException(
status_code=status.HTTP_504_GATEWAY_TIMEOUT,
detail={
"error": "AI service timeout",
"message": "The AI service took too long to respond. Please try again.",
"suggestion": "Your message may be too complex. Try breaking it into smaller requests."
}
)
except Exception as e:
# Other errors (rate limits, authentication, context, etc.)
# [From]: T022 - Add error handling for Gemini API unavailability
error_logger.error(f"Unexpected AI error for user {user_id}: {type(e).__name__}: {e}")
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail={
"error": "AI service error",
"message": f"An unexpected error occurred: {str(e)}",
"suggestion": "Please try again later or contact support if the problem persists."
}
)
# Prepare AI response for background save
ai_message_data = {
"id": uuid.uuid4(),
"conversation_id": conversation_id,
"user_id": user_uuid,
"role": MessageRole.ASSISTANT,
"content": ai_response_text,
"created_at": datetime.utcnow()
}
# Save messages to DB in background (non-blocking)
# This significantly improves response time
def save_messages_to_db():
"""Background task to save messages to database."""
try:
from core.database import engine
from sqlmodel import Session
# Create a new session for background task
bg_db = Session(engine)
try:
# Save user message
user_msg = Message(**user_message_data)
bg_db.add(user_msg)
# Save AI response
ai_msg = Message(**ai_message_data)
bg_db.add(ai_msg)
bg_db.commit()
# Update conversation timestamp
try:
update_conversation_timestamp(db=bg_db, conversation_id=conversation_id)
except SQLAlchemyError as e:
error_logger.error(f"Database error updating conversation timestamp for {conversation_id}: {e}")
except SQLAlchemyError as e:
error_logger.error(f"Background task: Database error saving messages for user {user_id}: {e}")
bg_db.rollback()
finally:
bg_db.close()
except Exception as e:
error_logger.error(f"Background task: Unexpected error saving messages for user {user_id}: {e}")
background_tasks.add_task(save_messages_to_db)
# TODO: Parse AI response for task references
# This will be enhanced in future tasks to extract task IDs from AI responses
task_references: list[TaskReference] = []
return ChatResponse(
response=ai_response_text,
conversation_id=str(conversation_id),
tasks=task_references
)
# ============================================================================
# ChatKit SSE Endpoint (Phase 010-chatkit-migration)
# ============================================================================
@router.post("/chatkit")
async def chatkit_endpoint(
request: Request, # Starlette Request object for raw body access
background_tasks: BackgroundTasks,
):
"""ChatKit SSE endpoint for streaming chat with Gemini LLM.
[Task]: T011
[From]: specs/010-chatkit-migration/contracts/backend.md - ChatKit SSE Endpoint
This endpoint implements the ChatKit protocol using Server-Sent Events (SSE).
It replaces the WebSocket-based streaming with a simpler HTTP-based approach.
Endpoint: POST /api/chatkit
Response: Server-Sent Events (text/event-stream)
Authentication: JWT via httpOnly cookie (auth_token)
Request Body (ChatKit protocol):
{
"event": "conversation_item_created",
"conversation_id": "<thread_uuid>",
"item": {
"type": "message",
"role": "user",
"content": [{"type": "text", "text": "Your message here"}]
}
}
SSE Event Types:
- message_delta: Streaming text content
- tool_call_created: Tool invocation started
- tool_call_done: Tool execution completed
- message_done: Message fully streamed
- error: Error occurred
[From]: specs/010-chatkit-migration/research.md - Section 4
"""
from fastapi import Response
from fastapi.responses import StreamingResponse
from starlette.requests import Request as StarletteRequest
# Import for authentication
from core.security import get_current_user_id_from_cookie
# Get authenticated user ID from JWT cookie
# [From]: specs/010-chatkit-migration/contracts/backend.md - Authentication
try:
user_id = await get_current_user_id_from_cookie(request)
if not user_id:
# Return error as SSE event
async def error_stream():
yield "event: error\n"
yield 'data: {"detail": "Invalid authentication"}\n\n'
return StreamingResponse(
error_stream(),
media_type="text/event-stream",
status_code=401
)
except Exception as e:
error_logger.error(f"Auth error in ChatKit endpoint: {e}")
async def error_stream():
yield "event: error\n"
yield f'data: {{"detail": "Authentication failed"}}\n\n'
return StreamingResponse(
error_stream(),
media_type="text/event-stream",
status_code=401
)
# Check rate limit before processing
# [From]: specs/010-chatkit-migration/tasks.md - T020
# [From]: specs/010-chatkit-migration/spec.md - FR-015
try:
from uuid import UUID
from core.database import engine
from sqlmodel import Session
# Create synchronous session for rate limit check
with Session(engine) as db:
allowed, remaining, reset_time = check_rate_limit(db, UUID(user_id))
if not allowed:
# Rate limit exceeded
async def rate_limit_stream():
yield "event: error\n"
import json
yield f'data: {json.dumps({"detail": "Daily message limit reached", "limit": 100, "resets_at": reset_time.isoformat() if reset_time else None})}\n\n'
return StreamingResponse(
rate_limit_stream(),
media_type="text/event-stream",
status_code=429
)
except HTTPException:
# Re-raise HTTP exceptions (rate limit errors)
raise
except Exception as e:
# Log unexpected errors but don't block the request
error_logger.error(f"Rate limit check failed for ChatKit endpoint: {e}")
# Continue processing - fail open for rate limit errors
# Create ChatKit server with synchronous database operations
# [From]: specs/010-chatkit-migration/contracts/backend.md - Store Interface Implementation
import json
# Parse request body
try:
body = await request.body()
except Exception as e:
error_logger.error(f"Failed to read ChatKit request body: {e}")
async def error_stream():
yield "event: error\n"
yield f'data: {{"detail": "Invalid request format"}}\n\n'
return StreamingResponse(
error_stream(),
media_type="text/event-stream",
status_code=400
)
# Parse ChatKit protocol request
try:
request_data = json.loads(body.decode('utf-8'))
except Exception as e:
error_logger.error(f"Failed to parse ChatKit request: {e}")
async def error_stream():
yield "event: error\n"
yield f'data: {{"detail": "Invalid JSON format"}}\n\n'
return StreamingResponse(
error_stream(),
media_type="text/event-stream",
status_code=400
)
# Extract thread ID and message content
conversation_id = request_data.get("conversation_id")
item = request_data.get("item", {})
event_type = request_data.get("event", "conversation_item_created")
# Extract user message
def extract_message_content(item_dict):
content_array = item_dict.get("content", [])
for content_block in content_array:
if content_block.get("type") == "text":
return content_block.get("text", "")
return ""
user_message = extract_message_content(item)
if not user_message:
async def error_stream():
yield "event: error\n"
yield f'data: {{"detail": "No message content provided"}}\n\n'
return StreamingResponse(
error_stream(),
media_type="text/event-stream",
status_code=400
)
# Use synchronous database session for thread/message operations
from core.database import engine
from sqlmodel import Session
# Get or create thread
thread_id = conversation_id
if not thread_id:
# Create new thread for first message
with Session(engine) as db:
from models.thread import Thread
import uuid
new_thread = Thread(
user_id=UUID(user_id),
title=None,
thread_metadata={}
)
db.add(new_thread)
db.commit()
db.refresh(new_thread)
thread_id = str(new_thread.id)
error_logger.info(f"Created new thread: {thread_id}")
# Save user message to database
with Session(engine) as db:
from models.message import Message, MessageRole
import uuid
user_msg = Message(
thread_id=UUID(thread_id) if thread_id else None,
user_id=UUID(user_id),
role=MessageRole.USER,
content=user_message,
)
db.add(user_msg)
db.commit()
# Run AI agent and stream response
# [Task]: T033 - Add SSE error handling for connection drops
async def stream_chat_response():
"""Stream ChatKit events as SSE with timeout protection.
[Task]: T034 - Timeout handling for long-running tool executions
"""
try:
# Import agent components
from ai_agent import run_agent
# Run agent with timeout protection
async with asyncio.timeout(120):
ai_response = await run_agent(
messages=[{"role": "user", "content": user_message}],
user_id=user_id
)
# Save assistant message to database
with Session(engine) as db:
from models.message import Message, MessageRole
import uuid
assistant_msg = Message(
thread_id=UUID(thread_id) if thread_id else None,
user_id=UUID(user_id),
role=MessageRole.ASSISTANT,
content=ai_response,
)
db.add(assistant_msg)
db.commit()
# Stream the response
import json
yield "event: message_delta\n"
yield f'data: {json.dumps({"type": "text", "text": ai_response})}\n\n'
# Send message done event
yield "event: message_done\n"
yield f'data: {json.dumps({"message_id": thread_id, "role": "assistant", "thread_id": thread_id})}\n\n'
except TimeoutError:
error_logger.error(f"Agent execution timeout for thread {thread_id}")
import json
yield "event: error\n"
yield f'data: {json.dumps({"detail": "Request timed out", "message": "The AI assistant took too long to respond. Please try again."})}\n\n'
except Exception as e:
error_logger.error(f"Agent execution error: {e}", exc_info=True)
import json
yield "event: error\n"
yield f'data: {json.dumps({"detail": "Processing error", "message": str(e)})}\n\n'
# [Task]: T033 - Wrap with connection-aware streaming
async def connection_aware_stream():
"""Stream SSE events with connection drop detection.
[Task]: T033 - SSE error handling for connection drops
"""
try:
async for chunk in stream_chat_response():
yield chunk
except (ConnectionError, OSError) as e:
# Client disconnected during streaming
error_logger.info(f"Client disconnected during ChatKit streaming: {e}")
except Exception as e:
# Unexpected streaming error
error_logger.error(f"Unexpected error during ChatKit streaming: {e}", exc_info=True)
yield "event: error\n"
yield f'data: {{"detail": "Streaming error", "message": str(e)}}\n\n'
return StreamingResponse(
connection_aware_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
|