understanding commited on
Commit
efc210c
·
verified ·
1 Parent(s): 031309b

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +74 -116
app.py CHANGED
@@ -8,9 +8,6 @@ import httpx
8
  import dns.resolver
9
  import gradio as gr
10
 
11
- # -----------------------
12
- # Safety: SSRF protection
13
- # -----------------------
14
  PRIVATE_NETS = [
15
  ipaddress.ip_network("0.0.0.0/8"),
16
  ipaddress.ip_network("10.0.0.0/8"),
@@ -18,14 +15,21 @@ PRIVATE_NETS = [
18
  ipaddress.ip_network("169.254.0.0/16"),
19
  ipaddress.ip_network("172.16.0.0/12"),
20
  ipaddress.ip_network("192.168.0.0/16"),
21
- ipaddress.ip_network("224.0.0.0/4"), # multicast
22
- ipaddress.ip_network("240.0.0.0/4"), # reserved
23
  ipaddress.ip_network("::1/128"),
24
- ipaddress.ip_network("fc00::/7"), # unique local
25
- ipaddress.ip_network("fe80::/10"), # link-local
26
  ]
27
 
28
- DOMAIN_RE = re.compile(r"^(?=.{1,253}$)(?!-)[A-Za-z0-9-]{1,63}(?<!-)(\.(?!-)[A-Za-z0-9-]{1,63}(?<!-))*\.?$")
 
 
 
 
 
 
 
29
 
30
  def _is_private_ip(ip_str: str) -> bool:
31
  try:
@@ -34,46 +38,32 @@ def _is_private_ip(ip_str: str) -> bool:
34
  except Exception:
35
  return True
36
 
37
- def _normalize_target(target: str) -> tuple[str, str]:
38
- """
39
- Returns (kind, value):
40
- kind: "domain" | "ip" | "url"
41
- value: normalized host (domain/ip) or full url
42
- """
43
  t = (target or "").strip()
44
  if not t:
45
- return ("", "")
46
 
47
- # If user passed full URL
48
  if t.startswith("http://") or t.startswith("https://"):
49
  u = urlparse(t)
50
- host = u.hostname or ""
51
- return ("url", t), host
52
 
53
- # IP?
54
  try:
55
  ipaddress.ip_address(t)
56
- return ("ip", t), t
57
  except Exception:
58
  pass
59
 
60
- # Domain?
61
- # Allow IDN via idna encoding by socket/dns; here just basic sanity
62
- d = t.strip().rstrip(".")
63
  if DOMAIN_RE.match(d):
64
- return ("domain", d), d
65
 
66
- return ("unknown", t), t
67
 
68
  def resolve_dns(host: str):
69
- """
70
- Return DNS data: A/AAAA/CNAME if possible.
71
- """
72
  out = {"A": [], "AAAA": [], "CNAME": []}
73
  if not host:
74
  return out
75
 
76
- # A/AAAA
77
  for rtype in ["A", "AAAA"]:
78
  try:
79
  start = time.time()
@@ -83,7 +73,6 @@ def resolve_dns(host: str):
83
  except Exception as e:
84
  out[f"{rtype}_error"] = str(e)
85
 
86
- # CNAME
87
  try:
88
  ans = dns.resolver.resolve(host, "CNAME", lifetime=3)
89
  out["CNAME"] = [r.target.to_text().rstrip(".") for r in ans]
@@ -92,56 +81,39 @@ def resolve_dns(host: str):
92
 
93
  return out
94
 
95
- def check_http(url: str, timeout_s: float = 8.0):
96
- """
97
- Try HTTPS then HTTP. Return details.
98
- """
99
- results = []
100
-
101
- def _try(one_url: str):
102
- info = {"url": one_url}
103
- try:
104
- start = time.time()
105
- with httpx.Client(follow_redirects=True, timeout=timeout_s, headers={"User-Agent": "HF-Connectivity-Checker/1.0"}) as client:
106
- r = client.head(one_url)
107
- # Some servers don't like HEAD; fallback to GET small
108
- if r.status_code in (405, 403) or r.status_code >= 500:
109
- r = client.get(one_url, headers={"Range": "bytes=0-1024"})
110
- elapsed = int((time.time() - start) * 1000)
111
-
112
- info.update({
113
- "ok": True,
114
- "status_code": r.status_code,
115
- "final_url": str(r.url),
116
- "latency_ms": elapsed,
117
- "server": r.headers.get("server", ""),
118
- "via": r.headers.get("via", ""),
119
- "cf_ray": r.headers.get("cf-ray", ""),
120
- })
121
- except httpx.ConnectTimeout:
122
- info.update({"ok": False, "error": "connect_timeout"})
123
- except httpx.ReadTimeout:
124
- info.update({"ok": False, "error": "read_timeout"})
125
- except httpx.ConnectError as e:
126
- info.update({"ok": False, "error": f"connect_error: {e}"})
127
- except httpx.HTTPError as e:
128
- info.update({"ok": False, "error": f"http_error: {e}"})
129
- except Exception as e:
130
- info.update({"ok": False, "error": f"unknown_error: {e}"})
131
- return info
132
-
133
- results.append(_try(url))
134
- return results
135
 
136
  def classify(dns_info, http_info):
137
- """
138
- Simple classification from HF viewpoint.
139
- """
140
- # DNS failures
141
  if dns_info.get("A_error") and dns_info.get("AAAA_error") and not dns_info.get("CNAME"):
142
  return "DNS_FAIL (HF can't resolve)"
143
 
144
- # HTTP checks
145
  ok = [x for x in http_info if x.get("ok")]
146
  if ok:
147
  code = ok[0].get("status_code", 0)
@@ -151,7 +123,6 @@ def classify(dns_info, http_info):
151
  return "REACHABLE but LEGAL_RESTRICTION (451)"
152
  return f"REACHABLE ({code})"
153
 
154
- # Common block-ish signals
155
  errs = " | ".join(x.get("error", "") for x in http_info if x.get("error"))
156
  if "timeout" in errs:
157
  return "NOT_REACHABLE (timeout / possible block)"
@@ -160,56 +131,44 @@ def classify(dns_info, http_info):
160
  return f"NOT_REACHABLE ({errs or 'unknown'})"
161
 
162
  def check_one(target: str):
163
- (kind, _), host = _normalize_target(target)
164
 
165
- if not target or not host:
166
- return {"error": "Please enter a domain / IP / URL"}
 
 
167
 
168
- # For URL, pull host and rebuild with scheme attempts
169
- if kind == "unknown":
170
- return {"error": "Invalid input. Enter domain, IP, or full URL."}
171
-
172
- # If host is IP: block private/reserved
173
  try:
174
  ipaddress.ip_address(host)
175
  if _is_private_ip(host):
176
- return {"error": "Blocked: private/reserved IP not allowed (SSRF protection)."}
177
  dns_info = {"A": [host], "AAAA": [], "CNAME": []}
178
  host_for_http = host
179
  except Exception:
180
- # Domain: resolve DNS
181
  dns_info = resolve_dns(host)
182
- # If resolved IPs contain private -> block (SSRF protection)
183
  ips = (dns_info.get("A") or []) + (dns_info.get("AAAA") or [])
184
  for ip in ips:
185
  if _is_private_ip(ip):
186
  return {"error": "Blocked: resolves to private/reserved IP (SSRF protection)."}
187
  host_for_http = host
188
 
189
- # Build URLs to try
190
  urls = []
191
  if kind == "url":
192
- # Try exactly as given first
193
- urls.append(target.strip())
194
- # Also try forcing https/http with host only
195
- urls.append(f"https://{host_for_http}")
196
- urls.append(f"http://{host_for_http}")
197
- else:
198
- urls = [f"https://{host_for_http}", f"http://{host_for_http}"]
199
-
200
- http_results = []
201
- for u in urls[:2]: # keep tight: only 2 tries
202
- http_results.extend(check_http(u))
203
 
 
204
  status = classify(dns_info, http_results)
205
 
206
  return {
207
  "input": target.strip(),
208
  "host": host_for_http,
209
  "dns": dns_info,
210
- "http": http_results[:2],
211
  "status": status,
212
- "note": "Result is from Hugging Face Space network (egress)."
213
  }
214
 
215
  def bulk_check(domain: str, subdomains_text: str):
@@ -218,23 +177,21 @@ def bulk_check(domain: str, subdomains_text: str):
218
  return []
219
 
220
  lines = [x.strip() for x in (subdomains_text or "").splitlines() if x.strip()]
221
- # Allow either full subdomain or just prefix
222
  targets = []
223
- for s in lines[:200]: # limit
224
- if "." in s:
225
- targets.append(s)
226
- else:
227
- targets.append(f"{s}.{base}")
228
 
229
  rows = []
230
  for t in targets:
231
  r = check_one(t)
 
 
232
  rows.append({
233
  "target": t,
234
  "status": r.get("status") or r.get("error", "error"),
235
- "A": ",".join((r.get("dns", {}) or {}).get("A", [])) if isinstance(r, dict) else "",
236
- "AAAA": ",".join((r.get("dns", {}) or {}).get("AAAA", [])) if isinstance(r, dict) else "",
237
- "http_code": (r.get("http", [{}])[0].get("status_code") if r.get("http") else ""),
238
  })
239
  return rows
240
 
@@ -242,12 +199,12 @@ with gr.Blocks(title="HF Domain/IP Connectivity Checker") as demo:
242
  gr.Markdown(
243
  "## HF Domain/IP Connectivity Checker\n"
244
  "Checks DNS + HTTP reachability **from this Hugging Face Space**.\n"
245
- "- No brute-force scanning. Subdomains must be **user-provided list**.\n"
246
- "- Private/reserved IPs blocked for safety."
247
  )
248
 
249
  with gr.Tab("Single Check"):
250
- inp = gr.Textbox(label="Domain / IP / URL", placeholder="example.com OR https://example.com OR 1.2.3.4")
251
  btn = gr.Button("Check")
252
  out = gr.JSON(label="Result")
253
  btn.click(check_one, inputs=inp, outputs=out)
@@ -260,9 +217,10 @@ with gr.Blocks(title="HF Domain/IP Connectivity Checker") as demo:
260
  headers=["target", "status", "A", "AAAA", "http_code"],
261
  datatype=["str", "str", "str", "str", "str"],
262
  row_count=5,
263
- col_count=(5, "fixed"),
264
- label="Results"
265
  )
