Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """ | |
| Base Chatbot - Abstract base class for all chatbot implementations | |
| This module provides shared functionality across all chatbot types: | |
| - Conversation loading/saving | |
| - UI filter extraction | |
| - Conversation context building | |
| - Common utility methods | |
| """ | |
| import json | |
| import time | |
| import logging | |
| from pathlib import Path | |
| from typing import Dict, List, Any, Optional | |
| from abc import ABC, abstractmethod | |
| from langchain_core.messages import HumanMessage, AIMessage | |
| from src.config.paths import CONVERSATIONS_DIR, PROJECT_DIR | |
| logger = logging.getLogger(__name__) | |
| class BaseChatbot(ABC): | |
| """ | |
| Abstract base class for all chatbot implementations. | |
| Provides shared functionality: | |
| - Conversation persistence (load/save) | |
| - UI filter extraction | |
| - Conversation context building | |
| """ | |
| def __init__(self): | |
| """Initialize base chatbot""" | |
| # Conversations directory | |
| self.conversations_dir = CONVERSATIONS_DIR | |
| try: | |
| self.conversations_dir.mkdir(parents=True, mode=0o777, exist_ok=True) | |
| except (PermissionError, OSError) as e: | |
| logger.warning(f"Could not create conversations directory at {self.conversations_dir}: {e}") | |
| self.conversations_dir = Path("conversations") | |
| try: | |
| self.conversations_dir.mkdir(parents=True, mode=0o777, exist_ok=True) | |
| except (PermissionError, OSError) as e2: | |
| logger.error(f"Could not create conversations directory at {self.conversations_dir}: {e2}") | |
| raise RuntimeError(f"Failed to create conversations directory: {e2}") | |
| def chat(self, user_input: str, conversation_id: str = "default") -> Dict[str, Any]: | |
| """ | |
| Main chat interface - must be implemented by subclasses. | |
| Args: | |
| user_input: User's input message | |
| conversation_id: Unique conversation identifier | |
| Returns: | |
| Dictionary with: | |
| - response: AI response | |
| - rag_result: RAG results (sources, answer) | |
| - agent_logs: List of agent logs | |
| - actual_rag_query: The query used for retrieval | |
| """ | |
| pass | |
| def _load_conversation(self, conversation_file: Path) -> Dict[str, Any]: | |
| """ | |
| Load conversation from file. | |
| Args: | |
| conversation_file: Path to conversation JSON file | |
| Returns: | |
| Dictionary with messages, session_start_time, last_ai_message_time, context | |
| """ | |
| if conversation_file.exists(): | |
| try: | |
| with open(conversation_file) as f: | |
| data = json.load(f) | |
| # Convert message dicts back to LangChain messages | |
| messages = [] | |
| for msg_data in data.get("messages", []): | |
| if msg_data["type"] == "human": | |
| messages.append(HumanMessage(content=msg_data["content"])) | |
| elif msg_data["type"] == "ai": | |
| messages.append(AIMessage(content=msg_data["content"])) | |
| data["messages"] = messages | |
| return data | |
| except Exception as e: | |
| logger.warning(f"Could not load conversation: {e}") | |
| # Return default conversation | |
| return { | |
| "messages": [], | |
| "session_start_time": time.time(), | |
| "last_ai_message_time": time.time(), | |
| "context": {} | |
| } | |
| def _save_conversation(self, conversation_file: Path, conversation: Dict[str, Any]): | |
| """ | |
| Save conversation to file. | |
| Args: | |
| conversation_file: Path to conversation JSON file | |
| conversation: Conversation dictionary | |
| """ | |
| try: | |
| # Ensure the conversations directory exists with proper permissions | |
| conversation_file.parent.mkdir(parents=True, mode=0o777, exist_ok=True) | |
| # Convert messages to serializable format | |
| messages_data = [] | |
| for msg in conversation["messages"]: | |
| if isinstance(msg, HumanMessage): | |
| messages_data.append({"type": "human", "content": msg.content}) | |
| elif isinstance(msg, AIMessage): | |
| messages_data.append({"type": "ai", "content": msg.content}) | |
| conversation_data = { | |
| "messages": messages_data, | |
| "session_start_time": conversation["session_start_time"], | |
| "last_ai_message_time": conversation["last_ai_message_time"], | |
| "context": conversation.get("context", {}) | |
| } | |
| with open(conversation_file, 'w') as f: | |
| json.dump(conversation_data, f, indent=2) | |
| except Exception as e: | |
| logger.error(f"Could not save conversation: {e}") | |
| def _extract_ui_filters(self, query: str) -> Dict[str, List[str]]: | |
| """ | |
| Extract UI filters from query. | |
| Expected format: | |
| FILTER CONTEXT: | |
| Sources: Source1, Source2 | |
| Years: 2020, 2021 | |
| Districts: District1, District2 | |
| Filenames: file1.pdf, file2.pdf | |
| USER QUERY: | |
| actual query text | |
| Args: | |
| query: User query (may contain filter context) | |
| Returns: | |
| Dictionary with extracted filters | |
| """ | |
| filters = {} | |
| # Look for FILTER CONTEXT in query | |
| if "FILTER CONTEXT:" in query: | |
| # Extract the entire filter section (until USER QUERY: or end of query) | |
| filter_section = query.split("FILTER CONTEXT:")[1] | |
| if "USER QUERY:" in filter_section: | |
| filter_section = filter_section.split("USER QUERY:")[0] | |
| filter_section = filter_section.strip() | |
| # Parse sources | |
| if "Sources:" in filter_section: | |
| sources_line = [line for line in filter_section.split('\n') if line.strip().startswith('Sources:')][0] | |
| sources_str = sources_line.split("Sources:")[1].strip() | |
| if sources_str and sources_str != "None": | |
| filters["sources"] = [s.strip() for s in sources_str.split(",")] | |
| # Parse years | |
| if "Years:" in filter_section: | |
| years_line = [line for line in filter_section.split('\n') if line.strip().startswith('Years:')][0] | |
| years_str = years_line.split("Years:")[1].strip() | |
| if years_str and years_str != "None": | |
| filters["years"] = [y.strip() for y in years_str.split(",")] | |
| # Parse districts | |
| if "Districts:" in filter_section: | |
| districts_line = [line for line in filter_section.split('\n') if line.strip().startswith('Districts:')][0] | |
| districts_str = districts_line.split("Districts:")[1].strip() | |
| if districts_str and districts_str != "None": | |
| filters["districts"] = [d.strip() for d in districts_str.split(",")] | |
| # Parse filenames | |
| if "Filenames:" in filter_section: | |
| filenames_line = [line for line in filter_section.split('\n') if line.strip().startswith('Filenames:')][0] | |
| filenames_str = filenames_line.split("Filenames:")[1].strip() | |
| if filenames_str and filenames_str != "None": | |
| filters["filenames"] = [f.strip() for f in filenames_str.split(",")] | |
| return filters | |
| def _build_conversation_context(self, messages: List[Any], num_messages: int = 6) -> str: | |
| """ | |
| Build conversation context from recent messages. | |
| Args: | |
| messages: List of LangChain messages | |
| num_messages: Number of recent messages to include | |
| Returns: | |
| Formatted conversation context string | |
| """ | |
| context_lines = [] | |
| # Show last N messages for context | |
| for msg in messages[-num_messages:]: | |
| if isinstance(msg, HumanMessage): | |
| context_lines.append(f"User: {msg.content}") | |
| elif isinstance(msg, AIMessage): | |
| context_lines.append(f"Assistant: {msg.content}") | |
| return "\n".join(context_lines) if context_lines else "No previous conversation." | |
| def _extract_clean_query(self, query: str) -> str: | |
| """ | |
| Extract the actual query without filter context. | |
| Args: | |
| query: User query (may contain filter context) | |
| Returns: | |
| Clean query without filter context | |
| """ | |
| if "USER QUERY:" in query: | |
| return query.split("USER QUERY:")[-1].strip() | |
| return query | |