codeBOKER commited on
Commit
96d2da4
·
verified ·
1 Parent(s): f0d2c6c

update database.py

Browse files
Files changed (1) hide show
  1. database.py +41 -147
database.py CHANGED
@@ -1,25 +1,28 @@
1
  import os
2
  from datetime import datetime
3
  from typing import List, Dict, Optional
4
- from supabase import create_client, Client
5
  import logging
6
  from config import SUPABASE_URL, SUPABASE_KEY
7
 
8
-
9
  class DatabaseManager:
10
  def __init__(self, supabase_url: str = SUPABASE_URL, supabase_key: str = SUPABASE_KEY):
11
  if not supabase_url or not supabase_key:
12
- raise ValueError("SUPABASE_URL and SUPABASE_KEY must be set in environment variables")
13
-
14
- self.supabase: Client = create_client(supabase_url, supabase_key)
 
15
  self.logger = logging.getLogger(__name__)
16
-
17
- def create_or_update_user(self, telegram_id: int, username: str = None,
18
- first_name: str = None, last_name: str = None):
19
- """Create or update user information"""
 
 
 
 
20
  try:
21
- # Check if user exists
22
- existing_user = self.supabase.table("users").select("id").eq("telegram_id", telegram_id).execute()
23
 
24
  user_data = {
25
  "telegram_id": telegram_id,
@@ -30,26 +33,20 @@ class DatabaseManager:
30
  }
31
 
32
  if existing_user.data:
33
- # Update existing user
34
- result = self.supabase.table("users").update(user_data).eq("telegram_id", telegram_id).execute()
35
  else:
36
- # Create new user
37
  user_data["created_at"] = datetime.utcnow().isoformat()
38
- result = self.supabase.table("users").insert(user_data).execute()
39
 
40
  return result.data[0] if result.data else None
41
-
42
  except Exception as e:
43
  self.logger.error(f"Error creating/updating user: {e}")
44
  return None
45
-
46
- def save_message(self, telegram_id: int, message_text: str, message_type: str):
47
- """Save a message to the database"""
48
  try:
49
- # Ensure user exists
50
- self.create_or_update_user(telegram_id)
51
 
52
- # Save message
53
  message_data = {
54
  "telegram_id": telegram_id,
55
  "message_text": message_text,
@@ -57,146 +54,43 @@ class DatabaseManager:
57
  "created_at": datetime.utcnow().isoformat()
58
  }
59
 
60
- result = self.supabase.table("messages").insert(message_data).execute()
61
-
62
- # Ensure active session exists
63
- self._ensure_active_session(telegram_id)
64
 
65
  return result.data[0] if result.data else None
66
-
67
  except Exception as e:
68
  self.logger.error(f"Error saving message: {e}")
69
  return None
70
-
71
- def get_conversation_history(self, telegram_id: int, limit: int = 10) -> List[Dict]:
72
- """Get conversation history for a user"""
73
  try:
74
- result = (self.supabase.table("messages")
75
- .select("message_text, message_type, created_at")
76
- .eq("telegram_id", telegram_id)
77
- .order("created_at", desc=True)
78
- .limit(limit)
79
- .execute())
80
-
81
  return result.data if result.data else []
82
-
83
  except Exception as e:
84
- self.logger.error(f"Error getting conversation history: {e}")
85
  return []
86
-
87
- def get_formatted_history(self, telegram_id: int, limit: int = 10) -> str:
88
- """Get formatted conversation history for Groq"""
89
- history = self.get_conversation_history(telegram_id, limit)
90
-
91
- if not history:
92
- return ""
93
-
94
- # Reverse to get chronological order
95
- history.reverse()
96
-
97
- formatted_history = "Previous conversation:\n"
98
- for msg in history:
99
- role = "User" if msg['message_type'] == 'user' else "Assistant"
100
- formatted_history += f"{role}: {msg['message_text']}\n"
101
-
102
- return formatted_history
103
-
104
- def _ensure_active_session(self, telegram_id: int):
105
- """Ensure an active session exists for the user"""
106
  try:
107
- # Check for active session
108
- active_session = (self.supabase.table("conversation_sessions")
109
- .select("id")
110
- .eq("telegram_id", telegram_id)
111
- .is_("session_end", "null")
112
- .execute())
113
 
114
- if not active_session.data:
115
- # Create new session
116
  session_data = {
117
  "telegram_id": telegram_id,
118
  "session_start": datetime.utcnow().isoformat(),
119
  "created_at": datetime.utcnow().isoformat()
120
  }
121
- self.supabase.table("conversation_sessions").insert(session_data).execute()
122
-
123
  except Exception as e:
