# utils/supabase_client.py """ Supabase client for TaijiChat Handles user management, quota tracking, and usage logging """ import os from datetime import datetime from typing import Optional, Dict, List, Tuple from supabase import create_client, Client import json class SupabaseClient: """Client for interacting with Supabase database""" def __init__(self, supabase_url: Optional[str] = None, supabase_key: Optional[str] = None): """ Initialize Supabase client Args: supabase_url: Supabase project URL (defaults to SUPABASE_URL env var) supabase_key: Supabase service role key (defaults to SUPABASE_KEY env var) """ self.supabase_url = supabase_url or os.getenv('SUPABASE_URL') self.supabase_key = supabase_key or os.getenv('SUPABASE_KEY') # Debug logging print(f"SupabaseClient: SUPABASE_URL from env: {self.supabase_url[:50] if self.supabase_url else 'None'}") print(f"SupabaseClient: SUPABASE_KEY from env: {'[SET]' if self.supabase_key else 'None'}") if not self.supabase_url or not self.supabase_key: print("WARNING: Supabase credentials not configured. Logging will be disabled.") self.client = None else: try: self.client: Client = create_client(self.supabase_url, self.supabase_key) print("SupabaseClient: Successfully initialized") except Exception as e: print(f"SupabaseClient: Failed to initialize - {e}") self.client = None def is_enabled(self) -> bool: """Check if Supabase client is properly configured""" return self.client is not None def get_or_create_user(self, hf_user_id: str, hf_username: str, email: Optional[str] = None) -> Optional[Dict]: """ Get existing user or create new user Args: hf_user_id: Hugging Face user ID hf_username: Hugging Face username email: User email (optional) Returns: User record dict or None if error """ if not self.is_enabled(): return None try: # Check if user exists response = self.client.table('users').select('*').eq('hf_user_id', hf_user_id).execute() if response.data and len(response.data) > 0: # User exists, update last_login user = response.data[0] self.client.table('users').update({ 'last_login': datetime.utcnow().isoformat() }).eq('id', user['id']).execute() print(f"SupabaseClient: User {hf_username} logged in") return user else: # Create new user new_user = { 'hf_user_id': hf_user_id, 'hf_username': hf_username, 'email': email, 'token_quota': 100000, # Default quota 'tokens_used': 0, 'last_login': datetime.utcnow().isoformat(), 'is_active': True } response = self.client.table('users').insert(new_user).execute() if response.data: print(f"SupabaseClient: Created new user {hf_username}") return response.data[0] else: print(f"SupabaseClient: Failed to create user - no data returned") return None except Exception as e: print(f"SupabaseClient: Error in get_or_create_user - {e}") return None def check_quota(self, hf_user_id: str) -> Tuple[bool, int, int]: """ Check if user has tokens remaining in quota Args: hf_user_id: Hugging Face user ID Returns: Tuple of (has_quota: bool, tokens_remaining: int, tokens_used: int) """ if not self.is_enabled(): return (True, 999999, 0) # Allow unlimited if Supabase disabled try: response = self.client.table('users').select('token_quota, tokens_used').eq('hf_user_id', hf_user_id).execute() if response.data and len(response.data) > 0: user = response.data[0] quota = user.get('token_quota', 100000) used = user.get('tokens_used', 0) remaining = quota - used has_quota = remaining > 0 return (has_quota, remaining, used) else: print(f"SupabaseClient: User not found for quota check") return (False, 0, 0) except Exception as e: print(f"SupabaseClient: Error checking quota - {e}") return (True, 999999, 0) # Fail open to allow usage if DB error def update_token_usage(self, hf_user_id: str, tokens_to_add: int) -> bool: """ Increment user's token usage Args: hf_user_id: Hugging Face user ID tokens_to_add: Number of tokens to add to usage Returns: True if successful, False otherwise """ if not self.is_enabled(): return True try: # Get current usage response = self.client.table('users').select('id, tokens_used').eq('hf_user_id', hf_user_id).execute() if response.data and len(response.data) > 0: user = response.data[0] new_usage = user.get('tokens_used', 0) + tokens_to_add # Update usage self.client.table('users').update({ 'tokens_used': new_usage }).eq('id', user['id']).execute() print(f"SupabaseClient: Updated token usage for user {hf_user_id} - added {tokens_to_add} tokens") return True else: print(f"SupabaseClient: User not found for token update") return False except Exception as e: print(f"SupabaseClient: Error updating token usage - {e}") return False def log_usage(self, hf_user_id: str, query_text: str, user_id: Optional[str] = None, prompt_tokens: int = 0, completion_tokens: int = 0, total_tokens: int = 0, model: Optional[str] = None, response_text: Optional[str] = None, error_message: Optional[str] = None, conversation_history: Optional[List[Dict]] = None, is_image_response: bool = False, image_path: Optional[str] = None) -> bool: """ Log a query to usage_logs table This is called IMMEDIATELY after getting a response from the agent or when an error occurs. Args: hf_user_id: Hugging Face user ID (required) query_text: User's query text (required) user_id: UUID of user from users table (optional) prompt_tokens: Number of prompt tokens used completion_tokens: Number of completion tokens used total_tokens: Total tokens used model: Model name (e.g., "gpt-4o") response_text: Assistant's response error_message: Error message if query failed conversation_history: Full conversation history as list of dicts is_image_response: Whether response included an image image_path: Path to image if applicable Returns: True if logged successfully, False otherwise """ if not self.is_enabled(): print(f"SupabaseClient: Logging disabled, skipping log for query: {query_text[:50]}...") return True try: log_entry = { 'hf_user_id': hf_user_id, 'user_id': user_id, 'query_text': query_text, 'prompt_tokens': prompt_tokens, 'completion_tokens': completion_tokens, 'total_tokens': total_tokens, 'model': model, 'response_text': response_text, 'error_message': error_message, 'conversation_history': json.dumps(conversation_history) if conversation_history else None, 'is_image_response': is_image_response, 'image_path': image_path } response = self.client.table('usage_logs').insert(log_entry).execute() if response.data: print(f"SupabaseClient: Logged usage - tokens: {total_tokens}, error: {error_message is not None}") return True else: print(f"SupabaseClient: Failed to log usage - no data returned") return False except Exception as e: print(f"SupabaseClient: Error logging usage - {e}") return False def get_user_stats(self, hf_user_id: str) -> Optional[Dict]: """ Get user statistics from user_stats view Args: hf_user_id: Hugging Face user ID Returns: Dict with user stats or None if error """ if not self.is_enabled(): return None try: response = self.client.table('user_stats').select('*').eq('hf_user_id', hf_user_id).execute() if response.data and len(response.data) > 0: return response.data[0] else: return None except Exception as e: print(f"SupabaseClient: Error getting user stats - {e}") return None # Singleton instance for easy import _supabase_client_instance = None def get_supabase_client() -> SupabaseClient: """Get singleton Supabase client instance""" global _supabase_client_instance if _supabase_client_instance is None: _supabase_client_instance = SupabaseClient() return _supabase_client_instance