understanding commited on
Commit
e4cf520
·
verified ·
1 Parent(s): 6d0fb50

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +209 -84
app.py CHANGED
@@ -1,14 +1,17 @@
1
  import time
2
  import re
3
  import ipaddress
4
- from urllib.parse import urlparse
5
 
6
  import httpx
7
  import dns.resolver
 
 
8
  import gradio as gr
9
 
 
10
  # -----------------------
11
- # Safety: SSRF protection
12
  # -----------------------
13
  PRIVATE_NETS = [
14
  ipaddress.ip_network("0.0.0.0/8"),
@@ -17,25 +20,24 @@ PRIVATE_NETS = [
17
  ipaddress.ip_network("169.254.0.0/16"),
18
  ipaddress.ip_network("172.16.0.0/12"),
19
  ipaddress.ip_network("192.168.0.0/16"),
20
- ipaddress.ip_network("224.0.0.0/4"), # multicast
21
- ipaddress.ip_network("240.0.0.0/4"), # reserved
22
  ipaddress.ip_network("::1/128"),
23
- ipaddress.ip_network("fc00::/7"), # unique local
24
- ipaddress.ip_network("fe80::/10"), # link-local
25
  ]
26
 
27
- # Basic domain sanity (not perfect IDN validation, but enough for UI)
28
  DOMAIN_RE = re.compile(
29
  r"^(?=.{1,253}$)(?!-)[A-Za-z0-9-]{1,63}(?<!-)(\.(?!-)[A-Za-z0-9-]{1,63}(?<!-))*$"
30
  )
31
 
32
- # Reuse one client (faster + avoids repeated loop churn)
33
  CLIENT = httpx.Client(
34
  follow_redirects=True,
35
- timeout=8.0,
36
- headers={"User-Agent": "HF-Connectivity-Checker/1.0"},
37
  )
38
 
 
39
  def is_private_ip(ip_str: str) -> bool:
40
  try:
41
  ip = ipaddress.ip_address(ip_str)
@@ -43,9 +45,10 @@ def is_private_ip(ip_str: str) -> bool:
43
  except Exception:
44
  return True
45
 
 
46
  def parse_target(target: str):
47
  """
48
- Returns: (kind, raw, host)
49
  kind: url | domain | ip | unknown | empty
50
  """
51
  t = (target or "").strip()
@@ -56,97 +59,219 @@ def parse_target(target: str):
56
  u = urlparse(t)
57
  return ("url", t, u.hostname or "")
58
 
59
- # IP?
60
  try:
61
  ipaddress.ip_address(t)
62
  return ("ip", t, t)
63
  except Exception:
64
  pass
65
 
66
- # Domain?
67
  d = t.rstrip(".")
68
  if DOMAIN_RE.match(d):
69
  return ("domain", d, d)
70
 
71
  return ("unknown", t, "")
72
 
73
- def resolve_dns(host: str):
74
- out = {"A": [], "AAAA": [], "CNAME": []}
 
 
 
 
 
 
 
 
 
 
 
75
 
76
  if not host:
 
 
77
  return out
78
 
79
- for rtype in ["A", "AAAA"]:
 
 
 
80
  try:
81
  start = time.time()
82
- ans = dns.resolver.resolve(host, rtype, lifetime=3)
83
- out[rtype] = [r.to_text() for r in ans]
84
- out[f"{rtype}_ms"] = int((time.time() - start) * 1000)
 
 
 
 
 
 
 
 
85
  except Exception as e:
86
- out[f"{rtype}_error"] = str(e)
 
 
 
 
 
 
 
 
 
 
87
 
 
88
  try:
89
- ans = dns.resolver.resolve(host, "CNAME", lifetime=3)
90
- out["CNAME"] = [r.target.to_text().rstrip(".") for r in ans]
91
  except Exception:
92
  pass
93
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
94
  return out
95
 
96
- def try_http(url: str):
97
- info = {"url": url}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
98
  try:
99
  start = time.time()
100
- r = CLIENT.head(url)
101
- # Some sites block HEAD; fallback to tiny GET
102
- if r.status_code in (405, 403) or r.status_code >= 500:
103
- r = CLIENT.get(url, headers={"Range": "bytes=0-1024"})
104
- elapsed = int((time.time() - start) * 1000)
 
 
 
 
105
 
106
  info.update({
107
  "ok": True,
108
  "status_code": r.status_code,
109
  "final_url": str(r.url),
110
- "latency_ms": elapsed,
 
111
  "server": r.headers.get("server", ""),
112
- "via": r.headers.get("via", ""),
113
  "cf_ray": r.headers.get("cf-ray", ""),
 
114
  })
 
 
115
  except httpx.ConnectTimeout:
116
- info.update({"ok": False, "error": "connect_timeout"})
117
  except httpx.ReadTimeout:
118
- info.update({"ok": False, "error": "read_timeout"})
119
  except httpx.ConnectError as e:
120
- info.update({"ok": False, "error": f"connect_error: {e}"})
121
  except httpx.HTTPError as e:
122
- info.update({"ok": False, "error": f"http_error: {e}"})
123
  except Exception as e:
124
- info.update({"ok": False, "error": f"unknown_error: {e}"})
 
125
  return info
126
 
127
- def classify(dns_info, http_info):
128
- # DNS fail
129
- if dns_info.get("A_error") and dns_info.get("AAAA_error") and not dns_info.get("CNAME"):
130
- return "DNS_FAIL (HF can't resolve)"
131
 
132
- # HTTP ok?
133
- ok = [x for x in http_info if x.get("ok")]
134
- if ok:
135
- code = ok[0].get("status_code", 0)
 
 
 
 
 
 
 
 
 
136
  if code in (401, 403):
137
- return f"REACHABLE but ACCESS_DENIED ({code})"
138
  if code == 451:
139
- return "REACHABLE but LEGAL_RESTRICTION (451)"
140
- return f"REACHABLE ({code})"
141
-
142
- errs = " | ".join(x.get("error", "") for x in http_info if x.get("error"))
 
 
 
 
 
 
 
143
  if "timeout" in errs:
144
- return "NOT_REACHABLE (timeout / possible block)"
145
- if "tls" in errs.lower():
146
- return "NOT_REACHABLE (TLS issue)"
147
- return f"NOT_REACHABLE ({errs or 'unknown'})"
148
 
149
- def check_one(target: str):
 
150
  kind, raw, host = parse_target(target)
151
 
152
  if kind == "empty":
@@ -154,89 +279,89 @@ def check_one(target: str):
154
  if kind == "unknown" or not host:
155
  return {"error": "Invalid input"}
156
 
157
- # IP checks
 
158
  try:
159
  ipaddress.ip_address(host)
160
  if is_private_ip(host):
161
  return {"error": "Blocked: private/reserved IP not allowed (SSRF protection)."}
162
- dns_info = {"A": [host], "AAAA": [], "CNAME": []}
163
- host_for_http = host
164
  except Exception:
165
- dns_info = resolve_dns(host)
166
- ips = (dns_info.get("A") or []) + (dns_info.get("AAAA") or [])
167
  for ip in ips:
168
  if is_private_ip(ip):
169
  return {"error": "Blocked: resolves to private/reserved IP (SSRF protection)."}
170
- host_for_http = host
171
 
172
- # Only 2 tries to keep it safe & fast
173
- urls = []
174
- if kind == "url":
175
- urls.append(raw)
176
- urls.append(f"https://{host_for_http}")
177
- urls.append(f"http://{host_for_http}")
178
 
179
- http_results = [try_http(urls[0]), try_http(urls[1])]
180
- status = classify(dns_info, http_results)
181
 
182
  return {
183
  "input": (target or "").strip(),
184
- "host": host_for_http,
185
- "dns": dns_info,
 
186
  "http": http_results,
187
  "status": status,
188
  "note": "Checked from Hugging Face Space network (egress).",
189
  }
190
 
191
- def bulk_check(base_domain: str, subdomains_text: str):
 
192
  base = (base_domain or "").strip().rstrip(".")
193
  if not base:
194
  return []
195
 
196
  lines = [x.strip() for x in (subdomains_text or "").splitlines() if x.strip()]
197
  targets = []
198
- for s in lines[:200]: # limit
199
  targets.append(s if "." in s else f"{s}.{base}")
200
 
201
  rows = []
202
  for t in targets:
203
- r = check_one(t)
204
  dns = r.get("dns", {}) if isinstance(r, dict) else {}
205
  http = r.get("http", [{}]) if isinstance(r, dict) else [{}]
 
206
  rows.append([
207
  t,
208
  r.get("status") or r.get("error", "error"),
 
209
  ",".join(dns.get("A", [])),
210
- ",".join(dns.get("AAAA", [])),
211
- str(http[0].get("status_code", "")),
212
  ])
213
  return rows
214
 
 
215
  with gr.Blocks(title="HF Domain IP Checker") as demo:
216
  gr.Markdown(
217
- "## HF Domain/IP Connectivity Checker\n"
218
- "DNS + HTTP reachability **from this Hugging Face Space**.\n"
219
  "- Subdomains are checked only from your provided list.\n"
220
- "- Private/reserved IPs are blocked (SSRF protection)."
221
  )
222
 
223
  with gr.Tab("Single Check"):
224
- inp = gr.Textbox(label="Domain / IP / URL", placeholder="example.com OR https://example.com OR 1.2.3.4")
 
225
  btn = gr.Button("Check")
226
  out = gr.JSON(label="Result")
227
- btn.click(check_one, inputs=inp, outputs=out)
228
 
229
- with gr.Tab("Subdomain List (Your List Only)"):
230
  base = gr.Textbox(label="Base domain", placeholder="example.com")
231
  subs = gr.Textbox(label="Subdomains (one per line)", lines=10, placeholder="www\napi\ncdn\nor full: api.example.com")
 
232
  btn2 = gr.Button("Bulk Check")
233
  table = gr.Dataframe(
234
- headers=["target", "status", "A", "AAAA", "http_code"],
235
  datatype=["str", "str", "str", "str", "str"],
236
  row_count=5,
237
  label="Results",
238
  )
239
- btn2.click(bulk_check, inputs=[base, subs], outputs=table)
240
 
241
- # SSR off for stability
242
  demo.launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False)
 
