vijaykumaredstellar's picture
Upload 6 files
7f10996 verified
"""
Supabase Database Helper
Saves/loads audit results in batches.
═══════════════════════════════════════════════════════════════
RUN THIS SQL IN SUPABASE SQL EDITOR (one-time setup):
═══════════════════════════════════════════════════════════════
CREATE TABLE audit_runs (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
name TEXT NOT NULL,
domain TEXT NOT NULL,
total_urls INTEGER DEFAULT 0,
completed_urls INTEGER DEFAULT 0,
status TEXT DEFAULT 'running' CHECK (status IN ('running', 'paused', 'completed', 'error')),
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now(),
summary JSONB DEFAULT '{}'::jsonb,
pending_urls JSONB DEFAULT '[]'::jsonb
);
CREATE TABLE audit_pages (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
run_id UUID REFERENCES audit_runs(id) ON DELETE CASCADE,
url TEXT NOT NULL,
result JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX idx_audit_pages_run_id ON audit_pages(run_id);
CREATE INDEX idx_audit_pages_url ON audit_pages(url);
CREATE INDEX idx_audit_runs_status ON audit_runs(status);
ALTER TABLE audit_runs ENABLE ROW LEVEL SECURITY;
ALTER TABLE audit_pages ENABLE ROW LEVEL SECURITY;
CREATE POLICY "Allow all on audit_runs" ON audit_runs FOR ALL USING (true);
CREATE POLICY "Allow all on audit_pages" ON audit_pages FOR ALL USING (true);
═══════════════════════════════════════════════════════════════
"""
import json
from datetime import datetime
from supabase import create_client, Client
def get_client(url: str, key: str) -> Client:
return create_client(url, key)
# ─── Run Management ───
def create_run(client: Client, name: str, domain: str, total_urls: int, all_urls: list) -> str:
data = {
"name": name,
"domain": domain,
"total_urls": total_urls,
"completed_urls": 0,
"status": "running",
"pending_urls": json.dumps(all_urls),
}
response = client.table("audit_runs").insert(data).execute()
return response.data[0]["id"]
def get_run(client: Client, run_id: str):
response = client.table("audit_runs").select("*").eq("id", run_id).single().execute()
return response.data
def get_all_runs(client: Client):
response = client.table("audit_runs").select("*").order("created_at", desc=True).execute()
return response.data
def update_run_status(client: Client, run_id: str, status: str, completed: int = None, summary: dict = None):
data = {"status": status, "updated_at": datetime.utcnow().isoformat()}
if completed is not None:
data["completed_urls"] = completed
if summary is not None:
data["summary"] = json.loads(json.dumps(summary, default=str))
client.table("audit_runs").update(data).eq("id", run_id).execute()
def delete_run(client: Client, run_id: str):
client.table("audit_pages").delete().eq("run_id", run_id).execute()
client.table("audit_runs").delete().eq("id", run_id).execute()
# ─── Page Results ───
def save_batch_results(client: Client, run_id: str, batch_results: list):
rows = []
for r in batch_results:
clean = json.loads(json.dumps(r, default=str))
rows.append({"run_id": run_id, "url": r['url'], "result": clean})
for i in range(0, len(rows), 50):
chunk = rows[i:i+50]
client.table("audit_pages").insert(chunk).execute()
def get_completed_urls(client: Client, run_id: str) -> set:
urls = set()
offset = 0
page_size = 1000
while True:
response = (
client.table("audit_pages")
.select("url")
.eq("run_id", run_id)
.range(offset, offset + page_size - 1)
.execute()
)
if not response.data:
break
urls.update(p["url"] for p in response.data)
if len(response.data) < page_size:
break
offset += page_size
return urls
def get_completed_count(client: Client, run_id: str) -> int:
response = (
client.table("audit_pages")
.select("id", count="exact")
.eq("run_id", run_id)
.execute()
)
return response.count or 0
def get_all_page_results(client: Client, run_id: str) -> list:
all_pages = []
offset = 0
page_size = 500
while True:
response = (
client.table("audit_pages")
.select("url, result")
.eq("run_id", run_id)
.order("created_at", desc=False)
.range(offset, offset + page_size - 1)
.execute()
)
if not response.data:
break
all_pages.extend(response.data)
if len(response.data) < page_size:
break
offset += page_size
return all_pages
def get_pending_urls(client: Client, run_id: str) -> list:
run = get_run(client, run_id)
pending = run.get("pending_urls")
if isinstance(pending, str):
return json.loads(pending)
return pending or []