File size: 8,109 Bytes
511ba56
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import os
from datetime import datetime
from typing import List, Dict, Optional
from supabase import create_client, Client
import logging
from config import SUPABASE_URL, SUPABASE_KEY


class DatabaseManager:
    def __init__(self, supabase_url: str = SUPABASE_URL, supabase_key: str = SUPABASE_KEY):
        if not supabase_url or not supabase_key:
            raise ValueError("SUPABASE_URL and SUPABASE_KEY must be set in environment variables")
        
        self.supabase: Client = create_client(supabase_url, supabase_key)
        self.logger = logging.getLogger(__name__)
    
    def create_or_update_user(self, telegram_id: int, username: str = None, 
                            first_name: str = None, last_name: str = None):
        """Create or update user information"""
        try:
            # Check if user exists
            existing_user = self.supabase.table("users").select("id").eq("telegram_id", telegram_id).execute()
            
            user_data = {
                "telegram_id": telegram_id,
                "username": username,
                "first_name": first_name,
                "last_name": last_name,
                "updated_at": datetime.utcnow().isoformat()
            }
            
            if existing_user.data:
                # Update existing user
                result = self.supabase.table("users").update(user_data).eq("telegram_id", telegram_id).execute()
            else:
                # Create new user
                user_data["created_at"] = datetime.utcnow().isoformat()
                result = self.supabase.table("users").insert(user_data).execute()
            
            return result.data[0] if result.data else None
            
        except Exception as e:
            self.logger.error(f"Error creating/updating user: {e}")
            return None
    
    def save_message(self, telegram_id: int, message_text: str, message_type: str):
        """Save a message to the database"""
        try:
            # Ensure user exists
            self.create_or_update_user(telegram_id)
            
            # Save message
            message_data = {
                "telegram_id": telegram_id,
                "message_text": message_text,
                "message_type": message_type,
                "created_at": datetime.utcnow().isoformat()
            }
            
            result = self.supabase.table("messages").insert(message_data).execute()
            
            # Ensure active session exists
            self._ensure_active_session(telegram_id)
            
            return result.data[0] if result.data else None
            
        except Exception as e:
            self.logger.error(f"Error saving message: {e}")
            return None
    
    def get_conversation_history(self, telegram_id: int, limit: int = 10) -> List[Dict]:
        """Get conversation history for a user"""
        try:
            result = (self.supabase.table("messages")
                     .select("message_text, message_type, created_at")
                     .eq("telegram_id", telegram_id)
                     .order("created_at", desc=True)
                     .limit(limit)
                     .execute())
            
            return result.data if result.data else []
            
        except Exception as e:
            self.logger.error(f"Error getting conversation history: {e}")
            return []
    
    def get_formatted_history(self, telegram_id: int, limit: int = 10) -> str:
        """Get formatted conversation history for Groq"""
        history = self.get_conversation_history(telegram_id, limit)
        
        if not history:
            return ""
        
        # Reverse to get chronological order
        history.reverse()
        
        formatted_history = "Previous conversation:\n"
        for msg in history:
            role = "User" if msg['message_type'] == 'user' else "Assistant"
            formatted_history += f"{role}: {msg['message_text']}\n"
        
        return formatted_history
    
    def _ensure_active_session(self, telegram_id: int):
        """Ensure an active session exists for the user"""
        try:
            # Check for active session
            active_session = (self.supabase.table("conversation_sessions")
                             .select("id")
                             .eq("telegram_id", telegram_id)
                             .is_("session_end", "null")
                             .execute())
            
            if not active_session.data:
                # Create new session
                session_data = {
                    "telegram_id": telegram_id,
                    "session_start": datetime.utcnow().isoformat(),
                    "created_at": datetime.utcnow().isoformat()
                }
                self.supabase.table("conversation_sessions").insert(session_data).execute()
                
        except Exception as e:
            self.logger.error(f"Error ensuring active session: {e}")
    
    def start_new_session(self, telegram_id: int):
        """Start a new conversation session"""
        try:
            # End previous sessions
            self.supabase.table("conversation_sessions").update({
                "session_end": datetime.utcnow().isoformat()
            }).eq("telegram_id", telegram_id).is_("session_end", "null").execute()
            
            # Start new session
            session_data = {
                "telegram_id": telegram_id,
                "session_start": datetime.utcnow().isoformat(),
                "created_at": datetime.utcnow().isoformat()
            }
            result = self.supabase.table("conversation_sessions").insert(session_data).execute()
            
            return result.data[0] if result.data else None
            
        except Exception as e:
            self.logger.error(f"Error starting new session: {e}")
            return None
    
    def get_user_stats(self, telegram_id: int) -> Dict:
        """Get user conversation statistics"""
        try:
            # Get message counts
            message_stats = (self.supabase.table("messages")
                           .select("message_type")
                           .eq("telegram_id", telegram_id)
                           .execute())
            
            if not message_stats.data:
                return {
                    "total_messages": 0,
                    "user_messages": 0,
                    "assistant_messages": 0,
                    "first_message": None,
                    "last_message": None
                }
            
            total_messages = len(message_stats.data)
            user_messages = len([m for m in message_stats.data if m['message_type'] == 'user'])
            assistant_messages = len([m for m in message_stats.data if m['message_type'] == 'assistant'])
            
            # Get first and last message timestamps
            timestamps = [m['created_at'] for m in message_stats.data]
            first_message = min(timestamps) if timestamps else None
            last_message = max(timestamps) if timestamps else None
            
            return {
                "total_messages": total_messages,
                "user_messages": user_messages,
                "assistant_messages": assistant_messages,
                "first_message": first_message,
                "last_message": last_message
            }
            
        except Exception as e:
            self.logger.error(f"Error getting user stats: {e}")
            return {
                "total_messages": 0,
                "user_messages": 0,
                "assistant_messages": 0,
                "first_message": None,
                "last_message": None
            }

# Global database instance
try:
    db_manager = DatabaseManager()
except ValueError as e:
    print(f"Database initialization failed: {e}")
    print("Please set SUPABASE_URL and SUPABASE_KEY environment variables")
    db_manager = None
except Exception as e:
    print(f"Unexpected database error: {e}")
    db_manager = None