import gradio as gr import asyncio import json import time import requests import os import subprocess import shutil from urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) def ensure_rust_binary(): binary_name = "rust-scanner" if os.name == "nt": binary_name += ".exe" bin_path = os.path.join(".", binary_name) if os.path.exists(bin_path): return bin_path print("Scanner engine not found. Building...") cargo_path = shutil.which("cargo") if not cargo_path: fallback = os.path.expanduser("~/.cargo/bin/cargo") if os.path.exists(fallback): cargo_path = fallback if not cargo_path: print("Rust toolchain not found. Installing rustup...") os.system("curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y") cargo_path = os.path.expanduser("~/.cargo/bin/cargo") if not os.path.exists(cargo_path): print("Cargo not available after install.") return None print("Compiling (this takes ~30–60s on first run)...") try: env = os.environ.copy() env["PATH"] = f"{os.path.expanduser('~/.cargo/bin')}:{env.get('PATH', '')}" subprocess.run([cargo_path, "build", "--release"], check=True, env=env) source_bin = os.path.join("target", "release", binary_name) if not os.path.exists(source_bin): print("Build succeeded but binary missing.") return None shutil.copy(source_bin, bin_path) if os.name != "nt": try: os.chmod(bin_path, 0o755) except Exception: pass print("Engine compiled successfully.") return bin_path except Exception as e: print(f"Compilation failed: {e}") return None try: if os.name != "nt": import resource soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) target = min(131072, hard) resource.setrlimit(resource.RLIMIT_NOFILE, (target, hard)) print(f"RLIMIT_NOFILE: {soft} → {target}") except Exception as e: print(f"Could not raise RLIMIT_NOFILE: {e} (non-fatal)") EXECUTABLE_PATH = ensure_rust_binary() # ========================================================= async def run_discovery(cidr: str, tcp_concurrency: int, http_concurrency: int, rate_limit: int): if not EXECUTABLE_PATH: yield [], "Error: Rust engine failed to compile. Check server logs." return start_time = time.time() results = [] combos_path = "combos.txt" if not os.path.exists(combos_path): with open(combos_path, "w") as f: f.write( "admin:admin\nadmin:12345\nadmin:password\n" "root:root\nadmin:\nadmin:1234\nadmin:admin123\n" ) cmd = [ EXECUTABLE_PATH, cidr, combos_path, str(tcp_concurrency), str(http_concurrency), str(rate_limit), ] try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) except Exception as e: yield results, f"Error launching scanner: {e}" return last_ui_update = 0.0 pass1_total = None # set on first Progress message in_pass2 = False pass2_seen = 0 while True: line = await process.stdout.readline() if not line: break try: msg = json.loads(line.decode().strip()) if msg["type"] == "Progress": scanned = msg["scanned"] total = msg["total"] now = time.time() # Detect pass boundary: total resets to a smaller number if pass1_total is None: pass1_total = total elif total < pass1_total and not in_pass2: in_pass2 = True if now - last_ui_update > 0.25: elapsed = now - start_time pct = (scanned / total * 100) if total > 0 else 0 rate = scanned / elapsed if elapsed > 0 else 0 eta = (total - scanned) / rate if rate > 0 else 0 if in_pass2: label = ( f"Pass 2 (retry): {scanned:,}/{total:,} ({pct:.1f}%) | " f"{rate:,.0f} targets/s | {len(results)} found" ) else: label = ( f"Pass 1: {scanned:,}/{total:,} ({pct:.1f}%) | " f"{rate:,.0f} targets/s | ETA {eta:.0f}s" ) yield results, label last_ui_update = now elif msg["type"] == "Found": results.append([ msg["ip"], msg["url"], msg["auth"], msg["status"], str(msg["port"]), ]) yield results, f"Found {len(results)} device(s)…" except Exception: continue await process.wait() elapsed = time.time() - start_time yield results, f"✓ Complete — {len(results)} found in {elapsed:.1f}s" # ========================================================= # Snapshot capture # ========================================================= def get_snapshot(evt: gr.SelectData, table_data): try: row = table_data.iloc[evt.index[0]] url = row[1] auth = row[2] paths = [ "/ISAPI/Streaming/Channels/101/picture", "/cgi-bin/snapshot.cgi", "/jpg/image.jpg", "/onvif/snapshot", "/snapshot.jpg", "/image/jpeg.cgi", "/cgi-bin/mjpg/video.cgi", ] for p in paths: try: parts = auth.split(":") creds = (parts[0], parts[1]) if len(parts) >= 2 else None r = requests.get( f"{url}{p}", auth=creds, timeout=2.0, verify=False ) if r.status_code == 200 and r.content[:2] == b"\xff\xd8": return r.content except Exception: continue except Exception: pass return None # ========================================================= # UI # ========================================================= with gr.Blocks(title="Rust Network Scanner") as app: gr.Markdown("# Rust Network Scanner") with gr.Row(): cidr = gr.Textbox(label="Target CIDR", value="192.168.1.0/24", scale=5) scan_btn = gr.Button("▶ Start Scan", variant="primary", scale=1) with gr.Row(): tcp_concurrency = gr.Slider( label="TCP Concurrency", minimum=100, maximum=10000, step=100, value=3000, info="Phase 1 simultaneous probes. 2000–5000 suits most systems.", ) http_concurrency = gr.Slider( label="HTTP Concurrency", minimum=10, maximum=2000, step=10, value=500, info="Phase 2 HTTP slots. Keep ≤ 500 on typical hardware.", ) rate_limit = gr.Slider( label="Rate Limit (SYNs/sec)", minimum=0, maximum=10000, step=100, value=2000, info="0 = unlimited. Lower if your router drops packets.", ) status_label = gr.Textbox(label="Status", interactive=False) with gr.Row(): with gr.Column(scale=3): table = gr.Dataframe( headers=["IP Address", "URL", "Credentials", "Status", "Port"], interactive=False, ) with gr.Column(scale=2): preview = gr.Image(label="Snapshot Preview") scan_btn.click( fn=run_discovery, inputs=[cidr, tcp_concurrency, http_concurrency, rate_limit], outputs=[table, status_label], ) table.select( fn=get_snapshot, inputs=[table], outputs=[preview], ) app.queue() # ========================================================= # Launch # ========================================================= if __name__ == "__main__": port = int(os.getenv("PORT", 7860)) app.launch( server_name="0.0.0.0", server_port=port, share=True, show_error=True, prevent_thread_lock=False, inbrowser=False, )