File size: 9,909 Bytes
91fe002
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75033ed
91fe002
 
 
 
 
 
 
 
 
75033ed
91fe002
 
 
75033ed
 
 
 
91fe002
 
 
 
 
 
 
75033ed
91fe002
75033ed
91fe002
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75033ed
91fe002
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75033ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
"""

Conversation Service for Multi-turn Chat

Server-side session management

"""
from typing import List, Dict, Optional
from datetime import datetime
from pymongo.collection import Collection
import uuid


class ConversationService:
    """

    Manages multi-turn conversation history với server-side session

    """
    
    def __init__(self, mongo_collection: Collection, max_history: int = 10):
        """

        Args:

            mongo_collection: MongoDB collection for storing conversations

            max_history: Maximum số messages giữ lại (sliding window)

        """
        self.collection = mongo_collection
        self.max_history = max_history
        
        # Create indexes
        self._ensure_indexes()
    
    def _ensure_indexes(self):
        """Create necessary indexes"""
        try:
            self.collection.create_index("session_id", unique=True)
            self.collection.create_index("user_id")  # NEW: Index for user filtering
            # Auto-delete sessions sau 7 ngày không dùng
            self.collection.create_index(
                "updated_at", 
                expireAfterSeconds=604800  # 7 days
            )
            print("✓ Conversation indexes created")
        except Exception as e:
            print(f"Conversation indexes already exist or error: {e}")
    
    def create_session(self, metadata: Optional[Dict] = None, user_id: Optional[str] = None) -> str:
        """

        Create new conversation session

        

        Args:

            metadata: Additional metadata

            user_id: User identifier (optional)

        

        Returns:

            session_id (UUID string)

        """
        session_id = str(uuid.uuid4())
        
        self.collection.insert_one({
            "session_id": session_id,
            "user_id": user_id,  # NEW: Store user_id
            "messages": [],
            "scenario_state": None,  # NEW: Scenario state
            "metadata": metadata or {},
            "created_at": datetime.utcnow(),
            "updated_at": datetime.utcnow()
        })
        
        return session_id
    
    def add_message(

        self, 

        session_id: str, 

        role: str, 

        content: str,

        metadata: Optional[Dict] = None

    ):
        """

        Add message to conversation history

        

        Args:

            session_id: Session identifier

            role: "user" or "assistant"

            content: Message text

            metadata: Additional info (rag_stats, tool_calls, etc.)

        """
        message = {
            "role": role,
            "content": content,
            "timestamp": datetime.utcnow().isoformat(),
            "metadata": metadata or {}
        }
        
        # Upsert: tạo session nếu chưa tồn tại
        self.collection.update_one(
            {"session_id": session_id},
            {
                "$push": {
                    "messages": {
                        "$each": [message],
                        "$slice": -self.max_history  # Keep only last N messages
                    }
                },
                "$set": {"updated_at": datetime.utcnow()}
            },
            upsert=True
        )
    
    def get_conversation_history(

        self, 

        session_id: str,

        limit: Optional[int] = None,

        include_metadata: bool = False

    ) -> List[Dict]:
        """

        Get conversation messages for LLM context

        

        Args:

            session_id: Session identifier

            limit: Override max_history với số lượng tùy chỉnh

            include_metadata: Include metadata trong response

        

        Returns:

            List of messages in format: [{"role": "user", "content": "..."}, ...]

        """
        session = self.collection.find_one({"session_id": session_id})
        
        if not session:
            return []
        
        messages = session.get("messages", [])
        
        # Limit to recent messages
        if limit:
            messages = messages[-limit:]
        else:
            messages = messages[-self.max_history:]
        
        # Format for LLM
        if include_metadata:
            return messages
        else:
            return [
                {
                    "role": msg["role"],
                    "content": msg["content"]
                }
                for msg in messages
            ]
    
    def get_session_info(self, session_id: str) -> Optional[Dict]:
        """

        Get session metadata

        

        Returns:

            Session info hoặc None nếu không tồn tại

        """
        session = self.collection.find_one(
            {"session_id": session_id},
            {"_id": 0, "session_id": 1, "user_id": 1, "created_at": 1, "updated_at": 1, "metadata": 1}
        )
        return session
    
    def clear_session(self, session_id: str) -> bool:
        """

        Clear conversation history for session

        

        Returns:

            True nếu xóa thành công, False nếu session không tồn tại

        """
        result = self.collection.delete_one({"session_id": session_id})
        return result.deleted_count > 0
    
    def session_exists(self, session_id: str) -> bool:
        """

        Check if session exists

        """
        return self.collection.count_documents({"session_id": session_id}) > 0
    
    def get_last_user_message(self, session_id: str) -> Optional[str]:
        """

        Get the last user message in conversation

        Useful for context extraction

        """
        session = self.collection.find_one({"session_id": session_id})
        if not session:
            return None
        
        messages = session.get("messages", [])
        # Tìm message cuối cùng từ user
        for msg in reversed(messages):
            if msg["role"] == "user":
                return msg["content"]
        
        return None
    
    def list_sessions(

        self,

        limit: int = 50,

        skip: int = 0,

        sort_by: str = "updated_at",

        descending: bool = True,

        user_id: Optional[str] = None  # NEW: Filter by user

    ) -> List[Dict]:
        """

        List all conversation sessions

        

        Args:

            limit: Maximum number of sessions to return

            skip: Number of sessions to skip (for pagination)

            sort_by: Field to sort by (created_at, updated_at)

            descending: Sort in descending order

            user_id: Filter sessions by user_id (optional)

        

        Returns:

            List of session summaries

        """
        sort_order = -1 if descending else 1
        
        # Build query filter
        query = {}
        if user_id:
            query["user_id"] = user_id
        
        sessions = self.collection.find(
            query,  # Use query filter
            {"_id": 0, "session_id": 1, "user_id": 1, "created_at": 1, "updated_at": 1, "metadata": 1}
        ).sort(sort_by, sort_order).skip(skip).limit(limit)
        
        result = []
        for session in sessions:
            # Count messages
            message_count = len(
                self.collection.find_one({"session_id": session["session_id"]}, {"messages": 1})
                .get("messages", [])
            )
            
            result.append({
                "session_id": session["session_id"],
                "user_id": session.get("user_id"),  # NEW: Include user_id
                "created_at": session["created_at"],
                "updated_at": session["updated_at"],
                "message_count": message_count,
                "metadata": session.get("metadata", {})
            })
        
        return result
    
    def count_sessions(self, user_id: Optional[str] = None) -> int:
        """

        Get total number of sessions

        

        Args:

            user_id: Filter count by user_id (optional)

        """
        query = {}
        if user_id:
            query["user_id"] = user_id
        return self.collection.count_documents(query)
    
    # ===== Scenario State Management =====
    
    def get_scenario_state(self, session_id: str) -> Optional[Dict]:
        """

        Get current scenario state for session

        

        Returns:

            {

                "active_scenario": "price_inquiry",

                "scenario_step": 3,

                "scenario_data": {...},

                "last_activity": "..."

            }

            or None if no active scenario

        """
        session = self.collection.find_one({"session_id": session_id})
        if not session:
            return None
        return session.get("scenario_state")
    
    def set_scenario_state(self, session_id: str, state: Dict):
        """

        Set scenario state for session

        

        Args:

            session_id: Session ID

            state: Scenario state dict

        """
        self.collection.update_one(
            {"session_id": session_id},
            {
                "$set": {
                    "scenario_state": state,
                    "updated_at": datetime.utcnow()
                }
            },
            upsert=True
        )
    
    def clear_scenario(self, session_id: str):
        """

        Clear scenario state (end scenario)

        """
        self.collection.update_one(
            {"session_id": session_id},
            {
                "$set": {
                    "scenario_state": None,
                    "updated_at": datetime.utcnow()
                }
            }
        )