tts-eval-framework / app /storage.py
aankitdas's picture
added storage limit guard
229a3e3
# app/storage.py
# Supabase Storage integration for persisting generated audio files.
# Uploads audio to the tts-audio public bucket and returns a public URL.
# Called as a background thread from run_synthesis — non-blocking.
import os
import threading
from pathlib import Path
from supabase import create_client, Client
# --- cleanup config ---
# soft limit: trigger cleanup when total audio in bucket exceeds this
_BUCKET_SIZE_LIMIT_BYTES = 800 * 1024 * 1024 # 800MB
# --- client setup ---
_client: Client | None = None
def _get_client() -> Client:
global _client
if _client is None:
url = os.getenv("SUPABASE_URL")
key = os.getenv("SUPABASE_ANON_KEY")
if not url or not key:
raise ValueError(
"SUPABASE_URL and SUPABASE_ANON_KEY must be set in .env"
)
_client = create_client(url, key)
return _client
def upload_audio(local_path: str, filename: str) -> str | None:
"""
Upload an audio file to Supabase tts-audio bucket.
Returns the public URL on success, None on failure.
Args:
local_path: full path to local audio file
filename: destination filename in bucket (e.g. '2026-04-14_kokoro_K-2.wav')
"""
try:
client = _get_client()
with open(local_path, "rb") as f:
data = f.read()
# detect content type
ext = Path(local_path).suffix.lower()
content_type = "audio/mpeg" if ext == ".mp3" else "audio/wav"
client.storage.from_("tts-audio").upload(
path=filename,
file=data,
file_options={"content-type": content_type, "upsert": "true"},
)
# build public URL
result = client.storage.from_("tts-audio").get_public_url(filename)
return result
except Exception as e:
print(f"[Storage] Upload failed for {filename}: {e}")
return None
def upload_audio_background(local_path: str, filename: str, callback=None) -> None:
"""
Upload audio in a background thread — non-blocking.
Optionally calls callback(url) when done, where url is None on failure.
Args:
local_path: full path to local audio file
filename: destination filename in bucket
callback: optional function(url: str | None) called after upload
"""
def _run():
url = upload_audio(local_path, filename)
if callback:
callback(url)
thread = threading.Thread(target=_run, daemon=True)
thread.start()
def upload_csv(local_path: str) -> bool:
"""
Upload eval_log.csv to Supabase tts-audio bucket.
Uses upsert so it overwrites the existing file.
Returns True on success, False on failure.
"""
try:
client = _get_client()
with open(local_path, "rb") as f:
data = f.read()
client.storage.from_("tts-audio").upload(
path="eval_log.csv",
file=data,
file_options={"content-type": "text/csv", "upsert": "true"},
)
print("[Storage] eval_log.csv uploaded to Supabase")
return True
except Exception as e:
print(f"[Storage] CSV upload failed: {e}")
return False
def download_csv(local_path: str) -> bool:
"""
Download eval_log.csv from Supabase tts-audio bucket to local path.
Returns True on success, False on failure.
"""
try:
client = _get_client()
response = client.storage.from_("tts-audio").download("eval_log.csv")
os.makedirs(os.path.dirname(local_path), exist_ok=True)
with open(local_path, "wb") as f:
f.write(response)
print("[Storage] eval_log.csv downloaded from Supabase")
return True
except Exception as e:
print(f"[Storage] CSV download failed (will use local fallback): {e}")
return False
def upload_csv_background(local_path: str) -> None:
"""Upload CSV in background thread — non-blocking."""
thread = threading.Thread(target=upload_csv, args=(local_path,), daemon=True)
thread.start()
def cleanup_bucket_if_needed(csv_local_path: str) -> None:
"""
Check total size of audio files in tts-audio bucket.
If over _BUCKET_SIZE_LIMIT_BYTES, delete oldest files by filename
timestamp until back under limit. Removes corresponding rows from
local CSV and re-uploads it to Supabase.
Skips eval_log.csv when calculating size and deleting.
"""
try:
client = _get_client()
# list all files in bucket
files = client.storage.from_("tts-audio").list()
if not files:
return
# filter out CSV — only count audio files
audio_files = [f for f in files if f["name"] != "eval_log.csv"]
# calculate total size
total_bytes = sum(f.get("metadata", {}).get("size", 0) for f in audio_files)
if total_bytes <= _BUCKET_SIZE_LIMIT_BYTES:
return
print(f"[Storage] Cleanup triggered: {total_bytes / 1024 / 1024:.1f}MB exceeds {_BUCKET_SIZE_LIMIT_BYTES / 1024 / 1024:.0f}MB limit")
# sort by filename (timestamp prefix ensures chronological order)
audio_files.sort(key=lambda f: f["name"])
# delete oldest files until under limit
freed_bytes = 0
deleted_names = []
for f in audio_files:
if total_bytes - freed_bytes <= _BUCKET_SIZE_LIMIT_BYTES:
break
name = f["name"]
size = f.get("metadata", {}).get("size", 0)
try:
client.storage.from_("tts-audio").remove([name])
freed_bytes += size
deleted_names.append(name)
print(f"[Storage] Cleanup: deleted {name} ({size / 1024 / 1024:.2f}MB)")
except Exception as e:
print(f"[Storage] Cleanup: failed to delete {name}: {e}")
print(f"[Storage] Cleanup: deleted {len(deleted_names)} files, freed {freed_bytes / 1024 / 1024:.1f}MB")
if not deleted_names:
return
# remove corresponding rows from CSV
try:
import pandas as pd
if not os.path.exists(csv_local_path):
return
df = pd.read_csv(csv_local_path, dtype={"audio_url": str})
# build set of deleted URLs for fast lookup
deleted_urls = set()
for name in deleted_names:
# reconstruct public URL pattern to match against csv
url_fragment = f"tts-audio/{name}"
deleted_urls.add(url_fragment)
# drop rows whose audio_url contains a deleted filename
original_len = len(df)
df = df[~df["audio_url"].apply(
lambda url: any(d in str(url) for d in deleted_urls)
)]
rows_removed = original_len - len(df)
df.to_csv(csv_local_path, index=False)
print(f"[Storage] Cleanup: removed {rows_removed} rows from CSV")
# re-upload cleaned CSV
upload_csv(csv_local_path)
except Exception as e:
print(f"[Storage] Cleanup: CSV update failed: {e}")
except Exception as e:
print(f"[Storage] Cleanup check failed: {e}")
def cleanup_bucket_background(csv_local_path: str) -> None:
"""Run bucket cleanup in background thread — non-blocking."""
thread = threading.Thread(
target=cleanup_bucket_if_needed,
args=(csv_local_path,),
daemon=True,
)
thread.start()