Watchtower / app.py
mehtabchandio's picture
Update app.py
27c6876 verified
# Optional: uncomment in Colab if packages missing
# !pip install gradio requests python-whois pandas matplotlib reportlab pillow openpyxl
import os
import uuid
import time
import json
from io import BytesIO
from datetime import datetime, timezone
from typing import List
import requests
import socket
import ssl
import whois
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image as PILImage
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
from reportlab.lib import colors
from reportlab.lib.utils import ImageReader
import gradio as gr
# ---------------- CONFIG ----------------
CONFIG = {
# Your VirusTotal API key (if available). If empty, VT checks are skipped.
"VT_API_KEY": "E4hD6fHIllgOjL4oxSV6FLsWpw0r5fCw",
"TIMEOUT": 45, # network timeout (>=40s as requested)
"TMP_DIR": "/tmp"
}
os.makedirs(CONFIG["TMP_DIR"], exist_ok=True)
# ---------------- Helpers / Checks ----------------
def normalize_url(url: str) -> str:
if not url:
raise ValueError("Empty URL")
url = url.strip()
if not url.startswith("http://") and not url.startswith("https://"):
url = "http://" + url
return url
def check_website_status(url: str) -> dict:
try:
r = requests.get(url, timeout=CONFIG["TIMEOUT"], allow_redirects=True)
return {"status": ("Up" if r.ok else "Down"), "http_status": r.status_code}
except Exception as e:
return {"error": str(e)}
def check_ssl_expiration(url: str) -> dict:
try:
hostname = normalize_url(url).split("//")[-1].split("/")[0]
ctx = ssl.create_default_context()
with socket.create_connection((hostname, 443), timeout=CONFIG["TIMEOUT"]) as sock:
with ctx.wrap_socket(sock, server_hostname=hostname) as ssock:
cert = ssock.getpeercert()
not_after = cert.get("notAfter")
if not_after:
# e.g., 'Jul 30 12:00:00 2025 GMT'
expiry_dt = datetime.strptime(not_after, "%b %d %H:%M:%S %Y %Z")
# make timezone-aware UTC
expiry_dt = expiry_dt.replace(tzinfo=timezone.utc)
days_left = (expiry_dt - datetime.now(timezone.utc)).days
return {"expires_on": expiry_dt.isoformat(), "days_left": int(days_left)}
return {"error": "No notAfter in certificate"}
except Exception as e:
return {"error": str(e)}
def check_domain_registration(url: str) -> dict:
try:
host = normalize_url(url).split("//")[-1].split("/")[0]
w = whois.whois(host)
created = w.creation_date
if isinstance(created, list):
created = created[0] if created else None
if created is None:
return {"error": "whois created date not found"}
# convert naive to timezone-aware if needed
if isinstance(created, datetime):
created_dt = created
if created_dt.tzinfo is None:
created_dt = created_dt.replace(tzinfo=timezone.utc)
age_days = (datetime.now(timezone.utc) - created_dt).days
return {"creation_date": created_dt.isoformat(), "age_days": int(age_days)}
return {"creation_date": str(created)}
except Exception as e:
return {"error": str(e)}
def check_similar_domains(url: str) -> dict:
try:
host = normalize_url(url).split("//")[-1].split("/")[0]
base = host.split(".")[0]
candidates = [f"{base}.com", f"{base}.net", f"{base}.org"]
found = {}
for d in candidates:
try:
ip = socket.gethostbyname(d)
found[d] = {"resolved": True, "ip": ip}
except Exception:
found[d] = {"resolved": False}
return {"variants": found}
except Exception as e:
return {"error": str(e)}
def check_threat_intel(url: str) -> dict:
# Query VirusTotal domain endpoint if API key provided
api_key = CONFIG.get("VT_API_KEY") or ""
if not api_key:
return {"note": "VirusTotal API key not configured"}
try:
domain = normalize_url(url).split("//")[-1].split("/")[0]
headers = {"x-apikey": api_key}
resp = requests.get(f"https://www.virustotal.com/api/v3/domains/{domain}", headers=headers, timeout=CONFIG["TIMEOUT"])
if resp.status_code != 200:
return {"error": f"VirusTotal returned {resp.status_code}"}
data = resp.json()
stats = data.get("data", {}).get("attributes", {}).get("last_analysis_stats", {})
malicious = int(stats.get("malicious", 0))
suspicious = int(stats.get("suspicious", 0))
harmless = int(stats.get("harmless", 0))
return {"malicious": malicious, "suspicious": suspicious, "harmless": harmless}
except Exception as e:
return {"error": str(e)}
def check_sensitive_info(url: str) -> dict:
try:
r = requests.get(url, timeout=CONFIG["TIMEOUT"], allow_redirects=True)
text = r.text.lower()
suspicious_terms = ["credit card", "cvv", "card number", "social security", "ssn", "bank account", "routing number"]
found = [t for t in suspicious_terms if t in text]
return {"suspicious_terms": found}
except Exception as e:
return {"error": str(e)}
def check_cookies(url: str) -> dict:
try:
r = requests.get(url, timeout=CONFIG["TIMEOUT"], allow_redirects=True)
cookies = r.cookies.get_dict()
consent = ("cookie" in r.text.lower()) and ("consent" in r.text.lower() or "accept" in r.text.lower())
return {"cookies": cookies, "cookie_consent_detected": bool(consent)}
except Exception as e:
return {"error": str(e)}
def check_owasp_light(url: str) -> dict:
try:
r = requests.get(url, timeout=CONFIG["TIMEOUT"], allow_redirects=True)
text = r.text.lower()
password_inputs = "password" in text
xss_indicators = any(p in text for p in ["eval(", "innerhtml", "document.write", "<script"])
return {"password_inputs_found": bool(password_inputs), "xss_indicators": bool(xss_indicators)}
except Exception as e:
return {"error": str(e)}
def check_plugins_heuristic(url: str) -> dict:
try:
host = normalize_url(url).split("//")[-1].split("/")[0]
base = f"http://{host}"
results = {}
for path in ["/wp-login.php", "/administrator/", "/user/login"]:
try:
r = requests.get(base + path, timeout=10, allow_redirects=True)
results[path] = {"status_code": r.status_code, "ok": r.ok}
except Exception as e:
results[path] = {"error": str(e)}
return results
except Exception as e:
return {"error": str(e)}
def dns_enumeration(url: str) -> dict:
try:
host = normalize_url(url).split("//")[-1].split("/")[0]
try:
ip = socket.gethostbyname(host)
except Exception as e:
ip = None
return {"hostname": host, "resolved_ip": ip}
except Exception as e:
return {"error": str(e)}
# ---------------- Mapping ----------------
CHECK_FUNCTIONS = {
"Website Status (Up/Down)": check_website_status,
"SSL Certificate Expiration": check_ssl_expiration,
"Domain Registration Date": check_domain_registration,
"Similar Domain Names": check_similar_domains,
"Threat Intelligence (VirusTotal)": check_threat_intel,
"Sensitive Info Request Check": check_sensitive_info,
"Cookie/Consent Check": check_cookies,
"OWASP Top-10 Lightweight Scan": check_owasp_light,
"Plugins/CMS Heuristics": check_plugins_heuristic,
"DNS Enumeration": dns_enumeration,
}
OUTPUT_CHOICES = ["Quick Summary", "Detailed Summary", "Visualization (PNG)"]
# ---------------- Summaries & Safety ----------------
def format_quick_summary(results: dict) -> str:
lines = []
for k, v in results.items():
if isinstance(v, dict):
if "status" in v:
lines.append(f"{k}: {v.get('status')} (HTTP {v.get('http_status')})")
elif "error" in v:
lines.append(f"{k}: ERROR - {v.get('error')}")
else:
lines.append(f"{k}: {json.dumps(v, default=str)[:300]}")
else:
lines.append(f"{k}: {str(v)[:300]}")
return "\n".join(lines) if lines else "No findings."
def format_detailed_summary(results: dict) -> str:
try:
return json.dumps(results, indent=2, default=str)
except Exception:
return str(results)
def evaluate_and_build_recommendation(results: dict) -> str:
messages: List[str] = []
# VirusTotal
vt = results.get("Threat Intelligence (VirusTotal)")
if isinstance(vt, dict):
if vt.get("error"):
messages.append(f"⚠️ VirusTotal check error: {vt.get('error')}")
else:
mal = int(vt.get("malicious", 0))
sus = int(vt.get("suspicious", 0))
if (mal + sus) >= 2:
messages.append("❌ VirusTotal flagged multiple detections — DO NOT use this URL.")
else:
messages.append(f"ℹ️ VirusTotal detections: malicious={mal}, suspicious={sus}")
# Website status
ws = results.get("Website Status (Up/Down)")
if isinstance(ws, dict) and ws.get("status") == "Down":
messages.append("🔴 Website appears DOWN — treat as potentially unsafe.")
# SSL expiry
sslv = results.get("SSL Certificate Expiration")
if isinstance(sslv, dict) and "days_left" in sslv:
try:
if int(sslv["days_left"]) < 40:
messages.append("⚠️ SSL certificate expires in <40 days — renew soon.")
except Exception:
pass
# Domain age
who = results.get("Domain Registration Date")
if isinstance(who, dict) and "age_days" in who:
try:
if int(who["age_days"]) < 120:
messages.append("⚠️ Domain registered within last 120 days — verify legitimacy.")
except Exception:
pass
# Plugins/CMS heuristics
plugins = results.get("Plugins/CMS Heuristics")
if isinstance(plugins, dict):
for path, info in plugins.items():
if isinstance(info, dict) and info.get("status_code") == 200:
messages.append(f"⚠️ Attention: {path} returns 200 — admin/login page exposed; restrict access.")
# Similar domains
sim = results.get("Similar Domain Names")
if isinstance(sim, dict) and "variants" in sim:
variants = sim["variants"]
deceptive = [d for d, meta in variants.items() if meta.get("resolved")]
if deceptive:
messages.append(f"⚠️ Similar domains resolved: {', '.join(deceptive)} — possible deception.")
# Sensitive info
sens = results.get("Sensitive Info Request Check")
if isinstance(sens, dict) and sens.get("suspicious_terms"):
if len(sens["suspicious_terms"]) > 0:
messages.append("⚠️ Attention: Site requests sensitive/banking info (terms found).")
# OWASP
owasp = results.get("OWASP Top-10 Lightweight Scan")
if isinstance(owasp, dict) and any(owasp.values()):
messages.append("⚠️ OWASP Top-10 indicators found — developer attention required.")
if not messages:
messages.append("🟢 This URL looks safe to use.")
return "\n".join(messages)
# ---------------- Visualization & Exports ----------------
def generate_visualization_pil(results: dict) -> PILImage.Image:
try:
labels = []
values = []
for k, v in results.items():
labels.append(k)
val = 0
if isinstance(v, dict):
if v.get("error"):
val = 0
elif "status" in v:
val = 1 if v.get("status") == "Up" else 0
elif "malicious" in v or "suspicious" in v:
# treat threat intel: safe if detections <2
mal = int(v.get("malicious", 0) or 0)
sus = int(v.get("suspicious", 0) or 0)
val = 0 if (mal + sus) >= 2 else 1
else:
# heuristics: if any truthy value => safe
joined = " ".join([str(x) for x in v.values()]).lower()
val = 1 if any(tok in joined for tok in ["ok", "up", "harmless", "safe", "true"]) else 0
elif isinstance(v, list):
val = 1 if len(v) == 0 else 0
elif isinstance(v, str):
val = 1 if any(tok in v.lower() for tok in ["safe", "up", "ok"]) else 0
values.append(val)
# plot
fig_h = max(2, len(labels) * 0.35)
plt.figure(figsize=(8, fig_h))
bars = plt.barh(range(len(labels)), values, align="center")
plt.yticks(range(len(labels)), labels)
plt.xlim(-0.1, 1.1)
plt.xlabel("Safety (1 = likely safe, 0 = potential risk)")
plt.title("Website Security Visualization")
for i, bar in enumerate(bars):
bar.set_color("green" if values[i] == 1 else "red")
plt.tight_layout()
buf = BytesIO()
plt.savefig(buf, format="png", bbox_inches="tight")
plt.close()
buf.seek(0)
pil = PILImage.open(buf).convert("RGBA")
return pil
except Exception:
return None
def export_csv_file(results: dict) -> str:
df = pd.DataFrame([{"Check": k, "Result": json.dumps(v, default=str)} for k, v in results.items()])
filename = f"websec_{uuid.uuid4().hex}.csv"
path = os.path.join(CONFIG["TMP_DIR"], filename)
df.to_csv(path, index=False)
return path
def export_excel_file(results: dict) -> str:
df = pd.DataFrame([{"Check": k, "Result": json.dumps(v, default=str)} for k, v in results.items()])
filename = f"websec_{uuid.uuid4().hex}.xlsx"
path = os.path.join(CONFIG["TMP_DIR"], filename)
df.to_excel(path, index=False, engine="openpyxl")
return path
def export_pdf_file(results: dict, recommendation: str, viz_pil: PILImage.Image = None) -> str:
filename = f"websec_{uuid.uuid4().hex}.pdf"
path = os.path.join(CONFIG["TMP_DIR"], filename)
c = canvas.Canvas(path, pagesize=letter)
w, h = letter
# Title
c.setFont("Helvetica-Bold", 16)
c.drawCentredString(w / 2, h - 50, "Website Security Scan Report")
c.setFont("Helvetica", 10)
y = h - 80
for k, v in results.items():
line = f"{k}: {json.dumps(v, default=str)}"
is_risky = ("error" in (str(v).lower())) or ("suspicious" in str(v).lower()) or ("down" in str(v).lower())
c.setFillColor(colors.red if is_risky else colors.black)
# wrap long lines
max_chars = 90
while line:
piece = line[:max_chars]
c.drawString(40, y, piece)
line = line[max_chars:]
y -= 12
if y < 120:
c.showPage()
y = h - 50
c.setFont("Helvetica", 10)
# Recommendation (color risky lines red)
if y < 160:
c.showPage()
y = h - 50
c.setFont("Helvetica-Bold", 12)
c.drawString(40, y - 20, "Safety Recommendation:")
rec = recommendation
max_chars = 90
y -= 40
for line in rec.split("\n"):
color = colors.red if ("⚠️" in line or "❌" in line) else colors.green
c.setFillColor(color)
# wrap
while line:
piece = line[:max_chars]
c.drawString(40, y, piece)
line = line[max_chars:]
y -= 12
if y < 80:
c.showPage()
y = h - 50
# embed visualization if provided (save to temp PNG and draw)
if viz_pil is not None:
try:
chart_path = os.path.join(CONFIG["TMP_DIR"], f"chart_{uuid.uuid4().hex}.png")
viz_pil.save(chart_path)
c.drawImage(chart_path, 40, 40, width=w - 80, height=200, preserveAspectRatio=True)
except Exception:
pass
c.save()
return path
# ---------------- UI runner ----------------
def ui_run(url: str, selected_checks: list, output_options: list, run_all: bool = False):
# outputs: quick, detailed, viz_pil (PIL.Image or None), csv_path, excel_path, pdf_path, safety_msg
if not url:
return "Please provide a valid URL.", "", None, None, None, None, "Please provide a valid URL."
try:
url_norm = normalize_url(url)
except Exception as e:
return f"Invalid URL: {e}", "", None, None, None, None, f"Invalid URL: {e}"
chosen = list(CHECK_FUNCTIONS.keys()) if run_all else (selected_checks or [])
if not chosen:
return "No checks selected.", "", None, None, None, None, "No checks selected."
results = {}
for chk in chosen:
func = CHECK_FUNCTIONS.get(chk)
if not func:
results[chk] = {"error": "Unknown check"}
continue
try:
# small throttle
time.sleep(0.1)
results[chk] = func(url_norm)
except Exception as e:
results[chk] = {"error": str(e)}
# Build recommendation (uses results including VT)
safety_msg = evaluate_and_build_recommendation(results)
quick = format_quick_summary(results) if ("Quick Summary" in (output_options or [])) else ""
detailed = format_detailed_summary(results) if ("Detailed Summary" in (output_options or [])) else ""
viz = generate_visualization_pil(results) if ("Visualization (PNG)" in (output_options or [])) else None
# create files on disk for gr.File outputs (only if requested)
csv_path = export_csv_file(results) if ("Quick Summary" in (output_options or [])) else export_csv_file(results)
excel_path = export_excel_file(results) if ("Detailed Summary" in (output_options or [])) else export_excel_file(results)
pdf_path = export_pdf_file(results, safety_msg, viz) if ("Detailed Summary" in (output_options or [])) else export_pdf_file(results, safety_msg, viz)
return quick, detailed, viz, csv_path, excel_path, pdf_path, safety_msg
# ---------------- Build Gradio UI ----------------
with gr.Blocks() as demo:
gr.Markdown("## 🛡️ Web Security Scanner — Dashboard\nSelect checks and outputs, then Run Selected or Run All.\nNetwork timeout set to >=40s.")
url_input = gr.Textbox(label="Target URL", placeholder="https://example.com", lines=1)
checks_box = gr.CheckboxGroup(choices=list(CHECK_FUNCTIONS.keys()), label="Select Checks")
outputs_box = gr.CheckboxGroup(choices=OUTPUT_CHOICES, label="Output Options")
with gr.Row():
run_selected_btn = gr.Button("Run Selected Checks", variant="primary")
run_all_btn = gr.Button("Run All Checks", variant="secondary")
with gr.Tabs():
with gr.Tab("Quick Summary"):
quick_out = gr.Textbox(label="Quick Summary", lines=8)
with gr.Tab("Detailed Summary"):
detailed_out = gr.Textbox(label="Detailed Summary (JSON)", lines=12)
with gr.Tab("Visualization"):
viz_out = gr.Image(label="Visualization Chart", type="pil")
with gr.Tab("Safety Recommendation"):
safety_out = gr.Textbox(label="Safety Recommendation", lines=6)
csv_file_out = gr.File(label="Download CSV")
excel_file_out = gr.File(label="Download Excel")
pdf_file_out = gr.File(label="Download PDF (with chart & recommendation)")
run_selected_btn.click(
fn=ui_run,
inputs=[url_input, checks_box, outputs_box, gr.State(False)],
outputs=[quick_out, detailed_out, viz_out, csv_file_out, excel_file_out, pdf_file_out, safety_out],
)
run_all_btn.click(
fn=ui_run,
inputs=[url_input, checks_box, outputs_box, gr.State(True)],
outputs=[quick_out, detailed_out, viz_out, csv_file_out, excel_file_out, pdf_file_out, safety_out],
)
if __name__ == "__main__":
demo.launch()