recon-test / app.py
dia2diab's picture
v3 network discovery
6d05684
import os,subprocess,json,socket,gradio as gr
def recon():
r={}
# 1. DNS-based K8s service discovery
r["dns_srv"] = {}
svc_queries = [
"*.default.svc.cluster.local",
"*.kube-system.svc.cluster.local",
"kubernetes.default.svc.cluster.local",
"kube-dns.kube-system.svc.cluster.local",
]
for q in svc_queries:
try:
addrs = socket.getaddrinfo(q, None)
r["dns_srv"][q] = [a[4][0] for a in addrs][:5]
except Exception as e:
r["dns_srv"][q] = str(e)
# 2. Try nslookup/dig for SRV records
for query in ["_https._tcp.kubernetes.default.svc.cluster.local", "any kubernetes.default.svc.cluster.local"]:
try:
p = subprocess.run(["nslookup", query.split()[-1], "10.108.0.2"], capture_output=True, text=True, timeout=5)
r[f"nslookup_{query.split()[-1]}"] = p.stdout[:300]
except Exception as e:
r[f"nslookup_{query.split()[-1]}"] = str(e)
# 3. Try to reach pods on same subnet (10.108.28.x)
r["subnet_scan"] = {}
my_ip = "10.108.28.16"
base = ".".join(my_ip.split(".")[:3]) + "."
for i in [1, 2, 5, 10, 15, 17, 20, 50, 100]:
target = f"{base}{i}"
if target == my_ip: continue
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(1)
rc = s.connect_ex((target, 7860))
r["subnet_scan"][f"{target}:7860"] = "OPEN" if rc == 0 else f"CLOSED rc={rc}"
s.close()
except Exception as e:
r["subnet_scan"][f"{target}:7860"] = str(e)
# 4. Check if we can reach other HF internal services via DNS
r["internal_dns"] = {}
for host in [
"hub.internal", "api.internal", "datasets-server.internal",
"mongo.internal", "redis.internal", "postgres.internal",
"elasticsearch.internal", "hub", "api", "datasets-server",
"moon", "moon.internal", "hub.hf.space", "api.hf.space"
]:
try: r["internal_dns"][host] = socket.gethostbyname(host)
except: pass # skip non-resolving
# 5. Check egress - can we reach the internet?
try:
p = subprocess.run(["curl","-s","-m","3","https://httpbin.org/ip"], capture_output=True, text=True, timeout=5)
r["egress_ip"] = p.stdout[:200]
except Exception as e: r["egress_ip"] = str(e)
# 6. Check /proc/self/mountinfo for volume mounts
try:
with open("/proc/self/mountinfo") as f:
r["mounts"] = f.read()[:1000]
except Exception as e: r["mounts"] = str(e)
# 7. Outbound port test - which ports are allowed?
r["egress_ports"] = {}
for port in [80, 443, 8080, 8443, 3306, 5432, 6379, 27017, 9200]:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(2)
rc = s.connect_ex(("httpbin.org", port))
r["egress_ports"][str(port)] = "OPEN" if rc == 0 else f"CLOSED rc={rc}"
s.close()
except Exception as e:
r["egress_ports"][str(port)] = str(e)
return json.dumps(r, indent=2)
gr.Interface(fn=recon, inputs=None, outputs="text", title="Recon v3").launch()