1
  import time
2
  import re
3
  import ipaddress
4
+ from urllib.parse import urlparse, urlunparse
5
 
6
  import httpx
7
  import dns.resolver
8
+ import dns.exception
9
+ import dns.rcode
10
  import gradio as gr
11
 
12
+
13
  # -----------------------
14
+ # SSRF protection
15
  # -----------------------
16
  PRIVATE_NETS = [
17
  ipaddress.ip_network("0.0.0.0/8"),
 
20
  ipaddress.ip_network("169.254.0.0/16"),
21
  ipaddress.ip_network("172.16.0.0/12"),
22
  ipaddress.ip_network("192.168.0.0/16"),
23
+ ipaddress.ip_network("224.0.0.0/4"),
24
+ ipaddress.ip_network("240.0.0.0/4"),
25
  ipaddress.ip_network("::1/128"),
26
+ ipaddress.ip_network("fc00::/7"),
27
+ ipaddress.ip_network("fe80::/10"),
28
  ]
29
 
 
30
  DOMAIN_RE = re.compile(
31
  r"^(?=.{1,253}$)(?!-)[A-Za-z0-9-]{1,63}(?<!-)(\.(?!-)[A-Za-z0-9-]{1,63}(?<!-))*$"
32
  )
33
 
 
34
  CLIENT = httpx.Client(
35
  follow_redirects=True,
36
+ timeout=10.0,
37
+ headers={"User-Agent": "HF-Connectivity-Checker/2.0"},
38
  )
