Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """CLI β API request sniffer. | |
| Captures all Flow UI API requests for endpoint discovery. | |
| Usage: | |
| python -m cli.sniff | |
| python -m cli.sniff --save sniffed.json | |
| """ | |
| import asyncio | |
| import argparse | |
| import json | |
| import logging | |
| import os | |
| import sys | |
| from http.server import HTTPServer, BaseHTTPRequestHandler | |
| import threading | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| for _pkg in ["websockets"]: | |
| try: | |
| __import__(_pkg) | |
| except ImportError: | |
| os.system(f"{sys.executable} -m pip install {_pkg} -q") | |
| import websockets | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%H:%M:%S") | |
| log = logging.getLogger("cli.sniff") | |
| all_requests = [] | |
| IGNORE = {"batchLog", "frontendEvents", "fetchUserRecommendations", "flowAgent/applets", | |
| "savedSharedApplets", "models/statuses"} | |
| def make_handler(save_file): | |
| class Handler(BaseHTTPRequestHandler): | |
| def do_POST(self): | |
| length = int(self.headers.get("Content-Length", 0)) | |
| body = json.loads(self.rfile.read(length)) if length else {} | |
| if body.get("type") == "sniffed_video_request": | |
| url = body.get("url", "") | |
| if not any(n in url for n in IGNORE): | |
| method = body.get("method", "?") | |
| payload = body.get("payload", "") | |
| log.info("β" * 60) | |
| log.info("π %s %s", method, url.split("?")[0]) | |
| if payload and payload != "(empty)": | |
| try: | |
| parsed = json.loads(payload) | |
| log.info(" %s", json.dumps(parsed, indent=2)[:2000]) | |
| except (json.JSONDecodeError, TypeError): | |
| log.info(" %s", str(payload)[:1000]) | |
| entry = { | |
| "url": url, | |
| "method": method, | |
| "payload": payload, | |
| "timestamp": body.get("timestamp"), | |
| } | |
| all_requests.append(entry) | |
| if save_file: | |
| with open(save_file, "w") as f: | |
| json.dump(all_requests, f, indent=2) | |
| self.send_response(200) | |
| self.send_header("Content-Type", "application/json") | |
| self.send_header("Access-Control-Allow-Origin", "*") | |
| self.end_headers() | |
| self.wfile.write(b'{"ok":true}') | |
| def do_OPTIONS(self): | |
| self.send_response(200) | |
| self.send_header("Access-Control-Allow-Origin", "*") | |
| self.send_header("Access-Control-Allow-Methods", "POST") | |
| self.send_header("Access-Control-Allow-Headers", "Content-Type") | |
| self.end_headers() | |
| def log_message(self, *a): | |
| pass | |
| return Handler | |
| async def on_connect(ws): | |
| log.info("β Extension connected!") | |
| async for raw in ws: | |
| data = json.loads(raw) | |
| if data.get("type") == "token_captured": | |
| log.info("π Token captured") | |
| async def run(args): | |
| Handler = make_handler(args.save) | |
| srv = HTTPServer(("127.0.0.1", args.port), Handler) | |
| threading.Thread(target=srv.serve_forever, daemon=True).start() | |
| log.info("β‘ WS server on ws://127.0.0.1:%d", args.ws_port) | |
| log.info("β‘ HTTP callback on http://127.0.0.1:%d", args.port) | |
| if args.save: | |
| log.info("πΎ Saving to: %s", args.save) | |
| log.info("π Open Flow UI and perform any action...") | |
| log.info("β" * 60) | |
| async with websockets.serve(on_connect, "127.0.0.1", args.ws_port): | |
| await asyncio.Future() | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Flow API Sniffer") | |
| parser.add_argument("--save", "-s", help="Save to JSON file") | |
| parser.add_argument("--port", type=int, default=8100) | |
| parser.add_argument("--ws-port", type=int, default=9222) | |
| args = parser.parse_args() | |
| asyncio.run(run(args)) | |
| if __name__ == "__main__": | |
| main() | |