266
  btn2.click(bulk_check, inputs=[base, subs], outputs=table)
267
 
268
- demo.launch()
 
 
8
  import dns.resolver
9
  import gradio as gr
10
 
 
 
 
11
  PRIVATE_NETS = [
12
  ipaddress.ip_network("0.0.0.0/8"),
13
  ipaddress.ip_network("10.0.0.0/8"),
 
15
  ipaddress.ip_network("169.254.0.0/16"),
16
  ipaddress.ip_network("172.16.0.0/12"),
17
  ipaddress.ip_network("192.168.0.0/16"),
18
+ ipaddress.ip_network("224.0.0.0/4"),
19
+ ipaddress.ip_network("240.0.0.0/4"),
20
  ipaddress.ip_network("::1/128"),
21
+ ipaddress.ip_network("fc00::/7"),
22
+ ipaddress.ip_network("fe80::/10"),
23
  ]
24
 
25
+ DOMAIN_RE = re.compile(r"^(?=.{1,253}$)(?!-)[A-Za-z0-9-]{1,63}(?<!-)(\.(?!-)[A-Za-z0-9-]{1,63}(?<!-))*$")
26
+
27
+ # ✅ Reuse one client (less event-loop weirdness + faster)
28
+ CLIENT = httpx.Client(
29
+ follow_redirects=True,
30
+ timeout=8.0,
31
+ headers={"User-Agent": "HF-Connectivity-Checker/1.0"},
32
+ )
33
 
