""" Link Audit Tool — Gradio UI for Hugging Face Spaces """ import gradio as gr import pandas as pd import time import json import os import threading import tempfile from datetime import datetime from audit_engine import audit_page, DEFAULT_BODY_SELECTORS, DEFAULT_SUGGESTION_MAP from report_generator import generate_report from db import ( get_client, create_run, get_all_runs, get_all_page_results, get_completed_urls, get_pending_urls, get_completed_count, save_batch_results, update_run_status, delete_run, ) SUPABASE_URL = os.environ.get("SUPABASE_URL", "") SUPABASE_KEY = os.environ.get("SUPABASE_KEY", "") sb = None if SUPABASE_URL and SUPABASE_KEY: try: sb = get_client(SUPABASE_URL, SUPABASE_KEY) sb.table("audit_runs").select("id").limit(1).execute() print("✅ Supabase connected") except Exception as e: print(f"❌ Supabase failed: {e}") sb = None class AuditState: def __init__(self): self.lock = threading.Lock() self.paused = False self.running = False self.run_id = None def request_pause(self): with self.lock: self.paused = True def resume(self): with self.lock: self.paused = False def is_paused(self): with self.lock: return self.paused def set_running(self, val, run_id=None): with self.lock: self.running = val if run_id: self.run_id = run_id def is_running(self): with self.lock: return self.running audit_state = AuditState() # ─── Global runs cache for dropdown ─── _runs_cache = [] def _refresh_cache(): global _runs_cache if sb is None: _runs_cache = [] return _runs_cache = get_all_runs(sb) or [] def _get_run_id_by_label(label): """Look up run ID from dropdown label.""" for r in _runs_cache: st = r.get('status', '?') expected = f"{r.get('name','?')} [{st.upper()}] ({r.get('completed_urls',0)}/{r.get('total_urls',0)})" if label == expected: return r['id'] # Maybe it's a raw UUID if label and len(label) > 30: return label return None # ═══════════════════════════════════════════════════ def run_audit(file, pasted_urls, domain, batch_size, timeout, delay, workers): if sb is None: yield "❌ Supabase not connected.", "" return urls = [] if file is not None: try: fpath = file.name if hasattr(file, 'name') else file df = pd.read_csv(fpath) if str(fpath).endswith('.csv') else pd.read_excel(fpath) url_col = None for col in df.columns: sample = str(df[col].iloc[0]).strip().lower() if sample.startswith('http') or domain in sample: url_col = col; break if not url_col: url_col = df.columns[0] urls = [u for u in df[url_col].dropna().astype(str).str.strip().tolist() if u.startswith('http')] except Exception as e: yield f"❌ File error: {e}", ""; return elif pasted_urls and pasted_urls.strip(): urls = [u.strip() for u in pasted_urls.strip().split('\n') if u.strip().startswith('http')] if not urls: yield "⚠ No valid URLs.", ""; return seen = set() unique = [] for u in urls: if u not in seen: seen.add(u); unique.append(u) urls = unique run_name = f"{domain} Audit — {datetime.now().strftime('%b %d %H:%M')} — {len(urls)} pages" run_id = create_run(sb, run_name, domain, len(urls), urls) audit_state.set_running(True, run_id) audit_state.resume() total = len(urls) batch_size, timeout, workers = int(batch_size), int(timeout), int(workers) start_time = time.time() batch_num = 0 log = [] yield f"🚀 {run_name}\n📦 {total} URLs · Batch: {batch_size}", "▶️ Running..." try: for bs in range(0, total, batch_size): if audit_state.is_paused(): c = get_completed_count(sb, run_id) update_run_status(sb, run_id, "paused", c) log.append(f"⏸️ PAUSED at {c}/{total}") audit_state.set_running(False) yield "\n".join(log[-40:]), f"⏸️ Paused — {c}/{total}"; return be = min(bs + batch_size, total) batch_urls = urls[bs:be] batch_num += 1 batch_results = [] for j, url in enumerate(batch_urls): if audit_state.is_paused(): if batch_results: save_batch_results(sb, run_id, batch_results) c = get_completed_count(sb, run_id) update_run_status(sb, run_id, "paused", c) log.append(f"⏸️ PAUSED at {c}/{total}") audit_state.set_running(False) yield "\n".join(log[-40:]), f"⏸️ Paused — {c}/{total}"; return gi = bs + j + 1 elapsed = time.time() - start_time eta = (elapsed / gi) * (total - gi) eta_s = f"{int(eta//60)}m{int(eta%60)}s" if eta > 60 else f"{eta:.0f}s" result = audit_page(url, domain, DEFAULT_BODY_SELECTORS, suggestion_map=DEFAULT_SUGGESTION_MAP, timeout=timeout, concurrent_workers=workers) batch_results.append(result) short = url.replace('https://www.', '').replace('https://', '')[:70] if result['error']: log.append(f"❌ [{gi}/{total}] {short} — {result['error'][:50]}") else: b = result['broken_int_count'] + result['broken_ext_count'] fc = result['follow_flag_count'] d = result['duplicate_count'] fl = [] if b: fl.append(f"🔴{b}broken") if fc: fl.append(f"🟡{fc}flags") if d: fl.append(f"🟣{d}dups") fs = " ".join(fl) if fl else "✅" log.append(f"[{gi}/{total}] {short} — Int:{result['int_count']} Ext:{result['ext_count']} {fs}") yield "\n".join(log[-40:]), f"📊 {gi}/{total} ({gi*100//total}%) Batch{batch_num} ETA:{eta_s}" if j < len(batch_urls) - 1: time.sleep(delay) if batch_results: try: save_batch_results(sb, run_id, batch_results) c = get_completed_count(sb, run_id) update_run_status(sb, run_id, "running", c) log.append(f"💾 Batch {batch_num} saved — {c}/{total}") except Exception as e: log.append(f"⚠ Save error: {str(e)[:60]}") yield "\n".join(log[-40:]), f"💾 Batch {batch_num} saved" del batch_results log.append("🔍 Orphan analysis...") yield "\n".join(log[-40:]), "🔍 Orphan analysis..." all_pages = get_all_page_results(sb, run_id) all_results = [p['result'] for p in all_pages] targets, pg_urls = set(), set() for r in all_results: pg_urls.add(r['url'].rstrip('/').split('?')[0]) for lk in r.get('internal_links', []): targets.add(lk['url'].rstrip('/').split('?')[0]) orphans = sorted([p for p in pg_urls if p not in targets]) summary = { 'total_pages': len(all_results), 'total_int': sum(r.get('int_count',0) for r in all_results), 'total_ext': sum(r.get('ext_count',0) for r in all_results), 'total_broken': sum(r.get('broken_int_count',0)+r.get('broken_ext_count',0) for r in all_results), 'total_redirects': sum(r.get('redirect_int_count',0)+r.get('redirect_ext_count',0) for r in all_results), 'total_flags': sum(r.get('follow_flag_count',0) for r in all_results), 'total_dups': sum(r.get('duplicate_count',0) for r in all_results), 'total_sug': sum(len(r.get('suggestions',[])) for r in all_results), 'orphan_count': len(orphans), 'orphan_urls': orphans[:100], } update_run_status(sb, run_id, "completed", len(all_results), summary) tt = time.time() - start_time log.append(f"✅ DONE! {len(all_results)} pages in {tt:.0f}s · {len(orphans)} orphans") log.append(f"Broken:{summary['total_broken']} Redirects:{summary['total_redirects']} Flags:{summary['total_flags']} Dups:{summary['total_dups']}") log.append("→ Past Runs → Refresh → Generate Report") audit_state.set_running(False) yield "\n".join(log[-40:]), f"✅ Done — {len(all_results)} pages in {tt:.0f}s" except Exception as e: log.append(f"❌ {str(e)}") audit_state.set_running(False) try: c = get_completed_count(sb, run_id); update_run_status(sb, run_id, "paused", c) except: pass yield "\n".join(log[-40:]), "❌ Error — progress saved" def pause_audit(): if audit_state.is_running(): audit_state.request_pause() return "⏸️ Stopping after current page..." return "No audit running." # ═══════════════════════════════════════════════════ def resume_audit(run_label, domain, batch_size, timeout, delay, workers): if sb is None: yield "❌ Supabase not connected.", ""; return if not run_label: yield "⚠ Select a run first (click Refresh, then pick from dropdown).", ""; return run_id = _get_run_id_by_label(run_label) if not run_id: yield f"❌ Could not find run for: {run_label}", ""; return all_urls = get_pending_urls(sb, run_id) done = get_completed_urls(sb, run_id) remaining = [u for u in all_urls if u not in done] if not remaining: update_run_status(sb, run_id, "completed", len(done)) yield "✅ Already complete!", ""; return try: rd = next((r for r in _runs_cache if r['id'] == run_id), None) if rd: domain = rd.get('domain', domain) except: pass audit_state.set_running(True, run_id) audit_state.resume() update_run_status(sb, run_id, "running") total = len(all_urls) batch_size, timeout, workers = int(batch_size), int(timeout), int(workers) start_time = time.time() bn = 0 log = [f"▶️ Resuming — {len(remaining)} left ({len(done)} done)"] yield "\n".join(log), f"Resuming: {len(done)}/{total}" try: for bs in range(0, len(remaining), batch_size): if audit_state.is_paused(): c = get_completed_count(sb, run_id) update_run_status(sb, run_id, "paused", c) log.append(f"⏸️ PAUSED {c}/{total}") audit_state.set_running(False) yield "\n".join(log[-40:]), f"⏸️ Paused {c}/{total}"; return be = min(bs + batch_size, len(remaining)) bu = remaining[bs:be] bn += 1; br = [] for j, url in enumerate(bu): if audit_state.is_paused(): if br: save_batch_results(sb, run_id, br) c = get_completed_count(sb, run_id) update_run_status(sb, run_id, "paused", c) log.append(f"⏸️ PAUSED {c}/{total}") audit_state.set_running(False) yield "\n".join(log[-40:]), f"⏸️ Paused {c}/{total}"; return gi = len(done) + bs + j + 1 elapsed = time.time() - start_time proc = bs + j + 1 eta = (elapsed / proc) * (len(remaining) - proc) eta_s = f"{int(eta//60)}m{int(eta%60)}s" if eta > 60 else f"{eta:.0f}s" result = audit_page(url, domain, DEFAULT_BODY_SELECTORS, suggestion_map=DEFAULT_SUGGESTION_MAP, timeout=timeout, concurrent_workers=workers) br.append(result) short = url.replace('https://www.', '').replace('https://', '')[:70] if result['error']: log.append(f"❌ [{gi}/{total}] {short}") else: b = result['broken_int_count'] + result['broken_ext_count'] log.append(f"[{gi}/{total}] {short} {'🔴'+str(b) if b else '✅'}") yield "\n".join(log[-40:]), f"📊 {gi}/{total} ({gi*100//total}%) ETA:{eta_s}" if j < len(bu) - 1: time.sleep(delay) if br: save_batch_results(sb, run_id, br) c = get_completed_count(sb, run_id) update_run_status(sb, run_id, "running", c) log.append(f"💾 Batch {bn} — {c}/{total}") del br log.append("🔍 Orphan analysis...") yield "\n".join(log[-40:]), "🔍 Orphans..." ap = get_all_page_results(sb, run_id) ar = [p['result'] for p in ap] tgt, pg = set(), set() for r in ar: pg.add(r['url'].rstrip('/').split('?')[0]) for lk in r.get('internal_links', []): tgt.add(lk['url'].rstrip('/').split('?')[0]) orph = sorted([p for p in pg if p not in tgt]) fs = { 'total_pages': len(ar), 'total_int': sum(r.get('int_count',0) for r in ar), 'total_ext': sum(r.get('ext_count',0) for r in ar), 'total_broken': sum(r.get('broken_int_count',0)+r.get('broken_ext_count',0) for r in ar), 'total_redirects': sum(r.get('redirect_int_count',0)+r.get('redirect_ext_count',0) for r in ar), 'total_flags': sum(r.get('follow_flag_count',0) for r in ar), 'total_dups': sum(r.get('duplicate_count',0) for r in ar), 'total_sug': sum(len(r.get('suggestions',[])) for r in ar), 'orphan_count': len(orph), 'orphan_urls': orph[:100], } update_run_status(sb, run_id, "completed", len(ar), fs) tt = time.time() - start_time log.append(f"✅ DONE! {len(ar)} pages in {tt:.0f}s · {len(orph)} orphans") audit_state.set_running(False) yield "\n".join(log[-40:]), f"✅ Done — {len(ar)} pages" except Exception as e: log.append(f"❌ {str(e)}") audit_state.set_running(False) try: c = get_completed_count(sb, run_id); update_run_status(sb, run_id, "paused", c) except: pass yield "\n".join(log[-40:]), "❌ Error" # ═══════════════════════════════════════════════════ # PAST RUNS # ═══════════════════════════════════════════════════ def load_runs_html(): _refresh_cache() if not _runs_cache: return "