124
- self.logger.error(f"Error ensuring active session: {e}")
125
-
126
- def start_new_session(self, telegram_id: int):
127
- """Start a new conversation session"""
128
- try:
129
- # End previous sessions
130
- self.supabase.table("conversation_sessions").update({
131
- "session_end": datetime.utcnow().isoformat()
132
- }).eq("telegram_id", telegram_id).is_("session_end", "null").execute()
133
-
134
- # Start new session
135
- session_data = {
136
- "telegram_id": telegram_id,
137
- "session_start": datetime.utcnow().isoformat(),
138
- "created_at": datetime.utcnow().isoformat()
139
- }
140
- result = self.supabase.table("conversation_sessions").insert(session_data).execute()
141
-
142
- return result.data[0] if result.data else None
143
-
144
- except Exception as e:
145
- self.logger.error(f"Error starting new session: {e}")
146
- return None
147
-
148
- def get_user_stats(self, telegram_id: int) -> Dict:
149
- """Get user conversation statistics"""
150
- try:
151
- # Get message counts
152
- message_stats = (self.supabase.table("messages")
153
- .select("message_type")
154
- .eq("telegram_id", telegram_id)
155
- .execute())
156
-
157
- if not message_stats.data:
158
- return {
159
- "total_messages": 0,
160
- "user_messages": 0,
161
- "assistant_messages": 0,
162
- "first_message": None,
163
- "last_message": None
164
- }
165
-
166
- total_messages = len(message_stats.data)
167
- user_messages = len([m for m in message_stats.data if m['message_type'] == 'user'])
168
- assistant_messages = len([m for m in message_stats.data if m['message_type'] == 'assistant'])
169
-
170
- # Get first and last message timestamps
171
- timestamps = [m['created_at'] for m in message_stats.data]
172
- first_message = min(timestamps) if timestamps else None
173
- last_message = max(timestamps) if timestamps else None
174
-
175
- return {
176
- "total_messages": total_messages,
177
- "user_messages": user_messages,
178
- "assistant_messages": assistant_messages,
179
- "first_message": first_message,
180
- "last_message": last_message
181
- }
182
-
183
- except Exception as e:
184
- self.logger.error(f"Error getting user stats: {e}")
185
- return {
186
- "total_messages": 0,
187
- "user_messages": 0,
188
- "assistant_messages": 0,
189
- "first_message": None,
190
- "last_message": None
191
- }
192
 
193
- # Global database instance
194
- try:
195
- db_manager = DatabaseManager()
196
- except ValueError as e:
197
- print(f"Database initialization failed: {e}")
198
- print("Please set SUPABASE_URL and SUPABASE_KEY environment variables")
199
- db_manager = None
200
- except Exception as e:
201
- print(f"Unexpected database error: {e}")
202
- db_manager = None
 
1
  import os
2
  from datetime import datetime
3
  from typing import List, Dict, Optional
4
+ from supabase import create_async_client, AsyncClient
5
  import logging
6
  from config import SUPABASE_URL, SUPABASE_KEY
7
 
 
8
  class DatabaseManager:
9
  def __init__(self, supabase_url: str = SUPABASE_URL, supabase_key: str = SUPABASE_KEY):
10
  if not supabase_url or not supabase_key:
11
+ raise ValueError("SUPABASE_URL and SUPABASE_KEY must be set")
12
+ self.supabase_url = supabase_url
13
+ self.supabase_key = supabase_key
14
+ self.supabase: Optional[AsyncClient] = None
15
  self.logger = logging.getLogger(__name__)
16
+
17
+ async def connect(self):
18
+ """Initialize the async client"""
19
+ if not self.supabase:
20
+ self.supabase = await create_async_client(self.supabase_url, self.supabase_key)
21
+
22
+ async def create_or_update_user(self, telegram_id: int, username: str = None,
23
+ first_name: str = None, last_name: str = None):
24
  try:
25
+ existing_user = await self.supabase.table("users").select("id").eq("telegram_id", telegram_id).execute()
 
26
 
27
  user_data = {
28
  "telegram_id": telegram_id,
 
33
  }
34
 
35
  if existing_user.data:
36
+ result = await self.supabase.table("users").update(user_data).eq("telegram_id", telegram_id).execute()
 
37
  else:
 
38
  user_data["created_at"] = datetime.utcnow().isoformat()
39
+ result = await self.supabase.table("users").insert(user_data).execute()
40
 
41
  return result.data[0] if result.data else None
 
42
  except Exception as e:
43
  self.logger.error(f"Error creating/updating user: {e}")
44
  return None
45
+
46
+ async def save_message(self, telegram_id: int, message_text: str, message_type: str):
 
47
  try:
48
+ await self.create_or_update_user(telegram_id)
 
49
 
 
50
  message_data = {
51
  "telegram_id": telegram_id,
52
  "message_text": message_text,
 
54
  "created_at": datetime.utcnow().isoformat()
55
  }
56
 
57
+ result = await self.supabase.table("messages").insert(message_data).execute()
58
+ await self._ensure_active_session(telegram_id)
 
 
59
 
60
  return result.data[0] if result.data else None
 
61
  except Exception as e:
62
  self.logger.error(f"Error saving message: {e}")
63
  return None
64
+
65
+ async def get_conversation_history(self, telegram_id: int, limit: int = 10) -> List[Dict]:
 
66
  try:
67
+ result = await (self.supabase.table("messages")
68
+ .select("message_text, message_type, created_at")
69
+ .eq("telegram_id", telegram_id)
70
+ .order("created_at", desc=True)
71
+ .limit(limit)
72
+ .execute())
 
73
  return result.data if result.data else []
 
74
  except Exception as e:
75
+ self.logger.error(f"Error getting history: {e}")
76
  return []
77
+
78
+ async def _ensure_active_session(self, telegram_id: int):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
79
  try:
80
+ active = await (self.supabase.table("conversation_sessions")
81
+ .select("id")
82
+ .eq("telegram_id", telegram_id)
83
+ .is_("session_end", "null")
84
+ .execute())
 
85
 
86
+ if not active.data:
 
87
  session_data = {
88
  "telegram_id": telegram_id,
89
  "session_start": datetime.utcnow().isoformat(),
90
  "created_at": datetime.utcnow().isoformat()
91
  }
92
+ await self.supabase.table("conversation_sessions").insert(session_data).execute()
 
93
  except Exception as e:
94
+ self.logger.error(f"Error ensuring session: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
95
 
96
+ db_manager = DatabaseManager()