Spaces:
Sleeping
Sleeping
| """Supabase Storage-based storage for analytics data""" | |
| import json | |
| import os | |
| from datetime import datetime | |
| from typing import Dict, Any, Optional, List | |
| import uuid | |
| class StorageBucketDB: | |
| """Uses Supabase Storage bucket to store analytics JSON files""" | |
| BUCKET_NAME = 'analytics-data' | |
| def __init__(self, supabase_client): | |
| self.supabase = supabase_client | |
| self.storage = supabase_client.storage | |
| print(f"[StorageBucketDB] Initialized with bucket: {self.BUCKET_NAME}") | |
| # In-memory cache for faster reads | |
| self._cache = {} | |
| def _get_path(self, table_name: str) -> str: | |
| """Get storage path for a table""" | |
| return f"{table_name}.json" | |
| def _load_table(self, table_name: str) -> Dict: | |
| """Load table data from storage""" | |
| if table_name in self._cache: | |
| return self._cache[table_name] | |
| try: | |
| path = self._get_path(table_name) | |
| response = self.storage.from_(self.BUCKET_NAME).download(path) | |
| if response: | |
| data = json.loads(response.decode('utf-8')) | |
| self._cache[table_name] = data | |
| print(f"[StorageBucketDB] Loaded {table_name}: {len(data)} records") | |
| return data | |
| except Exception as e: | |
| print(f"[StorageBucketDB] Table {table_name} not found or empty: {e}") | |
| self._cache[table_name] = {} | |
| return {} | |
| def _save_table(self, table_name: str, data: Dict): | |
| """Save table data to storage""" | |
| try: | |
| path = self._get_path(table_name) | |
| content = json.dumps(data, indent=2, default=str).encode('utf-8') | |
| # Try to update first, then upload if not exists | |
| try: | |
| self.storage.from_(self.BUCKET_NAME).update(path, content, { | |
| 'content-type': 'application/json' | |
| }) | |
| except: | |
| self.storage.from_(self.BUCKET_NAME).upload(path, content, { | |
| 'content-type': 'application/json' | |
| }) | |
| self._cache[table_name] = data | |
| print(f"[StorageBucketDB] Saved {table_name}: {len(data)} records") | |
| except Exception as e: | |
| print(f"[StorageBucketDB] Error saving {table_name}: {e}") | |
| raise | |
| def table(self, table_name: str) -> 'StorageTable': | |
| """Return a table-like interface""" | |
| return StorageTable(self, table_name) | |
| class StorageTable: | |
| """Mimics Supabase table interface using storage bucket""" | |
| def __init__(self, db: StorageBucketDB, table_name: str): | |
| self.db = db | |
| self.table_name = table_name | |
| self._query = {} | |
| self._single = False | |
| def select(self, columns: str = '*') -> 'StorageTable': | |
| return self | |
| def eq(self, column: str, value: Any) -> 'StorageTable': | |
| self._query[column] = value | |
| return self | |
| def maybe_single(self) -> 'StorageTable': | |
| self._single = True | |
| return self | |
| def single(self) -> 'StorageTable': | |
| self._single = True | |
| return self | |
| def insert(self, record: Dict) -> 'StorageTable': | |
| self._insert_data = record | |
| return self | |
| def upsert(self, record: Dict) -> 'StorageTable': | |
| self._upsert_data = record | |
| return self | |
| def update(self, record: Dict) -> 'StorageTable': | |
| self._update_data = record | |
| return self | |
| def execute(self) -> 'StorageResult': | |
| """Execute the query or write operation""" | |
| # Handle write operations | |
| if hasattr(self, '_insert_data'): | |
| return self._do_insert() | |
| if hasattr(self, '_upsert_data'): | |
| return self._do_upsert() | |
| if hasattr(self, '_update_data'): | |
| return self._do_update() | |
| # Handle read operation | |
| data = self.db._load_table(self.table_name) | |
| if self._query: | |
| results = [] | |
| for key, record in data.items(): | |
| match = all(record.get(k) == v for k, v in self._query.items()) | |
| if match: | |
| results.append(record) | |
| if self._single and results: | |
| return StorageResult(results[0]) | |
| elif self._single: | |
| return StorageResult(None) | |
| return StorageResult(results) | |
| else: | |
| return StorageResult(list(data.values())) | |
| def _do_insert(self) -> 'StorageResult': | |
| record = self._insert_data | |
| data = self.db._load_table(self.table_name) | |
| if 'id' not in record: | |
| record['id'] = str(uuid.uuid4()) | |
| key = record.get('student_id', record.get('id')) | |
| record['created_at'] = datetime.utcnow().isoformat() | |
| data[key] = record | |
| self.db._save_table(self.table_name, data) | |
| print(f"[StorageBucketDB] Inserted into {self.table_name}: {key}") | |
| return StorageResult([record]) | |
| def _do_upsert(self) -> 'StorageResult': | |
| record = self._upsert_data | |
| data = self.db._load_table(self.table_name) | |
| key = record.get('student_id', str(uuid.uuid4())) | |
| if key in data: | |
| existing = data[key] | |
| existing.update(record) | |
| existing['updated_at'] = datetime.utcnow().isoformat() | |
| record = existing | |
| else: | |
| record['id'] = str(uuid.uuid4()) | |
| record['created_at'] = datetime.utcnow().isoformat() | |
| data[key] = record | |
| self.db._save_table(self.table_name, data) | |
| print(f"[StorageBucketDB] Upserted into {self.table_name}: {key}") | |
| return StorageResult([record]) | |
| def _do_update(self) -> 'StorageResult': | |
| updates = self._update_data | |
| data = self.db._load_table(self.table_name) | |
| updated = [] | |
| for key, record in data.items(): | |
| match = all(record.get(k) == v for k, v in self._query.items()) | |
| if match: | |
| record.update(updates) | |
| record['updated_at'] = datetime.utcnow().isoformat() | |
| updated.append(record) | |
| self.db._save_table(self.table_name, data) | |
| print(f"[StorageBucketDB] Updated {len(updated)} records in {self.table_name}") | |
| return StorageResult(updated) | |
| class StorageResult: | |
| """Mimics Supabase result object""" | |
| def __init__(self, data: Any): | |
| self.data = data | |
| def execute(self) -> 'StorageResult': | |
| return self | |