tillu-daemon / app /memory /conversation_buffer.py
tillu-AI's picture
upload app/memory/conversation_buffer.py
9e5b4c1 verified
"""
ConversationBuffer - In-memory conversation buffer for active sessions
"""
from typing import List, Dict, Any, Optional
from collections import deque
from datetime import datetime
from app.utils.logging import get_logger
logger = get_logger("conversation_buffer")
class ConversationBuffer:
"""
In-memory buffer for active conversation sessions.
Provides O(1) access to recent messages.
"""
_buffers: Dict[str, deque] = {} # session_id -> deque of messages
_max_size: int = 100
@classmethod
def add_message(
cls,
session_id: str,
role: str,
content: str,
metadata: Optional[Dict[str, Any]] = None
) -> None:
"""
Add a message to the buffer.
Args:
session_id: Unique session identifier
role: "user" or "assistant"
content: Message content
metadata: Optional metadata (emotion, timestamp, etc.)
"""
if session_id not in cls._buffers:
cls._buffers[session_id] = deque(maxlen=cls._max_size)
message = {
"role": role,
"content": content,
"timestamp": metadata.get("timestamp") if metadata else datetime.now().isoformat(),
"metadata": metadata or {}
}
cls._buffers[session_id].append(message)
logger.debug(f"Added message to buffer", session_id=session_id, role=role)
@classmethod
def get_recent(
cls,
session_id: str,
n: int = 20
) -> List[Dict[str, Any]]:
"""
Get n most recent messages from buffer.
Args:
session_id: Session identifier
n: Number of messages to retrieve
Returns:
List of recent messages (oldest first)
"""
if session_id not in cls._buffers:
return []
buffer = cls._buffers[session_id]
# Get last n messages, reversed to get oldest first
recent = list(buffer)[-n:] if len(buffer) > n else list(buffer)
return recent
@classmethod
def get_all(cls, session_id: str) -> List[Dict[str, Any]]:
"""Get all messages in buffer"""
if session_id not in cls._buffers:
return []
return list(cls._buffers[session_id])
@classmethod
def clear(cls, session_id: str) -> None:
"""Clear buffer for a session"""
if session_id in cls._buffers:
del cls._buffers[session_id]
logger.info(f"Cleared buffer for session {session_id}")
@classmethod
def get_buffer_size(cls, session_id: str) -> int:
"""Get number of messages in buffer"""
if session_id not in cls._buffers:
return 0
return len(cls._buffers[session_id])
@classmethod
def get_formatted_history(
cls,
session_id: str,
n: int = 20,
format_type: str = "langchain"
) -> Any:
"""
Get formatted conversation history.
Args:
session_id: Session identifier
n: Number of messages
format_type: "langchain", "openai", or "raw"
Returns:
Formatted history in requested format
"""
messages = cls.get_recent(session_id, n)
if format_type == "raw":
return messages
elif format_type == "langchain":
from langchain.schema import HumanMessage, AIMessage
formatted = []
for msg in messages:
if msg["role"] == "user":
formatted.append(HumanMessage(content=msg["content"]))
else:
formatted.append(AIMessage(content=msg["content"]))
return formatted
elif format_type == "openai":
return [
{
"role": msg["role"],
"content": msg["content"]
}
for msg in messages
]
return messages