34
  def _is_private_ip(ip_str: str) -> bool:
35
  try:
 
38
  except Exception:
39
  return True
40
 
41
+ def _parse_target(target: str):
 
 
 
 
 
42
  t = (target or "").strip()
43
  if not t:
44
+ return ("", "", "")
45
 
 
46
  if t.startswith("http://") or t.startswith("https://"):
47
  u = urlparse(t)
48
+ return ("url", t, u.hostname or "")
 
49
 
 
50
  try:
51
  ipaddress.ip_address(t)
52
+ return ("ip", t, t)
53
  except Exception:
54
  pass
55
 
56
+ d = t.rstrip(".")
 
 
57
  if DOMAIN_RE.match(d):
58
+ return ("domain", d, d)
59
 
60
+ return ("unknown", t, "")
61
 
62
  def resolve_dns(host: str):
 
 
 
63
  out = {"A": [], "AAAA": [], "CNAME": []}
64
  if not host:
65
  return out
66
 
 
67
  for rtype in ["A", "AAAA"]:
68
  try:
69
  start = time.time()
 
73
  except Exception as e:
74
  out[f"{rtype}_error"] = str(e)
75
 
 
76
  try:
77
  ans = dns.resolver.resolve(host, "CNAME", lifetime=3)
78
  out["CNAME"] = [r.target.to_text().rstrip(".") for r in ans]
 
