redhairedshanks1 commited on
Commit
4a61db5
·
1 Parent(s): 6960794

Update services/session_manager.py

Browse files
Files changed (1) hide show
  1. services/session_manager.py +412 -412
services/session_manager.py CHANGED
@@ -1,412 +1,412 @@
1
- # services/session_manager.py
2
- """
3
- MongoDB-based user session management with pipeline tracking
4
- """
5
- import os
6
- import uuid
7
- from datetime import datetime, timedelta
8
- from typing import Optional, Dict, Any, List
9
- from pymongo import MongoClient
10
- from pymongo.errors import DuplicateKeyError, ConnectionFailure
11
-
12
-
13
- class SessionManager:
14
- """
15
- Manages user sessions in MongoDB with pipeline execution tracking
16
- """
17
-
18
- def __init__(self):
19
- """Initialize MongoDB connection for sessions"""
20
- self.mongo_uri = os.getenv("MONGODB_URI")
21
- self.db_name = os.getenv("MONGODB_DB", "point9")
22
- self.collection_name = "user-sessions" # New collection for sessions
23
- self.pipelines_collection_name = "pipeline-executions" # Track pipeline runs
24
-
25
- self.client = None
26
- self.db = None
27
- self.sessions_col = None
28
- self.pipelines_col = None
29
-
30
- self._connect()
31
-
32
- def _connect(self):
33
- """Establish MongoDB connection"""
34
- if not self.mongo_uri:
35
- print("⚠️ MongoDB URI not configured - session persistence disabled")
36
- return
37
-
38
- try:
39
- self.client = MongoClient(self.mongo_uri, serverSelectionTimeoutMS=5000)
40
- self.client.admin.command("ping") # Test connection
41
-
42
- self.db = self.client[self.db_name]
43
- self.sessions_col = self.db[self.collection_name]
44
- self.pipelines_col = self.db[self.pipelines_collection_name]
45
-
46
- # Create indexes
47
- self.sessions_col.create_index("session_id", unique=True)
48
- self.sessions_col.create_index("created_at")
49
- self.sessions_col.create_index("last_activity")
50
-
51
- self.pipelines_col.create_index("session_id")
52
- self.pipelines_col.create_index("executed_at")
53
- self.pipelines_col.create_index("pipeline_name")
54
-
55
- print(f"✅ MongoDB session manager connected: {self.db_name}.{self.collection_name}")
56
-
57
- except ConnectionFailure as e:
58
- print(f"❌ MongoDB connection failed: {e}")
59
- self.client = None
60
-
61
- def create_session(
62
- self,
63
- user_id: Optional[str] = None,
64
- metadata: Optional[Dict[str, Any]] = None
65
- ) -> str:
66
- """
67
- Create a new user session
68
-
69
- Args:
70
- user_id: Optional user identifier
71
- metadata: Additional session metadata
72
-
73
- Returns:
74
- session_id: Unique session identifier
75
- """
76
- session_id = str(uuid.uuid4())
77
-
78
- session_data = {
79
- "session_id": session_id,
80
- "user_id": user_id,
81
- "created_at": datetime.now(),
82
- "last_activity": datetime.now(),
83
- "current_file": None,
84
- "proposed_pipeline": None,
85
- "state": "initial", # initial, pipeline_proposed, executing, completed
86
- "conversation_history": [],
87
- "pipeline_executions": [],
88
- "metadata": metadata or {},
89
- "stats": {
90
- "total_messages": 0,
91
- "total_pipelines_executed": 0,
92
- "total_tokens_used": 0
93
- }
94
- }
95
-
96
- if self.sessions_col is not None:
97
- try:
98
- self.sessions_col.insert_one(session_data)
99
- print(f"✅ Session created in MongoDB: {session_id}")
100
- except Exception as e:
101
- print(f"⚠️ Failed to save session to MongoDB: {e}")
102
-
103
- return session_id
104
-
105
- def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
106
- """
107
- Retrieve session by ID
108
-
109
- Args:
110
- session_id: Session identifier
111
-
112
- Returns:
113
- Session data or None if not found
114
- """
115
- if self.sessions_col is None:
116
- return None
117
-
118
- try:
119
- session = self.sessions_col.find_one({"session_id": session_id})
120
- if session:
121
- # Update last activity
122
- self.sessions_col.update_one(
123
- {"session_id": session_id},
124
- {"$set": {"last_activity": datetime.now()}}
125
- )
126
- # Remove MongoDB _id field
127
- session.pop("_id", None)
128
- return session
129
- return None
130
- except Exception as e:
131
- print(f"⚠️ Error retrieving session: {e}")
132
- return None
133
-
134
- def update_session(
135
- self,
136
- session_id: str,
137
- updates: Dict[str, Any]
138
- ) -> bool:
139
- """
140
- Update session data
141
-
142
- Args:
143
- session_id: Session identifier
144
- updates: Dictionary of fields to update
145
-
146
- Returns:
147
- True if successful, False otherwise
148
- """
149
- if self.sessions_col is None:
150
- return False
151
-
152
- try:
153
- updates["last_activity"] = datetime.now()
154
-
155
- result = self.sessions_col.update_one(
156
- {"session_id": session_id},
157
- {"$set": updates}
158
- )
159
-
160
- return result.modified_count > 0
161
- except Exception as e:
162
- print(f"⚠️ Error updating session: {e}")
163
- return False
164
-
165
- def add_message(
166
- self,
167
- session_id: str,
168
- role: str,
169
- content: str,
170
- metadata: Optional[Dict[str, Any]] = None
171
- ) -> bool:
172
- """
173
- Add a message to conversation history
174
-
175
- Args:
176
- session_id: Session identifier
177
- role: Message role (user, assistant, system)
178
- content: Message content
179
- metadata: Additional message metadata
180
-
181
- Returns:
182
- True if successful
183
- """
184
- if self.sessions_col is None:
185
- return False
186
-
187
- try:
188
- message = {
189
- "role": role,
190
- "content": content,
191
- "timestamp": datetime.now(),
192
- "metadata": metadata or {}
193
- }
194
-
195
- self.sessions_col.update_one(
196
- {"session_id": session_id},
197
- {
198
- "$push": {"conversation_history": message},
199
- "$inc": {"stats.total_messages": 1},
200
- "$set": {"last_activity": datetime.now()}
201
- }
202
- )
203
-
204
- return True
205
- except Exception as e:
206
- print(f"⚠️ Error adding message: {e}")
207
- return False
208
-
209
- def save_pipeline_execution(
210
- self,
211
- session_id: str,
212
- pipeline: Dict[str, Any],
213
- result: Dict[str, Any],
214
- file_path: Optional[str] = None,
215
- executor: str = "unknown"
216
- ) -> bool:
217
- """
218
- Save pipeline execution to dedicated collection
219
-
220
- Args:
221
- session_id: Session identifier
222
- pipeline: Pipeline configuration
223
- result: Execution result
224
- file_path: File that was processed
225
- executor: Which executor was used (bedrock, crewai, gemini)
226
-
227
- Returns:
228
- True if successful
229
- """
230
- if self.pipelines_col is None:
231
- return False
232
-
233
- try:
234
- execution_data = {
235
- "execution_id": str(uuid.uuid4()),
236
- "session_id": session_id,
237
- "pipeline_name": pipeline.get("pipeline_name"),
238
- "pipeline_config": pipeline,
239
- "result": result,
240
- "file_path": file_path,
241
- "executor": executor,
242
- "executed_at": datetime.now(),
243
- "duration_seconds": result.get("summary", {}).get("total_duration_seconds"),
244
- "status": result.get("status", "unknown"),
245
- "components_executed": len(pipeline.get("components", []))
246
- }
247
-
248
- self.pipelines_col.insert_one(execution_data)
249
-
250
- # Update session stats
251
- self.sessions_col.update_one(
252
- {"session_id": session_id},
253
- {
254
- "$inc": {"stats.total_pipelines_executed": 1},
255
- "$push": {"pipeline_executions": execution_data["execution_id"]}
256
- }
257
- )
258
-
259
- print(f"✅ Pipeline execution saved: {execution_data['execution_id']}")
260
- return True
261
-
262
- except Exception as e:
263
- print(f"⚠️ Error saving pipeline execution: {e}")
264
- return False
265
-
266
- def get_session_history(
267
- self,
268
- session_id: str,
269
- limit: int = 50
270
- ) -> List[Dict[str, Any]]:
271
- """
272
- Get conversation history for a session
273
-
274
- Args:
275
- session_id: Session identifier
276
- limit: Maximum number of messages to return
277
-
278
- Returns:
279
- List of messages
280
- """
281
- session = self.get_session(session_id)
282
- if not session:
283
- return []
284
-
285
- history = session.get("conversation_history", [])
286
- return history[-limit:] if len(history) > limit else history
287
-
288
- def get_pipeline_executions(
289
- self,
290
- session_id: Optional[str] = None,
291
- limit: int = 10
292
- ) -> List[Dict[str, Any]]:
293
- """
294
- Get pipeline execution history
295
-
296
- Args:
297
- session_id: Optional session filter
298
- limit: Maximum number of executions to return
299
-
300
- Returns:
301
- List of pipeline executions
302
- """
303
- if self.pipelines_col is None:
304
- return []
305
-
306
- try:
307
- query = {"session_id": session_id} if session_id else {}
308
-
309
- executions = self.pipelines_col.find(query).sort("executed_at", -1).limit(limit)
310
-
311
- result = []
312
- for exec_doc in executions:
313
- exec_doc.pop("_id", None)
314
- # Convert datetime to ISO string
315
- if "executed_at" in exec_doc and isinstance(exec_doc["executed_at"], datetime):
316
- exec_doc["executed_at"] = exec_doc["executed_at"].isoformat()
317
- result.append(exec_doc)
318
-
319
- return result
320
-
321
- except Exception as e:
322
- print(f"⚠️ Error retrieving pipeline executions: {e}")
323
- return []
324
-
325
- def cleanup_old_sessions(self, max_age_hours: int = 24) -> int:
326
- """
327
- Remove sessions older than max_age_hours
328
-
329
- Args:
330
- max_age_hours: Maximum session age in hours
331
-
332
- Returns:
333
- Number of sessions removed
334
- """
335
- if self.sessions_col is None:
336
- return 0
337
-
338
- try:
339
- cutoff = datetime.now() - timedelta(hours=max_age_hours)
340
-
341
- result = self.sessions_col.delete_many({
342
- "last_activity": {"$lt": cutoff}
343
- })
344
-
345
- count = result.deleted_count
346
- if count > 0:
347
- print(f"🧹 Cleaned up {count} old sessions")
348
-
349
- return count
350
-
351
- except Exception as e:
352
- print(f"⚠️ Error cleaning up sessions: {e}")
353
- return 0
354
-
355
- def get_session_stats(self, session_id: str) -> Dict[str, Any]:
356
- """
357
- Get statistics for a session
358
-
359
- Args:
360
- session_id: Session identifier
361
-
362
- Returns:
363
- Session statistics
364
- """
365
- session = self.get_session(session_id)
366
- if not session:
367
- return {}
368
-
369
- return {
370
- "session_id": session_id,
371
- "created_at": session.get("created_at"),
372
- "last_activity": session.get("last_activity"),
373
- "total_messages": session.get("stats", {}).get("total_messages", 0),
374
- "total_pipelines_executed": session.get("stats", {}).get("total_pipelines_executed", 0),
375
- "conversation_length": len(session.get("conversation_history", [])),
376
- "state": session.get("state", "unknown")
377
- }
378
-
379
- def close(self):
380
- """Close MongoDB connection"""
381
- if self.client:
382
- self.client.close()
383
- print("🔒 MongoDB connection closed")
384
-
385
-
386
- # Global session manager instance
387
- session_manager = SessionManager()
388
-
389
-
390
- if __name__ == "__main__":
391
- # Test session manager
392
- print("Testing Session Manager...")
393
-
394
- # Create session
395
- sid = session_manager.create_session(user_id="test_user")
396
- print(f"Created session: {sid}")
397
-
398
- # Add messages
399
- session_manager.add_message(sid, "user", "Hello!")
400
- session_manager.add_message(sid, "assistant", "Hi! How can I help?")
401
-
402
- # Get session
403
- session = session_manager.get_session(sid)
404
- print(f"Session data: {session}")
405
-
406
- # Get history
407
- history = session_manager.get_session_history(sid)
408
- print(f"History: {history}")
409
-
410
- # Get stats
411
- stats = session_manager.get_session_stats(sid)
412
- print(f"Stats: {stats}")
 
