| """ |
| Supabase client service for database and storage operations. |
| """ |
| from typing import Optional |
| import anyio |
|
|
| from app.config import get_settings |
|
|
| |
| try: |
| from supabase import create_client, Client |
| SUPABASE_AVAILABLE = True |
| except ImportError: |
| SUPABASE_AVAILABLE = False |
| Client = None |
|
|
|
|
| class SupabaseService: |
| """ |
| Supabase client wrapper providing database and storage operations. |
| |
| Falls back to local mock operations if Supabase is not configured. |
| """ |
| |
| def __init__(self): |
| self._client: Optional[Client] = None |
| self._settings = get_settings() |
| self._initialized = False |
| |
| |
| self._allowed_tables: set[str] = { |
| "users", |
| "videos", |
| "organizations", |
| "players", |
| "teams", |
| "analysis_results", |
| "detections", |
| "analytics", |
| "notifications", |
| "activities", |
| "matches", |
| "schedules", |
| "announcements", |
| "personal_analyses", |
| "match_stat_uploads", |
| "match_player_stats", |
| "match_team_stats", |
| "organizations_staff", |
| "clips", |
| } |
| |
| def _initialize(self) -> None: |
| """Initialize the Supabase client if not already done.""" |
| if self._initialized: |
| return |
| |
| if not SUPABASE_AVAILABLE: |
| print("Warning: Supabase library not installed. Running in local mode.") |
| self._initialized = True |
| return |
| |
| if not self._settings.supabase_url or not self._settings.supabase_key: |
| print("Warning: Supabase credentials not configured. Running in local mode.") |
| self._initialized = True |
| return |
| |
| try: |
| |
| key = self._settings.supabase_service_key or self._settings.supabase_key |
| self._client = create_client( |
| self._settings.supabase_url, |
| key |
| ) |
| self._initialized = True |
| except Exception as e: |
| print(f"Warning: Failed to initialize Supabase client: {e}") |
| self._initialized = True |
| |
| @property |
| def client(self) -> Optional[Client]: |
| """Get the Supabase client instance.""" |
| if not self._initialized: |
| self._initialize() |
| return self._client |
| |
| @property |
| def is_connected(self) -> bool: |
| """Check if Supabase is connected and available.""" |
| return self.client is not None |
|
|
| async def _run_sync(self, fn): |
| """ |
| Run a blocking function in a thread to avoid blocking the event loop. |
| Includes a single-retry mechanism for 'Server disconnected' errors. |
| """ |
| try: |
| return await anyio.to_thread.run_sync(fn) |
| except Exception as e: |
| |
| err_str = str(e).lower() |
| is_disconnect = "disconnected" in err_str or "protocolerror" in err_str or "connection closed" in err_str |
| |
| if is_disconnect: |
| print(f"⚠️ Supabase connection lost ({e}). Attempting to reconnect...") |
| self._initialized = False |
| self._initialize() |
| |
| try: |
| return await anyio.to_thread.run_sync(fn) |
| except Exception as retry_e: |
| print(f"❌ Supabase reconnection failed: {retry_e}") |
| raise retry_e |
| raise e |
| |
| |
| |
| async def sign_up(self, email: str, password: str, metadata: dict = None) -> dict: |
| """Register a new user with Supabase Auth.""" |
| if not self.is_connected: |
| |
| return { |
| "user": { |
| "id": "mock-user-id", |
| "email": email, |
| "user_metadata": metadata or {} |
| } |
| } |
| |
| response = await self._run_sync(lambda: self.client.auth.sign_up({ |
| "email": email, |
| "password": password, |
| "options": {"data": metadata or {}} |
| })) |
| return response |
| |
| async def sign_in(self, email: str, password: str) -> dict: |
| """Sign in a user with email/password.""" |
| if not self.is_connected: |
| return { |
| "user": {"id": "mock-user-id", "email": email}, |
| "session": {"access_token": "mock-token"} |
| } |
| |
| response = await self._run_sync(lambda: self.client.auth.sign_in_with_password({ |
| "email": email, |
| "password": password |
| })) |
| return response |
| |
| async def get_user(self, token: str) -> Optional[dict]: |
| """Get user from JWT token.""" |
| if not self.is_connected: |
| return {"id": "mock-user-id", "email": "mock@example.com"} |
| |
| response = await self._run_sync(lambda: self.client.auth.get_user(token)) |
| return response.user if response else None |
|
|
| async def delete_user_auth(self, user_id: str) -> bool: |
| """ |
| Delete a user from Supabase Auth. |
| Requires high-privilege service role or admin access. |
| """ |
| if not self.is_connected: |
| return True |
| |
| try: |
| |
| await self._run_sync(lambda: self.client.auth.admin.delete_user(user_id)) |
| return True |
| except Exception as e: |
| print(f"Error deleting user from auth: {e}") |
| return False |
| |
| |
| |
| async def insert(self, table: str, data: dict) -> dict: |
| """Insert a record into a table.""" |
| if not self.is_connected: |
| return {"id": "mock-id", **data} |
| if table not in self._allowed_tables: |
| raise ValueError(f"Table '{table}' is not allowed for insert operations") |
| |
| response = await self._run_sync(lambda: self.client.table(table).insert(data).execute()) |
| return response.data[0] if response.data else {} |
|
|
| async def insert_many(self, table: str, rows: list[dict], chunk_size: int = 500) -> int: |
| """Insert many records (chunked). Returns inserted row count (best-effort).""" |
| if not rows: |
| return 0 |
| if not self.is_connected: |
| return len(rows) |
| if table not in self._allowed_tables: |
| raise ValueError(f"Table '{table}' is not allowed for bulk insert operations") |
|
|
| inserted = 0 |
| for i in range(0, len(rows), chunk_size): |
| chunk = rows[i : i + chunk_size] |
| response = await self._run_sync(lambda: self.client.table(table).insert(chunk).execute()) |
| inserted += len(response.data or []) |
| return inserted |
| |
| async def select( |
| self, |
| table: str, |
| columns: str = "*", |
| filters: dict = None, |
| limit: int = None, |
| order_by: str = None, |
| ascending: bool = True |
| ) -> list: |
| """Select records from a table.""" |
| if not self.is_connected: |
| return [] |
| if table not in self._allowed_tables: |
| raise ValueError(f"Table '{table}' is not allowed for select operations") |
| |
| query = self.client.table(table).select(columns) |
| |
| if filters: |
| for key, value in filters.items(): |
| query = query.eq(key, value) |
| |
| if order_by: |
| query = query.order(order_by, desc=not ascending) |
| |
| if limit: |
| query = query.limit(limit) |
| |
| response = await self._run_sync(lambda: query.execute()) |
| return response.data or [] |
| |
| async def select_in( |
| self, |
| table: str, |
| column: str, |
| values: list, |
| columns: str = "*", |
| filters: dict = None, |
| limit: int = None, |
| order_by: str = None, |
| ascending: bool = True, |
| ) -> list: |
| """Select records where column is in values.""" |
| if not self.is_connected: |
| return [] |
| if not values: |
| return [] |
| if table not in self._allowed_tables: |
| raise ValueError(f"Table '{table}' is not allowed for select operations") |
|
|
| query = self.client.table(table).select(columns).in_(column, values) |
| if filters: |
| for key, value in filters.items(): |
| query = query.eq(key, value) |
| if order_by: |
| query = query.order(order_by, desc=not ascending) |
| if limit: |
| query = query.limit(limit) |
|
|
| response = await self._run_sync(lambda: query.execute()) |
| return response.data or [] |
|
|
| async def select_one(self, table: str, id: str, columns: str = "*") -> Optional[dict]: |
| """Select a single record by ID.""" |
| if not self.is_connected: |
| return None |
| if table not in self._allowed_tables: |
| raise ValueError(f"Table '{table}' is not allowed for select operations") |
| |
| response = await self._run_sync( |
| lambda: self.client.table(table).select(columns).eq("id", id).single().execute() |
| ) |
| return response.data |
| |
| async def update(self, table: str, id: str, data: dict) -> dict: |
| """Update a record by ID.""" |
| if not self.is_connected: |
| return {"id": id, **data} |
| if table not in self._allowed_tables: |
| raise ValueError(f"Table '{table}' is not allowed for update operations") |
| |
| response = await self._run_sync(lambda: self.client.table(table).update(data).eq("id", id).execute()) |
| return response.data[0] if response.data else {} |
| |
| async def delete(self, table: str, id: str) -> bool: |
| """Delete a record by ID.""" |
| if not self.is_connected: |
| return True |
| if table not in self._allowed_tables: |
| raise ValueError(f"Table '{table}' is not allowed for delete operations") |
| |
| await self._run_sync(lambda: self.client.table(table).delete().eq("id", id).execute()) |
| return True |
|
|
| async def delete_where(self, table: str, filters: dict) -> bool: |
| """Delete records matching equality filters.""" |
| if not self.is_connected: |
| return True |
| if not filters: |
| raise ValueError("delete_where requires filters") |
| if table not in self._allowed_tables: |
| raise ValueError(f"Table '{table}' is not allowed for delete operations") |
|
|
| query = self.client.table(table).delete() |
| for key, value in filters.items(): |
| query = query.eq(key, value) |
|
|
| await self._run_sync(lambda: query.execute()) |
| return True |
| |
| |
| |
| async def create_bucket(self, name: str, public: bool = True) -> bool: |
| """Create a storage bucket.""" |
| if not self.is_connected: |
| return True |
| try: |
| |
| |
| await self._run_sync(lambda: self.client.storage.create_bucket( |
| id=name, |
| options={"public": public} |
| )) |
| return True |
| except Exception as e: |
| |
| if "already exists" in str(e).lower(): |
| return True |
| print(f"Error creating bucket {name}: {e}") |
| return False |
|
|
| async def ensure_bucket(self, name: str, public: bool = True) -> bool: |
| """Ensure a bucket exists, create if not.""" |
| if not self.is_connected: |
| return True |
| try: |
| buckets = await self._run_sync(lambda: self.client.storage.list_buckets()) |
| |
| exists = False |
| for b in buckets: |
| b_name = b.name if hasattr(b, 'name') else b.get('name') |
| b_id = b.id if hasattr(b, 'id') else b.get('id') |
| if b_name == name or b_id == name: |
| exists = True |
| break |
| |
| if exists: |
| return True |
| return await self.create_bucket(name, public) |
| except Exception as e: |
| print(f"Error ensuring bucket {name}: {e}") |
| return False |
|
|
| |
| |
| async def upload_file( |
| self, |
| bucket: str, |
| path: str, |
| file_data: bytes, |
| content_type: str = "video/mp4" |
| ) -> str: |
| """Upload a file to Supabase Storage.""" |
| if not self.is_connected: |
| |
| import os |
| settings = get_settings() |
| local_path = os.path.join(settings.upload_dir, path) |
| os.makedirs(os.path.dirname(local_path), exist_ok=True) |
| with open(local_path, "wb") as f: |
| f.write(file_data) |
| return local_path |
| |
| await self._run_sync(lambda: self.client.storage.from_(bucket).upload( |
| path, |
| file_data, |
| {"content-type": content_type} |
| )) |
| return f"{self._settings.supabase_url}/storage/v1/object/public/{bucket}/{path}" |
| |
| async def get_file_url(self, bucket: str, path: str, expires_in: int = 3600) -> str: |
| """Get a signed URL for a file.""" |
| if not self.is_connected: |
| return f"/uploads/{path}" |
| |
| response = await self._run_sync(lambda: self.client.storage.from_(bucket).create_signed_url(path, expires_in)) |
| return response.get("signedURL", "") |
| |
| async def delete_file(self, bucket: str, path: str) -> bool: |
| """Delete a file from storage.""" |
| if not self.is_connected: |
| import os |
| settings = get_settings() |
| local_path = os.path.join(settings.upload_dir, path) |
| if os.path.exists(local_path): |
| os.remove(local_path) |
| return True |
| |
| await self._run_sync(lambda: self.client.storage.from_(bucket).remove([path])) |
| return True |
|
|
| async def upload_file_from_path( |
| self, |
| bucket: str, |
| storage_path: str, |
| local_path: str, |
| content_type: str = "video/mp4", |
| ) -> str: |
| """ |
| Upload a file from disk to Supabase Storage by streaming it. |
| Avoids loading the entire file into memory — safe for large videos. |
| Returns the storage path on success. |
| Falls back to serving from local path if Supabase is not connected. |
| """ |
| import os |
| if not self.is_connected: |
| return local_path |
|
|
| with open(local_path, "rb") as f: |
| file_bytes = f.read() |
|
|
| def _upload(): |
| return self.client.storage.from_(bucket).upload( |
| storage_path, |
| file_bytes, |
| {"content-type": content_type, "upsert": "true"}, |
| ) |
|
|
| await self._run_sync(_upload) |
| return storage_path |
|
|
| async def get_long_lived_url( |
| self, |
| bucket: str, |
| storage_path: str, |
| expires_in: int = 60 * 60 * 24 * 7, |
| ) -> str: |
| """ |
| Generate a signed URL valid for `expires_in` seconds (default 7 days). |
| Falls back to the local /personal-output path when not connected. |
| """ |
| if not self.is_connected: |
| filename = storage_path.split("/")[-1] |
| return f"/personal-output/{filename}" |
|
|
| response = await self._run_sync( |
| lambda: self.client.storage.from_(bucket).create_signed_url( |
| storage_path, expires_in |
| ) |
| ) |
| |
| return response.get("signedURL") or response.get("signed_url") or "" |
|
|
|
|
| |
| _supabase_service: Optional[SupabaseService] = None |
|
|
|
|
| def get_supabase_service() -> SupabaseService: |
| """Get the singleton Supabase service instance.""" |
| global _supabase_service |
| if _supabase_service is None: |
| _supabase_service = SupabaseService() |
| return _supabase_service |
|
|