Spaces:
Sleeping
Sleeping
File size: 4,131 Bytes
9e5b4c1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 | """
ConversationBuffer - In-memory conversation buffer for active sessions
"""
from typing import List, Dict, Any, Optional
from collections import deque
from datetime import datetime
from app.utils.logging import get_logger
logger = get_logger("conversation_buffer")
class ConversationBuffer:
"""
In-memory buffer for active conversation sessions.
Provides O(1) access to recent messages.
"""
_buffers: Dict[str, deque] = {} # session_id -> deque of messages
_max_size: int = 100
@classmethod
def add_message(
cls,
session_id: str,
role: str,
content: str,
metadata: Optional[Dict[str, Any]] = None
) -> None:
"""
Add a message to the buffer.
Args:
session_id: Unique session identifier
role: "user" or "assistant"
content: Message content
metadata: Optional metadata (emotion, timestamp, etc.)
"""
if session_id not in cls._buffers:
cls._buffers[session_id] = deque(maxlen=cls._max_size)
message = {
"role": role,
"content": content,
"timestamp": metadata.get("timestamp") if metadata else datetime.now().isoformat(),
"metadata": metadata or {}
}
cls._buffers[session_id].append(message)
logger.debug(f"Added message to buffer", session_id=session_id, role=role)
@classmethod
def get_recent(
cls,
session_id: str,
n: int = 20
) -> List[Dict[str, Any]]:
"""
Get n most recent messages from buffer.
Args:
session_id: Session identifier
n: Number of messages to retrieve
Returns:
List of recent messages (oldest first)
"""
if session_id not in cls._buffers:
return []
buffer = cls._buffers[session_id]
# Get last n messages, reversed to get oldest first
recent = list(buffer)[-n:] if len(buffer) > n else list(buffer)
return recent
@classmethod
def get_all(cls, session_id: str) -> List[Dict[str, Any]]:
"""Get all messages in buffer"""
if session_id not in cls._buffers:
return []
return list(cls._buffers[session_id])
@classmethod
def clear(cls, session_id: str) -> None:
"""Clear buffer for a session"""
if session_id in cls._buffers:
del cls._buffers[session_id]
logger.info(f"Cleared buffer for session {session_id}")
@classmethod
def get_buffer_size(cls, session_id: str) -> int:
"""Get number of messages in buffer"""
if session_id not in cls._buffers:
return 0
return len(cls._buffers[session_id])
@classmethod
def get_formatted_history(
cls,
session_id: str,
n: int = 20,
format_type: str = "langchain"
) -> Any:
"""
Get formatted conversation history.
Args:
session_id: Session identifier
n: Number of messages
format_type: "langchain", "openai", or "raw"
Returns:
Formatted history in requested format
"""
messages = cls.get_recent(session_id, n)
if format_type == "raw":
return messages
elif format_type == "langchain":
from langchain.schema import HumanMessage, AIMessage
formatted = []
for msg in messages:
if msg["role"] == "user":
formatted.append(HumanMessage(content=msg["content"]))
else:
formatted.append(AIMessage(content=msg["content"]))
return formatted
elif format_type == "openai":
return [
{
"role": msg["role"],
"content": msg["content"]
}
for msg in messages
]
return messages
|