""" Supabase Database Helper Saves/loads audit results in batches. ═══════════════════════════════════════════════════════════════ RUN THIS SQL IN SUPABASE SQL EDITOR (one-time setup): ═══════════════════════════════════════════════════════════════ CREATE TABLE audit_runs ( id UUID DEFAULT gen_random_uuid() PRIMARY KEY, name TEXT NOT NULL, domain TEXT NOT NULL, total_urls INTEGER DEFAULT 0, completed_urls INTEGER DEFAULT 0, status TEXT DEFAULT 'running' CHECK (status IN ('running', 'paused', 'completed', 'error')), created_at TIMESTAMPTZ DEFAULT now(), updated_at TIMESTAMPTZ DEFAULT now(), summary JSONB DEFAULT '{}'::jsonb, pending_urls JSONB DEFAULT '[]'::jsonb ); CREATE TABLE audit_pages ( id UUID DEFAULT gen_random_uuid() PRIMARY KEY, run_id UUID REFERENCES audit_runs(id) ON DELETE CASCADE, url TEXT NOT NULL, result JSONB NOT NULL, created_at TIMESTAMPTZ DEFAULT now() ); CREATE INDEX idx_audit_pages_run_id ON audit_pages(run_id); CREATE INDEX idx_audit_pages_url ON audit_pages(url); CREATE INDEX idx_audit_runs_status ON audit_runs(status); ALTER TABLE audit_runs ENABLE ROW LEVEL SECURITY; ALTER TABLE audit_pages ENABLE ROW LEVEL SECURITY; CREATE POLICY "Allow all on audit_runs" ON audit_runs FOR ALL USING (true); CREATE POLICY "Allow all on audit_pages" ON audit_pages FOR ALL USING (true); ═══════════════════════════════════════════════════════════════ """ import json from datetime import datetime from supabase import create_client, Client def get_client(url: str, key: str) -> Client: return create_client(url, key) # ─── Run Management ─── def create_run(client: Client, name: str, domain: str, total_urls: int, all_urls: list) -> str: data = { "name": name, "domain": domain, "total_urls": total_urls, "completed_urls": 0, "status": "running", "pending_urls": json.dumps(all_urls), } response = client.table("audit_runs").insert(data).execute() return response.data[0]["id"] def get_run(client: Client, run_id: str): response = client.table("audit_runs").select("*").eq("id", run_id).single().execute() return response.data def get_all_runs(client: Client): response = client.table("audit_runs").select("*").order("created_at", desc=True).execute() return response.data def update_run_status(client: Client, run_id: str, status: str, completed: int = None, summary: dict = None): data = {"status": status, "updated_at": datetime.utcnow().isoformat()} if completed is not None: data["completed_urls"] = completed if summary is not None: data["summary"] = json.loads(json.dumps(summary, default=str)) client.table("audit_runs").update(data).eq("id", run_id).execute() def delete_run(client: Client, run_id: str): client.table("audit_pages").delete().eq("run_id", run_id).execute() client.table("audit_runs").delete().eq("id", run_id).execute() # ─── Page Results ─── def save_batch_results(client: Client, run_id: str, batch_results: list): rows = [] for r in batch_results: clean = json.loads(json.dumps(r, default=str)) rows.append({"run_id": run_id, "url": r['url'], "result": clean}) for i in range(0, len(rows), 50): chunk = rows[i:i+50] client.table("audit_pages").insert(chunk).execute() def get_completed_urls(client: Client, run_id: str) -> set: urls = set() offset = 0 page_size = 1000 while True: response = ( client.table("audit_pages") .select("url") .eq("run_id", run_id) .range(offset, offset + page_size - 1) .execute() ) if not response.data: break urls.update(p["url"] for p in response.data) if len(response.data) < page_size: break offset += page_size return urls def get_completed_count(client: Client, run_id: str) -> int: response = ( client.table("audit_pages") .select("id", count="exact") .eq("run_id", run_id) .execute() ) return response.count or 0 def get_all_page_results(client: Client, run_id: str) -> list: all_pages = [] offset = 0 page_size = 500 while True: response = ( client.table("audit_pages") .select("url, result") .eq("run_id", run_id) .order("created_at", desc=False) .range(offset, offset + page_size - 1) .execute() ) if not response.data: break all_pages.extend(response.data) if len(response.data) < page_size: break offset += page_size return all_pages def get_pending_urls(client: Client, run_id: str) -> list: run = get_run(client, run_id) pending = run.get("pending_urls") if isinstance(pending, str): return json.loads(pending) return pending or []