FCT / database /storage_bucket_db.py
Parthnuwal7
Supabase
2a317a3
"""Supabase Storage-based storage for analytics data"""
import json
import os
from datetime import datetime
from typing import Dict, Any, Optional, List
import uuid
class StorageBucketDB:
"""Uses Supabase Storage bucket to store analytics JSON files"""
BUCKET_NAME = 'analytics-data'
def __init__(self, supabase_client):
self.supabase = supabase_client
self.storage = supabase_client.storage
print(f"[StorageBucketDB] Initialized with bucket: {self.BUCKET_NAME}")
# In-memory cache for faster reads
self._cache = {}
def _get_path(self, table_name: str) -> str:
"""Get storage path for a table"""
return f"{table_name}.json"
def _load_table(self, table_name: str) -> Dict:
"""Load table data from storage"""
if table_name in self._cache:
return self._cache[table_name]
try:
path = self._get_path(table_name)
response = self.storage.from_(self.BUCKET_NAME).download(path)
if response:
data = json.loads(response.decode('utf-8'))
self._cache[table_name] = data
print(f"[StorageBucketDB] Loaded {table_name}: {len(data)} records")
return data
except Exception as e:
print(f"[StorageBucketDB] Table {table_name} not found or empty: {e}")
self._cache[table_name] = {}
return {}
def _save_table(self, table_name: str, data: Dict):
"""Save table data to storage"""
try:
path = self._get_path(table_name)
content = json.dumps(data, indent=2, default=str).encode('utf-8')
# Try to update first, then upload if not exists
try:
self.storage.from_(self.BUCKET_NAME).update(path, content, {
'content-type': 'application/json'
})
except:
self.storage.from_(self.BUCKET_NAME).upload(path, content, {
'content-type': 'application/json'
})
self._cache[table_name] = data
print(f"[StorageBucketDB] Saved {table_name}: {len(data)} records")
except Exception as e:
print(f"[StorageBucketDB] Error saving {table_name}: {e}")
raise
def table(self, table_name: str) -> 'StorageTable':
"""Return a table-like interface"""
return StorageTable(self, table_name)
class StorageTable:
"""Mimics Supabase table interface using storage bucket"""
def __init__(self, db: StorageBucketDB, table_name: str):
self.db = db
self.table_name = table_name
self._query = {}
self._single = False
def select(self, columns: str = '*') -> 'StorageTable':
return self
def eq(self, column: str, value: Any) -> 'StorageTable':
self._query[column] = value
return self
def maybe_single(self) -> 'StorageTable':
self._single = True
return self
def single(self) -> 'StorageTable':
self._single = True
return self
def insert(self, record: Dict) -> 'StorageTable':
self._insert_data = record
return self
def upsert(self, record: Dict) -> 'StorageTable':
self._upsert_data = record
return self
def update(self, record: Dict) -> 'StorageTable':
self._update_data = record
return self
def execute(self) -> 'StorageResult':
"""Execute the query or write operation"""
# Handle write operations
if hasattr(self, '_insert_data'):
return self._do_insert()
if hasattr(self, '_upsert_data'):
return self._do_upsert()
if hasattr(self, '_update_data'):
return self._do_update()
# Handle read operation
data = self.db._load_table(self.table_name)
if self._query:
results = []
for key, record in data.items():
match = all(record.get(k) == v for k, v in self._query.items())
if match:
results.append(record)
if self._single and results:
return StorageResult(results[0])
elif self._single:
return StorageResult(None)
return StorageResult(results)
else:
return StorageResult(list(data.values()))
def _do_insert(self) -> 'StorageResult':
record = self._insert_data
data = self.db._load_table(self.table_name)
if 'id' not in record:
record['id'] = str(uuid.uuid4())
key = record.get('student_id', record.get('id'))
record['created_at'] = datetime.utcnow().isoformat()
data[key] = record
self.db._save_table(self.table_name, data)
print(f"[StorageBucketDB] Inserted into {self.table_name}: {key}")
return StorageResult([record])
def _do_upsert(self) -> 'StorageResult':
record = self._upsert_data
data = self.db._load_table(self.table_name)
key = record.get('student_id', str(uuid.uuid4()))
if key in data:
existing = data[key]
existing.update(record)
existing['updated_at'] = datetime.utcnow().isoformat()
record = existing
else:
record['id'] = str(uuid.uuid4())
record['created_at'] = datetime.utcnow().isoformat()
data[key] = record
self.db._save_table(self.table_name, data)
print(f"[StorageBucketDB] Upserted into {self.table_name}: {key}")
return StorageResult([record])
def _do_update(self) -> 'StorageResult':
updates = self._update_data
data = self.db._load_table(self.table_name)
updated = []
for key, record in data.items():
match = all(record.get(k) == v for k, v in self._query.items())
if match:
record.update(updates)
record['updated_at'] = datetime.utcnow().isoformat()
updated.append(record)
self.db._save_table(self.table_name, data)
print(f"[StorageBucketDB] Updated {len(updated)} records in {self.table_name}")
return StorageResult(updated)
class StorageResult:
"""Mimics Supabase result object"""
def __init__(self, data: Any):
self.data = data
def execute(self) -> 'StorageResult':
return self