No saved runs.

" html = '' html += '' for r in _runs_cache: s = r.get('summary', {}) or {} st = r.get('status', '?') sc = {'completed':'#059669','paused':'#d97706','running':'#2563eb'}.get(st,'#888') bg = {'completed':'rgba(5,150,105,0.1)','paused':'rgba(217,119,6,0.1)','running':'rgba(37,99,235,0.1)'}.get(st,'rgba(136,136,136,0.1)') cr = r.get('created_at','')[:16].replace('T',' ') html += f'' html += '
RunStatusPagesBrokenFlagsDupsOrphans
{r.get("name","?")}
{cr}
{st.upper()}{r.get("completed_urls",0)}/{r.get("total_urls",0)}{s.get("total_broken","—")}{s.get("total_flags","—")}{s.get("total_dups","—")}{s.get("orphan_count","—")}
' return html def load_runs_choices(): """Return plain list of label strings for dropdown. Uses cache from load_runs_html.""" choices = [] for r in _runs_cache: st = r.get('status', '?') label = f"{r.get('name','?')} [{st.upper()}] ({r.get('completed_urls',0)}/{r.get('total_urls',0)})" choices.append(label) return choices def generate_report_for_run(run_label, domain): if sb is None or not run_label: return None, "❌ No run selected." run_id = _get_run_id_by_label(run_label) if not run_id: return None, "❌ Run not found." try: run = next((r for r in _runs_cache if r['id'] == run_id), None) pages = get_all_page_results(sb, run_id) if not pages: return None, "⚠ No data." results = [p['result'] for p in pages] s = (run.get('summary', {}) or {}) if run else {} rh = generate_report(results, s.get('orphan_urls', []), run.get('domain', domain) if run else domain) tmp = tempfile.NamedTemporaryFile(delete=False, suffix='.html', prefix='Audit_') tmp.write(rh.encode('utf-8')); tmp.close() return tmp.name, f"✅ Report — {len(results)} pages" except Exception as e: return None, f"❌ {str(e)}" def generate_csv_for_run(run_label): if sb is None or not run_label: return None, "❌ No run selected." run_id = _get_run_id_by_label(run_label) if not run_id: return None, "❌ Run not found." try: pages = get_all_page_results(sb, run_id) if not pages: return None, "⚠ No data." rows = [{'URL': p['result'].get('url',''), 'Internal': p['result'].get('int_count',0), 'External': p['result'].get('ext_count',0), 'Broken': p['result'].get('broken_int_count',0)+p['result'].get('broken_ext_count',0), 'Redirects': p['result'].get('redirect_int_count',0)+p['result'].get('redirect_ext_count',0), 'Flags': p['result'].get('follow_flag_count',0), 'Dups': p['result'].get('duplicate_count',0)} for p in pages] tmp = tempfile.NamedTemporaryFile(delete=False, suffix='.csv', prefix='Audit_') pd.DataFrame(rows).to_csv(tmp.name, index=False); tmp.close() return tmp.name, f"✅ CSV — {len(rows)} rows" except Exception as e: return None, f"❌ {str(e)}" def delete_selected_run(run_label): if sb is None or not run_label: return "❌ No run selected." run_id = _get_run_id_by_label(run_label) if not run_id: return "❌ Run not found." try: delete_run(sb, run_id) return "🗑️ Deleted. Click Refresh." except Exception as e: return f"❌ {str(e)}" # ═══════════════════════════════════════════════════ # UI # ═══════════════════════════════════════════════════ with gr.Blocks(title="Link Audit Tool", theme=gr.themes.Soft()) as app: gr.HTML("""

