# app/storage.py # Supabase Storage integration for persisting generated audio files. # Uploads audio to the tts-audio public bucket and returns a public URL. # Called as a background thread from run_synthesis — non-blocking. import os import threading from pathlib import Path from supabase import create_client, Client # --- cleanup config --- # soft limit: trigger cleanup when total audio in bucket exceeds this _BUCKET_SIZE_LIMIT_BYTES = 800 * 1024 * 1024 # 800MB # --- client setup --- _client: Client | None = None def _get_client() -> Client: global _client if _client is None: url = os.getenv("SUPABASE_URL") key = os.getenv("SUPABASE_ANON_KEY") if not url or not key: raise ValueError( "SUPABASE_URL and SUPABASE_ANON_KEY must be set in .env" ) _client = create_client(url, key) return _client def upload_audio(local_path: str, filename: str) -> str | None: """ Upload an audio file to Supabase tts-audio bucket. Returns the public URL on success, None on failure. Args: local_path: full path to local audio file filename: destination filename in bucket (e.g. '2026-04-14_kokoro_K-2.wav') """ try: client = _get_client() with open(local_path, "rb") as f: data = f.read() # detect content type ext = Path(local_path).suffix.lower() content_type = "audio/mpeg" if ext == ".mp3" else "audio/wav" client.storage.from_("tts-audio").upload( path=filename, file=data, file_options={"content-type": content_type, "upsert": "true"}, ) # build public URL result = client.storage.from_("tts-audio").get_public_url(filename) return result except Exception as e: print(f"[Storage] Upload failed for {filename}: {e}") return None def upload_audio_background(local_path: str, filename: str, callback=None) -> None: """ Upload audio in a background thread — non-blocking. Optionally calls callback(url) when done, where url is None on failure. Args: local_path: full path to local audio file filename: destination filename in bucket callback: optional function(url: str | None) called after upload """ def _run(): url = upload_audio(local_path, filename) if callback: callback(url) thread = threading.Thread(target=_run, daemon=True) thread.start() def upload_csv(local_path: str) -> bool: """ Upload eval_log.csv to Supabase tts-audio bucket. Uses upsert so it overwrites the existing file. Returns True on success, False on failure. """ try: client = _get_client() with open(local_path, "rb") as f: data = f.read() client.storage.from_("tts-audio").upload( path="eval_log.csv", file=data, file_options={"content-type": "text/csv", "upsert": "true"}, ) print("[Storage] eval_log.csv uploaded to Supabase") return True except Exception as e: print(f"[Storage] CSV upload failed: {e}") return False def download_csv(local_path: str) -> bool: """ Download eval_log.csv from Supabase tts-audio bucket to local path. Returns True on success, False on failure. """ try: client = _get_client() response = client.storage.from_("tts-audio").download("eval_log.csv") os.makedirs(os.path.dirname(local_path), exist_ok=True) with open(local_path, "wb") as f: f.write(response) print("[Storage] eval_log.csv downloaded from Supabase") return True except Exception as e: print(f"[Storage] CSV download failed (will use local fallback): {e}") return False def upload_csv_background(local_path: str) -> None: """Upload CSV in background thread — non-blocking.""" thread = threading.Thread(target=upload_csv, args=(local_path,), daemon=True) thread.start() def cleanup_bucket_if_needed(csv_local_path: str) -> None: """ Check total size of audio files in tts-audio bucket. If over _BUCKET_SIZE_LIMIT_BYTES, delete oldest files by filename timestamp until back under limit. Removes corresponding rows from local CSV and re-uploads it to Supabase. Skips eval_log.csv when calculating size and deleting. """ try: client = _get_client() # list all files in bucket files = client.storage.from_("tts-audio").list() if not files: return # filter out CSV — only count audio files audio_files = [f for f in files if f["name"] != "eval_log.csv"] # calculate total size total_bytes = sum(f.get("metadata", {}).get("size", 0) for f in audio_files) if total_bytes <= _BUCKET_SIZE_LIMIT_BYTES: return print(f"[Storage] Cleanup triggered: {total_bytes / 1024 / 1024:.1f}MB exceeds {_BUCKET_SIZE_LIMIT_BYTES / 1024 / 1024:.0f}MB limit") # sort by filename (timestamp prefix ensures chronological order) audio_files.sort(key=lambda f: f["name"]) # delete oldest files until under limit freed_bytes = 0 deleted_names = [] for f in audio_files: if total_bytes - freed_bytes <= _BUCKET_SIZE_LIMIT_BYTES: break name = f["name"] size = f.get("metadata", {}).get("size", 0) try: client.storage.from_("tts-audio").remove([name]) freed_bytes += size deleted_names.append(name) print(f"[Storage] Cleanup: deleted {name} ({size / 1024 / 1024:.2f}MB)") except Exception as e: print(f"[Storage] Cleanup: failed to delete {name}: {e}") print(f"[Storage] Cleanup: deleted {len(deleted_names)} files, freed {freed_bytes / 1024 / 1024:.1f}MB") if not deleted_names: return # remove corresponding rows from CSV try: import pandas as pd if not os.path.exists(csv_local_path): return df = pd.read_csv(csv_local_path, dtype={"audio_url": str}) # build set of deleted URLs for fast lookup deleted_urls = set() for name in deleted_names: # reconstruct public URL pattern to match against csv url_fragment = f"tts-audio/{name}" deleted_urls.add(url_fragment) # drop rows whose audio_url contains a deleted filename original_len = len(df) df = df[~df["audio_url"].apply( lambda url: any(d in str(url) for d in deleted_urls) )] rows_removed = original_len - len(df) df.to_csv(csv_local_path, index=False) print(f"[Storage] Cleanup: removed {rows_removed} rows from CSV") # re-upload cleaned CSV upload_csv(csv_local_path) except Exception as e: print(f"[Storage] Cleanup: CSV update failed: {e}") except Exception as e: print(f"[Storage] Cleanup check failed: {e}") def cleanup_bucket_background(csv_local_path: str) -> None: """Run bucket cleanup in background thread — non-blocking.""" thread = threading.Thread( target=cleanup_bucket_if_needed, args=(csv_local_path,), daemon=True, ) thread.start()