Spaces:
Running
Running
| """ | |
| Chat Memory - Short-term and long-term memory management. | |
| Supports MySQL, PostgreSQL, and SQLite with dialect-specific DDL. | |
| """ | |
| import logging | |
| import json | |
| from typing import List, Dict, Any, Optional | |
| from datetime import datetime | |
| from dataclasses import dataclass | |
| logger = logging.getLogger(__name__) | |
| class ChatMessage: | |
| role: str # "user" or "assistant" | |
| content: str | |
| timestamp: datetime = None | |
| metadata: Dict[str, Any] = None | |
| def __post_init__(self): | |
| if self.timestamp is None: | |
| self.timestamp = datetime.now() | |
| if self.metadata is None: | |
| self.metadata = {} | |
| def to_dict(self) -> Dict[str, str]: | |
| return {"role": self.role, "content": self.content} | |
| def get_memory_table_ddl(db_type: str) -> str: | |
| """Get the DDL for chat memory table based on database type.""" | |
| if db_type == "postgresql": | |
| return """ | |
| CREATE TABLE IF NOT EXISTS _chatbot_memory ( | |
| id SERIAL PRIMARY KEY, | |
| session_id VARCHAR(255) NOT NULL, | |
| user_id VARCHAR(255) NOT NULL DEFAULT 'default', | |
| role VARCHAR(50) NOT NULL, | |
| content TEXT NOT NULL, | |
| metadata JSONB, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """ | |
| elif db_type == "sqlite": | |
| return """ | |
| CREATE TABLE IF NOT EXISTS _chatbot_memory ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| session_id TEXT NOT NULL, | |
| user_id TEXT NOT NULL DEFAULT 'default', | |
| role TEXT NOT NULL, | |
| content TEXT NOT NULL, | |
| metadata TEXT, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """ | |
| else: # MySQL | |
| return """ | |
| CREATE TABLE IF NOT EXISTS _chatbot_memory ( | |
| id INT AUTO_INCREMENT PRIMARY KEY, | |
| session_id VARCHAR(255) NOT NULL, | |
| user_id VARCHAR(255) NOT NULL DEFAULT 'default', | |
| role VARCHAR(50) NOT NULL, | |
| content TEXT NOT NULL, | |
| metadata JSON, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| INDEX idx_session (session_id), | |
| INDEX idx_user (user_id), | |
| INDEX idx_created (created_at) | |
| ) | |
| """ | |
| def get_permanent_memory_ddl(db_type: str) -> str: | |
| """Get the DDL for permanent memory table based on database type.""" | |
| if db_type == "postgresql": | |
| return """ | |
| CREATE TABLE IF NOT EXISTS _chatbot_permanent_memory_v2 ( | |
| id SERIAL PRIMARY KEY, | |
| user_id VARCHAR(255) NOT NULL DEFAULT 'default', | |
| content TEXT NOT NULL, | |
| tags VARCHAR(255), | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """ | |
| elif db_type == "sqlite": | |
| return """ | |
| CREATE TABLE IF NOT EXISTS _chatbot_permanent_memory_v2 ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_id TEXT NOT NULL DEFAULT 'default', | |
| content TEXT NOT NULL, | |
| tags TEXT, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """ | |
| else: # MySQL | |
| return """ | |
| CREATE TABLE IF NOT EXISTS _chatbot_permanent_memory_v2 ( | |
| id INT AUTO_INCREMENT PRIMARY KEY, | |
| user_id VARCHAR(255) NOT NULL DEFAULT 'default', | |
| content TEXT NOT NULL, | |
| tags VARCHAR(255), | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| INDEX idx_user (user_id) | |
| ) | |
| """ | |
| def get_summary_table_ddl(db_type: str) -> str: | |
| """Get the DDL for summary table based on database type.""" | |
| if db_type == "postgresql": | |
| return """ | |
| CREATE TABLE IF NOT EXISTS _chatbot_user_summaries ( | |
| id SERIAL PRIMARY KEY, | |
| user_id VARCHAR(255) NOT NULL UNIQUE, | |
| summary TEXT NOT NULL, | |
| message_count INT DEFAULT 0, | |
| last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """ | |
| elif db_type == "sqlite": | |
| return """ | |
| CREATE TABLE IF NOT EXISTS _chatbot_user_summaries ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_id TEXT NOT NULL UNIQUE, | |
| summary TEXT NOT NULL, | |
| message_count INTEGER DEFAULT 0, | |
| last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """ | |
| else: # MySQL | |
| return """ | |
| CREATE TABLE IF NOT EXISTS _chatbot_user_summaries ( | |
| id INT AUTO_INCREMENT PRIMARY KEY, | |
| user_id VARCHAR(255) NOT NULL, | |
| summary TEXT NOT NULL, | |
| message_count INT DEFAULT 0, | |
| last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, | |
| UNIQUE KEY idx_user (user_id) | |
| ) | |
| """ | |
| def get_upsert_summary_query(db_type: str) -> str: | |
| """Get the upsert query for summary based on database type.""" | |
| if db_type == "postgresql": | |
| return """ | |
| INSERT INTO _chatbot_user_summaries | |
| (user_id, summary, message_count, last_updated) | |
| VALUES (:user_id, :summary, :message_count, CURRENT_TIMESTAMP) | |
| ON CONFLICT (user_id) | |
| DO UPDATE SET | |
| summary = EXCLUDED.summary, | |
| message_count = EXCLUDED.message_count, | |
| last_updated = CURRENT_TIMESTAMP | |
| """ | |
| elif db_type == "sqlite": | |
| return """ | |
| INSERT INTO _chatbot_user_summaries | |
| (user_id, summary, message_count, last_updated) | |
| VALUES (:user_id, :summary, :message_count, CURRENT_TIMESTAMP) | |
| ON CONFLICT(user_id) | |
| DO UPDATE SET | |
| summary = excluded.summary, | |
| message_count = excluded.message_count, | |
| last_updated = CURRENT_TIMESTAMP | |
| """ | |
| else: # MySQL | |
| return """ | |
| INSERT INTO _chatbot_user_summaries | |
| (user_id, summary, message_count) | |
| VALUES (:user_id, :summary, :message_count) | |
| ON DUPLICATE KEY UPDATE | |
| summary = :summary, | |
| message_count = :message_count, | |
| last_updated = CURRENT_TIMESTAMP | |
| """ | |
| class ChatMemory: | |
| """Manages chat history with short-term and long-term storage.""" | |
| def __init__(self, session_id: str, user_id: str = "default", max_messages: int = 20, db_connection=None): | |
| self.session_id = session_id | |
| self.user_id = user_id | |
| self.max_messages = max_messages | |
| self.db = db_connection | |
| self.messages: List[ChatMessage] = [] | |
| self._db_type = None | |
| if self.db: | |
| self._db_type = self.db.db_type.value | |
| self._ensure_tables() | |
| def _ensure_tables(self): | |
| """Create memory tables if they don't exist.""" | |
| try: | |
| memory_ddl = get_memory_table_ddl(self._db_type) | |
| permanent_ddl = get_permanent_memory_ddl(self._db_type) | |
| self.db.execute_write(memory_ddl) | |
| self.db.execute_write(permanent_ddl) | |
| # Create indexes for SQLite and PostgreSQL (MySQL creates them inline) | |
| if self._db_type in ("sqlite", "postgresql"): | |
| self._create_indexes() | |
| # Migration: Ensure user_id column exists (MySQL only for legacy support) | |
| if self._db_type == "mysql": | |
| self._migrate_mysql_user_id() | |
| except Exception as e: | |
| logger.warning(f"Failed to create memory tables: {e}") | |
| def _create_indexes(self): | |
| """Create indexes for SQLite and PostgreSQL.""" | |
| try: | |
| if self._db_type == "sqlite": | |
| self.db.execute_write("CREATE INDEX IF NOT EXISTS idx_memory_session ON _chatbot_memory(session_id)") | |
| self.db.execute_write("CREATE INDEX IF NOT EXISTS idx_memory_user ON _chatbot_memory(user_id)") | |
| self.db.execute_write("CREATE INDEX IF NOT EXISTS idx_memory_created ON _chatbot_memory(created_at)") | |
| self.db.execute_write("CREATE INDEX IF NOT EXISTS idx_permanent_user ON _chatbot_permanent_memory_v2(user_id)") | |
| elif self._db_type == "postgresql": | |
| self.db.execute_write("CREATE INDEX IF NOT EXISTS idx_memory_session ON _chatbot_memory(session_id)") | |
| self.db.execute_write("CREATE INDEX IF NOT EXISTS idx_memory_user ON _chatbot_memory(user_id)") | |
| self.db.execute_write("CREATE INDEX IF NOT EXISTS idx_memory_created ON _chatbot_memory(created_at)") | |
| self.db.execute_write("CREATE INDEX IF NOT EXISTS idx_permanent_user ON _chatbot_permanent_memory_v2(user_id)") | |
| except Exception as e: | |
| logger.debug(f"Index creation (may already exist): {e}") | |
| def _migrate_mysql_user_id(self): | |
| """Migrate MySQL table to include user_id column if missing.""" | |
| try: | |
| check_query = """ | |
| SELECT COLUMN_NAME | |
| FROM INFORMATION_SCHEMA.COLUMNS | |
| WHERE TABLE_SCHEMA = :db_name | |
| AND TABLE_NAME = '_chatbot_memory' | |
| AND COLUMN_NAME = 'user_id' | |
| """ | |
| db_name = self.db.config.database | |
| result = self.db.execute_query(check_query, {"db_name": db_name}) | |
| if not result: | |
| self.db.execute_write("ALTER TABLE _chatbot_memory ADD COLUMN user_id VARCHAR(255) NOT NULL DEFAULT 'default' AFTER session_id") | |
| self.db.execute_write("CREATE INDEX idx_user ON _chatbot_memory(user_id)") | |
| logger.info("Migrated _chatbot_memory to include user_id") | |
| except Exception as e: | |
| logger.debug(f"Migration check failed: {e}") | |
| def add_message(self, role: str, content: str, metadata: Dict = None): | |
| """Add a message to memory and optionally persist it.""" | |
| msg = ChatMessage(role=role, content=content, metadata=metadata) | |
| self.messages.append(msg) | |
| # Trim if exceeds max (short-term) | |
| if len(self.messages) > self.max_messages: | |
| self.messages = self.messages[-self.max_messages:] | |
| # Persist to DB (session history) | |
| if self.db: | |
| try: | |
| query = """ | |
| INSERT INTO _chatbot_memory (session_id, user_id, role, content, metadata) | |
| VALUES (:session_id, :user_id, :role, :content, :metadata) | |
| """ | |
| self.db.execute_write(query, { | |
| "session_id": self.session_id, | |
| "user_id": self.user_id, | |
| "role": role, | |
| "content": content, | |
| "metadata": json.dumps(metadata) if metadata else None | |
| }) | |
| except Exception as e: | |
| logger.warning(f"Failed to persist message: {e}") | |
| def save_permanent_context(self, content: str, tags: str = "user_saved"): | |
| """Save specific context explicitly to permanent memory for this user.""" | |
| if not self.db: | |
| return False, "No database connection" | |
| try: | |
| query = """ | |
| INSERT INTO _chatbot_permanent_memory_v2 (user_id, content, tags) | |
| VALUES (:user_id, :content, :tags) | |
| """ | |
| self.db.execute_write(query, { | |
| "user_id": self.user_id, | |
| "content": content, | |
| "tags": tags | |
| }) | |
| return True, "Context saved to permanent memory" | |
| except Exception as e: | |
| logger.error(f"Failed to save permanent context: {e}") | |
| return False, str(e) | |
| def get_permanent_context(self, limit: int = 5) -> List[str]: | |
| """Retrieve recent permanent context for this user only.""" | |
| if not self.db: | |
| return [] | |
| try: | |
| # Use database-agnostic LIMIT syntax | |
| query = """ | |
| SELECT content FROM _chatbot_permanent_memory_v2 | |
| WHERE user_id = :user_id | |
| ORDER BY created_at DESC LIMIT :limit | |
| """ | |
| rows = self.db.execute_query(query, { | |
| "user_id": self.user_id, | |
| "limit": limit | |
| }) | |
| return [row['content'] for row in rows] | |
| except Exception as e: | |
| logger.warning(f"Failed to load permanent context: {e}") | |
| return [] | |
| def get_messages(self, limit: Optional[int] = None) -> List[Dict[str, str]]: | |
| """Get messages for LLM context.""" | |
| msgs = self.messages if limit is None else self.messages[-limit:] | |
| return [m.to_dict() for m in msgs] | |
| def get_context_messages(self, count: int = 5) -> List[Dict[str, str]]: | |
| """Get recent messages plus permanent context for injection.""" | |
| # Get short-term session messages | |
| context = self.get_messages(limit=count) | |
| # Inject permanent memory if available | |
| perm_docs = self.get_permanent_context(limit=3) | |
| if perm_docs: | |
| perm_context = f"IMPORTANT CONTEXT FOR USER '{self.user_id}':\n" + "\n".join(perm_docs) | |
| # Add as a system note at the start | |
| context.insert(0, {"role": "system", "content": perm_context}) | |
| return context | |
| def clear(self): | |
| """Clear current session memory and remove from DB (temporary history).""" | |
| self.messages = [] | |
| if self.db: | |
| try: | |
| # Delete temporary messages for this session | |
| query = "DELETE FROM _chatbot_memory WHERE session_id = :session_id" | |
| self.db.execute_write(query, {"session_id": self.session_id}) | |
| logger.info(f"Cleared session memory for {self.session_id}") | |
| except Exception as e: | |
| logger.warning(f"Failed to clear memory from DB: {e}") | |
| def clear_user_history(self): | |
| """Clear ALL temporary history for this user (across all sessions).""" | |
| self.messages = [] | |
| if self.db: | |
| try: | |
| query = "DELETE FROM _chatbot_memory WHERE user_id = :user_id" | |
| self.db.execute_write(query, {"user_id": self.user_id}) | |
| logger.info(f"Cleared all temporary history for user: {self.user_id}") | |
| except Exception as e: | |
| logger.warning(f"Failed to clear user history from DB: {e}") | |
| class ConversationSummaryMemory: | |
| """ | |
| Per-user conversation summary memory using LLM for summarization. | |
| This class maintains a running summary of the conversation, updating it | |
| periodically (when message count exceeds threshold). This dramatically | |
| reduces token usage while preserving context for long conversations. | |
| Features: | |
| - Automatic summarization when threshold is reached | |
| - Per-user summary storage in database | |
| - Combines summary + recent messages for optimal context | |
| - Lazy summarization (only when needed) | |
| """ | |
| SUMMARIZATION_PROMPT = """You are a conversation summarizer. Create a concise summary of the conversation below that captures: | |
| 1. Key topics discussed | |
| 2. Important facts or preferences mentioned by the user | |
| 3. Any decisions or conclusions reached | |
| 4. Context needed for follow-up questions | |
| Keep the summary under 300 words but include all important details. | |
| CONVERSATION: | |
| {conversation} | |
| SUMMARY:""" | |
| INCREMENTAL_SUMMARY_PROMPT = """You are a conversation summarizer. Update the existing summary to incorporate new messages. | |
| EXISTING SUMMARY: | |
| {existing_summary} | |
| NEW MESSAGES: | |
| {new_messages} | |
| Create an updated, comprehensive summary that: | |
| 1. Incorporates new information from the recent messages | |
| 2. Retains important context from the existing summary | |
| 3. Removes redundant or outdated information | |
| 4. Stays under 300 words | |
| UPDATED SUMMARY:""" | |
| def __init__( | |
| self, | |
| user_id: str, | |
| session_id: str, | |
| db_connection=None, | |
| llm_client=None, | |
| summary_threshold: int = 10, # Summarize every N messages | |
| recent_messages_count: int = 5 # Keep this many recent messages verbatim | |
| ): | |
| self.user_id = user_id | |
| self.session_id = session_id | |
| self.db = db_connection | |
| self.llm = llm_client | |
| self.summary_threshold = summary_threshold | |
| self.recent_messages_count = recent_messages_count | |
| self._db_type = None | |
| self._cached_summary: Optional[str] = None | |
| self._messages_since_summary: int = 0 | |
| if self.db: | |
| self._db_type = self.db.db_type.value | |
| self._ensure_tables() | |
| self._load_state() | |
| def _ensure_tables(self): | |
| """Create summary table if it doesn't exist.""" | |
| try: | |
| ddl = get_summary_table_ddl(self._db_type) | |
| self.db.execute_write(ddl) | |
| except Exception as e: | |
| logger.warning(f"Failed to create summary table: {e}") | |
| def _load_state(self): | |
| """Load existing summary state from database (per-user, not per-session).""" | |
| try: | |
| query = """ | |
| SELECT summary, message_count FROM _chatbot_user_summaries | |
| WHERE user_id = :user_id | |
| """ | |
| rows = self.db.execute_query(query, { | |
| "user_id": self.user_id | |
| }) | |
| if rows: | |
| self._cached_summary = rows[0].get('summary') | |
| self._messages_since_summary = 0 # Reset since we loaded | |
| logger.debug(f"Loaded summary for user {self.user_id}") | |
| except Exception as e: | |
| logger.warning(f"Failed to load summary state: {e}") | |
| def set_llm_client(self, llm_client): | |
| """Set the LLM client for summarization.""" | |
| self.llm = llm_client | |
| def on_message_added(self, message_count: int): | |
| """ | |
| Called after a message is added to track when to summarize. | |
| Args: | |
| message_count: Current total number of messages in the conversation | |
| """ | |
| self._messages_since_summary += 1 | |
| # Check if we should summarize | |
| if self._messages_since_summary >= self.summary_threshold: | |
| self._trigger_summarization() | |
| def _trigger_summarization(self): | |
| """Trigger summarization of the conversation.""" | |
| if not self.llm: | |
| logger.warning("Cannot summarize: No LLM client configured") | |
| return | |
| if not self.db: | |
| logger.warning("Cannot summarize: No database connection") | |
| return | |
| try: | |
| # Get messages that need to be summarized | |
| query = """ | |
| SELECT role, content FROM _chatbot_memory | |
| WHERE user_id = :user_id AND session_id = :session_id | |
| ORDER BY created_at ASC | |
| """ | |
| rows = self.db.execute_query(query, { | |
| "user_id": self.user_id, | |
| "session_id": self.session_id | |
| }) | |
| if not rows: | |
| return | |
| # Format conversation for summarization | |
| conversation_text = self._format_messages_for_summary(rows) | |
| # Generate summary | |
| if self._cached_summary: | |
| # Incremental update | |
| prompt = self.INCREMENTAL_SUMMARY_PROMPT.format( | |
| existing_summary=self._cached_summary, | |
| new_messages=conversation_text | |
| ) | |
| else: | |
| # Fresh summary | |
| prompt = self.SUMMARIZATION_PROMPT.format(conversation=conversation_text) | |
| messages = [ | |
| {"role": "system", "content": "You are a helpful assistant that creates concise conversation summaries."}, | |
| {"role": "user", "content": prompt} | |
| ] | |
| summary = self.llm.chat(messages) | |
| # Save to database | |
| self._save_summary(summary, len(rows)) | |
| self._cached_summary = summary | |
| self._messages_since_summary = 0 | |
| logger.info(f"Generated summary for user {self.user_id}") | |
| except Exception as e: | |
| logger.error(f"Summarization failed: {e}") | |
| def _format_messages_for_summary(self, messages: List[Dict]) -> str: | |
| """Format messages as text for summarization.""" | |
| lines = [] | |
| for msg in messages: | |
| role = msg.get('role', 'unknown').upper() | |
| content = msg.get('content', '') | |
| lines.append(f"{role}: {content}") | |
| return "\n\n".join(lines) | |
| def _save_summary(self, summary: str, message_count: int): | |
| """Save or update summary in database (per-user).""" | |
| try: | |
| query = get_upsert_summary_query(self._db_type) | |
| self.db.execute_write(query, { | |
| "user_id": self.user_id, | |
| "summary": summary, | |
| "message_count": message_count | |
| }) | |
| except Exception as e: | |
| logger.error(f"Failed to save summary: {e}") | |
| def get_summary(self) -> Optional[str]: | |
| """Get the current conversation summary.""" | |
| return self._cached_summary | |
| def get_context_for_llm(self, recent_messages: List[Dict[str, str]]) -> List[Dict[str, str]]: | |
| """ | |
| Get optimized context for LLM calls. | |
| Combines the summary (if available) with recent messages for optimal | |
| token usage while maintaining context. | |
| Args: | |
| recent_messages: List of recent messages to include verbatim | |
| Returns: | |
| List of messages with summary prepended as system context | |
| """ | |
| context_messages = [] | |
| # Add summary as system context if available | |
| if self._cached_summary: | |
| summary_context = f"""CONVERSATION SUMMARY (previous context): | |
| {self._cached_summary} | |
| Use this summary to understand the conversation history and context for follow-up questions.""" | |
| context_messages.append({ | |
| "role": "system", | |
| "content": summary_context | |
| }) | |
| # Add recent messages verbatim | |
| context_messages.extend(recent_messages[-self.recent_messages_count:]) | |
| return context_messages | |
| def force_summarize(self): | |
| """Force immediate summarization regardless of threshold.""" | |
| self._trigger_summarization() | |
| def clear_summary(self): | |
| """Clear the summary for this user.""" | |
| self._cached_summary = None | |
| self._messages_since_summary = 0 | |
| if self.db: | |
| try: | |
| query = "DELETE FROM _chatbot_user_summaries WHERE user_id = :user_id" | |
| self.db.execute_write(query, { | |
| "user_id": self.user_id | |
| }) | |
| logger.info(f"Cleared summary for user: {self.user_id}") | |
| except Exception as e: | |
| logger.warning(f"Failed to clear summary: {e}") | |
| def clear_all_user_summaries(self): | |
| """Clear all summaries for this user (alias for clear_summary since it's now per-user).""" | |
| self.clear_summary() | |
| class EnhancedChatMemory(ChatMemory): | |
| """ | |
| Enhanced ChatMemory with integrated conversation summarization. | |
| Combines the standard ChatMemory functionality with ConversationSummaryMemory | |
| for automatic summarization and optimized context retrieval. | |
| """ | |
| def __init__( | |
| self, | |
| session_id: str, | |
| user_id: str = "default", | |
| max_messages: int = 20, | |
| db_connection=None, | |
| llm_client=None, | |
| enable_summarization: bool = True, | |
| summary_threshold: int = 10 | |
| ): | |
| super().__init__(session_id, user_id, max_messages, db_connection) | |
| self.enable_summarization = enable_summarization | |
| self.summary_memory: Optional[ConversationSummaryMemory] = None | |
| if enable_summarization: | |
| self.summary_memory = ConversationSummaryMemory( | |
| user_id=user_id, | |
| session_id=session_id, | |
| db_connection=db_connection, | |
| llm_client=llm_client, | |
| summary_threshold=summary_threshold | |
| ) | |
| def set_llm_client(self, llm_client): | |
| """Set the LLM client for summarization.""" | |
| if self.summary_memory: | |
| self.summary_memory.set_llm_client(llm_client) | |
| def add_message(self, role: str, content: str, metadata: Dict = None): | |
| """Add a message and trigger summarization check.""" | |
| super().add_message(role, content, metadata) | |
| # Notify summary memory of new message | |
| if self.summary_memory: | |
| self.summary_memory.on_message_added(len(self.messages)) | |
| def get_context_messages(self, count: int = 5) -> List[Dict[str, str]]: | |
| """ | |
| Get context messages with summary integration. | |
| If summarization is enabled and a summary exists, it will be | |
| prepended to provide historical context while keeping recent | |
| messages verbatim. | |
| """ | |
| # Get base context from parent | |
| base_context = super().get_context_messages(count) | |
| # If summarization is enabled, use summary-enhanced context | |
| if self.summary_memory and self.summary_memory.get_summary(): | |
| # Filter out system messages from base context (we'll add summary separately) | |
| filtered = [m for m in base_context if m.get("role") != "system"] | |
| # Get summary-enhanced context | |
| enhanced = self.summary_memory.get_context_for_llm(filtered) | |
| # Re-add permanent memory context if it was present | |
| for msg in base_context: | |
| if msg.get("role") == "system" and "IMPORTANT CONTEXT" in msg.get("content", ""): | |
| enhanced.insert(0, msg) | |
| return enhanced | |
| return base_context | |
| def get_summary(self) -> Optional[str]: | |
| """Get the current conversation summary.""" | |
| if self.summary_memory: | |
| return self.summary_memory.get_summary() | |
| return None | |
| def force_summarize(self): | |
| """Force immediate summarization.""" | |
| if self.summary_memory: | |
| self.summary_memory.force_summarize() | |
| def clear(self): | |
| """Clear session memory but KEEP the summary (long-term memory).""" | |
| super().clear() | |
| # NOTE: Summary is intentionally NOT cleared here | |
| # Summary acts as long-term memory that persists across chat sessions | |
| def clear_with_summary(self): | |
| """Clear session memory AND the summary (full reset).""" | |
| super().clear() | |
| if self.summary_memory: | |
| self.summary_memory.clear_summary() | |
| def clear_user_history(self): | |
| """Clear all user temp history but KEEP summaries.""" | |
| super().clear_user_history() | |
| # NOTE: Summaries are intentionally NOT cleared | |
| # They persist as long-term memory for the user | |
| def clear_all_including_summaries(self): | |
| """Clear ALL user data including summaries (complete wipe).""" | |
| super().clear_user_history() | |
| if self.summary_memory: | |
| self.summary_memory.clear_all_user_summaries() | |
| def create_memory(session_id: str, user_id: str = "default", max_messages: int = 20) -> ChatMemory: | |
| """Create a standard ChatMemory instance.""" | |
| from database import get_db | |
| db = get_db() | |
| return ChatMemory(session_id=session_id, user_id=user_id, max_messages=max_messages, db_connection=db) | |
| def create_enhanced_memory( | |
| session_id: str, | |
| user_id: str = "default", | |
| max_messages: int = 20, | |
| llm_client=None, | |
| enable_summarization: bool = True, | |
| summary_threshold: int = 10 | |
| ) -> EnhancedChatMemory: | |
| """ | |
| Create an EnhancedChatMemory with summarization support. | |
| Args: | |
| session_id: Unique session identifier | |
| user_id: User identifier for per-user memory isolation | |
| max_messages: Maximum messages to keep in short-term memory | |
| llm_client: LLM client for summarization (can be set later) | |
| enable_summarization: Whether to enable automatic summarization | |
| summary_threshold: Summarize after this many messages | |
| Returns: | |
| EnhancedChatMemory instance with summarization capabilities | |
| """ | |
| from database import get_db | |
| db = get_db() | |
| return EnhancedChatMemory( | |
| session_id=session_id, | |
| user_id=user_id, | |
| max_messages=max_messages, | |
| db_connection=db, | |
| llm_client=llm_client, | |
| enable_summarization=enable_summarization, | |
| summary_threshold=summary_threshold | |
| ) | |