39
 
40
+
41
  def is_private_ip(ip_str: str) -> bool:
42
  try:
43
  ip = ipaddress.ip_address(ip_str)
 
45
  except Exception:
46
  return True
47
 
48
+
49
  def parse_target(target: str):
50
  """
51
+ Returns (kind, raw, host)
52
  kind: url | domain | ip | unknown | empty
53
  """
54
  t = (target or "").strip()
 
59
  u = urlparse(t)
60
  return ("url", t, u.hostname or "")
61
 
 
62
  try:
63
  ipaddress.ip_address(t)
64
  return ("ip", t, t)
65
  except Exception:
66
  pass
67
 
 
68
  d = t.rstrip(".")
69
  if DOMAIN_RE.match(d):
70
  return ("domain", d, d)
71
 
72
  return ("unknown", t, "")
73
 
74
+
75
+ def dns_check(host: str):
76
+ """
77
+ DNS check with clearer classification.
78
+ """
79
+ out = {
80
+ "host": host,
81
+ "status": "UNKNOWN",
82
+ "A": [],
83
+ "AAAA": [],
84
+ "CNAME": [],
85
+ "detail": "",
86
+ }
87
 
88
  if not host:
89
+ out["status"] = "INVALID"
90
+ out["detail"] = "Empty host"
91
  return out
