import os from datetime import datetime from typing import List, Dict, Optional from supabase import create_client, Client import logging from config import SUPABASE_URL, SUPABASE_KEY class DatabaseManager: def __init__(self, supabase_url: str = SUPABASE_URL, supabase_key: str = SUPABASE_KEY): if not supabase_url or not supabase_key: raise ValueError("SUPABASE_URL and SUPABASE_KEY must be set in environment variables") self.supabase: Client = create_client(supabase_url, supabase_key) self.logger = logging.getLogger(__name__) def create_or_update_user(self, telegram_id: int, username: str = None, first_name: str = None, last_name: str = None): """Create or update user information""" try: # Check if user exists existing_user = self.supabase.table("users").select("id").eq("telegram_id", telegram_id).execute() user_data = { "telegram_id": telegram_id, "username": username, "first_name": first_name, "last_name": last_name, "updated_at": datetime.utcnow().isoformat() } if existing_user.data: # Update existing user result = self.supabase.table("users").update(user_data).eq("telegram_id", telegram_id).execute() else: # Create new user user_data["created_at"] = datetime.utcnow().isoformat() result = self.supabase.table("users").insert(user_data).execute() return result.data[0] if result.data else None except Exception as e: self.logger.error(f"Error creating/updating user: {e}") return None def save_message(self, telegram_id: int, message_text: str, message_type: str): """Save a message to the database""" try: # Ensure user exists self.create_or_update_user(telegram_id) # Save message message_data = { "telegram_id": telegram_id, "message_text": message_text, "message_type": message_type, "created_at": datetime.utcnow().isoformat() } result = self.supabase.table("messages").insert(message_data).execute() # Ensure active session exists self._ensure_active_session(telegram_id) return result.data[0] if result.data else None except Exception as e: self.logger.error(f"Error saving message: {e}") return None def get_conversation_history(self, telegram_id: int, limit: int = 10) -> List[Dict]: """Get conversation history for a user""" try: result = (self.supabase.table("messages") .select("message_text, message_type, created_at") .eq("telegram_id", telegram_id) .order("created_at", desc=True) .limit(limit) .execute()) return result.data if result.data else [] except Exception as e: self.logger.error(f"Error getting conversation history: {e}") return [] def get_formatted_history(self, telegram_id: int, limit: int = 10) -> str: """Get formatted conversation history for Groq""" history = self.get_conversation_history(telegram_id, limit) if not history: return "" # Reverse to get chronological order history.reverse() formatted_history = "Previous conversation:\n" for msg in history: role = "User" if msg['message_type'] == 'user' else "Assistant" formatted_history += f"{role}: {msg['message_text']}\n" return formatted_history def _ensure_active_session(self, telegram_id: int): """Ensure an active session exists for the user""" try: # Check for active session active_session = (self.supabase.table("conversation_sessions") .select("id") .eq("telegram_id", telegram_id) .is_("session_end", "null") .execute()) if not active_session.data: # Create new session session_data = { "telegram_id": telegram_id, "session_start": datetime.utcnow().isoformat(), "created_at": datetime.utcnow().isoformat() } self.supabase.table("conversation_sessions").insert(session_data).execute() except Exception as e: self.logger.error(f"Error ensuring active session: {e}") def start_new_session(self, telegram_id: int): """Start a new conversation session""" try: # End previous sessions self.supabase.table("conversation_sessions").update({ "session_end": datetime.utcnow().isoformat() }).eq("telegram_id", telegram_id).is_("session_end", "null").execute() # Start new session session_data = { "telegram_id": telegram_id, "session_start": datetime.utcnow().isoformat(), "created_at": datetime.utcnow().isoformat() } result = self.supabase.table("conversation_sessions").insert(session_data).execute() return result.data[0] if result.data else None except Exception as e: self.logger.error(f"Error starting new session: {e}") return None def get_user_stats(self, telegram_id: int) -> Dict: """Get user conversation statistics""" try: # Get message counts message_stats = (self.supabase.table("messages") .select("message_type") .eq("telegram_id", telegram_id) .execute()) if not message_stats.data: return { "total_messages": 0, "user_messages": 0, "assistant_messages": 0, "first_message": None, "last_message": None } total_messages = len(message_stats.data) user_messages = len([m for m in message_stats.data if m['message_type'] == 'user']) assistant_messages = len([m for m in message_stats.data if m['message_type'] == 'assistant']) # Get first and last message timestamps timestamps = [m['created_at'] for m in message_stats.data] first_message = min(timestamps) if timestamps else None last_message = max(timestamps) if timestamps else None return { "total_messages": total_messages, "user_messages": user_messages, "assistant_messages": assistant_messages, "first_message": first_message, "last_message": last_message } except Exception as e: self.logger.error(f"Error getting user stats: {e}") return { "total_messages": 0, "user_messages": 0, "assistant_messages": 0, "first_message": None, "last_message": None } # Global database instance try: db_manager = DatabaseManager() except ValueError as e: print(f"Database initialization failed: {e}") print("Please set SUPABASE_URL and SUPABASE_KEY environment variables") db_manager = None except Exception as e: print(f"Unexpected database error: {e}") db_manager = None