BakoAI / app /services /supabase_client.py
Okidi Norbert
Fix: automatically create Supabase buckets before upload
e9b83cd
"""
Supabase client service for database and storage operations.
"""
from typing import Optional
import anyio
from app.config import get_settings
# Try to import supabase, provide fallback for local development
try:
from supabase import create_client, Client
SUPABASE_AVAILABLE = True
except ImportError:
SUPABASE_AVAILABLE = False
Client = None
class SupabaseService:
"""
Supabase client wrapper providing database and storage operations.
Falls back to local mock operations if Supabase is not configured.
"""
def __init__(self):
self._client: Optional[Client] = None
self._settings = get_settings()
self._initialized = False
# Allowlist of tables this service is permitted to touch. This reduces
# the blast radius of accidental or malicious misuse.
self._allowed_tables: set[str] = {
"users",
"videos",
"organizations",
"players",
"teams",
"analysis_results",
"detections",
"analytics",
"notifications",
"activities",
"matches",
"schedules",
"announcements",
"personal_analyses",
"match_stat_uploads",
"match_player_stats",
"match_team_stats",
"organizations_staff",
"clips",
}
def _initialize(self) -> None:
"""Initialize the Supabase client if not already done."""
if self._initialized:
return
if not SUPABASE_AVAILABLE:
print("Warning: Supabase library not installed. Running in local mode.")
self._initialized = True
return
if not self._settings.supabase_url or not self._settings.supabase_key:
print("Warning: Supabase credentials not configured. Running in local mode.")
self._initialized = True
return
try:
# Prefer service key if available for administrative backend operations
key = self._settings.supabase_service_key or self._settings.supabase_key
self._client = create_client(
self._settings.supabase_url,
key
)
self._initialized = True
except Exception as e:
print(f"Warning: Failed to initialize Supabase client: {e}")
self._initialized = True
@property
def client(self) -> Optional[Client]:
"""Get the Supabase client instance."""
if not self._initialized:
self._initialize()
return self._client
@property
def is_connected(self) -> bool:
"""Check if Supabase is connected and available."""
return self.client is not None
async def _run_sync(self, fn):
"""
Run a blocking function in a thread to avoid blocking the event loop.
Includes a single-retry mechanism for 'Server disconnected' errors.
"""
try:
return await anyio.to_thread.run_sync(fn)
except Exception as e:
# Check for common network/protocol errors that suggest a stale connection
err_str = str(e).lower()
is_disconnect = "disconnected" in err_str or "protocolerror" in err_str or "connection closed" in err_str
if is_disconnect:
print(f"⚠️ Supabase connection lost ({e}). Attempting to reconnect...")
self._initialized = False # Force re-initialization
self._initialize()
# Retry once
try:
return await anyio.to_thread.run_sync(fn)
except Exception as retry_e:
print(f"❌ Supabase reconnection failed: {retry_e}")
raise retry_e
raise e
# ==================== Auth Operations ====================
async def sign_up(self, email: str, password: str, metadata: dict = None) -> dict:
"""Register a new user with Supabase Auth."""
if not self.is_connected:
# Local mock for development
return {
"user": {
"id": "mock-user-id",
"email": email,
"user_metadata": metadata or {}
}
}
response = await self._run_sync(lambda: self.client.auth.sign_up({
"email": email,
"password": password,
"options": {"data": metadata or {}}
}))
return response
async def sign_in(self, email: str, password: str) -> dict:
"""Sign in a user with email/password."""
if not self.is_connected:
return {
"user": {"id": "mock-user-id", "email": email},
"session": {"access_token": "mock-token"}
}
response = await self._run_sync(lambda: self.client.auth.sign_in_with_password({
"email": email,
"password": password
}))
return response
async def get_user(self, token: str) -> Optional[dict]:
"""Get user from JWT token."""
if not self.is_connected:
return {"id": "mock-user-id", "email": "mock@example.com"}
response = await self._run_sync(lambda: self.client.auth.get_user(token))
return response.user if response else None
async def delete_user_auth(self, user_id: str) -> bool:
"""
Delete a user from Supabase Auth.
Requires high-privilege service role or admin access.
"""
if not self.is_connected:
return True
try:
# We use the admin API to delete the user
await self._run_sync(lambda: self.client.auth.admin.delete_user(user_id))
return True
except Exception as e:
print(f"Error deleting user from auth: {e}")
return False
# ==================== Database Operations ====================
async def insert(self, table: str, data: dict) -> dict:
"""Insert a record into a table."""
if not self.is_connected:
return {"id": "mock-id", **data}
if table not in self._allowed_tables:
raise ValueError(f"Table '{table}' is not allowed for insert operations")
response = await self._run_sync(lambda: self.client.table(table).insert(data).execute())
return response.data[0] if response.data else {}
async def insert_many(self, table: str, rows: list[dict], chunk_size: int = 500) -> int:
"""Insert many records (chunked). Returns inserted row count (best-effort)."""
if not rows:
return 0
if not self.is_connected:
return len(rows)
if table not in self._allowed_tables:
raise ValueError(f"Table '{table}' is not allowed for bulk insert operations")
inserted = 0
for i in range(0, len(rows), chunk_size):
chunk = rows[i : i + chunk_size]
response = await self._run_sync(lambda: self.client.table(table).insert(chunk).execute())
inserted += len(response.data or [])
return inserted
async def select(
self,
table: str,
columns: str = "*",
filters: dict = None,
limit: int = None,
order_by: str = None,
ascending: bool = True
) -> list:
"""Select records from a table."""
if not self.is_connected:
return []
if table not in self._allowed_tables:
raise ValueError(f"Table '{table}' is not allowed for select operations")
query = self.client.table(table).select(columns)
if filters:
for key, value in filters.items():
query = query.eq(key, value)
if order_by:
query = query.order(order_by, desc=not ascending)
if limit:
query = query.limit(limit)
response = await self._run_sync(lambda: query.execute())
return response.data or []
async def select_in(
self,
table: str,
column: str,
values: list,
columns: str = "*",
filters: dict = None,
limit: int = None,
order_by: str = None,
ascending: bool = True,
) -> list:
"""Select records where column is in values."""
if not self.is_connected:
return []
if not values:
return []
if table not in self._allowed_tables:
raise ValueError(f"Table '{table}' is not allowed for select operations")
query = self.client.table(table).select(columns).in_(column, values)
if filters:
for key, value in filters.items():
query = query.eq(key, value)
if order_by:
query = query.order(order_by, desc=not ascending)
if limit:
query = query.limit(limit)
response = await self._run_sync(lambda: query.execute())
return response.data or []
async def select_one(self, table: str, id: str, columns: str = "*") -> Optional[dict]:
"""Select a single record by ID."""
if not self.is_connected:
return None
if table not in self._allowed_tables:
raise ValueError(f"Table '{table}' is not allowed for select operations")
response = await self._run_sync(
lambda: self.client.table(table).select(columns).eq("id", id).single().execute()
)
return response.data
async def update(self, table: str, id: str, data: dict) -> dict:
"""Update a record by ID."""
if not self.is_connected:
return {"id": id, **data}
if table not in self._allowed_tables:
raise ValueError(f"Table '{table}' is not allowed for update operations")
response = await self._run_sync(lambda: self.client.table(table).update(data).eq("id", id).execute())
return response.data[0] if response.data else {}
async def delete(self, table: str, id: str) -> bool:
"""Delete a record by ID."""
if not self.is_connected:
return True
if table not in self._allowed_tables:
raise ValueError(f"Table '{table}' is not allowed for delete operations")
await self._run_sync(lambda: self.client.table(table).delete().eq("id", id).execute())
return True
async def delete_where(self, table: str, filters: dict) -> bool:
"""Delete records matching equality filters."""
if not self.is_connected:
return True
if not filters:
raise ValueError("delete_where requires filters")
if table not in self._allowed_tables:
raise ValueError(f"Table '{table}' is not allowed for delete operations")
query = self.client.table(table).delete()
for key, value in filters.items():
query = query.eq(key, value)
await self._run_sync(lambda: query.execute())
return True
# ==================== Bucket Operations ====================
async def create_bucket(self, name: str, public: bool = True) -> bool:
"""Create a storage bucket."""
if not self.is_connected:
return True
try:
# The storage API expects 'id' and optionally 'name' in the body
# Latest supabase-py uses create_bucket(id, options)
await self._run_sync(lambda: self.client.storage.create_bucket(
id=name,
options={"public": public}
))
return True
except Exception as e:
# If it already exists, that's fine
if "already exists" in str(e).lower():
return True
print(f"Error creating bucket {name}: {e}")
return False
async def ensure_bucket(self, name: str, public: bool = True) -> bool:
"""Ensure a bucket exists, create if not."""
if not self.is_connected:
return True
try:
buckets = await self._run_sync(lambda: self.client.storage.list_buckets())
# Buckets can be objects or dicts depending on library version
exists = False
for b in buckets:
b_name = b.name if hasattr(b, 'name') else b.get('name')
b_id = b.id if hasattr(b, 'id') else b.get('id')
if b_name == name or b_id == name:
exists = True
break
if exists:
return True
return await self.create_bucket(name, public)
except Exception as e:
print(f"Error ensuring bucket {name}: {e}")
return False
# ==================== Storage Operations ====================
async def upload_file(
self,
bucket: str,
path: str,
file_data: bytes,
content_type: str = "video/mp4"
) -> str:
"""Upload a file to Supabase Storage."""
if not self.is_connected:
# Local file storage fallback
import os
settings = get_settings()
local_path = os.path.join(settings.upload_dir, path)
os.makedirs(os.path.dirname(local_path), exist_ok=True)
with open(local_path, "wb") as f:
f.write(file_data)
return local_path
await self._run_sync(lambda: self.client.storage.from_(bucket).upload(
path,
file_data,
{"content-type": content_type}
))
return f"{self._settings.supabase_url}/storage/v1/object/public/{bucket}/{path}"
async def get_file_url(self, bucket: str, path: str, expires_in: int = 3600) -> str:
"""Get a signed URL for a file."""
if not self.is_connected:
return f"/uploads/{path}"
response = await self._run_sync(lambda: self.client.storage.from_(bucket).create_signed_url(path, expires_in))
return response.get("signedURL", "")
async def delete_file(self, bucket: str, path: str) -> bool:
"""Delete a file from storage."""
if not self.is_connected:
import os
settings = get_settings()
local_path = os.path.join(settings.upload_dir, path)
if os.path.exists(local_path):
os.remove(local_path)
return True
await self._run_sync(lambda: self.client.storage.from_(bucket).remove([path]))
return True
async def upload_file_from_path(
self,
bucket: str,
storage_path: str,
local_path: str,
content_type: str = "video/mp4",
) -> str:
"""
Upload a file from disk to Supabase Storage by streaming it.
Avoids loading the entire file into memory — safe for large videos.
Returns the storage path on success.
Falls back to serving from local path if Supabase is not connected.
"""
import os
if not self.is_connected:
return local_path
with open(local_path, "rb") as f:
file_bytes = f.read()
def _upload():
return self.client.storage.from_(bucket).upload(
storage_path,
file_bytes,
{"content-type": content_type, "upsert": "true"},
)
await self._run_sync(_upload)
return storage_path
async def get_long_lived_url(
self,
bucket: str,
storage_path: str,
expires_in: int = 60 * 60 * 24 * 7, # 7 days
) -> str:
"""
Generate a signed URL valid for `expires_in` seconds (default 7 days).
Falls back to the local /personal-output path when not connected.
"""
if not self.is_connected:
filename = storage_path.split("/")[-1]
return f"/personal-output/{filename}"
response = await self._run_sync(
lambda: self.client.storage.from_(bucket).create_signed_url(
storage_path, expires_in
)
)
# Supabase SDK returns dict with 'signedURL' key
return response.get("signedURL") or response.get("signed_url") or ""
# Singleton instance
_supabase_service: Optional[SupabaseService] = None
def get_supabase_service() -> SupabaseService:
"""Get the singleton Supabase service instance."""
global _supabase_service
if _supabase_service is None:
_supabase_service = SupabaseService()
return _supabase_service