Spaces:
Paused
Paused
| """ | |
| context_manager.py - Conversation Context Management | |
| ===================================================== | |
| Maintains conversation history and context across user exchanges. | |
| Enables proper follow-up handling and context-aware responses. | |
| Author: AI Lab Team | |
| Last Updated: 2025-10-10 | |
| Version: 1.0 | |
| """ | |
| import json | |
| import os | |
| from datetime import datetime | |
| from typing import List, Dict, Any, Optional | |
| from logging_config import get_logger | |
| # Import config | |
| try: | |
| import graph_config as cfg | |
| except ImportError: | |
| # Fallback defaults if config not available | |
| class cfg: | |
| MAX_CONVERSATION_HISTORY = 10 | |
| CONTEXT_TOKEN_LIMIT = 4000 | |
| AUTO_SUMMARIZE_LONG_CONVERSATIONS = True | |
| SUMMARIZE_AFTER_EXCHANGES = 5 | |
| FOLLOW_UP_KEYWORDS = ['also', 'now', 'then', 'add'] | |
| REFERENCE_PRONOUNS = ['it', 'that', 'this'] | |
| log = get_logger(__name__) | |
| class ConversationContextManager: | |
| """ | |
| Manages conversation context across user exchanges. | |
| Features: | |
| - Tracks conversation history | |
| - Detects follow-up requests | |
| - Maintains artifact references | |
| - Auto-summarizes long conversations | |
| - Provides context for LLM prompts | |
| """ | |
| def __init__(self, storage_path: str = "outputs/conversations"): | |
| """ | |
| Initialize context manager. | |
| Args: | |
| storage_path: Directory to store conversation data | |
| """ | |
| self.storage_path = storage_path | |
| self.conversations = {} # session_id -> context | |
| # Create storage directory | |
| os.makedirs(storage_path, exist_ok=True) | |
| log.info("Context Manager initialized") | |
| def add_exchange(self, session_id: str, user_message: str, | |
| assistant_response: str, artifacts: List[str] = None, | |
| metadata: Dict[str, Any] = None): | |
| """ | |
| Add a user-assistant exchange to conversation history. | |
| Args: | |
| session_id: Unique session identifier | |
| user_message: User's input message | |
| assistant_response: Assistant's response | |
| artifacts: List of artifact filenames created | |
| metadata: Additional metadata (tier, cost, etc.) | |
| """ | |
| if session_id not in self.conversations: | |
| self.conversations[session_id] = { | |
| "session_id": session_id, | |
| "started_at": datetime.utcnow().isoformat(), | |
| "exchanges": [], | |
| "artifacts_created": [], | |
| "summary": None, | |
| "total_cost": 0.0 | |
| } | |
| context = self.conversations[session_id] | |
| # Add exchange | |
| exchange = { | |
| "user": user_message, | |
| "assistant": assistant_response[:1000], # Truncate long responses | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "artifacts": artifacts or [], | |
| "metadata": metadata or {} | |
| } | |
| context["exchanges"].append(exchange) | |
| # Track artifacts | |
| if artifacts: | |
| context["artifacts_created"].extend(artifacts) | |
| # Track cost | |
| if metadata and "cost" in metadata: | |
| context["total_cost"] += metadata["cost"] | |
| # Auto-summarize if needed | |
| if (cfg.AUTO_SUMMARIZE_LONG_CONVERSATIONS and | |
| len(context["exchanges"]) % cfg.SUMMARIZE_AFTER_EXCHANGES == 0): | |
| context["summary"] = self._generate_summary(context["exchanges"]) | |
| log.info(f"📝 Auto-summarized conversation: {session_id}") | |
| # Persist to disk | |
| self._save_conversation(session_id) | |
| log.info(f"💬 Exchange added: {session_id} ({len(context['exchanges'])} total)") | |
| def get_context(self, session_id: str, current_input: str = "") -> Dict[str, Any]: | |
| """ | |
| Get conversation context for current request. | |
| Args: | |
| session_id: Session identifier | |
| current_input: Current user input (for follow-up detection) | |
| Returns: | |
| Dict with context information: | |
| - is_follow_up: bool | |
| - context: str (formatted context) | |
| - artifacts_context: str | |
| - previous_artifacts: List[str] | |
| - exchange_count: int | |
| """ | |
| if session_id not in self.conversations: | |
| return { | |
| "is_follow_up": False, | |
| "context": "", | |
| "artifacts_context": "", | |
| "previous_artifacts": [], | |
| "exchange_count": 0 | |
| } | |
| context = self.conversations[session_id] | |
| # Detect follow-up | |
| is_follow_up = self._is_follow_up(current_input) if current_input else False | |
| # Build context string | |
| context_str = self._build_context_string(context) | |
| # Build artifacts context | |
| artifacts_str = self._build_artifacts_context(context) | |
| return { | |
| "is_follow_up": is_follow_up, | |
| "context": context_str, | |
| "artifacts_context": artifacts_str, | |
| "previous_artifacts": context["artifacts_created"], | |
| "exchange_count": len(context["exchanges"]), | |
| "total_cost": context.get("total_cost", 0.0), | |
| "session_started": context.get("started_at") | |
| } | |
| def _is_follow_up(self, user_input: str) -> bool: | |
| """ | |
| Detect if input is a follow-up to previous conversation. | |
| Args: | |
| user_input: User's current input | |
| Returns: | |
| True if follow-up detected | |
| """ | |
| text_lower = user_input.lower() | |
| words = text_lower.split() | |
| # Check for follow-up keywords | |
| has_follow_up_keyword = any( | |
| kw in text_lower for kw in cfg.FOLLOW_UP_KEYWORDS | |
| ) | |
| # Check for pronouns referencing previous context | |
| has_reference_pronoun = any( | |
| word in words for word in cfg.REFERENCE_PRONOUNS | |
| ) | |
| # Short messages are often follow-ups | |
| is_short = len(words) < 10 | |
| # Follow-up if has keywords OR (has pronouns AND short) | |
| return has_follow_up_keyword or (has_reference_pronoun and is_short) | |
| def _build_context_string(self, context: Dict) -> str: | |
| """ | |
| Build formatted context string for LLM. | |
| Args: | |
| context: Conversation context dict | |
| Returns: | |
| Formatted context string | |
| """ | |
| # Use summary for long conversations | |
| if context.get("summary"): | |
| context_str = f"=== CONVERSATION SUMMARY ===\n{context['summary']}\n\n" | |
| else: | |
| context_str = "" | |
| # Add recent exchanges | |
| recent_exchanges = context["exchanges"][-cfg.MAX_CONVERSATION_HISTORY:] | |
| if recent_exchanges: | |
| context_str += "=== RECENT CONVERSATION ===\n" | |
| for exchange in recent_exchanges: | |
| context_str += f"\nUser: {exchange['user']}\n" | |
| # Truncate assistant response | |
| response_preview = exchange['assistant'][:300] | |
| if len(exchange['assistant']) > 300: | |
| response_preview += "..." | |
| context_str += f"Assistant: {response_preview}\n" | |
| return context_str | |
| def _build_artifacts_context(self, context: Dict) -> str: | |
| """ | |
| Build artifacts reference string. | |
| Args: | |
| context: Conversation context dict | |
| Returns: | |
| Formatted artifacts context | |
| """ | |
| artifacts = context.get("artifacts_created", []) | |
| if not artifacts: | |
| return "" | |
| artifacts_str = "=== ARTIFACTS CREATED IN THIS CONVERSATION ===\n" | |
| # Show last 5 artifacts | |
| for artifact in artifacts[-5:]: | |
| artifacts_str += f"- {artifact}\n" | |
| if len(artifacts) > 5: | |
| artifacts_str += f"... and {len(artifacts) - 5} more\n" | |
| return artifacts_str | |
| def _generate_summary(self, exchanges: List[Dict]) -> str: | |
| """ | |
| Generate conversation summary using LLM. | |
| Args: | |
| exchanges: List of conversation exchanges | |
| Returns: | |
| Summary string | |
| """ | |
| try: | |
| # Import LLM (lazy import to avoid circular dependency) | |
| from langchain_openai import ChatOpenAI | |
| llm = ChatOpenAI(model="gpt-4o", temperature=0.3) | |
| # Build summary prompt | |
| recent_exchanges = exchanges[-10:] # Last 10 exchanges | |
| exchanges_text = "" | |
| for ex in recent_exchanges: | |
| exchanges_text += f"User: {ex['user']}\n" | |
| exchanges_text += f"Assistant: {ex['assistant'][:200]}...\n\n" | |
| prompt = f"""Summarize this conversation in 3-4 sentences. | |
| Focus on: what the user wanted, what was created, and current state. | |
| CONVERSATION: | |
| {exchanges_text} | |
| SUMMARY (3-4 sentences):""" | |
| response = llm.invoke(prompt) | |
| summary = getattr(response, "content", "")[:500] | |
| return summary | |
| except Exception as e: | |
| log.warning(f"Summary generation failed: {e}") | |
| return "Previous conversation context available." | |
| def _save_conversation(self, session_id: str): | |
| """ | |
| Persist conversation to disk. | |
| Args: | |
| session_id: Session identifier | |
| """ | |
| if session_id not in self.conversations: | |
| return | |
| filepath = os.path.join(self.storage_path, f"{session_id}.json") | |
| try: | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| json.dump(self.conversations[session_id], f, indent=2) | |
| except Exception as e: | |
| log.error(f"Failed to save conversation {session_id}: {e}") | |
| def load_conversation(self, session_id: str) -> bool: | |
| """ | |
| Load conversation from disk. | |
| Args: | |
| session_id: Session identifier | |
| Returns: | |
| True if loaded successfully | |
| """ | |
| filepath = os.path.join(self.storage_path, f"{session_id}.json") | |
| if not os.path.exists(filepath): | |
| return False | |
| try: | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| self.conversations[session_id] = json.load(f) | |
| log.info(f"📂 Conversation loaded: {session_id}") | |
| return True | |
| except Exception as e: | |
| log.error(f"Failed to load conversation {session_id}: {e}") | |
| return False | |
| def clear_session(self, session_id: str): | |
| """ | |
| Clear conversation history for session. | |
| Args: | |
| session_id: Session identifier | |
| """ | |
| if session_id in self.conversations: | |
| del self.conversations[session_id] | |
| # Also delete from disk | |
| filepath = os.path.join(self.storage_path, f"{session_id}.json") | |
| if os.path.exists(filepath): | |
| os.remove(filepath) | |
| log.info(f"🗑️ Context cleared: {session_id}") | |
| def get_all_sessions(self) -> List[str]: | |
| """ | |
| Get list of all session IDs. | |
| Returns: | |
| List of session IDs | |
| """ | |
| # Get from memory | |
| memory_sessions = list(self.conversations.keys()) | |
| # Get from disk | |
| disk_sessions = [] | |
| if os.path.exists(self.storage_path): | |
| for filename in os.listdir(self.storage_path): | |
| if filename.endswith('.json'): | |
| disk_sessions.append(filename[:-5]) # Remove .json | |
| # Combine and deduplicate | |
| all_sessions = list(set(memory_sessions + disk_sessions)) | |
| return sorted(all_sessions) | |
| def get_session_summary(self, session_id: str) -> Dict[str, Any]: | |
| """ | |
| Get summary information for a session. | |
| Args: | |
| session_id: Session identifier | |
| Returns: | |
| Dict with session summary | |
| """ | |
| if session_id not in self.conversations: | |
| if not self.load_conversation(session_id): | |
| return {} | |
| context = self.conversations[session_id] | |
| return { | |
| "session_id": session_id, | |
| "started_at": context.get("started_at"), | |
| "exchange_count": len(context.get("exchanges", [])), | |
| "artifacts_count": len(context.get("artifacts_created", [])), | |
| "total_cost": context.get("total_cost", 0.0), | |
| "has_summary": bool(context.get("summary")) | |
| } | |
| # Global instance | |
| context_manager = ConversationContextManager() | |
| # ============================================================================ | |
| # EXPORTS | |
| # ============================================================================ | |
| __all__ = [ | |
| 'ConversationContextManager', | |
| 'context_manager' | |
| ] |