File size: 16,822 Bytes
c6abe34
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e9b83cd
 
c6abe34
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d51c677
 
 
 
 
 
c6abe34
 
d51c677
 
 
c6abe34
 
 
 
 
 
 
 
 
d51c677
 
 
 
 
 
 
 
 
 
c6abe34
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
"""
Supabase client service for database and storage operations.
"""
from typing import Optional
import anyio

from app.config import get_settings

# Try to import supabase, provide fallback for local development
try:
    from supabase import create_client, Client
    SUPABASE_AVAILABLE = True
except ImportError:
    SUPABASE_AVAILABLE = False
    Client = None


class SupabaseService:
    """
    Supabase client wrapper providing database and storage operations.
    
    Falls back to local mock operations if Supabase is not configured.
    """
    
    def __init__(self):
        self._client: Optional[Client] = None
        self._settings = get_settings()
        self._initialized = False
        # Allowlist of tables this service is permitted to touch. This reduces
        # the blast radius of accidental or malicious misuse.
        self._allowed_tables: set[str] = {
            "users",
            "videos",
            "organizations",
            "players",
            "teams",
            "analysis_results",
            "detections",
            "analytics",
            "notifications",
            "activities",
            "matches",
            "schedules",
            "announcements",
            "personal_analyses",
            "match_stat_uploads",
            "match_player_stats",
            "match_team_stats",
            "organizations_staff",
            "clips",
        }
    
    def _initialize(self) -> None:
        """Initialize the Supabase client if not already done."""
        if self._initialized:
            return
            
        if not SUPABASE_AVAILABLE:
            print("Warning: Supabase library not installed. Running in local mode.")
            self._initialized = True
            return
            
        if not self._settings.supabase_url or not self._settings.supabase_key:
            print("Warning: Supabase credentials not configured. Running in local mode.")
            self._initialized = True
            return
            
        try:
            # Prefer service key if available for administrative backend operations
            key = self._settings.supabase_service_key or self._settings.supabase_key
            self._client = create_client(
                self._settings.supabase_url,
                key
            )
            self._initialized = True
        except Exception as e:
            print(f"Warning: Failed to initialize Supabase client: {e}")
            self._initialized = True
    
    @property
    def client(self) -> Optional[Client]:
        """Get the Supabase client instance."""
        if not self._initialized:
            self._initialize()
        return self._client
    
    @property
    def is_connected(self) -> bool:
        """Check if Supabase is connected and available."""
        return self.client is not None

    async def _run_sync(self, fn):
        """
        Run a blocking function in a thread to avoid blocking the event loop.
        Includes a single-retry mechanism for 'Server disconnected' errors.
        """
        try:
            return await anyio.to_thread.run_sync(fn)
        except Exception as e:
            # Check for common network/protocol errors that suggest a stale connection
            err_str = str(e).lower()
            is_disconnect = "disconnected" in err_str or "protocolerror" in err_str or "connection closed" in err_str
            
            if is_disconnect:
                print(f"⚠️ Supabase connection lost ({e}). Attempting to reconnect...")
                self._initialized = False  # Force re-initialization
                self._initialize()
                # Retry once
                try:
                    return await anyio.to_thread.run_sync(fn)
                except Exception as retry_e:
                    print(f"❌ Supabase reconnection failed: {retry_e}")
                    raise retry_e
            raise e
    
    # ==================== Auth Operations ====================
    
    async def sign_up(self, email: str, password: str, metadata: dict = None) -> dict:
        """Register a new user with Supabase Auth."""
        if not self.is_connected:
            # Local mock for development
            return {
                "user": {
                    "id": "mock-user-id",
                    "email": email,
                    "user_metadata": metadata or {}
                }
            }
        
        response = await self._run_sync(lambda: self.client.auth.sign_up({
            "email": email,
            "password": password,
            "options": {"data": metadata or {}}
        }))
        return response
    
    async def sign_in(self, email: str, password: str) -> dict:
        """Sign in a user with email/password."""
        if not self.is_connected:
            return {
                "user": {"id": "mock-user-id", "email": email},
                "session": {"access_token": "mock-token"}
            }
        
        response = await self._run_sync(lambda: self.client.auth.sign_in_with_password({
            "email": email,
            "password": password
        }))
        return response
    
    async def get_user(self, token: str) -> Optional[dict]:
        """Get user from JWT token."""
        if not self.is_connected:
            return {"id": "mock-user-id", "email": "mock@example.com"}
        
        response = await self._run_sync(lambda: self.client.auth.get_user(token))
        return response.user if response else None

    async def delete_user_auth(self, user_id: str) -> bool:
        """
        Delete a user from Supabase Auth.
        Requires high-privilege service role or admin access.
        """
        if not self.is_connected:
            return True
            
        try:
            # We use the admin API to delete the user
            await self._run_sync(lambda: self.client.auth.admin.delete_user(user_id))
            return True
        except Exception as e:
            print(f"Error deleting user from auth: {e}")
            return False
    
    # ==================== Database Operations ====================
    
    async def insert(self, table: str, data: dict) -> dict:
        """Insert a record into a table."""
        if not self.is_connected:
            return {"id": "mock-id", **data}
        if table not in self._allowed_tables:
            raise ValueError(f"Table '{table}' is not allowed for insert operations")
        
        response = await self._run_sync(lambda: self.client.table(table).insert(data).execute())
        return response.data[0] if response.data else {}

    async def insert_many(self, table: str, rows: list[dict], chunk_size: int = 500) -> int:
        """Insert many records (chunked). Returns inserted row count (best-effort)."""
        if not rows:
            return 0
        if not self.is_connected:
            return len(rows)
        if table not in self._allowed_tables:
            raise ValueError(f"Table '{table}' is not allowed for bulk insert operations")

        inserted = 0
        for i in range(0, len(rows), chunk_size):
            chunk = rows[i : i + chunk_size]
            response = await self._run_sync(lambda: self.client.table(table).insert(chunk).execute())
            inserted += len(response.data or [])
        return inserted
    
    async def select(
        self, 
        table: str, 
        columns: str = "*",
        filters: dict = None,
        limit: int = None,
        order_by: str = None,
        ascending: bool = True
    ) -> list:
        """Select records from a table."""
        if not self.is_connected:
            return []
        if table not in self._allowed_tables:
            raise ValueError(f"Table '{table}' is not allowed for select operations")
        
        query = self.client.table(table).select(columns)
        
        if filters:
            for key, value in filters.items():
                query = query.eq(key, value)
        
        if order_by:
            query = query.order(order_by, desc=not ascending)
        
        if limit:
            query = query.limit(limit)
        
        response = await self._run_sync(lambda: query.execute())
        return response.data or []
    
    async def select_in(
        self,
        table: str,
        column: str,
        values: list,
        columns: str = "*",
        filters: dict = None,
        limit: int = None,
        order_by: str = None,
        ascending: bool = True,
    ) -> list:
        """Select records where column is in values."""
        if not self.is_connected:
            return []
        if not values:
            return []
        if table not in self._allowed_tables:
            raise ValueError(f"Table '{table}' is not allowed for select operations")

        query = self.client.table(table).select(columns).in_(column, values)
        if filters:
            for key, value in filters.items():
                query = query.eq(key, value)
        if order_by:
            query = query.order(order_by, desc=not ascending)
        if limit:
            query = query.limit(limit)

        response = await self._run_sync(lambda: query.execute())
        return response.data or []

    async def select_one(self, table: str, id: str, columns: str = "*") -> Optional[dict]:
        """Select a single record by ID."""
        if not self.is_connected:
            return None
        if table not in self._allowed_tables:
            raise ValueError(f"Table '{table}' is not allowed for select operations")
        
        response = await self._run_sync(
            lambda: self.client.table(table).select(columns).eq("id", id).single().execute()
        )
        return response.data
    
    async def update(self, table: str, id: str, data: dict) -> dict:
        """Update a record by ID."""
        if not self.is_connected:
            return {"id": id, **data}
        if table not in self._allowed_tables:
            raise ValueError(f"Table '{table}' is not allowed for update operations")
        
        response = await self._run_sync(lambda: self.client.table(table).update(data).eq("id", id).execute())
        return response.data[0] if response.data else {}
    
    async def delete(self, table: str, id: str) -> bool:
        """Delete a record by ID."""
        if not self.is_connected:
            return True
        if table not in self._allowed_tables:
            raise ValueError(f"Table '{table}' is not allowed for delete operations")
        
        await self._run_sync(lambda: self.client.table(table).delete().eq("id", id).execute())
        return True

    async def delete_where(self, table: str, filters: dict) -> bool:
        """Delete records matching equality filters."""
        if not self.is_connected:
            return True
        if not filters:
            raise ValueError("delete_where requires filters")
        if table not in self._allowed_tables:
            raise ValueError(f"Table '{table}' is not allowed for delete operations")

        query = self.client.table(table).delete()
        for key, value in filters.items():
            query = query.eq(key, value)

        await self._run_sync(lambda: query.execute())
        return True
    
    # ==================== Bucket Operations ====================
    
    async def create_bucket(self, name: str, public: bool = True) -> bool:
        """Create a storage bucket."""
        if not self.is_connected:
            return True
        try:
            # The storage API expects 'id' and optionally 'name' in the body
            # Latest supabase-py uses create_bucket(id, options)
            await self._run_sync(lambda: self.client.storage.create_bucket(
                id=name, 
                options={"public": public}
            ))
            return True
        except Exception as e:
            # If it already exists, that's fine
            if "already exists" in str(e).lower():
                return True
            print(f"Error creating bucket {name}: {e}")
            return False

    async def ensure_bucket(self, name: str, public: bool = True) -> bool:
        """Ensure a bucket exists, create if not."""
        if not self.is_connected:
            return True
        try:
            buckets = await self._run_sync(lambda: self.client.storage.list_buckets())
            # Buckets can be objects or dicts depending on library version
            exists = False
            for b in buckets:
                b_name = b.name if hasattr(b, 'name') else b.get('name')
                b_id = b.id if hasattr(b, 'id') else b.get('id')
                if b_name == name or b_id == name:
                    exists = True
                    break
            
            if exists:
                return True
            return await self.create_bucket(name, public)
        except Exception as e:
            print(f"Error ensuring bucket {name}: {e}")
            return False

    # ==================== Storage Operations ====================
    
    async def upload_file(
        self, 
        bucket: str, 
        path: str, 
        file_data: bytes,
        content_type: str = "video/mp4"
    ) -> str:
        """Upload a file to Supabase Storage."""
        if not self.is_connected:
            # Local file storage fallback
            import os
            settings = get_settings()
            local_path = os.path.join(settings.upload_dir, path)
            os.makedirs(os.path.dirname(local_path), exist_ok=True)
            with open(local_path, "wb") as f:
                f.write(file_data)
            return local_path
        
        await self._run_sync(lambda: self.client.storage.from_(bucket).upload(
            path,
            file_data,
            {"content-type": content_type}
        ))
        return f"{self._settings.supabase_url}/storage/v1/object/public/{bucket}/{path}"
    
    async def get_file_url(self, bucket: str, path: str, expires_in: int = 3600) -> str:
        """Get a signed URL for a file."""
        if not self.is_connected:
            return f"/uploads/{path}"
        
        response = await self._run_sync(lambda: self.client.storage.from_(bucket).create_signed_url(path, expires_in))
        return response.get("signedURL", "")
    
    async def delete_file(self, bucket: str, path: str) -> bool:
        """Delete a file from storage."""
        if not self.is_connected:
            import os
            settings = get_settings()
            local_path = os.path.join(settings.upload_dir, path)
            if os.path.exists(local_path):
                os.remove(local_path)
            return True
        
        await self._run_sync(lambda: self.client.storage.from_(bucket).remove([path]))
        return True

    async def upload_file_from_path(
        self,
        bucket: str,
        storage_path: str,
        local_path: str,
        content_type: str = "video/mp4",
    ) -> str:
        """
        Upload a file from disk to Supabase Storage by streaming it.
        Avoids loading the entire file into memory — safe for large videos.
        Returns the storage path on success.
        Falls back to serving from local path if Supabase is not connected.
        """
        import os
        if not self.is_connected:
            return local_path

        with open(local_path, "rb") as f:
            file_bytes = f.read()

        def _upload():
            return self.client.storage.from_(bucket).upload(
                storage_path,
                file_bytes,
                {"content-type": content_type, "upsert": "true"},
            )

        await self._run_sync(_upload)
        return storage_path

    async def get_long_lived_url(
        self,
        bucket: str,
        storage_path: str,
        expires_in: int = 60 * 60 * 24 * 7,  # 7 days
    ) -> str:
        """
        Generate a signed URL valid for `expires_in` seconds (default 7 days).
        Falls back to the local /personal-output path when not connected.
        """
        if not self.is_connected:
            filename = storage_path.split("/")[-1]
            return f"/personal-output/{filename}"

        response = await self._run_sync(
            lambda: self.client.storage.from_(bucket).create_signed_url(
                storage_path, expires_in
            )
        )
        # Supabase SDK returns dict with 'signedURL' key
        return response.get("signedURL") or response.get("signed_url") or ""


# Singleton instance
_supabase_service: Optional[SupabaseService] = None


def get_supabase_service() -> SupabaseService:
    """Get the singleton Supabase service instance."""
    global _supabase_service
    if _supabase_service is None:
        _supabase_service = SupabaseService()
    return _supabase_service