understanding commited on
Commit
031309b
·
verified ·
1 Parent(s): 325cc16

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +268 -0
app.py ADDED
@@ -0,0 +1,268 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import time
2
+ import re
3
+ import socket
4
+ import ipaddress
5
+ from urllib.parse import urlparse
6
+
7
+ import httpx
8
+ import dns.resolver
9
+ import gradio as gr
10
+
11
+ # -----------------------
12
+ # Safety: SSRF protection
13
+ # -----------------------
14
+ PRIVATE_NETS = [
15
+ ipaddress.ip_network("0.0.0.0/8"),
16
+ ipaddress.ip_network("10.0.0.0/8"),
17
+ ipaddress.ip_network("127.0.0.0/8"),
18
+ ipaddress.ip_network("169.254.0.0/16"),
19
+ ipaddress.ip_network("172.16.0.0/12"),
20
+ ipaddress.ip_network("192.168.0.0/16"),
21
+ ipaddress.ip_network("224.0.0.0/4"), # multicast
22
+ ipaddress.ip_network("240.0.0.0/4"), # reserved
23
+ ipaddress.ip_network("::1/128"),
24
+ ipaddress.ip_network("fc00::/7"), # unique local
25
+ ipaddress.ip_network("fe80::/10"), # link-local
26
+ ]
27
+
28
+ DOMAIN_RE = re.compile(r"^(?=.{1,253}$)(?!-)[A-Za-z0-9-]{1,63}(?<!-)(\.(?!-)[A-Za-z0-9-]{1,63}(?<!-))*\.?$")
29
+
30
+ def _is_private_ip(ip_str: str) -> bool:
31
+ try:
32
+ ip = ipaddress.ip_address(ip_str)
33
+ return any(ip in net for net in PRIVATE_NETS)
34
+ except Exception:
35
+ return True
36
+
37
+ def _normalize_target(target: str) -> tuple[str, str]:
38
+ """
39
+ Returns (kind, value):
40
+ kind: "domain" | "ip" | "url"
41
+ value: normalized host (domain/ip) or full url
42
+ """
43
+ t = (target or "").strip()
44
+ if not t:
45
+ return ("", "")
46
+
47
+ # If user passed full URL
48
+ if t.startswith("http://") or t.startswith("https://"):
49
+ u = urlparse(t)
50
+ host = u.hostname or ""
51
+ return ("url", t), host
52
+
53
+ # IP?
54
+ try:
55
+ ipaddress.ip_address(t)
56
+ return ("ip", t), t
57
+ except Exception:
58
+ pass
59
+
60
+ # Domain?
61
+ # Allow IDN via idna encoding by socket/dns; here just basic sanity
62
+ d = t.strip().rstrip(".")
63
+ if DOMAIN_RE.match(d):
64
+ return ("domain", d), d
65
+
66
+ return ("unknown", t), t
67
+
68
+ def resolve_dns(host: str):
69
+ """
70
+ Return DNS data: A/AAAA/CNAME if possible.
71
+ """
72
+ out = {"A": [], "AAAA": [], "CNAME": []}
73
+ if not host:
74
+ return out
75
+
76
+ # A/AAAA
77
+ for rtype in ["A", "AAAA"]:
78
+ try:
79
+ start = time.time()
80
+ ans = dns.resolver.resolve(host, rtype, lifetime=3)
81
+ out[rtype] = [r.to_text() for r in ans]
82
+ out[f"{rtype}_ms"] = int((time.time() - start) * 1000)
83
+ except Exception as e:
84
+ out[f"{rtype}_error"] = str(e)
85
+
86
+ # CNAME
87
+ try:
88
+ ans = dns.resolver.resolve(host, "CNAME", lifetime=3)
89
+ out["CNAME"] = [r.target.to_text().rstrip(".") for r in ans]
90
+ except Exception:
91
+ pass
92
+
93
+ return out
94
+
95
+ def check_http(url: str, timeout_s: float = 8.0):
96
+ """
97
+ Try HTTPS then HTTP. Return details.
98
+ """
99
+ results = []
100
+
101
+ def _try(one_url: str):
102
+ info = {"url": one_url}
103
+ try:
104
+ start = time.time()
105
+ with httpx.Client(follow_redirects=True, timeout=timeout_s, headers={"User-Agent": "HF-Connectivity-Checker/1.0"}) as client:
106
+ r = client.head(one_url)
107
+ # Some servers don't like HEAD; fallback to GET small
108
+ if r.status_code in (405, 403) or r.status_code >= 500:
109
+ r = client.get(one_url, headers={"Range": "bytes=0-1024"})
110
+ elapsed = int((time.time() - start) * 1000)
111
+
112
+ info.update({
113
+ "ok": True,
114
+ "status_code": r.status_code,
115
+ "final_url": str(r.url),
116
+ "latency_ms": elapsed,
117
+ "server": r.headers.get("server", ""),
118
+ "via": r.headers.get("via", ""),
119
+ "cf_ray": r.headers.get("cf-ray", ""),
120
+ })
121
+ except httpx.ConnectTimeout:
122
+ info.update({"ok": False, "error": "connect_timeout"})
123
+ except httpx.ReadTimeout:
124
+ info.update({"ok": False, "error": "read_timeout"})
125
+ except httpx.ConnectError as e:
126
+ info.update({"ok": False, "error": f"connect_error: {e}"})
127
+ except httpx.HTTPError as e:
128
+ info.update({"ok": False, "error": f"http_error: {e}"})
129
+ except Exception as e:
130
+ info.update({"ok": False, "error": f"unknown_error: {e}"})
131
+ return info
132
+
133
+ results.append(_try(url))
134
+ return results
135
+
136
+ def classify(dns_info, http_info):
137
+ """
138
+ Simple classification from HF viewpoint.
139
+ """
140
+ # DNS failures
141
+ if dns_info.get("A_error") and dns_info.get("AAAA_error") and not dns_info.get("CNAME"):
142
+ return "DNS_FAIL (HF can't resolve)"
143
+
144
+ # HTTP checks
145
+ ok = [x for x in http_info if x.get("ok")]
146
+ if ok:
147
+ code = ok[0].get("status_code", 0)
148
+ if code in (401, 403):
149
+ return f"REACHABLE but ACCESS_DENIED ({code})"
150
+ if code == 451:
151
+ return "REACHABLE but LEGAL_RESTRICTION (451)"
152
+ return f"REACHABLE ({code})"
153
+
154
+ # Common block-ish signals
155
+ errs = " | ".join(x.get("error", "") for x in http_info if x.get("error"))
156
+ if "timeout" in errs:
157
+ return "NOT_REACHABLE (timeout / possible block)"
158
+ if "tls" in errs.lower():
159
+ return "NOT_REACHABLE (TLS issue)"
160
+ return f"NOT_REACHABLE ({errs or 'unknown'})"
161
+
162
+ def check_one(target: str):
163
+ (kind, _), host = _normalize_target(target)
164
+
165
+ if not target or not host:
166
+ return {"error": "Please enter a domain / IP / URL"}
167
+
168
+ # For URL, pull host and rebuild with scheme attempts
169
+ if kind == "unknown":
170
+ return {"error": "Invalid input. Enter domain, IP, or full URL."}
171
+
172
+ # If host is IP: block private/reserved
173
+ try:
174
+ ipaddress.ip_address(host)
175
+ if _is_private_ip(host):
176
+ return {"error": "Blocked: private/reserved IP not allowed (SSRF protection)."}
177
+ dns_info = {"A": [host], "AAAA": [], "CNAME": []}
178
+ host_for_http = host
179
+ except Exception:
180
+ # Domain: resolve DNS
181
+ dns_info = resolve_dns(host)
182
+ # If resolved IPs contain private -> block (SSRF protection)
183
+ ips = (dns_info.get("A") or []) + (dns_info.get("AAAA") or [])
184
+ for ip in ips:
185
+ if _is_private_ip(ip):
186
+ return {"error": "Blocked: resolves to private/reserved IP (SSRF protection)."}
187
+ host_for_http = host
188
+
189
+ # Build URLs to try
190
+ urls = []
191
+ if kind == "url":
192
+ # Try exactly as given first
193
+ urls.append(target.strip())
194
+ # Also try forcing https/http with host only
195
+ urls.append(f"https://{host_for_http}")
196
+ urls.append(f"http://{host_for_http}")
197
+ else:
198
+ urls = [f"https://{host_for_http}", f"http://{host_for_http}"]
199
+
200
+ http_results = []
201
+ for u in urls[:2]: # keep tight: only 2 tries
202
+ http_results.extend(check_http(u))
203
+
204
+ status = classify(dns_info, http_results)
205
+
206
+ return {
207
+ "input": target.strip(),
208
+ "host": host_for_http,
209
+ "dns": dns_info,
210
+ "http": http_results[:2],
211
+ "status": status,
212
+ "note": "Result is from Hugging Face Space network (egress)."
213
+ }
214
+
215
+ def bulk_check(domain: str, subdomains_text: str):
216
+ base = (domain or "").strip().rstrip(".")
217
+ if not base:
218
+ return []
219
+
220
+ lines = [x.strip() for x in (subdomains_text or "").splitlines() if x.strip()]
221
+ # Allow either full subdomain or just prefix
222
+ targets = []
223
+ for s in lines[:200]: # limit
224
+ if "." in s:
225
+ targets.append(s)
226
+ else:
227
+ targets.append(f"{s}.{base}")
228
+
229
+ rows = []
230
+ for t in targets:
231
+ r = check_one(t)
232
+ rows.append({
233
+ "target": t,
234
+ "status": r.get("status") or r.get("error", "error"),
235
+ "A": ",".join((r.get("dns", {}) or {}).get("A", [])) if isinstance(r, dict) else "",
236
+ "AAAA": ",".join((r.get("dns", {}) or {}).get("AAAA", [])) if isinstance(r, dict) else "",
237
+ "http_code": (r.get("http", [{}])[0].get("status_code") if r.get("http") else ""),
238
+ })
239
+ return rows
240
+
241
+ with gr.Blocks(title="HF Domain/IP Connectivity Checker") as demo:
242
+ gr.Markdown(
243
+ "## HF Domain/IP Connectivity Checker\n"
244
+ "Checks DNS + HTTP reachability **from this Hugging Face Space**.\n"
245
+ "- No brute-force scanning. Subdomains must be **user-provided list**.\n"
246
+ "- Private/reserved IPs blocked for safety."
247
+ )
248
+
249
+ with gr.Tab("Single Check"):
250
+ inp = gr.Textbox(label="Domain / IP / URL", placeholder="example.com OR https://example.com OR 1.2.3.4")
251
+ btn = gr.Button("Check")
252
+ out = gr.JSON(label="Result")
253
+ btn.click(check_one, inputs=inp, outputs=out)
254
+
255
+ with gr.Tab("Subdomain List (Your List Only)"):
256
+ base = gr.Textbox(label="Base domain", placeholder="example.com")
257
+ subs = gr.Textbox(label="Subdomains (one per line)", lines=10, placeholder="www\napi\ncdn\nor full: api.example.com")
258
+ btn2 = gr.Button("Bulk Check")
259
+ table = gr.Dataframe(
260
+ headers=["target", "status", "A", "AAAA", "http_code"],
261
+ datatype=["str", "str", "str", "str", "str"],
262
+ row_count=5,
263
+ col_count=(5, "fixed"),
264
+ label="Results"
265
+ )
266
+ btn2.click(bulk_check, inputs=[base, subs], outputs=table)
267
+
268
+ demo.launch()