92
 
93
+ r = dns.resolver.Resolver()
94
+ r.lifetime = 3.0
95
+
96
+ def _resolve(rtype: str):
97
  try:
98
  start = time.time()
99
+ ans = r.resolve(host, rtype)
100
+ ms = int((time.time() - start) * 1000)
101
+ return ("OK", [x.to_text() for x in ans], ms, "")
102
+ except dns.resolver.NXDOMAIN as e:
103
+ return ("NXDOMAIN", [], 0, str(e))
104
+ except dns.resolver.NoAnswer as e:
105
+ return ("NOANSWER", [], 0, str(e))
106
+ except dns.resolver.NoNameservers as e:
107
+ return ("NONAMESERVERS", [], 0, str(e))
108
+ except dns.exception.Timeout as e:
109
+ return ("TIMEOUT", [], 0, str(e))
110
  except Exception as e:
111
+ return ("ERROR", [], 0, str(e))
112
+
113
+ a_stat, a_vals, a_ms, a_err = _resolve("A")
114
+ aaaa_stat, aaaa_vals, aaaa_ms, aaaa_err = _resolve("AAAA")
115
+
116
+ out["A"] = a_vals
117
+ out["AAAA"] = aaaa_vals
118
+ if a_ms:
119
+ out["A_ms"] = a_ms
120
+ if aaaa_ms:
121
+ out["AAAA_ms"] = aaaa_ms
122
 
123
+ # CNAME best-effort
124
  try:
125
+ ans = r.resolve(host, "CNAME")
126
+ out["CNAME"] = [x.target.to_text().rstrip(".") for x in ans]
127
  except Exception:
128
  pass
129
 
130
+ # classify
131
+ if a_vals or aaaa_vals or out["CNAME"]:
132
+ out["status"] = "OK"
133
+ out["detail"] = "Resolved"
134
+ return out
135
+
136
+ # if both failed, choose most informative
137
+ # priority: NXDOMAIN > TIMEOUT > NONAMESERVERS > NOANSWER > ERROR
138
+ combined = [(a_stat, a_err), (aaaa_stat, aaaa_err)]
139
+ stats = [s for s, _ in combined]
140
+
141
+ if "NXDOMAIN" in stats:
142
+ out["status"] = "NXDOMAIN"
143
+ out["detail"] = a_err or aaaa_err
144
+ elif "TIMEOUT" in stats:
145
+ out["status"] = "TIMEOUT"
146
+ out["detail"] = a_err or aaaa_err
147
+ elif "NONAMESERVERS" in stats:
148
+ out["status"] = "SERVFAIL/NONAMESERVERS"
149
+ out["detail"] = a_err or aaaa_err
150
+ elif "NOANSWER" in stats:
151
+ out["status"] = "NOANSWER"
152
+ out["detail"] = a_err or aaaa_err
153
+ else:
154
+ out["status"] = "ERROR"
155
+ out["detail"] = a_err or aaaa_err
156
+
157
  return out
158
 