1
+ # services/session_manager.py
2
+ """
3
+ MongoDB-based user session management with pipeline tracking
4
+ """
5
+ import os
6
+ import uuid
7
+ from datetime import datetime, timedelta
8
+ from typing import Optional, Dict, Any, List
9
+ from pymongo import MongoClient
10
+ from pymongo.errors import DuplicateKeyError, ConnectionFailure
11
+
12
+
13
+ class SessionManager:
14
+ """
15
+ Manages user sessions in MongoDB with pipeline execution tracking
16
+ """
17
+
18
+ def __init__(self):
19
+ """Initialize MongoDB connection for sessions"""
20
+ self.mongo_uri = os.getenv("MONGODB_URI")
21
+ self.db_name = os.getenv("MONGODB_DB", "point9")
22
+ self.collection_name = "user-sessions" # New collection for sessions
23
+ self.pipelines_collection_name = "pipeline-executions" # Track pipeline runs
24
+
25
+ self.client = None
26
+ self.db = None
27
+ self.sessions_col = None
28
+ self.pipelines_col = None
29
+
30
+ self._connect()
31
+
32
+ def _connect(self):
33
+ """Establish MongoDB connection"""
34
+ if not self.mongo_uri:
35
+ print("⚠️ MongoDB URI not configured - session persistence disabled")
36
+ return
37
+
38
+ try:
39
+ self.client = MongoClient(self.mongo_uri, serverSelectionTimeoutMS=5000)
40
+ self.client.admin.command("ping") # Test connection
41
+
42
+ self.db = self.client[self.db_name]
43
+ self.sessions_col = self.db[self.collection_name]
44
+ self.pipelines_col = self.db[self.pipelines_collection_name]
45
+
46
+ # Create indexes
47
+ self.sessions_col.create_index("session_id", unique=True)
48
+ self.sessions_col.create_index("created_at")
49
+ self.sessions_col.create_index("last_activity")
50
+
51
+ self.pipelines_col.create_index("session_id")
52
+ self.pipelines_col.create_index("executed_at")
53
+ self.pipelines_col.create_index("pipeline_name")
54
+
55
+ print(f"✅ MongoDB session manager connected: {self.db_name}.{self.collection_name}")
56
+
57
+ except ConnectionFailure as e:
58
+ print(f"❌ MongoDB connection failed: {e}")
59
+ self.client = None
60
+
61
+ def create_session(
62
+ self,
63
+ user_id: Optional[str] = None,
64
+ metadata: Optional[Dict[str, Any]] = None
65
+ ) -> str:
66
+ """
67
+ Create a new user session
68
+
69
+ Args:
70
+ user_id: Optional user identifier
71
+ metadata: Additional session metadata
72
+
73
+ Returns:
74
+ session_id: Unique session identifier
75
+ """
76
+ session_id = str(uuid.uuid4())
77
+
78
+ session_data = {
79
+ "session_id": session_id,
80
+ "user_id": user_id,
81
+ "created_at": datetime.now(),
82
+ "last_activity": datetime.now(),
83
+ "current_file": None,
84
+ "proposed_pipeline": None,
85
+ "state": "initial", # initial, pipeline_proposed, executing, completed
86
+ "conversation_history": [],
87
+ "pipeline_executions": [],
88
+ "metadata": metadata or {},
89
+ "stats": {
90
+ "total_messages": 0,
91
+ "total_pipelines_executed": 0,
92
+ "total_tokens_used": 0
93
+ }
94
+ }
95
+
96
+ if self.sessions_col is not None:
97
+ try:
98
+ self.sessions_col.insert_one(session_data)
99
+ print(f"✅ Session created in MongoDB: {session_id}")
100
+ except Exception as e:
101
+ print(f"⚠️ Failed to save session to MongoDB: {e}")
102
+
103
+ return session_id
104
+
105
+ def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
106
+ """
107
+ Retrieve session by ID
108
+
109
+ Args:
110
+ session_id: Session identifier
111
+
112
+ Returns:
113
+ Session data or None if not found
114
+ """
115
+ if self.sessions_col is None:
116
+ return None
117
+
118
+ try:
119
+ session = self.sessions_col.find_one({"session_id": session_id})
120
+ if session:
121
+ # Update last activity
122
+ self.sessions_col.update_one(
123
+ {"session_id": session_id},
124
+ {"$set": {"last_activity": datetime.now()}}
125
+ )
126
+ # Remove MongoDB _id field
127
+ session.pop("_id", None)
128
+ return session
129
+ return None
130
+ except Exception as e:
131
+ print(f"⚠️ Error retrieving session: {e}")
132
+ return None
133
+
134
+ def update_session(
135
+ self,
136
+ session_id: str,
137
+ updates: Dict[str, Any]
138
+ ) -> bool:
139
+ """
140
+ Update session data
141
+
142
+ Args:
143
+ session_id: Session identifier
144
+ updates: Dictionary of fields to update
145
+
146
+ Returns:
147
+ True if successful, False otherwise
148
+ """
149
+ if self.sessions_col is None:
150
+ return False
151
+
152
+ try:
153
+ updates["last_activity"] = datetime.now()
154
+
155
+ result = self.sessions_col.update_one(
156
+ {"session_id": session_id},
157
+ {"$set": updates}
158
+ )
159
+
160
+ return result.modified_count > 0
161
+ except Exception as e:
162
+ print(f"⚠️ Error updating session: {e}")
163
+ return False
164
+
165
+ def add_message(
166
+ self,
167
+ session_id: str,
168
+ role: str,
169
+ content: str,
170
+ metadata: Optional[Dict[str, Any]] = None
171
+ ) -> bool:
172
+ """
173
+ Add a message to conversation history
174
+
175
+ Args:
176
+ session_id: Session identifier
177
+ role: Message role (user, assistant, system)
178
+ content: Message content
179
+ metadata: Additional message metadata
180
+
181
+ Returns:
182
+ True if successful
183
+ """
184
+ if self.sessions_col is None:
185
+ return False
186
+
187
+ try:
188
+ message = {
189
+ "role": role,
190
+ "content": content,
191
+ "timestamp": datetime.now(),
192
+ "metadata": metadata or {}
193
+ }
194
+
195
+ self.sessions_col.update_one(
196
+ {"session_id": session_id},
197
+ {
198
+ "$push": {"conversation_history": message},
199
+ "$inc": {"stats.total_messages": 1},
200
+ "$set": {"last_activity": datetime.now()}
201
+ }
202
+ )
203
+
204
+ return True
205
+ except Exception as e:
206
+ print(f"⚠️ Error adding message: {e}")
207
+ return False
208
+
209
+ def save_pipeline_execution(
210
+ self,
211
+ session_id: str,
212
+ pipeline: Dict[str, Any],
213
+ result: Dict[str, Any],
214
+ file_path: Optional[str] = None,
215
+ executor: str = "unknown"
216
+ ) -> bool:
217
+ """
218
+ Save pipeline execution to dedicated collection
219
+
220
+ Args:
221
+ session_id: Session identifier
222
+ pipeline: Pipeline configuration
223
+ result: Execution result
224
+ file_path: File that was processed
225
+ executor: Which executor was used (bedrock, crewai, gemini)
226
+
227
+ Returns:
228
+ True if successful
229
+ """
230
+ if self.pipelines_col is None:
231
+ return False
232
+
233
+ try:
234
+ execution_data = {
235
+ "execution_id": str(uuid.uuid4()),
236
+ "session_id": session_id,
237
+ "pipeline_name": pipeline.get("pipeline_name"),
238
+ "pipeline_config": pipeline,
239
+ "result": result,
240
+ "file_path": file_path,
241
+ "executor": executor,
242
+ "executed_at": datetime.now(),
243
+ "duration_seconds": result.get("summary", {}).get("total_duration_seconds"),
244
+ "status": result.get("status", "unknown"),
245
+ "components_executed": len(pipeline.get("components", []))
246
+ }
247
+
248
+ self.pipelines_col.insert_one(execution_data)
249
+
250
+ # Update session stats
251
+ self.sessions_col.update_one(
252
+ {"session_id": session_id},
253
+ {
254
+ "$inc": {"stats.total_pipelines_executed": 1},
255
+ "$push": {"pipeline_executions": execution_data["execution_id"]}
256
+ }
257
+ )
258
+
259
+ print(f"✅ Pipeline execution saved: {execution_data['execution_id']}")
260
+ return True
261
+
262
+ except Exception as e:
263
+ print(f"⚠️ Error saving pipeline execution: {e}")
264
+ return False
265
+
266
+ def get_session_history(
267
+ self,
268
+ session_id: str,
269
+ limit: int = 50
270
+ ) -> List[Dict[str, Any]]:
271
+ """
272
+ Get conversation history for a session
273
+
274
+ Args:
275
+ session_id: Session identifier
276
+ limit: Maximum number of messages to return
277
+
278
+ Returns:
279
+ List of messages
280
+ """
281
+ session = self.get_session(session_id)
282
+ if not session:
283
+ return []
284
+
285
+ history = session.get("conversation_history", [])
286
+ return history[-limit:] if len(history) > limit else history
287
+
288
+ def get_pipeline_executions(
289
+ self,
290
+ session_id: Optional[str] = None,
291
+ limit: int = 10
292
+ ) -> List[Dict[str, Any]]:
293
+ """
294
+ Get pipeline execution history
295
+
296
+ Args:
297
+ session_id: Optional session filter
298
+ limit: Maximum number of executions to return
299
+
300
+ Returns:
301
+ List of pipeline executions
302
+ """
303
+ if self.pipelines_col is None:
304
+ return []
305
+
306
+ try:
307
+ query = {"session_id": session_id} if session_id else {}
308
+
309
+ executions = self.pipelines_col.find(query).sort("executed_at", -1).limit(limit)
310
+
311
+ result = []
312
+ for exec_doc in executions:
313
+ exec_doc.pop("_id", None)
314
+ # Convert datetime to ISO string
315
+ if "executed_at" in exec_doc and isinstance(exec_doc["executed_at"], datetime):
316
+ exec_doc["executed_at"] = exec_doc["executed_at"].isoformat()
317
+ result.append(exec_doc)
318
+
319
+ return result
320
+
321
+ except Exception as e:
322
+ print(f"⚠️ Error retrieving pipeline executions: {e}")
323
+ return []
324
+
325
+ def cleanup_old_sessions(self, max_age_hours: int = 24) -> int:
326
+ """
327
+ Remove sessions older than max_age_hours
328
+
329
+ Args:
330
+ max_age_hours: Maximum session age in hours
331
+
332
+ Returns:
333
+ Number of sessions removed
334
+ """
335
+ if self.sessions_col is None:
336
+ return 0
337
+
338
+ try:
339
+ cutoff = datetime.now() - timedelta(hours=max_age_hours)
340
+
341
+ result = self.sessions_col.delete_many({
342
+ "last_activity": {"$lt": cutoff}
343
+ })
344
+
345
+ count = result.deleted_count
346
+ if count > 0:
347
+ print(f"🧹 Cleaned up {count} old sessions")
348
+
349
+ return count
350
+
351
+ except Exception as e:
352
+ print(f"⚠️ Error cleaning up sessions: {e}")
353
+ return 0
354
+
355
+ def get_session_stats(self, session_id: str) -> Dict[str, Any]:
356
+ """
357
+ Get statistics for a session
358
+
359
+ Args:
360
+ session_id: Session identifier
361
+
362
+ Returns:
363
+ Session statistics
364
+ """
365
+ session = self.get_session(session_id)
366
+ if not session:
367
+ return {}
368
+
369
+ return {
370
+ "session_id": session_id,
371
+ "created_at": session.get("created_at"),
372
+ "last_activity": session.get("last_activity"),
373
+ "total_messages": session.get("stats", {}).get("total_messages", 0),
374
+ "total_pipelines_executed": session.get("stats", {}).get("total_pipelines_executed", 0),
375
+ "conversation_length": len(session.get("conversation_history", [])),
376
+ "state": session.get("state", "unknown")
377
+ }
378
+
379
+ def close(self):
380
+ """Close MongoDB connection"""
381
+ if self.client:
382
+ self.client.close()
383
+ print("🔒 MongoDB connection closed")
384
+
385
+
386
+ # Global session manager instance
387
+ session_manager = SessionManager()
388
+
389
+
390
+ if __name__ == "__main__":
391
+ # Test session manager
392
+ print("Testing Session Manager...")
393
+
394
+ # Create session
395
+ sid = session_manager.create_session(user_id="test_user")
396
+ print(f"Created session: {sid}")
397
+
398
+ # Add messages
399
+ session_manager.add_message(sid, "user", "Hello!")
400
+ session_manager.add_message(sid, "assistant", "Hi! How can I help?")
401
+
402
+ # Get session
403
+ session = session_manager.get_session(sid)
404
+ print(f"Session data: {session}")
405
+
406
+ # Get history
407
+ history = session_manager.get_session_history(sid)
408
+ print(f"History: {history}")
409
+
410
+ # Get stats
411
+ stats = session_manager.get_session_stats(sid)
412
+ print(f"Stats: {stats}")