81
 
82
  return out
83
 
84
+ def try_http(url: str):
85
+ info = {"url": url}
86
+ try:
87
+ start = time.time()
88
+ r = CLIENT.head(url)
89
+ if r.status_code in (405, 403) or r.status_code >= 500:
90
+ r = CLIENT.get(url, headers={"Range": "bytes=0-1024"})
91
+ elapsed = int((time.time() - start) * 1000)
92
+ info.update({
93
+ "ok": True,
94
+ "status_code": r.status_code,
95
+ "final_url": str(r.url),
96
+ "latency_ms": elapsed,
97
+ "server": r.headers.get("server", ""),
98
+ "via": r.headers.get("via", ""),
99
+ "cf_ray": r.headers.get("cf-ray", ""),
100
+ })
101
+ except httpx.ConnectTimeout:
102
+ info.update({"ok": False, "error": "connect_timeout"})
103
+ except httpx.ReadTimeout:
104
+ info.update({"ok": False, "error": "read_timeout"})
105
+ except httpx.ConnectError as e:
106
+ info.update({"ok": False, "error": f"connect_error: {e}"})
107
+ except httpx.HTTPError as e:
108
+ info.update({"ok": False, "error": f"http_error: {e}"})
109
+ except Exception as e:
110
+ info.update({"ok": False, "error": f"unknown_error: {e}"})
111
+ return info
 
 
 
 
 
 
 
 
 
 
 
 
112
 
113
  def classify(dns_info, http_info):
 
 
 
 
114
  if dns_info.get("A_error") and dns_info.get("AAAA_error") and not dns_info.get("CNAME"):
115
  return "DNS_FAIL (HF can't resolve)"
116
 
 
117
  ok = [x for x in http_info if x.get("ok")]
118
  if ok:
119
  code = ok[0].get("status_code", 0)
 
123
  return "REACHABLE but LEGAL_RESTRICTION (451)"
124
  return f"REACHABLE ({code})"
125
 
 
126
  errs = " | ".join(x.get("error", "") for x in http_info if x.get("error"))
127
  if "timeout" in errs:
