File size: 4,131 Bytes
9e5b4c1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
"""
ConversationBuffer - In-memory conversation buffer for active sessions
"""
from typing import List, Dict, Any, Optional
from collections import deque
from datetime import datetime
from app.utils.logging import get_logger

logger = get_logger("conversation_buffer")


class ConversationBuffer:
    """
    In-memory buffer for active conversation sessions.
    Provides O(1) access to recent messages.
    """
    
    _buffers: Dict[str, deque] = {}  # session_id -> deque of messages
    _max_size: int = 100
    
    @classmethod
    def add_message(
        cls,
        session_id: str,
        role: str,
        content: str,
        metadata: Optional[Dict[str, Any]] = None
    ) -> None:
        """
        Add a message to the buffer.
        
        Args:
            session_id: Unique session identifier
            role: "user" or "assistant"
            content: Message content
            metadata: Optional metadata (emotion, timestamp, etc.)
        """
        if session_id not in cls._buffers:
            cls._buffers[session_id] = deque(maxlen=cls._max_size)
        
        message = {
            "role": role,
            "content": content,
            "timestamp": metadata.get("timestamp") if metadata else datetime.now().isoformat(),
            "metadata": metadata or {}
        }
        
        cls._buffers[session_id].append(message)
        logger.debug(f"Added message to buffer", session_id=session_id, role=role)
    
    @classmethod
    def get_recent(
        cls,
        session_id: str,
        n: int = 20
    ) -> List[Dict[str, Any]]:
        """
        Get n most recent messages from buffer.
        
        Args:
            session_id: Session identifier
            n: Number of messages to retrieve
            
        Returns:
            List of recent messages (oldest first)
        """
        if session_id not in cls._buffers:
            return []
        
        buffer = cls._buffers[session_id]
        # Get last n messages, reversed to get oldest first
        recent = list(buffer)[-n:] if len(buffer) > n else list(buffer)
        return recent
    
    @classmethod
    def get_all(cls, session_id: str) -> List[Dict[str, Any]]:
        """Get all messages in buffer"""
        if session_id not in cls._buffers:
            return []
        return list(cls._buffers[session_id])
    
    @classmethod
    def clear(cls, session_id: str) -> None:
        """Clear buffer for a session"""
        if session_id in cls._buffers:
            del cls._buffers[session_id]
            logger.info(f"Cleared buffer for session {session_id}")
    
    @classmethod
    def get_buffer_size(cls, session_id: str) -> int:
        """Get number of messages in buffer"""
        if session_id not in cls._buffers:
            return 0
        return len(cls._buffers[session_id])
    
    @classmethod
    def get_formatted_history(
        cls,
        session_id: str,
        n: int = 20,
        format_type: str = "langchain"
    ) -> Any:
        """
        Get formatted conversation history.
        
        Args:
            session_id: Session identifier
            n: Number of messages
            format_type: "langchain", "openai", or "raw"
            
        Returns:
            Formatted history in requested format
        """
        messages = cls.get_recent(session_id, n)
        
        if format_type == "raw":
            return messages
        
        elif format_type == "langchain":
            from langchain.schema import HumanMessage, AIMessage
            formatted = []
            for msg in messages:
                if msg["role"] == "user":
                    formatted.append(HumanMessage(content=msg["content"]))
                else:
                    formatted.append(AIMessage(content=msg["content"]))
            return formatted
        
        elif format_type == "openai":
            return [
                {
                    "role": msg["role"],
                    "content": msg["content"]
                }
                for msg in messages
            ]
        
        return messages