NitinBot001 commited on
Commit
66fd13c
·
verified ·
1 Parent(s): 2ca9f67

Update conversation_manager.py

Browse files
Files changed (1) hide show
  1. conversation_manager.py +381 -70
conversation_manager.py CHANGED
@@ -1,70 +1,381 @@
1
- # conversation_manager.py
2
-
3
- import os
4
- from supabase import create_client, Client
5
- from typing import List, Dict, Any, Optional
6
- import logging
7
-
8
- logger = logging.getLogger(__name__)
9
-
10
- class ConversationManager:
11
- def __init__(self):
12
- """Initializes the Supabase client."""
13
- supabase_url = os.getenv("SUPABASE_URL")
14
- supabase_key = os.getenv("SUPABASE_KEY")
15
-
16
- if not supabase_url or not supabase_key:
17
- raise ValueError("Supabase URL and Key must be set in environment variables.")
18
-
19
- self.supabase: Client = create_client(supabase_url, supabase_key)
20
- self.table_name = "conversations"
21
- logger.info("ConversationManager initialized with Supabase client.")
22
-
23
- def get_history(self, session_id: str) -> List[Dict[str, Any]]:
24
- """
25
- Retrieves conversation history for a given session_id.
26
- Returns an empty list if no history is found.
27
- """
28
- try:
29
- response = self.supabase.table(self.table_name)\
30
- .select("history")\
31
- .eq("session_id", session_id)\
32
- .limit(1)\
33
- .execute()
34
-
35
- if response.data:
36
- return response.data[0].get("history", [])
37
- return []
38
- except Exception as e:
39
- logger.error(f"Error fetching history for session {session_id}: {e}")
40
- return []
41
-
42
- def save_history(self, session_id: str, history: List[Dict[str, Any]]) -> None:
43
- """
44
- Saves or updates the conversation history for a session_id.
45
- Uses 'upsert' to create a new record or update an existing one.
46
- """
47
- try:
48
- self.supabase.table(self.table_name).upsert({
49
- "session_id": session_id,
50
- "history": history
51
- }).execute()
52
- logger.info(f"History for session {session_id} saved successfully.")
53
- except Exception as e:
54
- logger.error(f"Error saving history for session {session_id}: {e}")
55
-
56
- def delete_history(self, session_id: str) -> bool:
57
- """
58
- Deletes the conversation history for a given session_id.
59
- Returns True on success, False on failure.
60
- """
61
- try:
62
- self.supabase.table(self.table_name)\
63
- .delete()\
64
- .eq("session_id", session_id)\
65
- .execute()
66
- logger.info(f"History for session {session_id} deleted successfully.")
67
- return True
68
- except Exception as e:
69
- logger.error(f"Error deleting history for session {session_id}: {e}")
70
- return False
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # conversation_manager.py
2
+
3
+ import os
4
+ import uuid
5
+ import logging
6
+ from typing import Dict, List, Any, Optional
7
+ from datetime import datetime
8
+ from supabase import create_client, Client
9
+ from dotenv import load_dotenv
10
+
11
+ # Configure logging
12
+ logging.basicConfig(level=logging.INFO)
13
+ logger = logging.getLogger(__name__)
14
+
15
+ load_dotenv()
16
+
17
+
18
+ class ConversationManager:
19
+ """Enhanced ConversationManager with unique message IDs and flexible chat ID management"""
20
+
21
+ def __init__(self):
22
+ """Initialize the conversation manager with Supabase client"""
23
+ try:
24
+ # Initialize Supabase client
25
+ supabase_url = os.getenv("SUPABASE_URL")
26
+ supabase_key = os.getenv("SUPABASE_ANON_KEY")
27
+
28
+ if not supabase_url or not supabase_key:
29
+ raise ValueError("Missing Supabase credentials in environment variables")
30
+
31
+ self.supabase: Client = create_client(supabase_url, supabase_key)
32
+ logger.info("ConversationManager initialized successfully with Supabase")
33
+
34
+ except Exception as e:
35
+ logger.error(f"Failed to initialize ConversationManager: {e}")
36
+ raise
37
+
38
+ def generate_chat_id(self) -> str:
39
+ """Generate a unique chat ID"""
40
+ return f"chat_{uuid.uuid4().hex[:12]}"
41
+
42
+ def generate_message_id(self, chat_id: str) -> int:
43
+ """Generate the next message ID for a specific chat"""
44
+ try:
45
+ # Get current conversation to find the highest message ID
46
+ history = self.get_history(chat_id)
47
+
48
+ if not history:
49
+ return 1
50
+
51
+ # Find the highest message_id
52
+ max_id = 0
53
+ for message in history:
54
+ msg_id = message.get('message_id', 0)
55
+ if isinstance(msg_id, int) and msg_id > max_id:
56
+ max_id = msg_id
57
+
58
+ return max_id + 1
59
+
60
+ except Exception as e:
61
+ logger.error(f"Error generating message ID for chat {chat_id}: {e}")
62
+ return 1
63
+
64
+ def get_history(self, chat_id: str) -> List[Dict[str, Any]]:
65
+ """
66
+ Retrieve conversation history for a specific chat ID
67
+
68
+ Args:
69
+ chat_id: The chat ID to retrieve history for
70
+
71
+ Returns:
72
+ List of message dictionaries, empty list if chat doesn't exist
73
+ """
74
+ if not chat_id:
75
+ return []
76
+
77
+ try:
78
+ response = self.supabase.table('conversations').select('history').eq('session_id', chat_id).execute()
79
+
80
+ if response.data and len(response.data) > 0:
81
+ history = response.data[0].get('history', [])
82
+ if isinstance(history, list):
83
+ return history
84
+ else:
85
+ logger.warning(f"Invalid history format for chat {chat_id}")
86
+ return []
87
+ else:
88
+ logger.info(f"No history found for chat {chat_id}")
89
+ return []
90
+
91
+ except Exception as e:
92
+ logger.error(f"Error retrieving history for chat {chat_id}: {e}")
93
+ return []
94
+
95
+ def save_history(self, chat_id: str, history: List[Dict[str, Any]]) -> bool:
96
+ """
97
+ Save conversation history for a specific chat ID
98
+
99
+ Args:
100
+ chat_id: The chat ID to save history for
101
+ history: List of message dictionaries
102
+
103
+ Returns:
104
+ True if successful, False otherwise
105
+ """
106
+ if not chat_id:
107
+ logger.error("Cannot save history: chat_id is empty")
108
+ return False
109
+
110
+ if not isinstance(history, list):
111
+ logger.error("Cannot save history: history must be a list")
112
+ return False
113
+
114
+ try:
115
+ # Check if conversation exists
116
+ existing = self.supabase.table('conversations').select('session_id').eq('session_id', chat_id).execute()
117
+
118
+ if existing.data and len(existing.data) > 0:
119
+ # Update existing conversation
120
+ response = self.supabase.table('conversations').update({
121
+ 'history': history,
122
+ 'updated_at': datetime.utcnow().isoformat()
123
+ }).eq('session_id', chat_id).execute()
124
+
125
+ logger.info(f"Updated history for chat {chat_id}")
126
+ else:
127
+ # Create new conversation
128
+ response = self.supabase.table('conversations').insert({
129
+ 'session_id': chat_id,
130
+ 'history': history,
131
+ 'created_at': datetime.utcnow().isoformat(),
132
+ 'updated_at': datetime.utcnow().isoformat()
133
+ }).execute()
134
+
135
+ logger.info(f"Created new conversation for chat {chat_id}")
136
+
137
+ return True
138
+
139
+ except Exception as e:
140
+ logger.error(f"Error saving history for chat {chat_id}: {e}")
141
+ return False
142
+
143
+ def add_message(self, chat_id: str, role: str, content: str, image_url: Optional[str] = None) -> Dict[str, Any]:
144
+ """
145
+ Add a single message to a chat and return the message with its ID
146
+
147
+ Args:
148
+ chat_id: The chat ID to add the message to
149
+ role: Message role ('user' or 'assistant')
150
+ content: Message content
151
+ image_url: Optional image URL
152
+
153
+ Returns:
154
+ Dictionary containing the message with its assigned ID
155
+ """
156
+ if not chat_id:
157
+ logger.error("Cannot add message: chat_id is empty")
158
+ return {}
159
+
160
+ try:
161
+ # Generate message ID
162
+ message_id = self.generate_message_id(chat_id)
163
+
164
+ # Create message object
165
+ message = {
166
+ "message_id": message_id,
167
+ "role": role,
168
+ "content": content,
169
+ "timestamp": datetime.utcnow().isoformat()
170
+ }
171
+
172
+ # Add image URL if provided
173
+ if image_url:
174
+ message["imageUrl"] = image_url
175
+
176
+ # Get current history and add the new message
177
+ current_history = self.get_history(chat_id)
178
+ current_history.append(message)
179
+
180
+ # Save updated history
181
+ if self.save_history(chat_id, current_history):
182
+ logger.info(f"Added message {message_id} to chat {chat_id}")
183
+ return message
184
+ else:
185
+ logger.error(f"Failed to save message to chat {chat_id}")
186
+ return {}
187
+
188
+ except Exception as e:
189
+ logger.error(f"Error adding message to chat {chat_id}: {e}")
190
+ return {}
191
+
192
+ def delete_history(self, chat_id: str) -> bool:
193
+ """
194
+ Delete conversation history for a specific chat ID
195
+
196
+ Args:
197
+ chat_id: The chat ID to delete
198
+
199
+ Returns:
200
+ True if successful, False otherwise
201
+ """
202
+ if not chat_id:
203
+ logger.error("Cannot delete history: chat_id is empty")
204
+ return False
205
+
206
+ try:
207
+ response = self.supabase.table('conversations').delete().eq('session_id', chat_id).execute()
208
+
209
+ # Check if any rows were affected
210
+ if hasattr(response, 'data') and response.data is not None:
211
+ logger.info(f"Deleted conversation history for chat {chat_id}")
212
+ return True
213
+ else:
214
+ logger.warning(f"No conversation found to delete for chat {chat_id}")
215
+ return True # Consider it successful if nothing to delete
216
+
217
+ except Exception as e:
218
+ logger.error(f"Error deleting history for chat {chat_id}: {e}")
219
+ return False
220
+
221
+ def get_all_chat_sessions(self) -> List[Dict[str, Any]]:
222
+ """
223
+ Get all chat sessions with basic information
224
+
225
+ Returns:
226
+ List of dictionaries with chat session information
227
+ """
228
+ try:
229
+ response = self.supabase.table('conversations').select('session_id, history, created_at, updated_at').execute()
230
+
231
+ sessions = []
232
+ for conv in response.data:
233
+ session_id = conv.get('session_id')
234
+ history = conv.get('history', [])
235
+ created_at = conv.get('created_at')
236
+ updated_at = conv.get('updated_at')
237
+
238
+ # Generate title from first user message
239
+ title = "New Chat"
240
+ if history and len(history) > 0:
241
+ first_user_msg = None
242
+ for msg in history:
243
+ if msg.get('role') == 'user':
244
+ first_user_msg = msg
245
+ break
246
+
247
+ if first_user_msg:
248
+ content = first_user_msg.get('content', '')
249
+ if content:
250
+ title = content[:35] + '...' if len(content) > 35 else content
251
+ elif first_user_msg.get('imageUrl'):
252
+ title = "Image Query"
253
+
254
+ sessions.append({
255
+ "session_id": session_id,
256
+ "title": title,
257
+ "message_count": len(history),
258
+ "created_at": created_at,
259
+ "updated_at": updated_at
260
+ })
261
+
262
+ # Sort by updated_at (most recent first)
263
+ sessions.sort(key=lambda x: x.get('updated_at', ''), reverse=True)
264
+
265
+ return sessions
266
+
267
+ except Exception as e:
268
+ logger.error(f"Error fetching chat sessions: {e}")
269
+ return []
270
+
271
+ def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
272
+ """
273
+ Get information about a specific chat
274
+
275
+ Args:
276
+ chat_id: The chat ID to get information for
277
+
278
+ Returns:
279
+ Dictionary with chat information
280
+ """
281
+ if not chat_id:
282
+ return {}
283
+
284
+ try:
285
+ response = self.supabase.table('conversations').select('*').eq('session_id', chat_id).execute()
286
+
287
+ if response.data and len(response.data) > 0:
288
+ conv = response.data[0]
289
+ history = conv.get('history', [])
290
+
291
+ return {
292
+ "session_id": conv.get('session_id'),
293
+ "message_count": len(history),
294
+ "created_at": conv.get('created_at'),
295
+ "updated_at": conv.get('updated_at'),
296
+ "exists": True
297
+ }
298
+ else:
299
+ return {
300
+ "session_id": chat_id,
301
+ "message_count": 0,
302
+ "exists": False
303
+ }
304
+
305
+ except Exception as e:
306
+ logger.error(f"Error getting chat info for {chat_id}: {e}")
307
+ return {
308
+ "session_id": chat_id,
309
+ "error": str(e),
310
+ "exists": False
311
+ }
312
+
313
+ def chat_exists(self, chat_id: str) -> bool:
314
+ """
315
+ Check if a chat ID exists in the database
316
+
317
+ Args:
318
+ chat_id: The chat ID to check
319
+
320
+ Returns:
321
+ True if chat exists, False otherwise
322
+ """
323
+ if not chat_id:
324
+ return False
325
+
326
+ try:
327
+ response = self.supabase.table('conversations').select('session_id').eq('session_id', chat_id).execute()
328
+ return bool(response.data and len(response.data) > 0)
329
+
330
+ except Exception as e:
331
+ logger.error(f"Error checking if chat {chat_id} exists: {e}")
332
+ return False
333
+
334
+
335
+ # Example usage and testing
336
+ if __name__ == "__main__":
337
+ print("=== ConversationManager Test ===")
338
+
339
+ try:
340
+ # Initialize manager
341
+ manager = ConversationManager()
342
+
343
+ # Test 1: Create a new chat
344
+ test_chat_id = "test_chat_123"
345
+ print(f"\n1. Testing with chat ID: {test_chat_id}")
346
+
347
+ # Add some messages
348
+ msg1 = manager.add_message(test_chat_id, "user", "Hello, I need help with farming")
349
+ print(f"Added user message: {msg1}")
350
+
351
+ msg2 = manager.add_message(test_chat_id, "assistant", "Hello! I'd be happy to help you with farming. What specific questions do you have?")
352
+ print(f"Added assistant message: {msg2}")
353
+
354
+ msg3 = manager.add_message(test_chat_id, "user", "What crops should I grow?", "https://example.com/image.jpg")
355
+ print(f"Added user message with image: {msg3}")
356
+
357
+ # Test 2: Retrieve history
358
+ print(f"\n2. Retrieved history for {test_chat_id}:")
359
+ history = manager.get_history(test_chat_id)
360
+ for msg in history:
361
+ print(f" Message {msg.get('message_id')}: {msg.get('role')} - {msg.get('content')[:50]}...")
362
+
363
+ # Test 3: Get chat info
364
+ print(f"\n3. Chat info:")
365
+ info = manager.get_chat_info(test_chat_id)
366
+ print(f" Exists: {info.get('exists')}, Messages: {info.get('message_count')}")
367
+
368
+ # Test 4: Auto-generated chat ID
369
+ auto_chat_id = manager.generate_chat_id()
370
+ print(f"\n4. Auto-generated chat ID: {auto_chat_id}")
371
+
372
+ # Test 5: Get all sessions
373
+ print(f"\n5. All chat sessions:")
374
+ sessions = manager.get_all_chat_sessions()
375
+ for session in sessions[:3]: # Show first 3
376
+ print(f" {session.get('session_id')}: {session.get('title')} ({session.get('message_count')} messages)")
377
+
378
+ print("\n✅ All tests completed successfully!")
379
+
380
+ except Exception as e:
381
+ print(f"❌ Test failed: {e}")