Spaces:
Sleeping
Sleeping
| # ============================================================================ | |
| # backend/app/services/conversation_service.py | |
| # ============================================================================ | |
| """ | |
| Conversation Service - Business Logic Layer (UPDATED) | |
| OPTIMIZED: | |
| - Better error handling | |
| - Smart title generation with fallbacks | |
| - Async LLM title generation (optional) | |
| - User verification in all operations | |
| """ | |
| import re | |
| from typing import Optional, Dict, Any | |
| from datetime import datetime | |
| from app.db.repositories.conversation_repository import conversation_repository | |
| from app.models.conversation import ( | |
| Conversation, | |
| Message, | |
| ConversationListResult, | |
| CreateConversationRequest, | |
| UpdateConversationRequest | |
| ) | |
| # ============================================================================ | |
| # CONVERSATION SERVICE | |
| # ============================================================================ | |
| class ConversationService: | |
| """ | |
| Business logic for conversation management. | |
| Handles validation, auto-titles, and business rules. | |
| """ | |
| def __init__(self): | |
| """Initialize service""" | |
| self.repository = conversation_repository | |
| print("✅ ConversationService initialized") | |
| # ======================================================================== | |
| # AUTO-TITLE GENERATION | |
| # ======================================================================== | |
| def generate_title_from_message( | |
| self, | |
| message: str, | |
| max_length: int = 50 | |
| ) -> str: | |
| """ | |
| Generate conversation title from first user message. | |
| Optimized with better truncation logic. | |
| Args: | |
| message: First user message | |
| max_length: Maximum title length | |
| Returns: | |
| str: Generated title | |
| """ | |
| message = message.strip() | |
| if not message: | |
| return "New Conversation" | |
| # Remove extra whitespace | |
| message = re.sub(r'\s+', ' ', message) | |
| # Try first sentence | |
| sentences = re.split(r'[.!?]+', message) | |
| first_sentence = sentences[0].strip() | |
| # Use more if first sentence too short | |
| if len(first_sentence) < 15 and len(sentences) > 1: | |
| first_sentence = f"{first_sentence}. {sentences[1].strip()}" | |
| # Truncate smartly | |
| if len(first_sentence) > max_length: | |
| title = first_sentence[:max_length].strip() | |
| # Break at word boundary | |
| last_space = title.rfind(' ') | |
| if last_space > max_length * 0.6: | |
| title = title[:last_space] | |
| title += "..." | |
| else: | |
| title = first_sentence | |
| # Capitalize | |
| if title: | |
| title = title[0].upper() + title[1:] | |
| # Remove trailing punctuation before ellipsis | |
| title = re.sub(r'[,;:]\.\.\.$', '...', title) | |
| return title if title else "New Conversation" | |
| async def generate_smart_title( | |
| self, | |
| first_message: str, | |
| llm_manager = None | |
| ) -> str: | |
| """ | |
| Generate smart title using LLM (optional). | |
| Falls back gracefully if LLM unavailable. | |
| Args: | |
| first_message: First user message | |
| llm_manager: Optional LLM manager | |
| Returns: | |
| str: Generated title | |
| """ | |
| # Try LLM generation if available | |
| if llm_manager: | |
| try: | |
| prompt = f"""Generate a concise title (max 50 chars) for this banking query: | |
| "{first_message}" | |
| Requirements: | |
| - Clear and descriptive | |
| - Banking/finance focused | |
| - No quotes or formatting | |
| - Maximum 50 characters | |
| Title:""" | |
| # Use simple generation (not full chat) | |
| title = await llm_manager.generate_simple_response( | |
| prompt=prompt, | |
| max_tokens=15, | |
| temperature=0.3 | |
| ) | |
| # Clean and validate | |
| title = title.strip().strip('"\'`') | |
| title = re.sub(r'\s+', ' ', title) | |
| if 5 < len(title) <= 60: | |
| return title | |
| except Exception as e: | |
| print(f"⚠️ Smart title generation failed: {e}") | |
| # Fallback to simple generation | |
| return self.generate_title_from_message(first_message) | |
| # ======================================================================== | |
| # CREATE | |
| # ======================================================================== | |
| async def create_conversation( | |
| self, | |
| user_id: str, | |
| request: CreateConversationRequest = None, | |
| llm_manager = None | |
| ) -> Conversation: | |
| """ | |
| Create a new conversation with optional first message. | |
| OPTIMIZED: Better title generation + error handling | |
| Args: | |
| user_id: User ID | |
| request: Optional create request | |
| llm_manager: Optional LLM manager | |
| Returns: | |
| Conversation: Created conversation (full object) | |
| """ | |
| if request is None: | |
| request = CreateConversationRequest() | |
| # Determine title | |
| if request.title: | |
| title = request.title | |
| elif request.first_message: | |
| # Auto-generate from message | |
| title = await self.generate_smart_title( | |
| request.first_message, | |
| llm_manager | |
| ) | |
| else: | |
| title = f"New Chat - {datetime.now().strftime('%b %d, %H:%M')}" | |
| # Create conversation (returns ID string) | |
| conversation_id = await self.repository.create_conversation( | |
| user_id=user_id, | |
| title=title, | |
| first_message=request.first_message | |
| ) | |
| # Fetch full conversation | |
| conversation = await self.repository.get_conversation_by_id( | |
| conversation_id, | |
| user_id | |
| ) | |
| if not conversation: | |
| raise ValueError("Failed to create conversation") | |
| return conversation | |
| # ======================================================================== | |
| # READ | |
| # ======================================================================== | |
| async def get_conversation( | |
| self, | |
| conversation_id: str, | |
| user_id: str | |
| ) -> Optional[Conversation]: | |
| """ | |
| Get conversation with user verification. | |
| Args: | |
| conversation_id: Conversation ID | |
| user_id: User ID (must match owner) | |
| Returns: | |
| Conversation or None | |
| """ | |
| return await self.repository.get_conversation_by_id( | |
| conversation_id, | |
| user_id | |
| ) | |
| async def list_conversations( | |
| self, | |
| user_id: str, | |
| page: int = 1, | |
| page_size: int = 20, | |
| include_archived: bool = False | |
| ) -> ConversationListResult: | |
| """ | |
| List conversations for user with pagination. | |
| Args: | |
| user_id: User ID | |
| page: Page number (1-indexed) | |
| page_size: Items per page | |
| include_archived: Include archived? | |
| Returns: | |
| ConversationListResult: Paginated list | |
| """ | |
| # Validate pagination | |
| page = max(1, page) | |
| page_size = min(max(1, page_size), 100) # Cap at 100 | |
| return await self.repository.list_conversations( | |
| user_id=user_id, | |
| page=page, | |
| page_size=page_size, | |
| include_archived=include_archived | |
| ) | |
| async def search_conversations( | |
| self, | |
| user_id: str, | |
| query: str, | |
| page: int = 1, | |
| page_size: int = 20 | |
| ) -> ConversationListResult: | |
| """ | |
| Search conversations by title/content. | |
| Args: | |
| user_id: User ID | |
| query: Search query | |
| page: Page number | |
| page_size: Items per page | |
| Returns: | |
| ConversationListResult: Search results | |
| """ | |
| # Validate query | |
| if not query or len(query.strip()) < 2: | |
| # Return empty results for invalid query | |
| return ConversationListResult( | |
| conversations=[], | |
| total=0, | |
| page=page, | |
| page_size=page_size, | |
| has_more=False | |
| ) | |
| return await self.repository.search_conversations( | |
| user_id=user_id, | |
| query=query.strip(), | |
| page=page, | |
| page_size=page_size | |
| ) | |
| # ======================================================================== | |
| # UPDATE | |
| # ======================================================================== | |
| async def add_message_to_conversation( | |
| self, | |
| conversation_id: str, | |
| user_id: str, | |
| role: str, | |
| content: str, | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> Optional[Conversation]: | |
| """ | |
| Add message to conversation with validation. | |
| Args: | |
| conversation_id: Conversation ID | |
| user_id: User ID (must match owner) | |
| role: 'user' or 'assistant' | |
| content: Message content | |
| metadata: Optional metadata | |
| Returns: | |
| Updated Conversation or None | |
| """ | |
| # Validate role | |
| if role not in ['user', 'assistant']: | |
| raise ValueError(f"Invalid role: {role}") | |
| # Validate content | |
| if not content or not content.strip(): | |
| raise ValueError("Message content cannot be empty") | |
| # Verify ownership | |
| conversation = await self.get_conversation(conversation_id, user_id) | |
| if not conversation: | |
| return None | |
| # Create message | |
| message = Message( | |
| role=role, | |
| content=content.strip(), | |
| timestamp=datetime.utcnow(), | |
| metadata=metadata | |
| ) | |
| # Add to repository | |
| success = await self.repository.add_message( | |
| conversation_id, | |
| message.dict() | |
| ) | |
| if success: | |
| return await self.get_conversation(conversation_id, user_id) | |
| return None | |
| async def update_conversation( | |
| self, | |
| conversation_id: str, | |
| user_id: str, | |
| request: UpdateConversationRequest | |
| ) -> Optional[Conversation]: | |
| """ | |
| Update conversation properties. | |
| Args: | |
| conversation_id: Conversation ID | |
| user_id: User ID (must match owner) | |
| request: Update request | |
| Returns: | |
| Updated Conversation or None | |
| """ | |
| update_data = {} | |
| if request.title is not None: | |
| # Validate title | |
| title = request.title.strip() | |
| if not title: | |
| raise ValueError("Title cannot be empty") | |
| if len(title) > 100: | |
| raise ValueError("Title too long (max 100 chars)") | |
| update_data["title"] = title | |
| if request.is_archived is not None: | |
| update_data["is_archived"] = request.is_archived | |
| if not update_data: | |
| # Nothing to update | |
| return await self.get_conversation(conversation_id, user_id) | |
| return await self.repository.update_conversation( | |
| conversation_id, | |
| user_id, | |
| update_data | |
| ) | |
| async def rename_conversation( | |
| self, | |
| conversation_id: str, | |
| user_id: str, | |
| new_title: str | |
| ) -> Optional[Conversation]: | |
| """ | |
| Rename conversation with validation. | |
| Args: | |
| conversation_id: Conversation ID | |
| user_id: User ID | |
| new_title: New title | |
| Returns: | |
| Updated Conversation or None | |
| """ | |
| # Validate title | |
| new_title = new_title.strip() | |
| if not new_title: | |
| raise ValueError("Title cannot be empty") | |
| if len(new_title) > 100: | |
| raise ValueError("Title too long (max 100 chars)") | |
| return await self.repository.rename_conversation( | |
| conversation_id, | |
| user_id, | |
| new_title | |
| ) | |
| async def archive_conversation( | |
| self, | |
| conversation_id: str, | |
| user_id: str | |
| ) -> Optional[Conversation]: | |
| """Archive a conversation.""" | |
| return await self.repository.archive_conversation( | |
| conversation_id, | |
| user_id, | |
| archived=True | |
| ) | |
| async def unarchive_conversation( | |
| self, | |
| conversation_id: str, | |
| user_id: str | |
| ) -> Optional[Conversation]: | |
| """Unarchive a conversation.""" | |
| return await self.repository.archive_conversation( | |
| conversation_id, | |
| user_id, | |
| archived=False | |
| ) | |
| # ======================================================================== | |
| # DELETE | |
| # ======================================================================== | |
| async def delete_conversation( | |
| self, | |
| conversation_id: str, | |
| user_id: str, | |
| permanent: bool = False | |
| ) -> bool: | |
| """ | |
| Delete conversation (with ownership verification). | |
| Args: | |
| conversation_id: Conversation ID | |
| user_id: User ID (must match owner) | |
| permanent: Hard delete if True | |
| Returns: | |
| bool: True if deleted | |
| """ | |
| # Verify ownership first | |
| conversation = await self.get_conversation(conversation_id, user_id) | |
| if not conversation: | |
| return False | |
| return await self.repository.delete_conversation( | |
| conversation_id, | |
| soft_delete=not permanent | |
| ) | |
| # ======================================================================== | |
| # UTILITY | |
| # ======================================================================== | |
| async def get_conversation_stats( | |
| self, | |
| user_id: str | |
| ) -> Dict[str, int]: | |
| """ | |
| Get conversation statistics. | |
| Args: | |
| user_id: User ID | |
| Returns: | |
| dict: Stats with total, active, archived | |
| """ | |
| return await self.repository.get_conversation_count(user_id) | |
| # ============================================================================ | |
| # GLOBAL SERVICE INSTANCE | |
| # ============================================================================ | |
| conversation_service = ConversationService() |