File size: 4,147 Bytes
511ba56
 
 
96d2da4
511ba56
 
 
 
 
 
96d2da4
 
 
 
511ba56
96d2da4
 
 
 
 
 
 
 
511ba56
96d2da4
511ba56
 
 
 
 
 
 
 
 
 
96d2da4
511ba56
 
96d2da4
511ba56
 
 
 
 
96d2da4
 
511ba56
96d2da4
511ba56
 
 
 
 
 
 
 
96d2da4
 
511ba56
 
 
 
 
96d2da4
 
511ba56
96d2da4
 
 
 
 
 
511ba56
 
96d2da4
511ba56
96d2da4
 
511ba56
96d2da4
 
 
 
 
511ba56
96d2da4
511ba56
 
 
 
 
96d2da4
511ba56
96d2da4
511ba56
96d2da4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import os
from datetime import datetime
from typing import List, Dict, Optional
from supabase import create_async_client, AsyncClient
import logging
from config import SUPABASE_URL, SUPABASE_KEY

class DatabaseManager:
    def __init__(self, supabase_url: str = SUPABASE_URL, supabase_key: str = SUPABASE_KEY):
        if not supabase_url or not supabase_key:
            raise ValueError("SUPABASE_URL and SUPABASE_KEY must be set")
        self.supabase_url = supabase_url
        self.supabase_key = supabase_key
        self.supabase: Optional[AsyncClient] = None
        self.logger = logging.getLogger(__name__)

    async def connect(self):
        """Initialize the async client"""
        if not self.supabase:
            self.supabase = await create_async_client(self.supabase_url, self.supabase_key)

    async def create_or_update_user(self, telegram_id: int, username: str = None, 
                                    first_name: str = None, last_name: str = None):
        try:
            existing_user = await self.supabase.table("users").select("id").eq("telegram_id", telegram_id).execute()
            
            user_data = {
                "telegram_id": telegram_id,
                "username": username,
                "first_name": first_name,
                "last_name": last_name,
                "updated_at": datetime.utcnow().isoformat()
            }
            
            if existing_user.data:
                result = await self.supabase.table("users").update(user_data).eq("telegram_id", telegram_id).execute()
            else:
                user_data["created_at"] = datetime.utcnow().isoformat()
                result = await self.supabase.table("users").insert(user_data).execute()
            
            return result.data[0] if result.data else None
        except Exception as e:
            self.logger.error(f"Error creating/updating user: {e}")
            return None

    async def save_message(self, telegram_id: int, message_text: str, message_type: str):
        try:
            await self.create_or_update_user(telegram_id)
            
            message_data = {
                "telegram_id": telegram_id,
                "message_text": message_text,
                "message_type": message_type,
                "created_at": datetime.utcnow().isoformat()
            }
            
            result = await self.supabase.table("messages").insert(message_data).execute()
            await self._ensure_active_session(telegram_id)
            
            return result.data[0] if result.data else None
        except Exception as e:
            self.logger.error(f"Error saving message: {e}")
            return None

    async def get_conversation_history(self, telegram_id: int, limit: int = 10) -> List[Dict]:
        try:
            result = await (self.supabase.table("messages")
                           .select("message_text, message_type, created_at")
                           .eq("telegram_id", telegram_id)
                           .order("created_at", desc=True)
                           .limit(limit)
                           .execute())
            return result.data if result.data else []
        except Exception as e:
            self.logger.error(f"Error getting history: {e}")
            return []

    async def _ensure_active_session(self, telegram_id: int):
        try:
            active = await (self.supabase.table("conversation_sessions")
                            .select("id")
                            .eq("telegram_id", telegram_id)
                            .is_("session_end", "null")
                            .execute())
            
            if not active.data:
                session_data = {
                    "telegram_id": telegram_id,
                    "session_start": datetime.utcnow().isoformat(),
                    "created_at": datetime.utcnow().isoformat()
                }
                await self.supabase.table("conversation_sessions").insert(session_data).execute()
        except Exception as e:
            self.logger.error(f"Error ensuring session: {e}")

db_manager = DatabaseManager()