Spaces:
Sleeping
Sleeping
File size: 8,109 Bytes
511ba56 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 | import os
from datetime import datetime
from typing import List, Dict, Optional
from supabase import create_client, Client
import logging
from config import SUPABASE_URL, SUPABASE_KEY
class DatabaseManager:
def __init__(self, supabase_url: str = SUPABASE_URL, supabase_key: str = SUPABASE_KEY):
if not supabase_url or not supabase_key:
raise ValueError("SUPABASE_URL and SUPABASE_KEY must be set in environment variables")
self.supabase: Client = create_client(supabase_url, supabase_key)
self.logger = logging.getLogger(__name__)
def create_or_update_user(self, telegram_id: int, username: str = None,
first_name: str = None, last_name: str = None):
"""Create or update user information"""
try:
# Check if user exists
existing_user = self.supabase.table("users").select("id").eq("telegram_id", telegram_id).execute()
user_data = {
"telegram_id": telegram_id,
"username": username,
"first_name": first_name,
"last_name": last_name,
"updated_at": datetime.utcnow().isoformat()
}
if existing_user.data:
# Update existing user
result = self.supabase.table("users").update(user_data).eq("telegram_id", telegram_id).execute()
else:
# Create new user
user_data["created_at"] = datetime.utcnow().isoformat()
result = self.supabase.table("users").insert(user_data).execute()
return result.data[0] if result.data else None
except Exception as e:
self.logger.error(f"Error creating/updating user: {e}")
return None
def save_message(self, telegram_id: int, message_text: str, message_type: str):
"""Save a message to the database"""
try:
# Ensure user exists
self.create_or_update_user(telegram_id)
# Save message
message_data = {
"telegram_id": telegram_id,
"message_text": message_text,
"message_type": message_type,
"created_at": datetime.utcnow().isoformat()
}
result = self.supabase.table("messages").insert(message_data).execute()
# Ensure active session exists
self._ensure_active_session(telegram_id)
return result.data[0] if result.data else None
except Exception as e:
self.logger.error(f"Error saving message: {e}")
return None
def get_conversation_history(self, telegram_id: int, limit: int = 10) -> List[Dict]:
"""Get conversation history for a user"""
try:
result = (self.supabase.table("messages")
.select("message_text, message_type, created_at")
.eq("telegram_id", telegram_id)
.order("created_at", desc=True)
.limit(limit)
.execute())
return result.data if result.data else []
except Exception as e:
self.logger.error(f"Error getting conversation history: {e}")
return []
def get_formatted_history(self, telegram_id: int, limit: int = 10) -> str:
"""Get formatted conversation history for Groq"""
history = self.get_conversation_history(telegram_id, limit)
if not history:
return ""
# Reverse to get chronological order
history.reverse()
formatted_history = "Previous conversation:\n"
for msg in history:
role = "User" if msg['message_type'] == 'user' else "Assistant"
formatted_history += f"{role}: {msg['message_text']}\n"
return formatted_history
def _ensure_active_session(self, telegram_id: int):
"""Ensure an active session exists for the user"""
try:
# Check for active session
active_session = (self.supabase.table("conversation_sessions")
.select("id")
.eq("telegram_id", telegram_id)
.is_("session_end", "null")
.execute())
if not active_session.data:
# Create new session
session_data = {
"telegram_id": telegram_id,
"session_start": datetime.utcnow().isoformat(),
"created_at": datetime.utcnow().isoformat()
}
self.supabase.table("conversation_sessions").insert(session_data).execute()
except Exception as e:
self.logger.error(f"Error ensuring active session: {e}")
def start_new_session(self, telegram_id: int):
"""Start a new conversation session"""
try:
# End previous sessions
self.supabase.table("conversation_sessions").update({
"session_end": datetime.utcnow().isoformat()
}).eq("telegram_id", telegram_id).is_("session_end", "null").execute()
# Start new session
session_data = {
"telegram_id": telegram_id,
"session_start": datetime.utcnow().isoformat(),
"created_at": datetime.utcnow().isoformat()
}
result = self.supabase.table("conversation_sessions").insert(session_data).execute()
return result.data[0] if result.data else None
except Exception as e:
self.logger.error(f"Error starting new session: {e}")
return None
def get_user_stats(self, telegram_id: int) -> Dict:
"""Get user conversation statistics"""
try:
# Get message counts
message_stats = (self.supabase.table("messages")
.select("message_type")
.eq("telegram_id", telegram_id)
.execute())
if not message_stats.data:
return {
"total_messages": 0,
"user_messages": 0,
"assistant_messages": 0,
"first_message": None,
"last_message": None
}
total_messages = len(message_stats.data)
user_messages = len([m for m in message_stats.data if m['message_type'] == 'user'])
assistant_messages = len([m for m in message_stats.data if m['message_type'] == 'assistant'])
# Get first and last message timestamps
timestamps = [m['created_at'] for m in message_stats.data]
first_message = min(timestamps) if timestamps else None
last_message = max(timestamps) if timestamps else None
return {
"total_messages": total_messages,
"user_messages": user_messages,
"assistant_messages": assistant_messages,
"first_message": first_message,
"last_message": last_message
}
except Exception as e:
self.logger.error(f"Error getting user stats: {e}")
return {
"total_messages": 0,
"user_messages": 0,
"assistant_messages": 0,
"first_message": None,
"last_message": None
}
# Global database instance
try:
db_manager = DatabaseManager()
except ValueError as e:
print(f"Database initialization failed: {e}")
print("Please set SUPABASE_URL and SUPABASE_KEY environment variables")
db_manager = None
except Exception as e:
print(f"Unexpected database error: {e}")
db_manager = None
|