SEO LINK AUDIT TOOL

🔗 Bulk Link Audit

Upload URLs → batch crawl → pause/resume → generate report

""") conn = "🗄️ ✅ Supabase Connected" if sb else "🗄️ ❌ Not Connected" gr.Markdown(f"**{conn}**") with gr.Tabs(): with gr.Tab("🔍 New Audit"): with gr.Row(): with gr.Column(scale=2): file_input = gr.File(label="Upload Excel / CSV", file_types=[".xlsx", ".csv", ".xls"]) pasted_urls = gr.Textbox(label="Or paste URLs (one per line)", lines=5) with gr.Column(scale=1): domain_input = gr.Textbox(label="Your Domain", value="edstellar.com") batch_size_input = gr.Slider(5, 50, value=25, step=5, label="Batch Size") timeout_input = gr.Slider(5, 60, value=15, step=5, label="Timeout (s)") delay_input = gr.Slider(0, 5, value=1.0, step=0.5, label="Delay (s)") workers_input = gr.Slider(1, 10, value=5, step=1, label="Parallel checks") with gr.Row(): run_btn = gr.Button("🚀 Run Audit", variant="primary", scale=2) pause_btn = gr.Button("⏸️ Pause", variant="stop", scale=1) progress_text = gr.Textbox(label="Status", interactive=False) log_output = gr.Textbox(label="Audit Log", lines=20, interactive=False) run_btn.click(api_name=False, fn=run_audit, inputs=[file_input, pasted_urls, domain_input, batch_size_input, timeout_input, delay_input, workers_input], outputs=[log_output, progress_text]) pause_btn.click(api_name=False, fn=pause_audit, outputs=[progress_text]) with gr.Tab("📁 Past Runs"): refresh_btn = gr.Button("🔄 Refresh", variant="secondary") runs_html = gr.HTML(value="

Click Refresh to load.

") # Dropdown uses plain string labels (no tuples, no UUIDs as values) # We look up the UUID from the label when needed run_dropdown = gr.Dropdown(label="Select Run", choices=[], interactive=True, allow_custom_value=True) with gr.Row(): report_btn = gr.Button("📊 HTML Report", variant="primary") csv_btn = gr.Button("📋 CSV", variant="secondary") resume_btn = gr.Button("▶️ Resume", variant="primary") delete_btn = gr.Button("🗑️ Delete", variant="stop") action_status = gr.Textbox(label="Status", interactive=False) with gr.Row(): report_file = gr.File(label="Report Download", interactive=False) csv_file = gr.File(label="CSV Download", interactive=False) gr.Markdown("---\n### Resume Controls") resume_progress = gr.Textbox(label="Resume Status", interactive=False) resume_log = gr.Textbox(label="Resume Log", lines=15, interactive=False) resume_pause_btn = gr.Button("⏸️ Pause Resume", variant="stop") # Refresh: load HTML first (which refreshes cache), then update dropdown choices refresh_btn.click(api_name=False, fn=load_runs_html, outputs=[runs_html]).then( api_name=False, fn=load_runs_choices, outputs=[run_dropdown]) report_btn.click(api_name=False, fn=generate_report_for_run, inputs=[run_dropdown, domain_input], outputs=[report_file, action_status]) csv_btn.click(api_name=False, fn=generate_csv_for_run, inputs=[run_dropdown], outputs=[csv_file, action_status]) delete_btn.click(api_name=False, fn=delete_selected_run, inputs=[run_dropdown], outputs=[action_status]) resume_btn.click(api_name=False, fn=resume_audit, inputs=[run_dropdown, domain_input, batch_size_input, timeout_input, delay_input, workers_input], outputs=[resume_log, resume_progress]) resume_pause_btn.click(api_name=False, fn=pause_audit, outputs=[resume_progress]) if __name__ == "__main__": app.queue().launch(server_name="0.0.0.0", server_port=7860)