vijaykumaredstellar's picture
Update app.py
828b2ac verified
"""
Link Audit Tool β€” Gradio UI for Hugging Face Spaces
"""
import gradio as gr
import pandas as pd
import time
import json
import os
import threading
import tempfile
from datetime import datetime
from audit_engine import audit_page, DEFAULT_BODY_SELECTORS, DEFAULT_SUGGESTION_MAP
from report_generator import generate_report
from db import (
get_client, create_run, get_all_runs, get_all_page_results,
get_completed_urls, get_pending_urls, get_completed_count,
save_batch_results, update_run_status, delete_run,
)
SUPABASE_URL = os.environ.get("SUPABASE_URL", "")
SUPABASE_KEY = os.environ.get("SUPABASE_KEY", "")
sb = None
if SUPABASE_URL and SUPABASE_KEY:
try:
sb = get_client(SUPABASE_URL, SUPABASE_KEY)
sb.table("audit_runs").select("id").limit(1).execute()
print("βœ… Supabase connected")
except Exception as e:
print(f"❌ Supabase failed: {e}")
sb = None
class AuditState:
def __init__(self):
self.lock = threading.Lock()
self.paused = False
self.running = False
self.run_id = None
def request_pause(self):
with self.lock: self.paused = True
def resume(self):
with self.lock: self.paused = False
def is_paused(self):
with self.lock: return self.paused
def set_running(self, val, run_id=None):
with self.lock:
self.running = val
if run_id: self.run_id = run_id
def is_running(self):
with self.lock: return self.running
audit_state = AuditState()
# ─── Global runs cache for dropdown ───
_runs_cache = []
def _refresh_cache():
global _runs_cache
if sb is None:
_runs_cache = []
return
_runs_cache = get_all_runs(sb) or []
def _get_run_id_by_label(label):
"""Look up run ID from dropdown label."""
for r in _runs_cache:
st = r.get('status', '?')
expected = f"{r.get('name','?')} [{st.upper()}] ({r.get('completed_urls',0)}/{r.get('total_urls',0)})"
if label == expected:
return r['id']
# Maybe it's a raw UUID
if label and len(label) > 30:
return label
return None
# ═══════════════════════════════════════════════════
def run_audit(file, pasted_urls, domain, batch_size, timeout, delay, workers):
if sb is None:
yield "❌ Supabase not connected.", ""
return
urls = []
if file is not None:
try:
fpath = file.name if hasattr(file, 'name') else file
df = pd.read_csv(fpath) if str(fpath).endswith('.csv') else pd.read_excel(fpath)
url_col = None
for col in df.columns:
sample = str(df[col].iloc[0]).strip().lower()
if sample.startswith('http') or domain in sample:
url_col = col; break
if not url_col: url_col = df.columns[0]
urls = [u for u in df[url_col].dropna().astype(str).str.strip().tolist() if u.startswith('http')]
except Exception as e:
yield f"❌ File error: {e}", ""; return
elif pasted_urls and pasted_urls.strip():
urls = [u.strip() for u in pasted_urls.strip().split('\n') if u.strip().startswith('http')]
if not urls:
yield "⚠ No valid URLs.", ""; return
seen = set()
unique = []
for u in urls:
if u not in seen: seen.add(u); unique.append(u)
urls = unique
run_name = f"{domain} Audit β€” {datetime.now().strftime('%b %d %H:%M')} β€” {len(urls)} pages"
run_id = create_run(sb, run_name, domain, len(urls), urls)
audit_state.set_running(True, run_id)
audit_state.resume()
total = len(urls)
batch_size, timeout, workers = int(batch_size), int(timeout), int(workers)
start_time = time.time()
batch_num = 0
log = []
yield f"πŸš€ {run_name}\nπŸ“¦ {total} URLs Β· Batch: {batch_size}", "▢️ Running..."
try:
for bs in range(0, total, batch_size):
if audit_state.is_paused():
c = get_completed_count(sb, run_id)
update_run_status(sb, run_id, "paused", c)
log.append(f"⏸️ PAUSED at {c}/{total}")
audit_state.set_running(False)
yield "\n".join(log[-40:]), f"⏸️ Paused β€” {c}/{total}"; return
be = min(bs + batch_size, total)
batch_urls = urls[bs:be]
batch_num += 1
batch_results = []
for j, url in enumerate(batch_urls):
if audit_state.is_paused():
if batch_results: save_batch_results(sb, run_id, batch_results)
c = get_completed_count(sb, run_id)
update_run_status(sb, run_id, "paused", c)
log.append(f"⏸️ PAUSED at {c}/{total}")
audit_state.set_running(False)
yield "\n".join(log[-40:]), f"⏸️ Paused β€” {c}/{total}"; return
gi = bs + j + 1
elapsed = time.time() - start_time
eta = (elapsed / gi) * (total - gi)
eta_s = f"{int(eta//60)}m{int(eta%60)}s" if eta > 60 else f"{eta:.0f}s"
result = audit_page(url, domain, DEFAULT_BODY_SELECTORS,
suggestion_map=DEFAULT_SUGGESTION_MAP, timeout=timeout, concurrent_workers=workers)
batch_results.append(result)
short = url.replace('https://www.', '').replace('https://', '')[:70]
if result['error']:
log.append(f"❌ [{gi}/{total}] {short} β€” {result['error'][:50]}")
else:
b = result['broken_int_count'] + result['broken_ext_count']
fc = result['follow_flag_count']
d = result['duplicate_count']
fl = []
if b: fl.append(f"πŸ”΄{b}broken")
if fc: fl.append(f"🟑{fc}flags")
if d: fl.append(f"🟣{d}dups")
fs = " ".join(fl) if fl else "βœ…"
log.append(f"[{gi}/{total}] {short} β€” Int:{result['int_count']} Ext:{result['ext_count']} {fs}")
yield "\n".join(log[-40:]), f"πŸ“Š {gi}/{total} ({gi*100//total}%) Batch{batch_num} ETA:{eta_s}"
if j < len(batch_urls) - 1: time.sleep(delay)
if batch_results:
try:
save_batch_results(sb, run_id, batch_results)
c = get_completed_count(sb, run_id)
update_run_status(sb, run_id, "running", c)
log.append(f"πŸ’Ύ Batch {batch_num} saved β€” {c}/{total}")
except Exception as e:
log.append(f"⚠ Save error: {str(e)[:60]}")
yield "\n".join(log[-40:]), f"πŸ’Ύ Batch {batch_num} saved"
del batch_results
log.append("πŸ” Orphan analysis...")
yield "\n".join(log[-40:]), "πŸ” Orphan analysis..."
all_pages = get_all_page_results(sb, run_id)
all_results = [p['result'] for p in all_pages]
targets, pg_urls = set(), set()
for r in all_results:
pg_urls.add(r['url'].rstrip('/').split('?')[0])
for lk in r.get('internal_links', []): targets.add(lk['url'].rstrip('/').split('?')[0])
orphans = sorted([p for p in pg_urls if p not in targets])
summary = {
'total_pages': len(all_results), 'total_int': sum(r.get('int_count',0) for r in all_results),
'total_ext': sum(r.get('ext_count',0) for r in all_results),
'total_broken': sum(r.get('broken_int_count',0)+r.get('broken_ext_count',0) for r in all_results),
'total_redirects': sum(r.get('redirect_int_count',0)+r.get('redirect_ext_count',0) for r in all_results),
'total_flags': sum(r.get('follow_flag_count',0) for r in all_results),
'total_dups': sum(r.get('duplicate_count',0) for r in all_results),
'total_sug': sum(len(r.get('suggestions',[])) for r in all_results),
'orphan_count': len(orphans), 'orphan_urls': orphans[:100],
}
update_run_status(sb, run_id, "completed", len(all_results), summary)
tt = time.time() - start_time
log.append(f"βœ… DONE! {len(all_results)} pages in {tt:.0f}s Β· {len(orphans)} orphans")
log.append(f"Broken:{summary['total_broken']} Redirects:{summary['total_redirects']} Flags:{summary['total_flags']} Dups:{summary['total_dups']}")
log.append("β†’ Past Runs β†’ Refresh β†’ Generate Report")
audit_state.set_running(False)
yield "\n".join(log[-40:]), f"βœ… Done β€” {len(all_results)} pages in {tt:.0f}s"
except Exception as e:
log.append(f"❌ {str(e)}")
audit_state.set_running(False)
try: c = get_completed_count(sb, run_id); update_run_status(sb, run_id, "paused", c)
except: pass
yield "\n".join(log[-40:]), "❌ Error β€” progress saved"
def pause_audit():
if audit_state.is_running():
audit_state.request_pause()
return "⏸️ Stopping after current page..."
return "No audit running."
# ═══════════════════════════════════════════════════
def resume_audit(run_label, domain, batch_size, timeout, delay, workers):
if sb is None:
yield "❌ Supabase not connected.", ""; return
if not run_label:
yield "⚠ Select a run first (click Refresh, then pick from dropdown).", ""; return
run_id = _get_run_id_by_label(run_label)
if not run_id:
yield f"❌ Could not find run for: {run_label}", ""; return
all_urls = get_pending_urls(sb, run_id)
done = get_completed_urls(sb, run_id)
remaining = [u for u in all_urls if u not in done]
if not remaining:
update_run_status(sb, run_id, "completed", len(done))
yield "βœ… Already complete!", ""; return
try:
rd = next((r for r in _runs_cache if r['id'] == run_id), None)
if rd: domain = rd.get('domain', domain)
except: pass
audit_state.set_running(True, run_id)
audit_state.resume()
update_run_status(sb, run_id, "running")
total = len(all_urls)
batch_size, timeout, workers = int(batch_size), int(timeout), int(workers)
start_time = time.time()
bn = 0
log = [f"▢️ Resuming β€” {len(remaining)} left ({len(done)} done)"]
yield "\n".join(log), f"Resuming: {len(done)}/{total}"
try:
for bs in range(0, len(remaining), batch_size):
if audit_state.is_paused():
c = get_completed_count(sb, run_id)
update_run_status(sb, run_id, "paused", c)
log.append(f"⏸️ PAUSED {c}/{total}")
audit_state.set_running(False)
yield "\n".join(log[-40:]), f"⏸️ Paused {c}/{total}"; return
be = min(bs + batch_size, len(remaining))
bu = remaining[bs:be]
bn += 1; br = []
for j, url in enumerate(bu):
if audit_state.is_paused():
if br: save_batch_results(sb, run_id, br)
c = get_completed_count(sb, run_id)
update_run_status(sb, run_id, "paused", c)
log.append(f"⏸️ PAUSED {c}/{total}")
audit_state.set_running(False)
yield "\n".join(log[-40:]), f"⏸️ Paused {c}/{total}"; return
gi = len(done) + bs + j + 1
elapsed = time.time() - start_time
proc = bs + j + 1
eta = (elapsed / proc) * (len(remaining) - proc)
eta_s = f"{int(eta//60)}m{int(eta%60)}s" if eta > 60 else f"{eta:.0f}s"
result = audit_page(url, domain, DEFAULT_BODY_SELECTORS,
suggestion_map=DEFAULT_SUGGESTION_MAP, timeout=timeout, concurrent_workers=workers)
br.append(result)
short = url.replace('https://www.', '').replace('https://', '')[:70]
if result['error']:
log.append(f"❌ [{gi}/{total}] {short}")
else:
b = result['broken_int_count'] + result['broken_ext_count']
log.append(f"[{gi}/{total}] {short} {'πŸ”΄'+str(b) if b else 'βœ…'}")
yield "\n".join(log[-40:]), f"πŸ“Š {gi}/{total} ({gi*100//total}%) ETA:{eta_s}"
if j < len(bu) - 1: time.sleep(delay)
if br:
save_batch_results(sb, run_id, br)
c = get_completed_count(sb, run_id)
update_run_status(sb, run_id, "running", c)
log.append(f"πŸ’Ύ Batch {bn} β€” {c}/{total}")
del br
log.append("πŸ” Orphan analysis...")
yield "\n".join(log[-40:]), "πŸ” Orphans..."
ap = get_all_page_results(sb, run_id)
ar = [p['result'] for p in ap]
tgt, pg = set(), set()
for r in ar:
pg.add(r['url'].rstrip('/').split('?')[0])
for lk in r.get('internal_links', []): tgt.add(lk['url'].rstrip('/').split('?')[0])
orph = sorted([p for p in pg if p not in tgt])
fs = {
'total_pages': len(ar), 'total_int': sum(r.get('int_count',0) for r in ar),
'total_ext': sum(r.get('ext_count',0) for r in ar),
'total_broken': sum(r.get('broken_int_count',0)+r.get('broken_ext_count',0) for r in ar),
'total_redirects': sum(r.get('redirect_int_count',0)+r.get('redirect_ext_count',0) for r in ar),
'total_flags': sum(r.get('follow_flag_count',0) for r in ar),
'total_dups': sum(r.get('duplicate_count',0) for r in ar),
'total_sug': sum(len(r.get('suggestions',[])) for r in ar),
'orphan_count': len(orph), 'orphan_urls': orph[:100],
}
update_run_status(sb, run_id, "completed", len(ar), fs)
tt = time.time() - start_time
log.append(f"βœ… DONE! {len(ar)} pages in {tt:.0f}s Β· {len(orph)} orphans")
audit_state.set_running(False)
yield "\n".join(log[-40:]), f"βœ… Done β€” {len(ar)} pages"
except Exception as e:
log.append(f"❌ {str(e)}")
audit_state.set_running(False)
try: c = get_completed_count(sb, run_id); update_run_status(sb, run_id, "paused", c)
except: pass
yield "\n".join(log[-40:]), "❌ Error"
# ═══════════════════════════════════════════════════
# PAST RUNS
# ═══════════════════════════════════════════════════
def load_runs_html():
_refresh_cache()
if not _runs_cache:
return "<p>No saved runs.</p>"
html = '<table style="width:100%;border-collapse:collapse;font-size:13px;">'
html += '<tr style="background:#f1f5f9;"><th style="padding:8px;text-align:left;">Run</th><th style="padding:8px;text-align:center;">Status</th><th style="padding:8px;text-align:center;">Pages</th><th style="padding:8px;text-align:center;">Broken</th><th style="padding:8px;text-align:center;">Flags</th><th style="padding:8px;text-align:center;">Dups</th><th style="padding:8px;text-align:center;">Orphans</th></tr>'
for r in _runs_cache:
s = r.get('summary', {}) or {}
st = r.get('status', '?')
sc = {'completed':'#059669','paused':'#d97706','running':'#2563eb'}.get(st,'#888')
bg = {'completed':'rgba(5,150,105,0.1)','paused':'rgba(217,119,6,0.1)','running':'rgba(37,99,235,0.1)'}.get(st,'rgba(136,136,136,0.1)')
cr = r.get('created_at','')[:16].replace('T',' ')
html += f'<tr style="border-bottom:1px solid #e2e8f0;"><td style="padding:8px;"><b>{r.get("name","?")}</b><br><span style="font-size:10px;color:#94a3b8;">{cr}</span></td><td style="padding:8px;text-align:center;"><span style="background:{bg};color:{sc};padding:2px 8px;border-radius:10px;font-size:10px;font-weight:700;">{st.upper()}</span></td><td style="padding:8px;text-align:center;font-weight:700;">{r.get("completed_urls",0)}/{r.get("total_urls",0)}</td><td style="padding:8px;text-align:center;color:#dc2626;font-weight:700;">{s.get("total_broken","β€”")}</td><td style="padding:8px;text-align:center;color:#dc2626;font-weight:700;">{s.get("total_flags","β€”")}</td><td style="padding:8px;text-align:center;color:#db2777;font-weight:700;">{s.get("total_dups","β€”")}</td><td style="padding:8px;text-align:center;color:#dc2626;font-weight:700;">{s.get("orphan_count","β€”")}</td></tr>'
html += '</table>'
return html
def load_runs_choices():
"""Return plain list of label strings for dropdown. Uses cache from load_runs_html."""
choices = []
for r in _runs_cache:
st = r.get('status', '?')
label = f"{r.get('name','?')} [{st.upper()}] ({r.get('completed_urls',0)}/{r.get('total_urls',0)})"
choices.append(label)
return choices
def generate_report_for_run(run_label, domain):
if sb is None or not run_label:
return None, "❌ No run selected."
run_id = _get_run_id_by_label(run_label)
if not run_id:
return None, "❌ Run not found."
try:
run = next((r for r in _runs_cache if r['id'] == run_id), None)
pages = get_all_page_results(sb, run_id)
if not pages: return None, "⚠ No data."
results = [p['result'] for p in pages]
s = (run.get('summary', {}) or {}) if run else {}
rh = generate_report(results, s.get('orphan_urls', []), run.get('domain', domain) if run else domain)
tmp = tempfile.NamedTemporaryFile(delete=False, suffix='.html', prefix='Audit_')
tmp.write(rh.encode('utf-8')); tmp.close()
return tmp.name, f"βœ… Report β€” {len(results)} pages"
except Exception as e:
return None, f"❌ {str(e)}"
def generate_csv_for_run(run_label):
if sb is None or not run_label:
return None, "❌ No run selected."
run_id = _get_run_id_by_label(run_label)
if not run_id: return None, "❌ Run not found."
try:
pages = get_all_page_results(sb, run_id)
if not pages: return None, "⚠ No data."
rows = [{'URL': p['result'].get('url',''), 'Internal': p['result'].get('int_count',0),
'External': p['result'].get('ext_count',0),
'Broken': p['result'].get('broken_int_count',0)+p['result'].get('broken_ext_count',0),
'Redirects': p['result'].get('redirect_int_count',0)+p['result'].get('redirect_ext_count',0),
'Flags': p['result'].get('follow_flag_count',0),
'Dups': p['result'].get('duplicate_count',0)} for p in pages]
tmp = tempfile.NamedTemporaryFile(delete=False, suffix='.csv', prefix='Audit_')
pd.DataFrame(rows).to_csv(tmp.name, index=False); tmp.close()
return tmp.name, f"βœ… CSV β€” {len(rows)} rows"
except Exception as e:
return None, f"❌ {str(e)}"
def delete_selected_run(run_label):
if sb is None or not run_label:
return "❌ No run selected."
run_id = _get_run_id_by_label(run_label)
if not run_id: return "❌ Run not found."
try:
delete_run(sb, run_id)
return "πŸ—‘οΈ Deleted. Click Refresh."
except Exception as e:
return f"❌ {str(e)}"
# ═══════════════════════════════════════════════════
# UI
# ═══════════════════════════════════════════════════
with gr.Blocks(title="Link Audit Tool", theme=gr.themes.Soft()) as app:
gr.HTML("""<div style="background:linear-gradient(135deg,#1e3a5f,#2563eb);padding:24px 28px;border-radius:12px;color:white;margin-bottom:16px;">
<p style="font-size:10px;font-weight:700;letter-spacing:1.5px;text-transform:uppercase;color:#93c5fd;margin-bottom:8px;">SEO LINK AUDIT TOOL</p>
<h1 style="margin:0 0 4px 0;font-size:24px;">πŸ”— Bulk Link Audit</h1>
<p style="margin:0;opacity:0.8;font-size:13px;">Upload URLs β†’ batch crawl β†’ pause/resume β†’ generate report</p></div>""")
conn = "πŸ—„οΈ βœ… Supabase Connected" if sb else "πŸ—„οΈ ❌ Not Connected"
gr.Markdown(f"**{conn}**")
with gr.Tabs():
with gr.Tab("πŸ” New Audit"):
with gr.Row():
with gr.Column(scale=2):
file_input = gr.File(label="Upload Excel / CSV", file_types=[".xlsx", ".csv", ".xls"])
pasted_urls = gr.Textbox(label="Or paste URLs (one per line)", lines=5)
with gr.Column(scale=1):
domain_input = gr.Textbox(label="Your Domain", value="edstellar.com")
batch_size_input = gr.Slider(5, 50, value=25, step=5, label="Batch Size")
timeout_input = gr.Slider(5, 60, value=15, step=5, label="Timeout (s)")
delay_input = gr.Slider(0, 5, value=1.0, step=0.5, label="Delay (s)")
workers_input = gr.Slider(1, 10, value=5, step=1, label="Parallel checks")
with gr.Row():
run_btn = gr.Button("πŸš€ Run Audit", variant="primary", scale=2)
pause_btn = gr.Button("⏸️ Pause", variant="stop", scale=1)
progress_text = gr.Textbox(label="Status", interactive=False)
log_output = gr.Textbox(label="Audit Log", lines=20, interactive=False)
run_btn.click(api_name=False, fn=run_audit,
inputs=[file_input, pasted_urls, domain_input, batch_size_input, timeout_input, delay_input, workers_input],
outputs=[log_output, progress_text])
pause_btn.click(api_name=False, fn=pause_audit, outputs=[progress_text])
with gr.Tab("πŸ“ Past Runs"):
refresh_btn = gr.Button("πŸ”„ Refresh", variant="secondary")
runs_html = gr.HTML(value="<p>Click Refresh to load.</p>")
# Dropdown uses plain string labels (no tuples, no UUIDs as values)
# We look up the UUID from the label when needed
run_dropdown = gr.Dropdown(label="Select Run", choices=[], interactive=True, allow_custom_value=True)
with gr.Row():
report_btn = gr.Button("πŸ“Š HTML Report", variant="primary")
csv_btn = gr.Button("πŸ“‹ CSV", variant="secondary")
resume_btn = gr.Button("▢️ Resume", variant="primary")
delete_btn = gr.Button("πŸ—‘οΈ Delete", variant="stop")
action_status = gr.Textbox(label="Status", interactive=False)
with gr.Row():
report_file = gr.File(label="Report Download", interactive=False)
csv_file = gr.File(label="CSV Download", interactive=False)
gr.Markdown("---\n### Resume Controls")
resume_progress = gr.Textbox(label="Resume Status", interactive=False)
resume_log = gr.Textbox(label="Resume Log", lines=15, interactive=False)
resume_pause_btn = gr.Button("⏸️ Pause Resume", variant="stop")
# Refresh: load HTML first (which refreshes cache), then update dropdown choices
refresh_btn.click(api_name=False, fn=load_runs_html, outputs=[runs_html]).then(
api_name=False, fn=load_runs_choices, outputs=[run_dropdown])
report_btn.click(api_name=False, fn=generate_report_for_run, inputs=[run_dropdown, domain_input], outputs=[report_file, action_status])
csv_btn.click(api_name=False, fn=generate_csv_for_run, inputs=[run_dropdown], outputs=[csv_file, action_status])
delete_btn.click(api_name=False, fn=delete_selected_run, inputs=[run_dropdown], outputs=[action_status])
resume_btn.click(api_name=False, fn=resume_audit,
inputs=[run_dropdown, domain_input, batch_size_input, timeout_input, delay_input, workers_input],
outputs=[resume_log, resume_progress])
resume_pause_btn.click(api_name=False, fn=pause_audit, outputs=[resume_progress])
if __name__ == "__main__":
app.queue().launch(server_name="0.0.0.0", server_port=7860)