|
|
"""
|
|
|
Supabase Storage Integration for Photo Selection App
|
|
|
Handles persistent storage of dataset metadata (not photos) in Supabase.
|
|
|
Also provides global embedding cache for CLIP/SigLIP embeddings.
|
|
|
"""
|
|
|
|
|
|
import os
|
|
|
import json
|
|
|
import base64
|
|
|
import hashlib
|
|
|
import numpy as np
|
|
|
from typing import Optional, List, Dict, Any
|
|
|
|
|
|
|
|
|
SUPABASE_URL = os.environ.get('SUPABASE_URL', 'https://cqnyibiopjcwuxmyqbgy.supabase.co/')
|
|
|
if not SUPABASE_URL.endswith('/'):
|
|
|
SUPABASE_URL = SUPABASE_URL + '/'
|
|
|
SUPABASE_KEY = os.environ.get('SUPABASE_KEY', '')
|
|
|
BUCKET_NAME = 'datasets'
|
|
|
|
|
|
|
|
|
_supabase_client = None
|
|
|
|
|
|
def get_supabase_client():
|
|
|
"""Get or create Supabase client."""
|
|
|
global _supabase_client
|
|
|
|
|
|
if not SUPABASE_KEY:
|
|
|
print("[Supabase] No SUPABASE_KEY found in environment")
|
|
|
return None
|
|
|
|
|
|
if _supabase_client is None:
|
|
|
try:
|
|
|
from supabase import create_client
|
|
|
_supabase_client = create_client(SUPABASE_URL, SUPABASE_KEY)
|
|
|
print(f"[Supabase] Connected to {SUPABASE_URL}")
|
|
|
except ImportError:
|
|
|
print("[Supabase] supabase-py not installed. Run: pip install supabase")
|
|
|
return None
|
|
|
except Exception as e:
|
|
|
print(f"[Supabase] Connection error: {e}")
|
|
|
return None
|
|
|
|
|
|
return _supabase_client
|
|
|
|
|
|
|
|
|
def is_supabase_available() -> bool:
|
|
|
"""Check if Supabase is configured and available."""
|
|
|
return get_supabase_client() is not None
|
|
|
|
|
|
|
|
|
def _get_dataset_registry(client) -> List[str]:
|
|
|
"""
|
|
|
Get the list of dataset names from the registry file.
|
|
|
Returns None if there's an error reading (to prevent accidental overwrite).
|
|
|
Returns [] only if file doesn't exist yet.
|
|
|
"""
|
|
|
try:
|
|
|
storage = client.storage.from_(BUCKET_NAME)
|
|
|
response = storage.download("_registry.json")
|
|
|
registry = json.loads(response.decode('utf-8'))
|
|
|
return registry.get('datasets', [])
|
|
|
except Exception as e:
|
|
|
error_str = str(e).lower()
|
|
|
|
|
|
if 'not found' in error_str or '404' in error_str or 'does not exist' in error_str:
|
|
|
print("[Supabase] Registry file doesn't exist yet, starting fresh")
|
|
|
return []
|
|
|
else:
|
|
|
|
|
|
print(f"[Supabase] ERROR reading registry: {e}")
|
|
|
return None
|
|
|
|
|
|
|
|
|
def _update_dataset_registry(client, dataset_name: str, action: str = 'add'):
|
|
|
"""Update the registry file with dataset names."""
|
|
|
try:
|
|
|
storage = client.storage.from_(BUCKET_NAME)
|
|
|
|
|
|
|
|
|
datasets = _get_dataset_registry(client)
|
|
|
|
|
|
|
|
|
if datasets is None:
|
|
|
print(f"[Supabase] Skipping registry update - couldn't read existing registry safely")
|
|
|
return
|
|
|
|
|
|
if action == 'add' and dataset_name not in datasets:
|
|
|
datasets.append(dataset_name)
|
|
|
elif action == 'remove' and dataset_name in datasets:
|
|
|
datasets.remove(dataset_name)
|
|
|
else:
|
|
|
return
|
|
|
|
|
|
|
|
|
registry_data = json.dumps({'datasets': datasets}, indent=2).encode('utf-8')
|
|
|
|
|
|
|
|
|
try:
|
|
|
storage.update(
|
|
|
path="_registry.json",
|
|
|
file=registry_data,
|
|
|
file_options={"content-type": "application/json"}
|
|
|
)
|
|
|
except Exception:
|
|
|
|
|
|
storage.upload(
|
|
|
path="_registry.json",
|
|
|
file=registry_data,
|
|
|
file_options={"content-type": "application/json"}
|
|
|
)
|
|
|
|
|
|
print(f"[Supabase] Registry updated: {action} '{dataset_name}'")
|
|
|
except Exception as e:
|
|
|
print(f"[Supabase] Error updating registry: {e}")
|
|
|
|
|
|
|
|
|
def save_dataset_to_supabase(
|
|
|
dataset_name: str,
|
|
|
embeddings_data: bytes,
|
|
|
face_results: dict,
|
|
|
metadata: dict
|
|
|
) -> bool:
|
|
|
"""
|
|
|
Save dataset files to Supabase Storage.
|
|
|
|
|
|
Args:
|
|
|
dataset_name: Unique name for the dataset (folder name)
|
|
|
embeddings_data: Binary data of reference_embeddings.npz
|
|
|
face_results: Dictionary of face detection results
|
|
|
metadata: Dataset metadata dictionary
|
|
|
|
|
|
Returns:
|
|
|
True if successful, False otherwise
|
|
|
"""
|
|
|
client = get_supabase_client()
|
|
|
if not client:
|
|
|
print("[Supabase] Client not available, skipping cloud save")
|
|
|
return False
|
|
|
|
|
|
try:
|
|
|
|
|
|
embeddings_path = f"{dataset_name}/reference_embeddings.npz"
|
|
|
result = client.storage.from_(BUCKET_NAME).upload(
|
|
|
path=embeddings_path,
|
|
|
file=embeddings_data,
|
|
|
file_options={"content-type": "application/octet-stream"}
|
|
|
)
|
|
|
print(f"[Supabase] Uploaded {embeddings_path}: {result}")
|
|
|
|
|
|
|
|
|
face_results_path = f"{dataset_name}/face_results.json"
|
|
|
face_results_bytes = json.dumps(face_results, indent=2).encode('utf-8')
|
|
|
result = client.storage.from_(BUCKET_NAME).upload(
|
|
|
path=face_results_path,
|
|
|
file=face_results_bytes,
|
|
|
file_options={"content-type": "application/json"}
|
|
|
)
|
|
|
print(f"[Supabase] Uploaded {face_results_path}: {result}")
|
|
|
|
|
|
|
|
|
metadata_path = f"{dataset_name}/metadata.json"
|
|
|
metadata_bytes = json.dumps(metadata, indent=2).encode('utf-8')
|
|
|
result = client.storage.from_(BUCKET_NAME).upload(
|
|
|
path=metadata_path,
|
|
|
file=metadata_bytes,
|
|
|
file_options={"content-type": "application/json"}
|
|
|
)
|
|
|
print(f"[Supabase] Uploaded {metadata_path}")
|
|
|
|
|
|
|
|
|
_update_dataset_registry(client, dataset_name, action='add')
|
|
|
|
|
|
print(f"[Supabase] Dataset '{dataset_name}' saved successfully")
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"[Supabase] Error saving dataset: {e}")
|
|
|
return False
|
|
|
|
|
|
|
|
|
def load_dataset_from_supabase(dataset_name: str) -> Optional[Dict[str, Any]]:
|
|
|
"""
|
|
|
Load dataset files from Supabase Storage.
|
|
|
|
|
|
Args:
|
|
|
dataset_name: Name of the dataset to load
|
|
|
|
|
|
Returns:
|
|
|
Dictionary with 'embeddings_data', 'face_results', 'metadata' or None if failed
|
|
|
"""
|
|
|
client = get_supabase_client()
|
|
|
if not client:
|
|
|
print("[Supabase] Client not available")
|
|
|
return None
|
|
|
|
|
|
try:
|
|
|
result = {}
|
|
|
|
|
|
|
|
|
embeddings_path = f"{dataset_name}/reference_embeddings.npz"
|
|
|
response = client.storage.from_(BUCKET_NAME).download(embeddings_path)
|
|
|
result['embeddings_data'] = response
|
|
|
print(f"[Supabase] Downloaded {embeddings_path}")
|
|
|
|
|
|
|
|
|
face_results_path = f"{dataset_name}/face_results.json"
|
|
|
response = client.storage.from_(BUCKET_NAME).download(face_results_path)
|
|
|
result['face_results'] = json.loads(response.decode('utf-8'))
|
|
|
print(f"[Supabase] Downloaded {face_results_path}")
|
|
|
|
|
|
|
|
|
metadata_path = f"{dataset_name}/metadata.json"
|
|
|
response = client.storage.from_(BUCKET_NAME).download(metadata_path)
|
|
|
result['metadata'] = json.loads(response.decode('utf-8'))
|
|
|
print(f"[Supabase] Downloaded {metadata_path}")
|
|
|
|
|
|
print(f"[Supabase] Dataset '{dataset_name}' loaded successfully")
|
|
|
return result
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"[Supabase] Error loading dataset: {e}")
|
|
|
return None
|
|
|
|
|
|
|
|
|
def list_datasets_from_supabase() -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
List all datasets by scanning folders directly in Supabase Storage.
|
|
|
No registry file needed - each folder = a dataset.
|
|
|
|
|
|
Returns:
|
|
|
List of dataset metadata dictionaries
|
|
|
"""
|
|
|
client = get_supabase_client()
|
|
|
if not client:
|
|
|
print("[Supabase] Client not available", flush=True)
|
|
|
return []
|
|
|
|
|
|
try:
|
|
|
storage = client.storage.from_(BUCKET_NAME)
|
|
|
|
|
|
|
|
|
|
|
|
items = storage.list(path="", options={"limit": 1000})
|
|
|
print(f"[Supabase] storage.list() returned {len(items)} items", flush=True)
|
|
|
|
|
|
datasets = []
|
|
|
|
|
|
for item in items:
|
|
|
name = item.get("name", "")
|
|
|
if not name:
|
|
|
continue
|
|
|
|
|
|
|
|
|
if name.startswith("_") or name.startswith("."):
|
|
|
print(f"[Supabase] Skipping: {name}", flush=True)
|
|
|
continue
|
|
|
|
|
|
folder_name = name
|
|
|
print(f"[Supabase] Found folder: {folder_name}", flush=True)
|
|
|
|
|
|
try:
|
|
|
|
|
|
metadata_path = f"{folder_name}/metadata.json"
|
|
|
metadata_response = storage.download(metadata_path)
|
|
|
metadata = json.loads(metadata_response.decode("utf-8"))
|
|
|
metadata["folder_name"] = folder_name
|
|
|
metadata["source"] = "supabase"
|
|
|
datasets.append(metadata)
|
|
|
print(f"[Supabase] Loaded dataset: {folder_name}", flush=True)
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[Supabase] No metadata for {folder_name}: {e}", flush=True)
|
|
|
datasets.append({
|
|
|
"name": folder_name,
|
|
|
"folder_name": folder_name,
|
|
|
"source": "supabase",
|
|
|
"total_photos": 0,
|
|
|
"created_at": None
|
|
|
})
|
|
|
|
|
|
print(f"[Supabase] Total datasets found: {len(datasets)}", flush=True)
|
|
|
return datasets
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"[Supabase] Error listing datasets: {e}", flush=True)
|
|
|
import traceback
|
|
|
traceback.print_exc()
|
|
|
return []
|
|
|
|
|
|
|
|
|
def delete_dataset_from_supabase(dataset_name: str) -> bool:
|
|
|
"""
|
|
|
Delete a dataset from Supabase Storage.
|
|
|
|
|
|
Args:
|
|
|
dataset_name: Name of the dataset to delete
|
|
|
|
|
|
Returns:
|
|
|
True if successful, False otherwise
|
|
|
"""
|
|
|
client = get_supabase_client()
|
|
|
if not client:
|
|
|
print("[Supabase] Client not available")
|
|
|
return False
|
|
|
|
|
|
try:
|
|
|
storage = client.storage.from_(BUCKET_NAME)
|
|
|
|
|
|
|
|
|
files = storage.list(path=dataset_name, options={"limit": 1000})
|
|
|
print(f"[Supabase] Found {len(files)} files in '{dataset_name}' to delete")
|
|
|
|
|
|
|
|
|
file_paths = [f"{dataset_name}/{f['name']}" for f in files if f.get('name')]
|
|
|
|
|
|
if file_paths:
|
|
|
result = storage.remove(file_paths)
|
|
|
print(f"[Supabase] Deleted {len(file_paths)} files from '{dataset_name}': {result}")
|
|
|
|
|
|
|
|
|
_update_dataset_registry(client, dataset_name, action='remove')
|
|
|
|
|
|
print(f"[Supabase] Dataset '{dataset_name}' deleted successfully")
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"[Supabase] Error deleting dataset: {e}")
|
|
|
return False
|
|
|
|
|
|
|
|
|
def check_dataset_exists_in_supabase(dataset_name: str) -> bool:
|
|
|
"""
|
|
|
Check if a dataset exists in Supabase.
|
|
|
|
|
|
Args:
|
|
|
dataset_name: Name of the dataset to check
|
|
|
|
|
|
Returns:
|
|
|
True if exists, False otherwise
|
|
|
"""
|
|
|
client = get_supabase_client()
|
|
|
if not client:
|
|
|
return False
|
|
|
|
|
|
try:
|
|
|
|
|
|
files = client.storage.from_(BUCKET_NAME).list(path=dataset_name, options={"limit": 1000})
|
|
|
return len(files) > 0
|
|
|
except:
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
EMBEDDING_TABLE = 'image_embeddings'
|
|
|
|
|
|
|
|
|
def compute_file_hash(filepath: str) -> Optional[str]:
|
|
|
"""
|
|
|
Compute MD5 hash of a file.
|
|
|
|
|
|
Args:
|
|
|
filepath: Path to the image file
|
|
|
|
|
|
Returns:
|
|
|
MD5 hash string or None if error
|
|
|
"""
|
|
|
try:
|
|
|
md5 = hashlib.md5()
|
|
|
with open(filepath, 'rb') as f:
|
|
|
|
|
|
for chunk in iter(lambda: f.read(8192), b''):
|
|
|
md5.update(chunk)
|
|
|
return md5.hexdigest()
|
|
|
except Exception as e:
|
|
|
print(f"[EmbeddingCache] Error hashing {filepath}: {e}")
|
|
|
return None
|
|
|
|
|
|
|
|
|
def _embedding_to_base64(embedding: np.ndarray) -> str:
|
|
|
"""Convert numpy embedding to base64 string for storage."""
|
|
|
return base64.b64encode(embedding.astype(np.float32).tobytes()).decode('utf-8')
|
|
|
|
|
|
|
|
|
def _base64_to_embedding(b64_str: str, dim: int) -> np.ndarray:
|
|
|
"""Convert base64 string back to numpy embedding."""
|
|
|
bytes_data = base64.b64decode(b64_str)
|
|
|
return np.frombuffer(bytes_data, dtype=np.float32).reshape(dim)
|
|
|
|
|
|
|
|
|
def get_cached_embeddings_batch(
|
|
|
image_hashes: List[str],
|
|
|
embedding_model: str
|
|
|
) -> Dict[str, np.ndarray]:
|
|
|
"""
|
|
|
Get cached embeddings for multiple images in one query.
|
|
|
|
|
|
Args:
|
|
|
image_hashes: List of MD5 hashes to look up
|
|
|
embedding_model: Model name ('siglip' or 'clip')
|
|
|
|
|
|
Returns:
|
|
|
Dict mapping hash -> embedding for found entries
|
|
|
"""
|
|
|
client = get_supabase_client()
|
|
|
if not client or not image_hashes:
|
|
|
return {}
|
|
|
|
|
|
try:
|
|
|
|
|
|
response = client.table(EMBEDDING_TABLE).select(
|
|
|
'image_hash, embedding, embedding_dim'
|
|
|
).in_('image_hash', image_hashes).eq('embedding_model', embedding_model).execute()
|
|
|
|
|
|
result = {}
|
|
|
for row in response.data:
|
|
|
embedding = _base64_to_embedding(row['embedding'], row['embedding_dim'])
|
|
|
result[row['image_hash']] = embedding
|
|
|
|
|
|
print(f"[EmbeddingCache] Found {len(result)}/{len(image_hashes)} cached embeddings for {embedding_model}")
|
|
|
return result
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"[EmbeddingCache] Error fetching batch: {e}")
|
|
|
return {}
|
|
|
|
|
|
|
|
|
def save_embeddings_batch(
|
|
|
embeddings: Dict[str, np.ndarray],
|
|
|
image_hashes: Dict[str, str],
|
|
|
embedding_model: str
|
|
|
) -> int:
|
|
|
"""
|
|
|
Save multiple embeddings to cache.
|
|
|
|
|
|
Args:
|
|
|
embeddings: Dict mapping filename -> embedding
|
|
|
image_hashes: Dict mapping filename -> hash
|
|
|
embedding_model: Model name ('siglip' or 'clip')
|
|
|
|
|
|
Returns:
|
|
|
Number of embeddings saved
|
|
|
"""
|
|
|
client = get_supabase_client()
|
|
|
if not client or not embeddings:
|
|
|
return 0
|
|
|
|
|
|
try:
|
|
|
|
|
|
rows = []
|
|
|
for filename, embedding in embeddings.items():
|
|
|
img_hash = image_hashes.get(filename)
|
|
|
if img_hash and embedding is not None:
|
|
|
rows.append({
|
|
|
'image_hash': img_hash,
|
|
|
'embedding_model': embedding_model,
|
|
|
'embedding': _embedding_to_base64(embedding),
|
|
|
'embedding_dim': len(embedding)
|
|
|
})
|
|
|
|
|
|
if not rows:
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
|
|
saved = 0
|
|
|
chunk_size = 100
|
|
|
for i in range(0, len(rows), chunk_size):
|
|
|
chunk = rows[i:i + chunk_size]
|
|
|
try:
|
|
|
client.table(EMBEDDING_TABLE).upsert(
|
|
|
chunk,
|
|
|
on_conflict='image_hash,embedding_model'
|
|
|
).execute()
|
|
|
saved += len(chunk)
|
|
|
except Exception as e:
|
|
|
print(f"[EmbeddingCache] Error saving chunk {i//chunk_size}: {e}")
|
|
|
|
|
|
print(f"[EmbeddingCache] Saved {saved} new embeddings for {embedding_model}")
|
|
|
return saved
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"[EmbeddingCache] Error saving batch: {e}")
|
|
|
return 0
|
|
|
|
|
|
|
|
|
def get_embedding_cache_stats() -> Dict[str, Any]:
|
|
|
"""Get statistics about the embedding cache."""
|
|
|
client = get_supabase_client()
|
|
|
if not client:
|
|
|
return {'available': False}
|
|
|
|
|
|
try:
|
|
|
|
|
|
response = client.table(EMBEDDING_TABLE).select(
|
|
|
'embedding_model',
|
|
|
count='exact'
|
|
|
).execute()
|
|
|
|
|
|
|
|
|
siglip_count = client.table(EMBEDDING_TABLE).select(
|
|
|
'id', count='exact'
|
|
|
).eq('embedding_model', 'siglip').execute()
|
|
|
|
|
|
clip_count = client.table(EMBEDDING_TABLE).select(
|
|
|
'id', count='exact'
|
|
|
).eq('embedding_model', 'clip').execute()
|
|
|
|
|
|
return {
|
|
|
'available': True,
|
|
|
'siglip_count': siglip_count.count or 0,
|
|
|
'clip_count': clip_count.count or 0,
|
|
|
'total': (siglip_count.count or 0) + (clip_count.count or 0)
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"[EmbeddingCache] Error getting stats: {e}")
|
|
|
return {'available': False, 'error': str(e)}
|
|
|
|