159
+
160
+ def build_probe_urls(kind: str, raw: str, host: str, path: str):
161
+ """
162
+ Build unique URLs to probe (avoid duplicates).
163
+ If user gives full URL, keep it.
164
+ If domain/ip, probe https://host + path then http://host + path
165
+ """
166
+ path = (path or "/").strip()
167
+ if not path.startswith("/"):
168
+ path = "/" + path
169
+
170
+ urls = []
171
+
172
+ if kind == "url":
173
+ # Use raw as-is first
174
+ urls.append(raw)
175
+
176
+ # Also probe scheme+host+path (but only if different from raw)
177
+ u = urlparse(raw)
178
+ host_only = u.hostname or host
179
+ # keep query if user gave raw with query; otherwise keep their raw
180
+ # For second probe, use https host + path (no query) as fallback
181
+ urls.append(f"https://{host_only}{path}")
182
+ else:
183
+ urls.append(f"https://{host}{path}")
184
+ urls.append(f"http://{host}{path}")
185
+
186
+ # de-dup while preserving order
187
+ seen = set()
188
+ out = []
189
+ for u in urls:
190
+ if u not in seen:
191
+ seen.add(u)
192
+ out.append(u)
193
+ return out[:2] # max 2 probes
194
+
195
+
196
+ def http_probe(url: str):
197
+ """
198
+ GET probe (better for API than HEAD).
199
+ Returns status + snippet.
200
+ """
201
+ info = {"url": url, "ok": False}
202
  try:
203
  start = time.time()
204
+ r = CLIENT.get(url, headers={"Range": "bytes=0-2048"})
205
+ ms = int((time.time() - start) * 1000)
206
+
207
+ ctype = r.headers.get("content-type", "")
208
+ snippet = ""
209
+ try:
210
+ snippet = r.text[:250]
211
+ except Exception:
212
+ snippet = ""
213
 
214
  info.update({
215
  "ok": True,
216
  "status_code": r.status_code,
217
  "final_url": str(r.url),
218
+ "latency_ms": ms,
219
+ "content_type": ctype,
220
  "server": r.headers.get("server", ""),
 
221
  "cf_ray": r.headers.get("cf-ray", ""),
222
+ "snippet": snippet,
223
  })
224
+ return info
225
+
226
  except httpx.ConnectTimeout:
227
+ info["error"] = "connect_timeout"
228
  except httpx.ReadTimeout:
229
+ info["error"] = "read_timeout"
230
  except httpx.ConnectError as e:
231
+ info["error"] = f"connect_error: {e}"
232
  except httpx.HTTPError as e:
233
+ info["error"] = f"http_error: {e}"
234
  except Exception as e:
235
+ info["error"] = f"unknown_error: {e}"
236
+
237
  return info
238
 
 
 
 
 
239
 
240
+ def overall_status(dns_result, http_results):
241
+ """
242
+ Make it super clear: allowed/blocked/access-denied/down.
243
+ """
244
+ dns_stat = dns_result.get("status", "UNKNOWN")
245
+
246
+ if dns_stat in ("NXDOMAIN", "TIMEOUT", "SERVFAIL/NONAMESERVERS", "ERROR"):
247
+ return f"DNS_{dns_stat} (HF can't resolve reliably)"
248
+
249
+ # HTTP
250
+ oks = [x for x in http_results if x.get("ok")]
251
+ if oks:
252
+ code = oks[0].get("status_code", 0)
253
  if code in (401, 403):
254
+ return f"REACHABLE_BUT_PROTECTED ({code})"
255
  if code == 451:
256
+ return "REACHABLE_BUT_RESTRICTED (451)"
257
+ if 200 <= code < 300:
258
+ return f"API_ACCESSIBLE ({code})"
259
+ if 300 <= code < 400:
260
+ return f"REACHABLE_REDIRECT ({code})"
261
+ if code == 404:
262
+ return "REACHABLE_BUT_NOT_FOUND (404) (domain ok, path missing)"
263
+ return f"REACHABLE_OTHER ({code})"
264
+
265
+ # No OK results, DNS is OK => likely network block OR origin down
266
+ errs = " | ".join(x.get("error", "") for x in http_results if x.get("error"))
267
  if "timeout" in errs:
268
+ return "HTTP_TIMEOUT (possible block / route issue / origin down)"
269
+ if "No address associated" in errs:
270
+ return "DNS_ISSUE (no address)"
271
+ return f"HTTP_FAIL ({errs or 'unknown'})"
272
 
273
+
274
+ def check_one(target: str, path: str):
275
  kind, raw, host = parse_target(target)
276
 
277
  if kind == "empty":
 
