| | """ |
| | Chat service for conversation and message management. |
| | |
| | Per @specs/001-chatbot-mcp/plan.md and @specs/001-chatbot-mcp/data-model.md |
| | """ |
| | from sqlmodel import Session, select |
| | from typing import List, Optional |
| | from datetime import datetime |
| | from uuid import UUID, uuid4 |
| | import re |
| | import logging |
| |
|
| | from models.conversation import ConversationTable |
| | from models.message import MessageTable, MessageRole |
| | from config import engine |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class ChatService: |
| | """ |
| | Service for chat business logic. |
| | |
| | Handles conversation creation, message storage, history retrieval, |
| | and input sanitization per @specs/001-chatbot-mcp/plan.md. |
| | """ |
| |
|
| | @staticmethod |
| | def create_conversation(session: Session, user_id: UUID, title: Optional[str] = None) -> ConversationTable: |
| | """ |
| | Create a new conversation for a user. |
| | |
| | Args: |
| | session: Database session |
| | user_id: Owner's user ID (from JWT) |
| | title: Optional conversation title |
| | |
| | Returns: |
| | Created conversation with user_id set |
| | """ |
| | conversation = ConversationTable( |
| | id=uuid4(), |
| | user_id=user_id, |
| | title=title, |
| | created_at=datetime.utcnow(), |
| | updated_at=datetime.utcnow() |
| | ) |
| | session.add(conversation) |
| | session.commit() |
| | session.refresh(conversation) |
| |
|
| | logger.info(f"Created conversation {conversation.id} for user {user_id}") |
| | return conversation |
| |
|
| | @staticmethod |
| | def get_conversation(session: Session, conversation_id: UUID, user_id: UUID) -> Optional[ConversationTable]: |
| | """ |
| | Retrieve a conversation if it belongs to the user. |
| | |
| | Critical for security - enforces user ownership check. |
| | Per @specs/001-chatbot-mcp/data-model.md data isolation requirements. |
| | |
| | Args: |
| | session: Database session |
| | conversation_id: Conversation ID to retrieve |
| | user_id: User requesting the conversation (for ownership verification) |
| | |
| | Returns: |
| | Conversation if found and owned by user, None otherwise |
| | """ |
| | return session.query(ConversationTable).filter( |
| | ConversationTable.id == conversation_id, |
| | ConversationTable.user_id == user_id |
| | ).first() |
| |
|
| | @staticmethod |
| | def store_message( |
| | session: Session, |
| | conversation_id: UUID, |
| | role: MessageRole, |
| | content: str, |
| | metadata: Optional[dict] = None |
| | ) -> MessageTable: |
| | """ |
| | Store a message in the conversation. |
| | |
| | Args: |
| | session: Database session |
| | conversation_id: ID of the conversation |
| | role: Message role (user or assistant) |
| | content: Message content |
| | metadata: Optional metadata for tool calls, token usage, etc. |
| | |
| | Returns: |
| | Created message |
| | """ |
| | message = MessageTable( |
| | id=uuid4(), |
| | conversation_id=conversation_id, |
| | role=role, |
| | content=content, |
| | created_at=datetime.utcnow(), |
| | tool_metadata=metadata |
| | ) |
| | session.add(message) |
| |
|
| | |
| | conversation = session.get(ConversationTable, conversation_id) |
| | if conversation: |
| | conversation.updated_at = datetime.utcnow() |
| |
|
| | session.commit() |
| | session.refresh(message) |
| |
|
| | logger.debug(f"Stored {role.value} message in conversation {conversation_id}") |
| | return message |
| |
|
| | @staticmethod |
| | def get_conversation_history( |
| | session: Session, |
| | conversation_id: UUID, |
| | user_id: UUID, |
| | limit: int = 100 |
| | ) -> List[MessageTable]: |
| | """ |
| | Get message history for a conversation. |
| | |
| | Enforces user ownership check before returning messages. |
| | Per @specs/001-chatbot-mcp/plan.md stateless architecture requirements. |
| | |
| | Args: |
| | session: Database session |
| | conversation_id: ID of the conversation |
| | user_id: User requesting the history (for ownership verification) |
| | limit: Maximum number of messages to return (default: 100) |
| | |
| | Returns: |
| | List of messages in chronological order, empty list if not found |
| | |
| | Raises: |
| | ValueError: If conversation doesn't belong to user |
| | """ |
| | |
| | conversation = ChatService.get_conversation(session, conversation_id, user_id) |
| | if not conversation: |
| | logger.warning(f"User {user_id} attempted to access conversation {conversation_id}") |
| | return [] |
| |
|
| | |
| | statement = ( |
| | select(MessageTable) |
| | .where(MessageTable.conversation_id == conversation_id) |
| | .order_by(MessageTable.created_at.asc()) |
| | .limit(limit) |
| | ) |
| |
|
| | messages = session.exec(statement).all() |
| | logger.info(f"Retrieved {len(messages)} messages for conversation {conversation_id}") |
| | return messages |
| |
|
| | @staticmethod |
| | def get_user_conversations( |
| | session: Session, |
| | user_id: UUID, |
| | limit: int = 20 |
| | ) -> List[ConversationTable]: |
| | """ |
| | Get all conversations for a user, ordered by updated_at desc. |
| | |
| | Args: |
| | session: Database session |
| | user_id: User's ID |
| | limit: Maximum number of conversations to return |
| | |
| | Returns: |
| | List of conversations, most recently updated first |
| | """ |
| | statement = ( |
| | select(ConversationTable) |
| | .where(ConversationTable.user_id == user_id) |
| | .order_by(ConversationTable.updated_at.desc()) |
| | .limit(limit) |
| | ) |
| |
|
| | return session.exec(statement).all() |
| |
|
| | @staticmethod |
| | def sanitize_user_input(user_input: str) -> str: |
| | """ |
| | Sanitize user input to prevent prompt injection attacks. |
| | |
| | Per @specs/001-chatbot-mcp/plan.md security requirements. |
| | Removes or escapes potentially dangerous patterns. |
| | |
| | Args: |
| | user_input: Raw user input |
| | |
| | Returns: |
| | Sanitized input safe for use in AI prompts |
| | """ |
| | if not user_input: |
| | return "" |
| |
|
| | |
| | sanitized = user_input.replace("\x00", "") |
| |
|
| | |
| | max_length = 5000 |
| | if len(sanitized) > max_length: |
| | sanitized = sanitized[:max_length] |
| | logger.warning(f"Truncated user input from {len(user_input)} to {max_length} chars") |
| |
|
| | |
| | sanitized = re.sub(r"\s+", " ", sanitized).strip() |
| |
|
| | |
| | injection_patterns = [ |
| | r"ignore\s+(all\s+)?(previous|above)", |
| | r"disregard\s+(all\s+)?(previous|above)", |
| | r"forget\s+(everything|all\s+instructions)", |
| | r"<\|.*?\|>", |
| | r"<<.*?>>", |
| | ] |
| |
|
| | for pattern in injection_patterns: |
| | if re.search(pattern, sanitized, re.IGNORECASE): |
| | logger.warning(f"Detected potential prompt injection: {pattern}") |
| | |
| |
|
| | return sanitized |
| |
|
| | @staticmethod |
| | def format_messages_for_openai(messages: List[MessageTable]) -> List[dict]: |
| | """ |
| | Format database messages for OpenAI API. |
| | |
| | Converts MessageTable objects to OpenAI message format. |
| | |
| | Args: |
| | messages: List of MessageTable objects |
| | |
| | Returns: |
| | List of message dicts in OpenAI format |
| | """ |
| | formatted = [] |
| | for msg in messages: |
| | formatted.append({ |
| | "role": msg.role.value, |
| | "content": msg.content |
| | }) |
| | return formatted |
| |
|
| |
|
| | |
| | chat_service = ChatService() |
| |
|