understanding commited on
Commit
6d0fb50
·
verified ·
1 Parent(s): 9292da9

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +51 -35
app.py CHANGED
@@ -1,6 +1,5 @@
1
  import time
2
  import re
3
- import socket
4
  import ipaddress
5
  from urllib.parse import urlparse
6
 
@@ -8,6 +7,9 @@ import httpx
8
  import dns.resolver
9
  import gradio as gr
10
 
 
 
 
11
  PRIVATE_NETS = [
12
  ipaddress.ip_network("0.0.0.0/8"),
13
  ipaddress.ip_network("10.0.0.0/8"),
@@ -15,44 +17,53 @@ PRIVATE_NETS = [
15
  ipaddress.ip_network("169.254.0.0/16"),
16
  ipaddress.ip_network("172.16.0.0/12"),
17
  ipaddress.ip_network("192.168.0.0/16"),
18
- ipaddress.ip_network("224.0.0.0/4"),
19
- ipaddress.ip_network("240.0.0.0/4"),
20
  ipaddress.ip_network("::1/128"),
21
- ipaddress.ip_network("fc00::/7"),
22
- ipaddress.ip_network("fe80::/10"),
23
  ]
24
 
25
- DOMAIN_RE = re.compile(r"^(?=.{1,253}$)(?!-)[A-Za-z0-9-]{1,63}(?<!-)(\.(?!-)[A-Za-z0-9-]{1,63}(?<!-))*$")
 
 
 
26
 
27
- # Reuse one client (less event-loop weirdness + faster)
28
  CLIENT = httpx.Client(
29
  follow_redirects=True,
30
  timeout=8.0,
31
  headers={"User-Agent": "HF-Connectivity-Checker/1.0"},
32
  )
33
 
34
- def _is_private_ip(ip_str: str) -> bool:
35
  try:
36
  ip = ipaddress.ip_address(ip_str)
37
  return any(ip in net for net in PRIVATE_NETS)
38
  except Exception:
39
  return True
40
 
41
- def _parse_target(target: str):
 
 
 
 
42
  t = (target or "").strip()
43
  if not t:
44
- return ("", "", "")
45
 
46
  if t.startswith("http://") or t.startswith("https://"):
47
  u = urlparse(t)
48
  return ("url", t, u.hostname or "")
49
 
 
50
  try:
51
  ipaddress.ip_address(t)
52
  return ("ip", t, t)
53
  except Exception:
54
  pass
55
 
 
56
  d = t.rstrip(".")
57
  if DOMAIN_RE.match(d):
58
  return ("domain", d, d)
@@ -61,6 +72,7 @@ def _parse_target(target: str):
61
 
62
  def resolve_dns(host: str):
63
  out = {"A": [], "AAAA": [], "CNAME": []}
 
64
  if not host:
65
  return out
66
 
@@ -86,9 +98,11 @@ def try_http(url: str):
86
  try:
87
  start = time.time()
88
  r = CLIENT.head(url)
 
89
  if r.status_code in (405, 403) or r.status_code >= 500:
90
  r = CLIENT.get(url, headers={"Range": "bytes=0-1024"})
91
  elapsed = int((time.time() - start) * 1000)
 
92
  info.update({
93
  "ok": True,
94
  "status_code": r.status_code,
@@ -111,9 +125,11 @@ def try_http(url: str):
111
  return info
112
 
113
  def classify(dns_info, http_info):
 
114
  if dns_info.get("A_error") and dns_info.get("AAAA_error") and not dns_info.get("CNAME"):
115
  return "DNS_FAIL (HF can't resolve)"
116
 
 
117
  ok = [x for x in http_info if x.get("ok")]
118
  if ok:
119
  code = ok[0].get("status_code", 0)
@@ -131,54 +147,55 @@ def classify(dns_info, http_info):
131
  return f"NOT_REACHABLE ({errs or 'unknown'})"
132
 
133
  def check_one(target: str):
134
- kind, raw, host = _parse_target(target)
135
 
136
- if kind == "":
137
  return {"error": "Enter a domain / IP / URL"}
138
  if kind == "unknown" or not host:
139
  return {"error": "Invalid input"}
140
 
141
- # IP safety
142
  try:
143
  ipaddress.ip_address(host)
144
- if _is_private_ip(host):
145
- return {"error": "Blocked: private/reserved IP not allowed."}
146
  dns_info = {"A": [host], "AAAA": [], "CNAME": []}
147
  host_for_http = host
148
  except Exception:
149
  dns_info = resolve_dns(host)
150
  ips = (dns_info.get("A") or []) + (dns_info.get("AAAA") or [])
151
  for ip in ips:
152
- if _is_private_ip(ip):
153
  return {"error": "Blocked: resolves to private/reserved IP (SSRF protection)."}
154
  host_for_http = host
155
 
 
156
  urls = []
157
  if kind == "url":
158
  urls.append(raw)
159
  urls.append(f"https://{host_for_http}")
160
  urls.append(f"http://{host_for_http}")
161
 
162
- http_results = [try_http(urls[0]), try_http(urls[1])] # only two tries
163
  status = classify(dns_info, http_results)
164
 
165
  return {
166
- "input": target.strip(),
167
  "host": host_for_http,
168
  "dns": dns_info,
169
  "http": http_results,
170
  "status": status,
171
- "note": "Checked from Hugging Face Space network."
172
  }
173
 
174
- def bulk_check(domain: str, subdomains_text: str):
175
- base = (domain or "").strip().rstrip(".")
176
  if not base:
177
  return []
178
 
179
  lines = [x.strip() for x in (subdomains_text or "").splitlines() if x.strip()]
180
  targets = []
181
- for s in lines[:200]:
182
  targets.append(s if "." in s else f"{s}.{base}")
183
 
184
  rows = []
@@ -186,21 +203,21 @@ def bulk_check(domain: str, subdomains_text: str):
186
  r = check_one(t)
187
  dns = r.get("dns", {}) if isinstance(r, dict) else {}
188
  http = r.get("http", [{}]) if isinstance(r, dict) else [{}]
189
- rows.append({
190
- "target": t,
191
- "status": r.get("status") or r.get("error", "error"),
192
- "A": ",".join(dns.get("A", [])),
193
- "AAAA": ",".join(dns.get("AAAA", [])),
194
- "http_code": http[0].get("status_code", ""),
195
- })
196
  return rows
197
 
198
- with gr.Blocks(title="HF Domain/IP Connectivity Checker") as demo:
199
  gr.Markdown(
200
  "## HF Domain/IP Connectivity Checker\n"
201
- "Checks DNS + HTTP reachability **from this Hugging Face Space**.\n"
202
- "- Subdomains must be **your provided list** (no brute-force scanning).\n"
203
- "- Private/reserved IPs blocked (SSRF protection)."
204
  )
205
 
206
  with gr.Tab("Single Check"):
@@ -217,10 +234,9 @@ with gr.Blocks(title="HF Domain/IP Connectivity Checker") as demo:
217
  headers=["target", "status", "A", "AAAA", "http_code"],
218
  datatype=["str", "str", "str", "str", "str"],
219
  row_count=5,
220
- column_count=(5, "fixed"), # ✅ new param
221
  label="Results",
222
  )
223
  btn2.click(bulk_check, inputs=[base, subs], outputs=table)
224
 
225
- # SSR off to avoid Python 3.13 loop cleanup errors
226
  demo.launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False)
 
1
  import time
2
  import re
 
3
  import ipaddress
4
  from urllib.parse import urlparse
5
 
 
7
  import dns.resolver
8
  import gradio as gr
9
 
10
+ # -----------------------
11
+ # Safety: SSRF protection
12
+ # -----------------------
13
  PRIVATE_NETS = [
14
  ipaddress.ip_network("0.0.0.0/8"),
15
  ipaddress.ip_network("10.0.0.0/8"),
 
17
  ipaddress.ip_network("169.254.0.0/16"),
18
  ipaddress.ip_network("172.16.0.0/12"),
19
  ipaddress.ip_network("192.168.0.0/16"),
20
+ ipaddress.ip_network("224.0.0.0/4"), # multicast
21
+ ipaddress.ip_network("240.0.0.0/4"), # reserved
22
  ipaddress.ip_network("::1/128"),
23
+ ipaddress.ip_network("fc00::/7"), # unique local
24
+ ipaddress.ip_network("fe80::/10"), # link-local
25
  ]
26
 
27
+ # Basic domain sanity (not perfect IDN validation, but enough for UI)
28
+ DOMAIN_RE = re.compile(
29
+ r"^(?=.{1,253}$)(?!-)[A-Za-z0-9-]{1,63}(?<!-)(\.(?!-)[A-Za-z0-9-]{1,63}(?<!-))*$"
30
+ )
31
 
32
+ # Reuse one client (faster + avoids repeated loop churn)
33
  CLIENT = httpx.Client(
34
  follow_redirects=True,
35
  timeout=8.0,
36
  headers={"User-Agent": "HF-Connectivity-Checker/1.0"},
37
  )
38
 
39
+ def is_private_ip(ip_str: str) -> bool:
40
  try:
41
  ip = ipaddress.ip_address(ip_str)
42
  return any(ip in net for net in PRIVATE_NETS)
43
  except Exception:
44
  return True
45
 
46
+ def parse_target(target: str):
47
+ """
48
+ Returns: (kind, raw, host)
49
+ kind: url | domain | ip | unknown | empty
50
+ """
51
  t = (target or "").strip()
52
  if not t:
53
+ return ("empty", "", "")
54
 
55
  if t.startswith("http://") or t.startswith("https://"):
56
  u = urlparse(t)
57
  return ("url", t, u.hostname or "")
58
 
59
+ # IP?
60
  try:
61
  ipaddress.ip_address(t)
62
  return ("ip", t, t)
63
  except Exception:
64
  pass
65
 
66
+ # Domain?
67
  d = t.rstrip(".")
68
  if DOMAIN_RE.match(d):
69
  return ("domain", d, d)
 
72
 
73
  def resolve_dns(host: str):
74
  out = {"A": [], "AAAA": [], "CNAME": []}
75
+
76
  if not host:
77
  return out
78
 
 
98
  try:
99
  start = time.time()
100
  r = CLIENT.head(url)
101
+ # Some sites block HEAD; fallback to tiny GET
102
  if r.status_code in (405, 403) or r.status_code >= 500:
103
  r = CLIENT.get(url, headers={"Range": "bytes=0-1024"})
104
  elapsed = int((time.time() - start) * 1000)
105
+
106
  info.update({
107
  "ok": True,
108
  "status_code": r.status_code,
 
125
  return info
126
 
127
  def classify(dns_info, http_info):
128
+ # DNS fail
129
  if dns_info.get("A_error") and dns_info.get("AAAA_error") and not dns_info.get("CNAME"):
130
  return "DNS_FAIL (HF can't resolve)"
131
 
132
+ # HTTP ok?
133
  ok = [x for x in http_info if x.get("ok")]
134
  if ok:
135
  code = ok[0].get("status_code", 0)
 
147
  return f"NOT_REACHABLE ({errs or 'unknown'})"
148
 
149
  def check_one(target: str):
150
+ kind, raw, host = parse_target(target)
151
 
152
+ if kind == "empty":
153
  return {"error": "Enter a domain / IP / URL"}
154
  if kind == "unknown" or not host:
155
  return {"error": "Invalid input"}
156
 
157
+ # IP checks
158
  try:
159
  ipaddress.ip_address(host)
160
+ if is_private_ip(host):
161
+ return {"error": "Blocked: private/reserved IP not allowed (SSRF protection)."}
162
  dns_info = {"A": [host], "AAAA": [], "CNAME": []}
163
  host_for_http = host
164
  except Exception:
165
  dns_info = resolve_dns(host)
166
  ips = (dns_info.get("A") or []) + (dns_info.get("AAAA") or [])
167
  for ip in ips:
168
+ if is_private_ip(ip):
169
  return {"error": "Blocked: resolves to private/reserved IP (SSRF protection)."}
170
  host_for_http = host
171
 
172
+ # Only 2 tries to keep it safe & fast
173
  urls = []
174
  if kind == "url":
175
  urls.append(raw)
176
  urls.append(f"https://{host_for_http}")
177
  urls.append(f"http://{host_for_http}")
178
 
179
+ http_results = [try_http(urls[0]), try_http(urls[1])]
180
  status = classify(dns_info, http_results)
181
 
182
  return {
183
+ "input": (target or "").strip(),
184
  "host": host_for_http,
185
  "dns": dns_info,
186
  "http": http_results,
187
  "status": status,
188
+ "note": "Checked from Hugging Face Space network (egress).",
189
  }
190
 
191
+ def bulk_check(base_domain: str, subdomains_text: str):
192
+ base = (base_domain or "").strip().rstrip(".")
193
  if not base:
194
  return []
195
 
196
  lines = [x.strip() for x in (subdomains_text or "").splitlines() if x.strip()]
197
  targets = []
198
+ for s in lines[:200]: # limit
199
  targets.append(s if "." in s else f"{s}.{base}")
200
 
201
  rows = []
 
203
  r = check_one(t)
204
  dns = r.get("dns", {}) if isinstance(r, dict) else {}
205
  http = r.get("http", [{}]) if isinstance(r, dict) else [{}]
206
+ rows.append([
207
+ t,
208
+ r.get("status") or r.get("error", "error"),
209
+ ",".join(dns.get("A", [])),
210
+ ",".join(dns.get("AAAA", [])),
211
+ str(http[0].get("status_code", "")),
212
+ ])
213
  return rows
214
 
215
+ with gr.Blocks(title="HF Domain IP Checker") as demo:
216
  gr.Markdown(
217
  "## HF Domain/IP Connectivity Checker\n"
218
+ "DNS + HTTP reachability **from this Hugging Face Space**.\n"
219
+ "- Subdomains are checked only from your provided list.\n"
220
+ "- Private/reserved IPs are blocked (SSRF protection)."
221
  )
222
 
223
  with gr.Tab("Single Check"):
 
234
  headers=["target", "status", "A", "AAAA", "http_code"],
235
  datatype=["str", "str", "str", "str", "str"],
236
  row_count=5,
 
237
  label="Results",
238
  )
239
  btn2.click(bulk_check, inputs=[base, subs], outputs=table)
240
 
241
+ # SSR off for stability
242
  demo.launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False)