""" context_manager.py - Conversation Context Management ===================================================== Maintains conversation history and context across user exchanges. Enables proper follow-up handling and context-aware responses. Author: AI Lab Team Last Updated: 2025-10-10 Version: 1.0 """ import json import os from datetime import datetime from typing import List, Dict, Any, Optional from logging_config import get_logger # Import config try: import graph_config as cfg except ImportError: # Fallback defaults if config not available class cfg: MAX_CONVERSATION_HISTORY = 10 CONTEXT_TOKEN_LIMIT = 4000 AUTO_SUMMARIZE_LONG_CONVERSATIONS = True SUMMARIZE_AFTER_EXCHANGES = 5 FOLLOW_UP_KEYWORDS = ['also', 'now', 'then', 'add'] REFERENCE_PRONOUNS = ['it', 'that', 'this'] log = get_logger(__name__) class ConversationContextManager: """ Manages conversation context across user exchanges. Features: - Tracks conversation history - Detects follow-up requests - Maintains artifact references - Auto-summarizes long conversations - Provides context for LLM prompts """ def __init__(self, storage_path: str = "outputs/conversations"): """ Initialize context manager. Args: storage_path: Directory to store conversation data """ self.storage_path = storage_path self.conversations = {} # session_id -> context # Create storage directory os.makedirs(storage_path, exist_ok=True) log.info("Context Manager initialized") def add_exchange(self, session_id: str, user_message: str, assistant_response: str, artifacts: List[str] = None, metadata: Dict[str, Any] = None): """ Add a user-assistant exchange to conversation history. Args: session_id: Unique session identifier user_message: User's input message assistant_response: Assistant's response artifacts: List of artifact filenames created metadata: Additional metadata (tier, cost, etc.) """ if session_id not in self.conversations: self.conversations[session_id] = { "session_id": session_id, "started_at": datetime.utcnow().isoformat(), "exchanges": [], "artifacts_created": [], "summary": None, "total_cost": 0.0 } context = self.conversations[session_id] # Add exchange exchange = { "user": user_message, "assistant": assistant_response[:1000], # Truncate long responses "timestamp": datetime.utcnow().isoformat(), "artifacts": artifacts or [], "metadata": metadata or {} } context["exchanges"].append(exchange) # Track artifacts if artifacts: context["artifacts_created"].extend(artifacts) # Track cost if metadata and "cost" in metadata: context["total_cost"] += metadata["cost"] # Auto-summarize if needed if (cfg.AUTO_SUMMARIZE_LONG_CONVERSATIONS and len(context["exchanges"]) % cfg.SUMMARIZE_AFTER_EXCHANGES == 0): context["summary"] = self._generate_summary(context["exchanges"]) log.info(f"📝 Auto-summarized conversation: {session_id}") # Persist to disk self._save_conversation(session_id) log.info(f"💬 Exchange added: {session_id} ({len(context['exchanges'])} total)") def get_context(self, session_id: str, current_input: str = "") -> Dict[str, Any]: """ Get conversation context for current request. Args: session_id: Session identifier current_input: Current user input (for follow-up detection) Returns: Dict with context information: - is_follow_up: bool - context: str (formatted context) - artifacts_context: str - previous_artifacts: List[str] - exchange_count: int """ if session_id not in self.conversations: return { "is_follow_up": False, "context": "", "artifacts_context": "", "previous_artifacts": [], "exchange_count": 0 } context = self.conversations[session_id] # Detect follow-up is_follow_up = self._is_follow_up(current_input) if current_input else False # Build context string context_str = self._build_context_string(context) # Build artifacts context artifacts_str = self._build_artifacts_context(context) return { "is_follow_up": is_follow_up, "context": context_str, "artifacts_context": artifacts_str, "previous_artifacts": context["artifacts_created"], "exchange_count": len(context["exchanges"]), "total_cost": context.get("total_cost", 0.0), "session_started": context.get("started_at") } def _is_follow_up(self, user_input: str) -> bool: """ Detect if input is a follow-up to previous conversation. Args: user_input: User's current input Returns: True if follow-up detected """ text_lower = user_input.lower() words = text_lower.split() # Check for follow-up keywords has_follow_up_keyword = any( kw in text_lower for kw in cfg.FOLLOW_UP_KEYWORDS ) # Check for pronouns referencing previous context has_reference_pronoun = any( word in words for word in cfg.REFERENCE_PRONOUNS ) # Short messages are often follow-ups is_short = len(words) < 10 # Follow-up if has keywords OR (has pronouns AND short) return has_follow_up_keyword or (has_reference_pronoun and is_short) def _build_context_string(self, context: Dict) -> str: """ Build formatted context string for LLM. Args: context: Conversation context dict Returns: Formatted context string """ # Use summary for long conversations if context.get("summary"): context_str = f"=== CONVERSATION SUMMARY ===\n{context['summary']}\n\n" else: context_str = "" # Add recent exchanges recent_exchanges = context["exchanges"][-cfg.MAX_CONVERSATION_HISTORY:] if recent_exchanges: context_str += "=== RECENT CONVERSATION ===\n" for exchange in recent_exchanges: context_str += f"\nUser: {exchange['user']}\n" # Truncate assistant response response_preview = exchange['assistant'][:300] if len(exchange['assistant']) > 300: response_preview += "..." context_str += f"Assistant: {response_preview}\n" return context_str def _build_artifacts_context(self, context: Dict) -> str: """ Build artifacts reference string. Args: context: Conversation context dict Returns: Formatted artifacts context """ artifacts = context.get("artifacts_created", []) if not artifacts: return "" artifacts_str = "=== ARTIFACTS CREATED IN THIS CONVERSATION ===\n" # Show last 5 artifacts for artifact in artifacts[-5:]: artifacts_str += f"- {artifact}\n" if len(artifacts) > 5: artifacts_str += f"... and {len(artifacts) - 5} more\n" return artifacts_str def _generate_summary(self, exchanges: List[Dict]) -> str: """ Generate conversation summary using LLM. Args: exchanges: List of conversation exchanges Returns: Summary string """ try: # Import LLM (lazy import to avoid circular dependency) from langchain_openai import ChatOpenAI llm = ChatOpenAI(model="gpt-4o", temperature=0.3) # Build summary prompt recent_exchanges = exchanges[-10:] # Last 10 exchanges exchanges_text = "" for ex in recent_exchanges: exchanges_text += f"User: {ex['user']}\n" exchanges_text += f"Assistant: {ex['assistant'][:200]}...\n\n" prompt = f"""Summarize this conversation in 3-4 sentences. Focus on: what the user wanted, what was created, and current state. CONVERSATION: {exchanges_text} SUMMARY (3-4 sentences):""" response = llm.invoke(prompt) summary = getattr(response, "content", "")[:500] return summary except Exception as e: log.warning(f"Summary generation failed: {e}") return "Previous conversation context available." def _save_conversation(self, session_id: str): """ Persist conversation to disk. Args: session_id: Session identifier """ if session_id not in self.conversations: return filepath = os.path.join(self.storage_path, f"{session_id}.json") try: with open(filepath, 'w', encoding='utf-8') as f: json.dump(self.conversations[session_id], f, indent=2) except Exception as e: log.error(f"Failed to save conversation {session_id}: {e}") def load_conversation(self, session_id: str) -> bool: """ Load conversation from disk. Args: session_id: Session identifier Returns: True if loaded successfully """ filepath = os.path.join(self.storage_path, f"{session_id}.json") if not os.path.exists(filepath): return False try: with open(filepath, 'r', encoding='utf-8') as f: self.conversations[session_id] = json.load(f) log.info(f"📂 Conversation loaded: {session_id}") return True except Exception as e: log.error(f"Failed to load conversation {session_id}: {e}") return False def clear_session(self, session_id: str): """ Clear conversation history for session. Args: session_id: Session identifier """ if session_id in self.conversations: del self.conversations[session_id] # Also delete from disk filepath = os.path.join(self.storage_path, f"{session_id}.json") if os.path.exists(filepath): os.remove(filepath) log.info(f"🗑️ Context cleared: {session_id}") def get_all_sessions(self) -> List[str]: """ Get list of all session IDs. Returns: List of session IDs """ # Get from memory memory_sessions = list(self.conversations.keys()) # Get from disk disk_sessions = [] if os.path.exists(self.storage_path): for filename in os.listdir(self.storage_path): if filename.endswith('.json'): disk_sessions.append(filename[:-5]) # Remove .json # Combine and deduplicate all_sessions = list(set(memory_sessions + disk_sessions)) return sorted(all_sessions) def get_session_summary(self, session_id: str) -> Dict[str, Any]: """ Get summary information for a session. Args: session_id: Session identifier Returns: Dict with session summary """ if session_id not in self.conversations: if not self.load_conversation(session_id): return {} context = self.conversations[session_id] return { "session_id": session_id, "started_at": context.get("started_at"), "exchange_count": len(context.get("exchanges", [])), "artifacts_count": len(context.get("artifacts_created", [])), "total_cost": context.get("total_cost", 0.0), "has_summary": bool(context.get("summary")) } # Global instance context_manager = ConversationContextManager() # ============================================================================ # EXPORTS # ============================================================================ __all__ = [ 'ConversationContextManager', 'context_manager' ]