File size: 16,822 Bytes
c6abe34 e9b83cd c6abe34 d51c677 c6abe34 d51c677 c6abe34 d51c677 c6abe34 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 | """
Supabase client service for database and storage operations.
"""
from typing import Optional
import anyio
from app.config import get_settings
# Try to import supabase, provide fallback for local development
try:
from supabase import create_client, Client
SUPABASE_AVAILABLE = True
except ImportError:
SUPABASE_AVAILABLE = False
Client = None
class SupabaseService:
"""
Supabase client wrapper providing database and storage operations.
Falls back to local mock operations if Supabase is not configured.
"""
def __init__(self):
self._client: Optional[Client] = None
self._settings = get_settings()
self._initialized = False
# Allowlist of tables this service is permitted to touch. This reduces
# the blast radius of accidental or malicious misuse.
self._allowed_tables: set[str] = {
"users",
"videos",
"organizations",
"players",
"teams",
"analysis_results",
"detections",
"analytics",
"notifications",
"activities",
"matches",
"schedules",
"announcements",
"personal_analyses",
"match_stat_uploads",
"match_player_stats",
"match_team_stats",
"organizations_staff",
"clips",
}
def _initialize(self) -> None:
"""Initialize the Supabase client if not already done."""
if self._initialized:
return
if not SUPABASE_AVAILABLE:
print("Warning: Supabase library not installed. Running in local mode.")
self._initialized = True
return
if not self._settings.supabase_url or not self._settings.supabase_key:
print("Warning: Supabase credentials not configured. Running in local mode.")
self._initialized = True
return
try:
# Prefer service key if available for administrative backend operations
key = self._settings.supabase_service_key or self._settings.supabase_key
self._client = create_client(
self._settings.supabase_url,
key
)
self._initialized = True
except Exception as e:
print(f"Warning: Failed to initialize Supabase client: {e}")
self._initialized = True
@property
def client(self) -> Optional[Client]:
"""Get the Supabase client instance."""
if not self._initialized:
self._initialize()
return self._client
@property
def is_connected(self) -> bool:
"""Check if Supabase is connected and available."""
return self.client is not None
async def _run_sync(self, fn):
"""
Run a blocking function in a thread to avoid blocking the event loop.
Includes a single-retry mechanism for 'Server disconnected' errors.
"""
try:
return await anyio.to_thread.run_sync(fn)
except Exception as e:
# Check for common network/protocol errors that suggest a stale connection
err_str = str(e).lower()
is_disconnect = "disconnected" in err_str or "protocolerror" in err_str or "connection closed" in err_str
if is_disconnect:
print(f"⚠️ Supabase connection lost ({e}). Attempting to reconnect...")
self._initialized = False # Force re-initialization
self._initialize()
# Retry once
try:
return await anyio.to_thread.run_sync(fn)
except Exception as retry_e:
print(f"❌ Supabase reconnection failed: {retry_e}")
raise retry_e
raise e
# ==================== Auth Operations ====================
async def sign_up(self, email: str, password: str, metadata: dict = None) -> dict:
"""Register a new user with Supabase Auth."""
if not self.is_connected:
# Local mock for development
return {
"user": {
"id": "mock-user-id",
"email": email,
"user_metadata": metadata or {}
}
}
response = await self._run_sync(lambda: self.client.auth.sign_up({
"email": email,
"password": password,
"options": {"data": metadata or {}}
}))
return response
async def sign_in(self, email: str, password: str) -> dict:
"""Sign in a user with email/password."""
if not self.is_connected:
return {
"user": {"id": "mock-user-id", "email": email},
"session": {"access_token": "mock-token"}
}
response = await self._run_sync(lambda: self.client.auth.sign_in_with_password({
"email": email,
"password": password
}))
return response
async def get_user(self, token: str) -> Optional[dict]:
"""Get user from JWT token."""
if not self.is_connected:
return {"id": "mock-user-id", "email": "mock@example.com"}
response = await self._run_sync(lambda: self.client.auth.get_user(token))
return response.user if response else None
async def delete_user_auth(self, user_id: str) -> bool:
"""
Delete a user from Supabase Auth.
Requires high-privilege service role or admin access.
"""
if not self.is_connected:
return True
try:
# We use the admin API to delete the user
await self._run_sync(lambda: self.client.auth.admin.delete_user(user_id))
return True
except Exception as e:
print(f"Error deleting user from auth: {e}")
return False
# ==================== Database Operations ====================
async def insert(self, table: str, data: dict) -> dict:
"""Insert a record into a table."""
if not self.is_connected:
return {"id": "mock-id", **data}
if table not in self._allowed_tables:
raise ValueError(f"Table '{table}' is not allowed for insert operations")
response = await self._run_sync(lambda: self.client.table(table).insert(data).execute())
return response.data[0] if response.data else {}
async def insert_many(self, table: str, rows: list[dict], chunk_size: int = 500) -> int:
"""Insert many records (chunked). Returns inserted row count (best-effort)."""
if not rows:
return 0
if not self.is_connected:
return len(rows)
if table not in self._allowed_tables:
raise ValueError(f"Table '{table}' is not allowed for bulk insert operations")
inserted = 0
for i in range(0, len(rows), chunk_size):
chunk = rows[i : i + chunk_size]
response = await self._run_sync(lambda: self.client.table(table).insert(chunk).execute())
inserted += len(response.data or [])
return inserted
async def select(
self,
table: str,
columns: str = "*",
filters: dict = None,
limit: int = None,
order_by: str = None,
ascending: bool = True
) -> list:
"""Select records from a table."""
if not self.is_connected:
return []
if table not in self._allowed_tables:
raise ValueError(f"Table '{table}' is not allowed for select operations")
query = self.client.table(table).select(columns)
if filters:
for key, value in filters.items():
query = query.eq(key, value)
if order_by:
query = query.order(order_by, desc=not ascending)
if limit:
query = query.limit(limit)
response = await self._run_sync(lambda: query.execute())
return response.data or []
async def select_in(
self,
table: str,
column: str,
values: list,
columns: str = "*",
filters: dict = None,
limit: int = None,
order_by: str = None,
ascending: bool = True,
) -> list:
"""Select records where column is in values."""
if not self.is_connected:
return []
if not values:
return []
if table not in self._allowed_tables:
raise ValueError(f"Table '{table}' is not allowed for select operations")
query = self.client.table(table).select(columns).in_(column, values)
if filters:
for key, value in filters.items():
query = query.eq(key, value)
if order_by:
query = query.order(order_by, desc=not ascending)
if limit:
query = query.limit(limit)
response = await self._run_sync(lambda: query.execute())
return response.data or []
async def select_one(self, table: str, id: str, columns: str = "*") -> Optional[dict]:
"""Select a single record by ID."""
if not self.is_connected:
return None
if table not in self._allowed_tables:
raise ValueError(f"Table '{table}' is not allowed for select operations")
response = await self._run_sync(
lambda: self.client.table(table).select(columns).eq("id", id).single().execute()
)
return response.data
async def update(self, table: str, id: str, data: dict) -> dict:
"""Update a record by ID."""
if not self.is_connected:
return {"id": id, **data}
if table not in self._allowed_tables:
raise ValueError(f"Table '{table}' is not allowed for update operations")
response = await self._run_sync(lambda: self.client.table(table).update(data).eq("id", id).execute())
return response.data[0] if response.data else {}
async def delete(self, table: str, id: str) -> bool:
"""Delete a record by ID."""
if not self.is_connected:
return True
if table not in self._allowed_tables:
raise ValueError(f"Table '{table}' is not allowed for delete operations")
await self._run_sync(lambda: self.client.table(table).delete().eq("id", id).execute())
return True
async def delete_where(self, table: str, filters: dict) -> bool:
"""Delete records matching equality filters."""
if not self.is_connected:
return True
if not filters:
raise ValueError("delete_where requires filters")
if table not in self._allowed_tables:
raise ValueError(f"Table '{table}' is not allowed for delete operations")
query = self.client.table(table).delete()
for key, value in filters.items():
query = query.eq(key, value)
await self._run_sync(lambda: query.execute())
return True
# ==================== Bucket Operations ====================
async def create_bucket(self, name: str, public: bool = True) -> bool:
"""Create a storage bucket."""
if not self.is_connected:
return True
try:
# The storage API expects 'id' and optionally 'name' in the body
# Latest supabase-py uses create_bucket(id, options)
await self._run_sync(lambda: self.client.storage.create_bucket(
id=name,
options={"public": public}
))
return True
except Exception as e:
# If it already exists, that's fine
if "already exists" in str(e).lower():
return True
print(f"Error creating bucket {name}: {e}")
return False
async def ensure_bucket(self, name: str, public: bool = True) -> bool:
"""Ensure a bucket exists, create if not."""
if not self.is_connected:
return True
try:
buckets = await self._run_sync(lambda: self.client.storage.list_buckets())
# Buckets can be objects or dicts depending on library version
exists = False
for b in buckets:
b_name = b.name if hasattr(b, 'name') else b.get('name')
b_id = b.id if hasattr(b, 'id') else b.get('id')
if b_name == name or b_id == name:
exists = True
break
if exists:
return True
return await self.create_bucket(name, public)
except Exception as e:
print(f"Error ensuring bucket {name}: {e}")
return False
# ==================== Storage Operations ====================
async def upload_file(
self,
bucket: str,
path: str,
file_data: bytes,
content_type: str = "video/mp4"
) -> str:
"""Upload a file to Supabase Storage."""
if not self.is_connected:
# Local file storage fallback
import os
settings = get_settings()
local_path = os.path.join(settings.upload_dir, path)
os.makedirs(os.path.dirname(local_path), exist_ok=True)
with open(local_path, "wb") as f:
f.write(file_data)
return local_path
await self._run_sync(lambda: self.client.storage.from_(bucket).upload(
path,
file_data,
{"content-type": content_type}
))
return f"{self._settings.supabase_url}/storage/v1/object/public/{bucket}/{path}"
async def get_file_url(self, bucket: str, path: str, expires_in: int = 3600) -> str:
"""Get a signed URL for a file."""
if not self.is_connected:
return f"/uploads/{path}"
response = await self._run_sync(lambda: self.client.storage.from_(bucket).create_signed_url(path, expires_in))
return response.get("signedURL", "")
async def delete_file(self, bucket: str, path: str) -> bool:
"""Delete a file from storage."""
if not self.is_connected:
import os
settings = get_settings()
local_path = os.path.join(settings.upload_dir, path)
if os.path.exists(local_path):
os.remove(local_path)
return True
await self._run_sync(lambda: self.client.storage.from_(bucket).remove([path]))
return True
async def upload_file_from_path(
self,
bucket: str,
storage_path: str,
local_path: str,
content_type: str = "video/mp4",
) -> str:
"""
Upload a file from disk to Supabase Storage by streaming it.
Avoids loading the entire file into memory — safe for large videos.
Returns the storage path on success.
Falls back to serving from local path if Supabase is not connected.
"""
import os
if not self.is_connected:
return local_path
with open(local_path, "rb") as f:
file_bytes = f.read()
def _upload():
return self.client.storage.from_(bucket).upload(
storage_path,
file_bytes,
{"content-type": content_type, "upsert": "true"},
)
await self._run_sync(_upload)
return storage_path
async def get_long_lived_url(
self,
bucket: str,
storage_path: str,
expires_in: int = 60 * 60 * 24 * 7, # 7 days
) -> str:
"""
Generate a signed URL valid for `expires_in` seconds (default 7 days).
Falls back to the local /personal-output path when not connected.
"""
if not self.is_connected:
filename = storage_path.split("/")[-1]
return f"/personal-output/{filename}"
response = await self._run_sync(
lambda: self.client.storage.from_(bucket).create_signed_url(
storage_path, expires_in
)
)
# Supabase SDK returns dict with 'signedURL' key
return response.get("signedURL") or response.get("signed_url") or ""
# Singleton instance
_supabase_service: Optional[SupabaseService] = None
def get_supabase_service() -> SupabaseService:
"""Get the singleton Supabase service instance."""
global _supabase_service
if _supabase_service is None:
_supabase_service = SupabaseService()
return _supabase_service
|