279
  if kind == "unknown" or not host:
280
  return {"error": "Invalid input"}
281
 
282
+ # DNS
283
+ # If IP -> skip DNS, but block private/reserved
284
  try:
285
  ipaddress.ip_address(host)
286
  if is_private_ip(host):
287
  return {"error": "Blocked: private/reserved IP not allowed (SSRF protection)."}
288
+ dns_result = {"host": host, "status": "OK", "A": [host], "AAAA": [], "CNAME": [], "detail": "IP input"}
 
289
  except Exception:
290
+ dns_result = dns_check(host)
291
+ ips = (dns_result.get("A") or []) + (dns_result.get("AAAA") or [])
292
  for ip in ips:
293
  if is_private_ip(ip):
294
  return {"error": "Blocked: resolves to private/reserved IP (SSRF protection)."}
 
295
 
296
+ # HTTP/API probes
297
+ urls = build_probe_urls(kind, raw, host, path)
298
+ http_results = [http_probe(urls[0]), http_probe(urls[1])] if len(urls) > 1 else [http_probe(urls[0])]
 
 
 
299
 
300
+ status = overall_status(dns_result, http_results)
 
301
 
302
  return {
303
  "input": (target or "").strip(),
304
+ "probe_path": (path or "/").strip(),
305
+ "host": host,
306
+ "dns": dns_result,
307
  "http": http_results,
308
  "status": status,
309
  "note": "Checked from Hugging Face Space network (egress).",
310
  }
311
 
312
+
313
+ def bulk_check(base_domain: str, subdomains_text: str, path: str):
314
  base = (base_domain or "").strip().rstrip(".")
315
  if not base:
316
  return []
317
 
318
  lines = [x.strip() for x in (subdomains_text or "").splitlines() if x.strip()]
319
  targets = []
320
+ for s in lines[:200]:
321
  targets.append(s if "." in s else f"{s}.{base}")
322
 
323
  rows = []
324
  for t in targets:
325
+ r = check_one(t, path)
326
  dns = r.get("dns", {}) if isinstance(r, dict) else {}
327
  http = r.get("http", [{}]) if isinstance(r, dict) else [{}]
328
+ code = http[0].get("status_code", "")
329
  rows.append([
330
  t,
331
  r.get("status") or r.get("error", "error"),
332
+ dns.get("status", ""),
333
  ",".join(dns.get("A", [])),
334
+ str(code),
 
335
  ])
336
  return rows
337
 
338
+
339
  with gr.Blocks(title="HF Domain IP Checker") as demo:
340
  gr.Markdown(
341
+ "## HF Domain/IP + API Accessibility Checker\n"
342
+ "DNS resolve + API reachable check **from this Hugging Face Space**.\n"
343
  "- Subdomains are checked only from your provided list.\n"
344
+ "- Private/reserved IPs blocked (SSRF protection)."
345
  )
346
 
347
  with gr.Tab("Single Check"):
348
+ inp = gr.Textbox(label="Domain / IP / URL", placeholder="example.com OR https://example.com/api OR 1.2.3.4")
349
+ path = gr.Textbox(label="Probe path (optional)", value="/", placeholder="/ OR /health OR /api")
350
  btn = gr.Button("Check")
351
  out = gr.JSON(label="Result")
352
+ btn.click(check_one, inputs=[inp, path], outputs=out)
353
 
354
+ with gr.Tab("Bulk (Your list only)"):
355
  base = gr.Textbox(label="Base domain", placeholder="example.com")
356
  subs = gr.Textbox(label="Subdomains (one per line)", lines=10, placeholder="www\napi\ncdn\nor full: api.example.com")
357
+ path2 = gr.Textbox(label="Probe path for all", value="/", placeholder="/health (recommended for API)")
358
  btn2 = gr.Button("Bulk Check")
359
  table = gr.Dataframe(
360
+ headers=["target", "overall_status", "dns_status", "A_records", "http_code"],
361
  datatype=["str", "str", "str", "str", "str"],
362
  row_count=5,
363
  label="Results",
364
  )
365
+ btn2.click(bulk_check, inputs=[base, subs, path2], outputs=table)
366
 
 
367
  demo.launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False)