import os from datetime import datetime from typing import List, Dict, Optional from supabase import create_async_client, AsyncClient import logging from config import SUPABASE_URL, SUPABASE_KEY class DatabaseManager: def __init__(self, supabase_url: str = SUPABASE_URL, supabase_key: str = SUPABASE_KEY): if not supabase_url or not supabase_key: raise ValueError("SUPABASE_URL and SUPABASE_KEY must be set") self.supabase_url = supabase_url self.supabase_key = supabase_key self.supabase: Optional[AsyncClient] = None self.logger = logging.getLogger(__name__) async def connect(self): """Initialize the async client""" if not self.supabase: self.supabase = await create_async_client(self.supabase_url, self.supabase_key) async def create_or_update_user(self, telegram_id: int, username: str = None, first_name: str = None, last_name: str = None): try: existing_user = await self.supabase.table("users").select("id").eq("telegram_id", telegram_id).execute() user_data = { "telegram_id": telegram_id, "username": username, "first_name": first_name, "last_name": last_name, "updated_at": datetime.utcnow().isoformat() } if existing_user.data: result = await self.supabase.table("users").update(user_data).eq("telegram_id", telegram_id).execute() else: user_data["created_at"] = datetime.utcnow().isoformat() result = await self.supabase.table("users").insert(user_data).execute() return result.data[0] if result.data else None except Exception as e: self.logger.error(f"Error creating/updating user: {e}") return None async def save_message(self, telegram_id: int, message_text: str, message_type: str): try: await self.create_or_update_user(telegram_id) message_data = { "telegram_id": telegram_id, "message_text": message_text, "message_type": message_type, "created_at": datetime.utcnow().isoformat() } result = await self.supabase.table("messages").insert(message_data).execute() await self._ensure_active_session(telegram_id) return result.data[0] if result.data else None except Exception as e: self.logger.error(f"Error saving message: {e}") return None async def get_conversation_history(self, telegram_id: int, limit: int = 10) -> List[Dict]: try: result = await (self.supabase.table("messages") .select("message_text, message_type, created_at") .eq("telegram_id", telegram_id) .order("created_at", desc=True) .limit(limit) .execute()) return result.data if result.data else [] except Exception as e: self.logger.error(f"Error getting history: {e}") return [] async def _ensure_active_session(self, telegram_id: int): try: active = await (self.supabase.table("conversation_sessions") .select("id") .eq("telegram_id", telegram_id) .is_("session_end", "null") .execute()) if not active.data: session_data = { "telegram_id": telegram_id, "session_start": datetime.utcnow().isoformat(), "created_at": datetime.utcnow().isoformat() } await self.supabase.table("conversation_sessions").insert(session_data).execute() except Exception as e: self.logger.error(f"Error ensuring session: {e}") db_manager = DatabaseManager()