"""Supabase Storage-based storage for analytics data""" import json import os from datetime import datetime from typing import Dict, Any, Optional, List import uuid class StorageBucketDB: """Uses Supabase Storage bucket to store analytics JSON files""" BUCKET_NAME = 'analytics-data' def __init__(self, supabase_client): self.supabase = supabase_client self.storage = supabase_client.storage print(f"[StorageBucketDB] Initialized with bucket: {self.BUCKET_NAME}") # In-memory cache for faster reads self._cache = {} def _get_path(self, table_name: str) -> str: """Get storage path for a table""" return f"{table_name}.json" def _load_table(self, table_name: str) -> Dict: """Load table data from storage""" if table_name in self._cache: return self._cache[table_name] try: path = self._get_path(table_name) response = self.storage.from_(self.BUCKET_NAME).download(path) if response: data = json.loads(response.decode('utf-8')) self._cache[table_name] = data print(f"[StorageBucketDB] Loaded {table_name}: {len(data)} records") return data except Exception as e: print(f"[StorageBucketDB] Table {table_name} not found or empty: {e}") self._cache[table_name] = {} return {} def _save_table(self, table_name: str, data: Dict): """Save table data to storage""" try: path = self._get_path(table_name) content = json.dumps(data, indent=2, default=str).encode('utf-8') # Try to update first, then upload if not exists try: self.storage.from_(self.BUCKET_NAME).update(path, content, { 'content-type': 'application/json' }) except: self.storage.from_(self.BUCKET_NAME).upload(path, content, { 'content-type': 'application/json' }) self._cache[table_name] = data print(f"[StorageBucketDB] Saved {table_name}: {len(data)} records") except Exception as e: print(f"[StorageBucketDB] Error saving {table_name}: {e}") raise def table(self, table_name: str) -> 'StorageTable': """Return a table-like interface""" return StorageTable(self, table_name) class StorageTable: """Mimics Supabase table interface using storage bucket""" def __init__(self, db: StorageBucketDB, table_name: str): self.db = db self.table_name = table_name self._query = {} self._single = False def select(self, columns: str = '*') -> 'StorageTable': return self def eq(self, column: str, value: Any) -> 'StorageTable': self._query[column] = value return self def maybe_single(self) -> 'StorageTable': self._single = True return self def single(self) -> 'StorageTable': self._single = True return self def insert(self, record: Dict) -> 'StorageTable': self._insert_data = record return self def upsert(self, record: Dict) -> 'StorageTable': self._upsert_data = record return self def update(self, record: Dict) -> 'StorageTable': self._update_data = record return self def execute(self) -> 'StorageResult': """Execute the query or write operation""" # Handle write operations if hasattr(self, '_insert_data'): return self._do_insert() if hasattr(self, '_upsert_data'): return self._do_upsert() if hasattr(self, '_update_data'): return self._do_update() # Handle read operation data = self.db._load_table(self.table_name) if self._query: results = [] for key, record in data.items(): match = all(record.get(k) == v for k, v in self._query.items()) if match: results.append(record) if self._single and results: return StorageResult(results[0]) elif self._single: return StorageResult(None) return StorageResult(results) else: return StorageResult(list(data.values())) def _do_insert(self) -> 'StorageResult': record = self._insert_data data = self.db._load_table(self.table_name) if 'id' not in record: record['id'] = str(uuid.uuid4()) key = record.get('student_id', record.get('id')) record['created_at'] = datetime.utcnow().isoformat() data[key] = record self.db._save_table(self.table_name, data) print(f"[StorageBucketDB] Inserted into {self.table_name}: {key}") return StorageResult([record]) def _do_upsert(self) -> 'StorageResult': record = self._upsert_data data = self.db._load_table(self.table_name) key = record.get('student_id', str(uuid.uuid4())) if key in data: existing = data[key] existing.update(record) existing['updated_at'] = datetime.utcnow().isoformat() record = existing else: record['id'] = str(uuid.uuid4()) record['created_at'] = datetime.utcnow().isoformat() data[key] = record self.db._save_table(self.table_name, data) print(f"[StorageBucketDB] Upserted into {self.table_name}: {key}") return StorageResult([record]) def _do_update(self) -> 'StorageResult': updates = self._update_data data = self.db._load_table(self.table_name) updated = [] for key, record in data.items(): match = all(record.get(k) == v for k, v in self._query.items()) if match: record.update(updates) record['updated_at'] = datetime.utcnow().isoformat() updated.append(record) self.db._save_table(self.table_name, data) print(f"[StorageBucketDB] Updated {len(updated)} records in {self.table_name}") return StorageResult(updated) class StorageResult: """Mimics Supabase result object""" def __init__(self, data: Any): self.data = data def execute(self) -> 'StorageResult': return self