Spaces:
Sleeping
Sleeping
File size: 6,664 Bytes
2a317a3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 | """Supabase Storage-based storage for analytics data"""
import json
import os
from datetime import datetime
from typing import Dict, Any, Optional, List
import uuid
class StorageBucketDB:
"""Uses Supabase Storage bucket to store analytics JSON files"""
BUCKET_NAME = 'analytics-data'
def __init__(self, supabase_client):
self.supabase = supabase_client
self.storage = supabase_client.storage
print(f"[StorageBucketDB] Initialized with bucket: {self.BUCKET_NAME}")
# In-memory cache for faster reads
self._cache = {}
def _get_path(self, table_name: str) -> str:
"""Get storage path for a table"""
return f"{table_name}.json"
def _load_table(self, table_name: str) -> Dict:
"""Load table data from storage"""
if table_name in self._cache:
return self._cache[table_name]
try:
path = self._get_path(table_name)
response = self.storage.from_(self.BUCKET_NAME).download(path)
if response:
data = json.loads(response.decode('utf-8'))
self._cache[table_name] = data
print(f"[StorageBucketDB] Loaded {table_name}: {len(data)} records")
return data
except Exception as e:
print(f"[StorageBucketDB] Table {table_name} not found or empty: {e}")
self._cache[table_name] = {}
return {}
def _save_table(self, table_name: str, data: Dict):
"""Save table data to storage"""
try:
path = self._get_path(table_name)
content = json.dumps(data, indent=2, default=str).encode('utf-8')
# Try to update first, then upload if not exists
try:
self.storage.from_(self.BUCKET_NAME).update(path, content, {
'content-type': 'application/json'
})
except:
self.storage.from_(self.BUCKET_NAME).upload(path, content, {
'content-type': 'application/json'
})
self._cache[table_name] = data
print(f"[StorageBucketDB] Saved {table_name}: {len(data)} records")
except Exception as e:
print(f"[StorageBucketDB] Error saving {table_name}: {e}")
raise
def table(self, table_name: str) -> 'StorageTable':
"""Return a table-like interface"""
return StorageTable(self, table_name)
class StorageTable:
"""Mimics Supabase table interface using storage bucket"""
def __init__(self, db: StorageBucketDB, table_name: str):
self.db = db
self.table_name = table_name
self._query = {}
self._single = False
def select(self, columns: str = '*') -> 'StorageTable':
return self
def eq(self, column: str, value: Any) -> 'StorageTable':
self._query[column] = value
return self
def maybe_single(self) -> 'StorageTable':
self._single = True
return self
def single(self) -> 'StorageTable':
self._single = True
return self
def insert(self, record: Dict) -> 'StorageTable':
self._insert_data = record
return self
def upsert(self, record: Dict) -> 'StorageTable':
self._upsert_data = record
return self
def update(self, record: Dict) -> 'StorageTable':
self._update_data = record
return self
def execute(self) -> 'StorageResult':
"""Execute the query or write operation"""
# Handle write operations
if hasattr(self, '_insert_data'):
return self._do_insert()
if hasattr(self, '_upsert_data'):
return self._do_upsert()
if hasattr(self, '_update_data'):
return self._do_update()
# Handle read operation
data = self.db._load_table(self.table_name)
if self._query:
results = []
for key, record in data.items():
match = all(record.get(k) == v for k, v in self._query.items())
if match:
results.append(record)
if self._single and results:
return StorageResult(results[0])
elif self._single:
return StorageResult(None)
return StorageResult(results)
else:
return StorageResult(list(data.values()))
def _do_insert(self) -> 'StorageResult':
record = self._insert_data
data = self.db._load_table(self.table_name)
if 'id' not in record:
record['id'] = str(uuid.uuid4())
key = record.get('student_id', record.get('id'))
record['created_at'] = datetime.utcnow().isoformat()
data[key] = record
self.db._save_table(self.table_name, data)
print(f"[StorageBucketDB] Inserted into {self.table_name}: {key}")
return StorageResult([record])
def _do_upsert(self) -> 'StorageResult':
record = self._upsert_data
data = self.db._load_table(self.table_name)
key = record.get('student_id', str(uuid.uuid4()))
if key in data:
existing = data[key]
existing.update(record)
existing['updated_at'] = datetime.utcnow().isoformat()
record = existing
else:
record['id'] = str(uuid.uuid4())
record['created_at'] = datetime.utcnow().isoformat()
data[key] = record
self.db._save_table(self.table_name, data)
print(f"[StorageBucketDB] Upserted into {self.table_name}: {key}")
return StorageResult([record])
def _do_update(self) -> 'StorageResult':
updates = self._update_data
data = self.db._load_table(self.table_name)
updated = []
for key, record in data.items():
match = all(record.get(k) == v for k, v in self._query.items())
if match:
record.update(updates)
record['updated_at'] = datetime.utcnow().isoformat()
updated.append(record)
self.db._save_table(self.table_name, data)
print(f"[StorageBucketDB] Updated {len(updated)} records in {self.table_name}")
return StorageResult(updated)
class StorageResult:
"""Mimics Supabase result object"""
def __init__(self, data: Any):
self.data = data
def execute(self) -> 'StorageResult':
return self
|