| |
|
| | """
|
| | Supabase client for TaijiChat
|
| | Handles user management, quota tracking, and usage logging
|
| | """
|
| |
|
| | import os
|
| | from datetime import datetime
|
| | from typing import Optional, Dict, List, Tuple
|
| | from supabase import create_client, Client
|
| | import json
|
| |
|
| |
|
| | class SupabaseClient:
|
| | """Client for interacting with Supabase database"""
|
| |
|
| | def __init__(self, supabase_url: Optional[str] = None, supabase_key: Optional[str] = None):
|
| | """
|
| | Initialize Supabase client
|
| |
|
| | Args:
|
| | supabase_url: Supabase project URL (defaults to SUPABASE_URL env var)
|
| | supabase_key: Supabase service role key (defaults to SUPABASE_KEY env var)
|
| | """
|
| | self.supabase_url = supabase_url or os.getenv('SUPABASE_URL')
|
| | self.supabase_key = supabase_key or os.getenv('SUPABASE_KEY')
|
| |
|
| |
|
| | print(f"SupabaseClient: SUPABASE_URL from env: {self.supabase_url[:50] if self.supabase_url else 'None'}")
|
| | print(f"SupabaseClient: SUPABASE_KEY from env: {'[SET]' if self.supabase_key else 'None'}")
|
| |
|
| | if not self.supabase_url or not self.supabase_key:
|
| | print("WARNING: Supabase credentials not configured. Logging will be disabled.")
|
| | self.client = None
|
| | else:
|
| | try:
|
| | self.client: Client = create_client(self.supabase_url, self.supabase_key)
|
| | print("SupabaseClient: Successfully initialized")
|
| | except Exception as e:
|
| | print(f"SupabaseClient: Failed to initialize - {e}")
|
| | self.client = None
|
| |
|
| | def is_enabled(self) -> bool:
|
| | """Check if Supabase client is properly configured"""
|
| | return self.client is not None
|
| |
|
| | def get_or_create_user(self, hf_user_id: str, hf_username: str, email: Optional[str] = None) -> Optional[Dict]:
|
| | """
|
| | Get existing user or create new user
|
| |
|
| | Args:
|
| | hf_user_id: Hugging Face user ID
|
| | hf_username: Hugging Face username
|
| | email: User email (optional)
|
| |
|
| | Returns:
|
| | User record dict or None if error
|
| | """
|
| | if not self.is_enabled():
|
| | return None
|
| |
|
| | try:
|
| |
|
| | response = self.client.table('users').select('*').eq('hf_user_id', hf_user_id).execute()
|
| |
|
| | if response.data and len(response.data) > 0:
|
| |
|
| | user = response.data[0]
|
| | self.client.table('users').update({
|
| | 'last_login': datetime.utcnow().isoformat()
|
| | }).eq('id', user['id']).execute()
|
| | print(f"SupabaseClient: User {hf_username} logged in")
|
| | return user
|
| | else:
|
| |
|
| | new_user = {
|
| | 'hf_user_id': hf_user_id,
|
| | 'hf_username': hf_username,
|
| | 'email': email,
|
| | 'token_quota': 100000,
|
| | 'tokens_used': 0,
|
| | 'last_login': datetime.utcnow().isoformat(),
|
| | 'is_active': True
|
| | }
|
| | response = self.client.table('users').insert(new_user).execute()
|
| | if response.data:
|
| | print(f"SupabaseClient: Created new user {hf_username}")
|
| | return response.data[0]
|
| | else:
|
| | print(f"SupabaseClient: Failed to create user - no data returned")
|
| | return None
|
| | except Exception as e:
|
| | print(f"SupabaseClient: Error in get_or_create_user - {e}")
|
| | return None
|
| |
|
| | def check_quota(self, hf_user_id: str) -> Tuple[bool, int, int]:
|
| | """
|
| | Check if user has tokens remaining in quota
|
| |
|
| | Args:
|
| | hf_user_id: Hugging Face user ID
|
| |
|
| | Returns:
|
| | Tuple of (has_quota: bool, tokens_remaining: int, tokens_used: int)
|
| | """
|
| | if not self.is_enabled():
|
| | return (True, 999999, 0)
|
| |
|
| | try:
|
| | response = self.client.table('users').select('token_quota, tokens_used').eq('hf_user_id', hf_user_id).execute()
|
| |
|
| | if response.data and len(response.data) > 0:
|
| | user = response.data[0]
|
| | quota = user.get('token_quota', 100000)
|
| | used = user.get('tokens_used', 0)
|
| | remaining = quota - used
|
| | has_quota = remaining > 0
|
| | return (has_quota, remaining, used)
|
| | else:
|
| | print(f"SupabaseClient: User not found for quota check")
|
| | return (False, 0, 0)
|
| | except Exception as e:
|
| | print(f"SupabaseClient: Error checking quota - {e}")
|
| | return (True, 999999, 0)
|
| |
|
| | def update_token_usage(self, hf_user_id: str, tokens_to_add: int) -> bool:
|
| | """
|
| | Increment user's token usage
|
| |
|
| | Args:
|
| | hf_user_id: Hugging Face user ID
|
| | tokens_to_add: Number of tokens to add to usage
|
| |
|
| | Returns:
|
| | True if successful, False otherwise
|
| | """
|
| | if not self.is_enabled():
|
| | return True
|
| |
|
| | try:
|
| |
|
| | response = self.client.table('users').select('id, tokens_used').eq('hf_user_id', hf_user_id).execute()
|
| |
|
| | if response.data and len(response.data) > 0:
|
| | user = response.data[0]
|
| | new_usage = user.get('tokens_used', 0) + tokens_to_add
|
| |
|
| |
|
| | self.client.table('users').update({
|
| | 'tokens_used': new_usage
|
| | }).eq('id', user['id']).execute()
|
| |
|
| | print(f"SupabaseClient: Updated token usage for user {hf_user_id} - added {tokens_to_add} tokens")
|
| | return True
|
| | else:
|
| | print(f"SupabaseClient: User not found for token update")
|
| | return False
|
| | except Exception as e:
|
| | print(f"SupabaseClient: Error updating token usage - {e}")
|
| | return False
|
| |
|
| | def log_usage(self,
|
| | hf_user_id: str,
|
| | query_text: str,
|
| | user_id: Optional[str] = None,
|
| | prompt_tokens: int = 0,
|
| | completion_tokens: int = 0,
|
| | total_tokens: int = 0,
|
| | model: Optional[str] = None,
|
| | response_text: Optional[str] = None,
|
| | error_message: Optional[str] = None,
|
| | conversation_history: Optional[List[Dict]] = None,
|
| | is_image_response: bool = False,
|
| | image_path: Optional[str] = None) -> bool:
|
| | """
|
| | Log a query to usage_logs table
|
| |
|
| | This is called IMMEDIATELY after getting a response from the agent
|
| | or when an error occurs.
|
| |
|
| | Args:
|
| | hf_user_id: Hugging Face user ID (required)
|
| | query_text: User's query text (required)
|
| | user_id: UUID of user from users table (optional)
|
| | prompt_tokens: Number of prompt tokens used
|
| | completion_tokens: Number of completion tokens used
|
| | total_tokens: Total tokens used
|
| | model: Model name (e.g., "gpt-4o")
|
| | response_text: Assistant's response
|
| | error_message: Error message if query failed
|
| | conversation_history: Full conversation history as list of dicts
|
| | is_image_response: Whether response included an image
|
| | image_path: Path to image if applicable
|
| |
|
| | Returns:
|
| | True if logged successfully, False otherwise
|
| | """
|
| | if not self.is_enabled():
|
| | print(f"SupabaseClient: Logging disabled, skipping log for query: {query_text[:50]}...")
|
| | return True
|
| |
|
| | try:
|
| | log_entry = {
|
| | 'hf_user_id': hf_user_id,
|
| | 'user_id': user_id,
|
| | 'query_text': query_text,
|
| | 'prompt_tokens': prompt_tokens,
|
| | 'completion_tokens': completion_tokens,
|
| | 'total_tokens': total_tokens,
|
| | 'model': model,
|
| | 'response_text': response_text,
|
| | 'error_message': error_message,
|
| | 'conversation_history': json.dumps(conversation_history) if conversation_history else None,
|
| | 'is_image_response': is_image_response,
|
| | 'image_path': image_path
|
| | }
|
| |
|
| | response = self.client.table('usage_logs').insert(log_entry).execute()
|
| |
|
| | if response.data:
|
| | print(f"SupabaseClient: Logged usage - tokens: {total_tokens}, error: {error_message is not None}")
|
| | return True
|
| | else:
|
| | print(f"SupabaseClient: Failed to log usage - no data returned")
|
| | return False
|
| | except Exception as e:
|
| | print(f"SupabaseClient: Error logging usage - {e}")
|
| | return False
|
| |
|
| | def get_user_stats(self, hf_user_id: str) -> Optional[Dict]:
|
| | """
|
| | Get user statistics from user_stats view
|
| |
|
| | Args:
|
| | hf_user_id: Hugging Face user ID
|
| |
|
| | Returns:
|
| | Dict with user stats or None if error
|
| | """
|
| | if not self.is_enabled():
|
| | return None
|
| |
|
| | try:
|
| | response = self.client.table('user_stats').select('*').eq('hf_user_id', hf_user_id).execute()
|
| |
|
| | if response.data and len(response.data) > 0:
|
| | return response.data[0]
|
| | else:
|
| | return None
|
| | except Exception as e:
|
| | print(f"SupabaseClient: Error getting user stats - {e}")
|
| | return None
|
| |
|
| |
|
| |
|
| | _supabase_client_instance = None
|
| |
|
| | def get_supabase_client() -> SupabaseClient:
|
| | """Get singleton Supabase client instance"""
|
| | global _supabase_client_instance
|
| | if _supabase_client_instance is None:
|
| | _supabase_client_instance = SupabaseClient()
|
| | return _supabase_client_instance
|
| |
|