Domain / app.py
understanding's picture
Update app.py
e4cf520 verified
import time
import re
import ipaddress
from urllib.parse import urlparse, urlunparse
import httpx
import dns.resolver
import dns.exception
import dns.rcode
import gradio as gr
# -----------------------
# SSRF protection
# -----------------------
PRIVATE_NETS = [
ipaddress.ip_network("0.0.0.0/8"),
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("127.0.0.0/8"),
ipaddress.ip_network("169.254.0.0/16"),
ipaddress.ip_network("172.16.0.0/12"),
ipaddress.ip_network("192.168.0.0/16"),
ipaddress.ip_network("224.0.0.0/4"),
ipaddress.ip_network("240.0.0.0/4"),
ipaddress.ip_network("::1/128"),
ipaddress.ip_network("fc00::/7"),
ipaddress.ip_network("fe80::/10"),
]
DOMAIN_RE = re.compile(
r"^(?=.{1,253}$)(?!-)[A-Za-z0-9-]{1,63}(?<!-)(\.(?!-)[A-Za-z0-9-]{1,63}(?<!-))*$"
)
CLIENT = httpx.Client(
follow_redirects=True,
timeout=10.0,
headers={"User-Agent": "HF-Connectivity-Checker/2.0"},
)
def is_private_ip(ip_str: str) -> bool:
try:
ip = ipaddress.ip_address(ip_str)
return any(ip in net for net in PRIVATE_NETS)
except Exception:
return True
def parse_target(target: str):
"""
Returns (kind, raw, host)
kind: url | domain | ip | unknown | empty
"""
t = (target or "").strip()
if not t:
return ("empty", "", "")
if t.startswith("http://") or t.startswith("https://"):
u = urlparse(t)
return ("url", t, u.hostname or "")
try:
ipaddress.ip_address(t)
return ("ip", t, t)
except Exception:
pass
d = t.rstrip(".")
if DOMAIN_RE.match(d):
return ("domain", d, d)
return ("unknown", t, "")
def dns_check(host: str):
"""
DNS check with clearer classification.
"""
out = {
"host": host,
"status": "UNKNOWN",
"A": [],
"AAAA": [],
"CNAME": [],
"detail": "",
}
if not host:
out["status"] = "INVALID"
out["detail"] = "Empty host"
return out
r = dns.resolver.Resolver()
r.lifetime = 3.0
def _resolve(rtype: str):
try:
start = time.time()
ans = r.resolve(host, rtype)
ms = int((time.time() - start) * 1000)
return ("OK", [x.to_text() for x in ans], ms, "")
except dns.resolver.NXDOMAIN as e:
return ("NXDOMAIN", [], 0, str(e))
except dns.resolver.NoAnswer as e:
return ("NOANSWER", [], 0, str(e))
except dns.resolver.NoNameservers as e:
return ("NONAMESERVERS", [], 0, str(e))
except dns.exception.Timeout as e:
return ("TIMEOUT", [], 0, str(e))
except Exception as e:
return ("ERROR", [], 0, str(e))
a_stat, a_vals, a_ms, a_err = _resolve("A")
aaaa_stat, aaaa_vals, aaaa_ms, aaaa_err = _resolve("AAAA")
out["A"] = a_vals
out["AAAA"] = aaaa_vals
if a_ms:
out["A_ms"] = a_ms
if aaaa_ms:
out["AAAA_ms"] = aaaa_ms
# CNAME best-effort
try:
ans = r.resolve(host, "CNAME")
out["CNAME"] = [x.target.to_text().rstrip(".") for x in ans]
except Exception:
pass
# classify
if a_vals or aaaa_vals or out["CNAME"]:
out["status"] = "OK"
out["detail"] = "Resolved"
return out
# if both failed, choose most informative
# priority: NXDOMAIN > TIMEOUT > NONAMESERVERS > NOANSWER > ERROR
combined = [(a_stat, a_err), (aaaa_stat, aaaa_err)]
stats = [s for s, _ in combined]
if "NXDOMAIN" in stats:
out["status"] = "NXDOMAIN"
out["detail"] = a_err or aaaa_err
elif "TIMEOUT" in stats:
out["status"] = "TIMEOUT"
out["detail"] = a_err or aaaa_err
elif "NONAMESERVERS" in stats:
out["status"] = "SERVFAIL/NONAMESERVERS"
out["detail"] = a_err or aaaa_err
elif "NOANSWER" in stats:
out["status"] = "NOANSWER"
out["detail"] = a_err or aaaa_err
else:
out["status"] = "ERROR"
out["detail"] = a_err or aaaa_err
return out
def build_probe_urls(kind: str, raw: str, host: str, path: str):
"""
Build unique URLs to probe (avoid duplicates).
If user gives full URL, keep it.
If domain/ip, probe https://host + path then http://host + path
"""
path = (path or "/").strip()
if not path.startswith("/"):
path = "/" + path
urls = []
if kind == "url":
# Use raw as-is first
urls.append(raw)
# Also probe scheme+host+path (but only if different from raw)
u = urlparse(raw)
host_only = u.hostname or host
# keep query if user gave raw with query; otherwise keep their raw
# For second probe, use https host + path (no query) as fallback
urls.append(f"https://{host_only}{path}")
else:
urls.append(f"https://{host}{path}")
urls.append(f"http://{host}{path}")
# de-dup while preserving order
seen = set()
out = []
for u in urls:
if u not in seen:
seen.add(u)
out.append(u)
return out[:2] # max 2 probes
def http_probe(url: str):
"""
GET probe (better for API than HEAD).
Returns status + snippet.
"""
info = {"url": url, "ok": False}
try:
start = time.time()
r = CLIENT.get(url, headers={"Range": "bytes=0-2048"})
ms = int((time.time() - start) * 1000)
ctype = r.headers.get("content-type", "")
snippet = ""
try:
snippet = r.text[:250]
except Exception:
snippet = ""
info.update({
"ok": True,
"status_code": r.status_code,
"final_url": str(r.url),
"latency_ms": ms,
"content_type": ctype,
"server": r.headers.get("server", ""),
"cf_ray": r.headers.get("cf-ray", ""),
"snippet": snippet,
})
return info
except httpx.ConnectTimeout:
info["error"] = "connect_timeout"
except httpx.ReadTimeout:
info["error"] = "read_timeout"
except httpx.ConnectError as e:
info["error"] = f"connect_error: {e}"
except httpx.HTTPError as e:
info["error"] = f"http_error: {e}"
except Exception as e:
info["error"] = f"unknown_error: {e}"
return info
def overall_status(dns_result, http_results):
"""
Make it super clear: allowed/blocked/access-denied/down.
"""
dns_stat = dns_result.get("status", "UNKNOWN")
if dns_stat in ("NXDOMAIN", "TIMEOUT", "SERVFAIL/NONAMESERVERS", "ERROR"):
return f"DNS_{dns_stat} (HF can't resolve reliably)"
# HTTP
oks = [x for x in http_results if x.get("ok")]
if oks:
code = oks[0].get("status_code", 0)
if code in (401, 403):
return f"REACHABLE_BUT_PROTECTED ({code})"
if code == 451:
return "REACHABLE_BUT_RESTRICTED (451)"
if 200 <= code < 300:
return f"API_ACCESSIBLE ({code})"
if 300 <= code < 400:
return f"REACHABLE_REDIRECT ({code})"
if code == 404:
return "REACHABLE_BUT_NOT_FOUND (404) (domain ok, path missing)"
return f"REACHABLE_OTHER ({code})"
# No OK results, DNS is OK => likely network block OR origin down
errs = " | ".join(x.get("error", "") for x in http_results if x.get("error"))
if "timeout" in errs:
return "HTTP_TIMEOUT (possible block / route issue / origin down)"
if "No address associated" in errs:
return "DNS_ISSUE (no address)"
return f"HTTP_FAIL ({errs or 'unknown'})"
def check_one(target: str, path: str):
kind, raw, host = parse_target(target)
if kind == "empty":
return {"error": "Enter a domain / IP / URL"}
if kind == "unknown" or not host:
return {"error": "Invalid input"}
# DNS
# If IP -> skip DNS, but block private/reserved
try:
ipaddress.ip_address(host)
if is_private_ip(host):
return {"error": "Blocked: private/reserved IP not allowed (SSRF protection)."}
dns_result = {"host": host, "status": "OK", "A": [host], "AAAA": [], "CNAME": [], "detail": "IP input"}
except Exception:
dns_result = dns_check(host)
ips = (dns_result.get("A") or []) + (dns_result.get("AAAA") or [])
for ip in ips:
if is_private_ip(ip):
return {"error": "Blocked: resolves to private/reserved IP (SSRF protection)."}
# HTTP/API probes
urls = build_probe_urls(kind, raw, host, path)
http_results = [http_probe(urls[0]), http_probe(urls[1])] if len(urls) > 1 else [http_probe(urls[0])]
status = overall_status(dns_result, http_results)
return {
"input": (target or "").strip(),
"probe_path": (path or "/").strip(),
"host": host,
"dns": dns_result,
"http": http_results,
"status": status,
"note": "Checked from Hugging Face Space network (egress).",
}
def bulk_check(base_domain: str, subdomains_text: str, path: str):
base = (base_domain or "").strip().rstrip(".")
if not base:
return []
lines = [x.strip() for x in (subdomains_text or "").splitlines() if x.strip()]
targets = []
for s in lines[:200]:
targets.append(s if "." in s else f"{s}.{base}")
rows = []
for t in targets:
r = check_one(t, path)
dns = r.get("dns", {}) if isinstance(r, dict) else {}
http = r.get("http", [{}]) if isinstance(r, dict) else [{}]
code = http[0].get("status_code", "")
rows.append([
t,
r.get("status") or r.get("error", "error"),
dns.get("status", ""),
",".join(dns.get("A", [])),
str(code),
])
return rows
with gr.Blocks(title="HF Domain IP Checker") as demo:
gr.Markdown(
"## HF Domain/IP + API Accessibility Checker\n"
"✅ DNS resolve + ✅ API reachable check **from this Hugging Face Space**.\n"
"- Subdomains are checked only from your provided list.\n"
"- Private/reserved IPs blocked (SSRF protection)."
)
with gr.Tab("Single Check"):
inp = gr.Textbox(label="Domain / IP / URL", placeholder="example.com OR https://example.com/api OR 1.2.3.4")
path = gr.Textbox(label="Probe path (optional)", value="/", placeholder="/ OR /health OR /api")
btn = gr.Button("Check")
out = gr.JSON(label="Result")
btn.click(check_one, inputs=[inp, path], outputs=out)
with gr.Tab("Bulk (Your list only)"):
base = gr.Textbox(label="Base domain", placeholder="example.com")
subs = gr.Textbox(label="Subdomains (one per line)", lines=10, placeholder="www\napi\ncdn\nor full: api.example.com")
path2 = gr.Textbox(label="Probe path for all", value="/", placeholder="/health (recommended for API)")
btn2 = gr.Button("Bulk Check")
table = gr.Dataframe(
headers=["target", "overall_status", "dns_status", "A_records", "http_code"],
datatype=["str", "str", "str", "str", "str"],
row_count=5,
label="Results",
)
btn2.click(bulk_check, inputs=[base, subs, path2], outputs=table)
demo.launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False)