128
  return "NOT_REACHABLE (timeout / possible block)"
 
131
  return f"NOT_REACHABLE ({errs or 'unknown'})"
132
 
133
  def check_one(target: str):
134
+ kind, raw, host = _parse_target(target)
135
 
136
+ if kind == "":
137
+ return {"error": "Enter a domain / IP / URL"}
138
+ if kind == "unknown" or not host:
139
+ return {"error": "Invalid input"}
140
 
141
+ # IP safety
 
 
 
 
142
  try:
143
  ipaddress.ip_address(host)
144
  if _is_private_ip(host):
145
+ return {"error": "Blocked: private/reserved IP not allowed."}
146
  dns_info = {"A": [host], "AAAA": [], "CNAME": []}
147
  host_for_http = host
148
  except Exception:
 
149
  dns_info = resolve_dns(host)
 
150
  ips = (dns_info.get("A") or []) + (dns_info.get("AAAA") or [])
151
  for ip in ips:
152
  if _is_private_ip(ip):
153
  return {"error": "Blocked: resolves to private/reserved IP (SSRF protection)."}
154
  host_for_http = host
155
 
 
156
  urls = []
157
  if kind == "url":
158
+ urls.append(raw)
159
+ urls.append(f"https://{host_for_http}")
160
+ urls.append(f"http://{host_for_http}")
 
 
 
 
 
 
 
 
161
 
162
+ http_results = [try_http(urls[0]), try_http(urls[1])] # only two tries
163
  status = classify(dns_info, http_results)
164
 
165
  return {
166
  "input": target.strip(),
167
  "host": host_for_http,
168
  "dns": dns_info,
169
+ "http": http_results,
170
  "status": status,
171
+ "note": "Checked from Hugging Face Space network."
172
  }
173
 
174
  def bulk_check(domain: str, subdomains_text: str):
 
177
  return []
178
 
179
  lines = [x.strip() for x in (subdomains_text or "").splitlines() if x.strip()]
 
180
  targets = []
181
+ for s in lines[:200]:
182
+ targets.append(s if "." in s else f"{s}.{base}")
 
 
 
183
 
184
  rows = []
185
  for t in targets:
186
  r = check_one(t)
187
+ dns = r.get("dns", {}) if isinstance(r, dict) else {}
188
+ http = r.get("http", [{}]) if isinstance(r, dict) else [{}]
189
  rows.append({
190
  "target": t,
191
  "status": r.get("status") or r.get("error", "error"),
192
+ "A": ",".join(dns.get("A", [])),
193
+ "AAAA": ",".join(dns.get("AAAA", [])),
194
+ "http_code": http[0].get("status_code", ""),
195
  })
196
  return rows
197
 
 
199
  gr.Markdown(
200
  "## HF Domain/IP Connectivity Checker\n"
201
  "Checks DNS + HTTP reachability **from this Hugging Face Space**.\n"
202
+ "- Subdomains must be **your provided list** (no brute-force scanning).\n"
203
+ "- Private/reserved IPs blocked (SSRF protection)."
204
  )
205
 
206
  with gr.Tab("Single Check"):
207
+ inp = gr.Textbox(label="Domain / IP / URL", placeholder="example.com OR https://example.com OR 1.2.3.4")
208
  btn = gr.Button("Check")
209
  out = gr.JSON(label="Result")
210
  btn.click(check_one, inputs=inp, outputs=out)
 
217
  headers=["target", "status", "A", "AAAA", "http_code"],
218
  datatype=["str", "str", "str", "str", "str"],
219
  row_count=5,
220
+ column_count=(5, "fixed"), # ✅ new param
221
+ label="Results",
222
  )
223
  btn2.click(bulk_check, inputs=[base, subs], outputs=table)
224
 
225
+ # ✅ SSR off to avoid Python 3.13 loop cleanup errors
226
+ demo.launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False)