SPOC_V1 / context_manager.py
JatinAutonomousLabs's picture
Create context_manager.py
1af1949 verified
raw
history blame
13.3 kB
"""
context_manager.py - Conversation Context Management
=====================================================
Maintains conversation history and context across user exchanges.
Enables proper follow-up handling and context-aware responses.
Author: AI Lab Team
Last Updated: 2025-10-10
Version: 1.0
"""
import json
import os
from datetime import datetime
from typing import List, Dict, Any, Optional
from logging_config import get_logger
# Import config
try:
import graph_config as cfg
except ImportError:
# Fallback defaults if config not available
class cfg:
MAX_CONVERSATION_HISTORY = 10
CONTEXT_TOKEN_LIMIT = 4000
AUTO_SUMMARIZE_LONG_CONVERSATIONS = True
SUMMARIZE_AFTER_EXCHANGES = 5
FOLLOW_UP_KEYWORDS = ['also', 'now', 'then', 'add']
REFERENCE_PRONOUNS = ['it', 'that', 'this']
log = get_logger(__name__)
class ConversationContextManager:
"""
Manages conversation context across user exchanges.
Features:
- Tracks conversation history
- Detects follow-up requests
- Maintains artifact references
- Auto-summarizes long conversations
- Provides context for LLM prompts
"""
def __init__(self, storage_path: str = "outputs/conversations"):
"""
Initialize context manager.
Args:
storage_path: Directory to store conversation data
"""
self.storage_path = storage_path
self.conversations = {} # session_id -> context
# Create storage directory
os.makedirs(storage_path, exist_ok=True)
log.info("Context Manager initialized")
def add_exchange(self, session_id: str, user_message: str,
assistant_response: str, artifacts: List[str] = None,
metadata: Dict[str, Any] = None):
"""
Add a user-assistant exchange to conversation history.
Args:
session_id: Unique session identifier
user_message: User's input message
assistant_response: Assistant's response
artifacts: List of artifact filenames created
metadata: Additional metadata (tier, cost, etc.)
"""
if session_id not in self.conversations:
self.conversations[session_id] = {
"session_id": session_id,
"started_at": datetime.utcnow().isoformat(),
"exchanges": [],
"artifacts_created": [],
"summary": None,
"total_cost": 0.0
}
context = self.conversations[session_id]
# Add exchange
exchange = {
"user": user_message,
"assistant": assistant_response[:1000], # Truncate long responses
"timestamp": datetime.utcnow().isoformat(),
"artifacts": artifacts or [],
"metadata": metadata or {}
}
context["exchanges"].append(exchange)
# Track artifacts
if artifacts:
context["artifacts_created"].extend(artifacts)
# Track cost
if metadata and "cost" in metadata:
context["total_cost"] += metadata["cost"]
# Auto-summarize if needed
if (cfg.AUTO_SUMMARIZE_LONG_CONVERSATIONS and
len(context["exchanges"]) % cfg.SUMMARIZE_AFTER_EXCHANGES == 0):
context["summary"] = self._generate_summary(context["exchanges"])
log.info(f"📝 Auto-summarized conversation: {session_id}")
# Persist to disk
self._save_conversation(session_id)
log.info(f"💬 Exchange added: {session_id} ({len(context['exchanges'])} total)")
def get_context(self, session_id: str, current_input: str = "") -> Dict[str, Any]:
"""
Get conversation context for current request.
Args:
session_id: Session identifier
current_input: Current user input (for follow-up detection)
Returns:
Dict with context information:
- is_follow_up: bool
- context: str (formatted context)
- artifacts_context: str
- previous_artifacts: List[str]
- exchange_count: int
"""
if session_id not in self.conversations:
return {
"is_follow_up": False,
"context": "",
"artifacts_context": "",
"previous_artifacts": [],
"exchange_count": 0
}
context = self.conversations[session_id]
# Detect follow-up
is_follow_up = self._is_follow_up(current_input) if current_input else False
# Build context string
context_str = self._build_context_string(context)
# Build artifacts context
artifacts_str = self._build_artifacts_context(context)
return {
"is_follow_up": is_follow_up,
"context": context_str,
"artifacts_context": artifacts_str,
"previous_artifacts": context["artifacts_created"],
"exchange_count": len(context["exchanges"]),
"total_cost": context.get("total_cost", 0.0),
"session_started": context.get("started_at")
}
def _is_follow_up(self, user_input: str) -> bool:
"""
Detect if input is a follow-up to previous conversation.
Args:
user_input: User's current input
Returns:
True if follow-up detected
"""
text_lower = user_input.lower()
words = text_lower.split()
# Check for follow-up keywords
has_follow_up_keyword = any(
kw in text_lower for kw in cfg.FOLLOW_UP_KEYWORDS
)
# Check for pronouns referencing previous context
has_reference_pronoun = any(
word in words for word in cfg.REFERENCE_PRONOUNS
)
# Short messages are often follow-ups
is_short = len(words) < 10
# Follow-up if has keywords OR (has pronouns AND short)
return has_follow_up_keyword or (has_reference_pronoun and is_short)
def _build_context_string(self, context: Dict) -> str:
"""
Build formatted context string for LLM.
Args:
context: Conversation context dict
Returns:
Formatted context string
"""
# Use summary for long conversations
if context.get("summary"):
context_str = f"=== CONVERSATION SUMMARY ===\n{context['summary']}\n\n"
else:
context_str = ""
# Add recent exchanges
recent_exchanges = context["exchanges"][-cfg.MAX_CONVERSATION_HISTORY:]
if recent_exchanges:
context_str += "=== RECENT CONVERSATION ===\n"
for exchange in recent_exchanges:
context_str += f"\nUser: {exchange['user']}\n"
# Truncate assistant response
response_preview = exchange['assistant'][:300]
if len(exchange['assistant']) > 300:
response_preview += "..."
context_str += f"Assistant: {response_preview}\n"
return context_str
def _build_artifacts_context(self, context: Dict) -> str:
"""
Build artifacts reference string.
Args:
context: Conversation context dict
Returns:
Formatted artifacts context
"""
artifacts = context.get("artifacts_created", [])
if not artifacts:
return ""
artifacts_str = "=== ARTIFACTS CREATED IN THIS CONVERSATION ===\n"
# Show last 5 artifacts
for artifact in artifacts[-5:]:
artifacts_str += f"- {artifact}\n"
if len(artifacts) > 5:
artifacts_str += f"... and {len(artifacts) - 5} more\n"
return artifacts_str
def _generate_summary(self, exchanges: List[Dict]) -> str:
"""
Generate conversation summary using LLM.
Args:
exchanges: List of conversation exchanges
Returns:
Summary string
"""
try:
# Import LLM (lazy import to avoid circular dependency)
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
# Build summary prompt
recent_exchanges = exchanges[-10:] # Last 10 exchanges
exchanges_text = ""
for ex in recent_exchanges:
exchanges_text += f"User: {ex['user']}\n"
exchanges_text += f"Assistant: {ex['assistant'][:200]}...\n\n"
prompt = f"""Summarize this conversation in 3-4 sentences.
Focus on: what the user wanted, what was created, and current state.
CONVERSATION:
{exchanges_text}
SUMMARY (3-4 sentences):"""
response = llm.invoke(prompt)
summary = getattr(response, "content", "")[:500]
return summary
except Exception as e:
log.warning(f"Summary generation failed: {e}")
return "Previous conversation context available."
def _save_conversation(self, session_id: str):
"""
Persist conversation to disk.
Args:
session_id: Session identifier
"""
if session_id not in self.conversations:
return
filepath = os.path.join(self.storage_path, f"{session_id}.json")
try:
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(self.conversations[session_id], f, indent=2)
except Exception as e:
log.error(f"Failed to save conversation {session_id}: {e}")
def load_conversation(self, session_id: str) -> bool:
"""
Load conversation from disk.
Args:
session_id: Session identifier
Returns:
True if loaded successfully
"""
filepath = os.path.join(self.storage_path, f"{session_id}.json")
if not os.path.exists(filepath):
return False
try:
with open(filepath, 'r', encoding='utf-8') as f:
self.conversations[session_id] = json.load(f)
log.info(f"📂 Conversation loaded: {session_id}")
return True
except Exception as e:
log.error(f"Failed to load conversation {session_id}: {e}")
return False
def clear_session(self, session_id: str):
"""
Clear conversation history for session.
Args:
session_id: Session identifier
"""
if session_id in self.conversations:
del self.conversations[session_id]
# Also delete from disk
filepath = os.path.join(self.storage_path, f"{session_id}.json")
if os.path.exists(filepath):
os.remove(filepath)
log.info(f"🗑️ Context cleared: {session_id}")
def get_all_sessions(self) -> List[str]:
"""
Get list of all session IDs.
Returns:
List of session IDs
"""
# Get from memory
memory_sessions = list(self.conversations.keys())
# Get from disk
disk_sessions = []
if os.path.exists(self.storage_path):
for filename in os.listdir(self.storage_path):
if filename.endswith('.json'):
disk_sessions.append(filename[:-5]) # Remove .json
# Combine and deduplicate
all_sessions = list(set(memory_sessions + disk_sessions))
return sorted(all_sessions)
def get_session_summary(self, session_id: str) -> Dict[str, Any]:
"""
Get summary information for a session.
Args:
session_id: Session identifier
Returns:
Dict with session summary
"""
if session_id not in self.conversations:
if not self.load_conversation(session_id):
return {}
context = self.conversations[session_id]
return {
"session_id": session_id,
"started_at": context.get("started_at"),
"exchange_count": len(context.get("exchanges", [])),
"artifacts_count": len(context.get("artifacts_created", [])),
"total_cost": context.get("total_cost", 0.0),
"has_summary": bool(context.get("summary"))
}
# Global instance
context_manager = ConversationContextManager()
# ============================================================================
# EXPORTS
# ============================================================================
__all__ = [
'ConversationContextManager',
'context_manager'
]