| import gradio as gr |
| import asyncio |
| import json |
| import time |
| import requests |
| import os |
| import subprocess |
| import shutil |
| from urllib3.exceptions import InsecureRequestWarning |
|
|
| requests.packages.urllib3.disable_warnings(InsecureRequestWarning) |
|
|
| def ensure_rust_binary(): |
| binary_name = "rust-scanner" |
| if os.name == "nt": |
| binary_name += ".exe" |
|
|
| bin_path = os.path.join(".", binary_name) |
| if os.path.exists(bin_path): |
| return bin_path |
|
|
| print("Scanner engine not found. Building...") |
|
|
| cargo_path = shutil.which("cargo") |
| if not cargo_path: |
| fallback = os.path.expanduser("~/.cargo/bin/cargo") |
| if os.path.exists(fallback): |
| cargo_path = fallback |
|
|
| if not cargo_path: |
| print("Rust toolchain not found. Installing rustup...") |
| os.system("curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y") |
| cargo_path = os.path.expanduser("~/.cargo/bin/cargo") |
|
|
| if not os.path.exists(cargo_path): |
| print("Cargo not available after install.") |
| return None |
|
|
| print("Compiling (this takes ~30–60s on first run)...") |
| try: |
| env = os.environ.copy() |
| env["PATH"] = f"{os.path.expanduser('~/.cargo/bin')}:{env.get('PATH', '')}" |
| subprocess.run([cargo_path, "build", "--release"], check=True, env=env) |
|
|
| source_bin = os.path.join("target", "release", binary_name) |
| if not os.path.exists(source_bin): |
| print("Build succeeded but binary missing.") |
| return None |
|
|
| shutil.copy(source_bin, bin_path) |
| if os.name != "nt": |
| try: |
| os.chmod(bin_path, 0o755) |
| except Exception: |
| pass |
|
|
| print("Engine compiled successfully.") |
| return bin_path |
| except Exception as e: |
| print(f"Compilation failed: {e}") |
| return None |
|
|
| try: |
| if os.name != "nt": |
| import resource |
| soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) |
| target = min(131072, hard) |
| resource.setrlimit(resource.RLIMIT_NOFILE, (target, hard)) |
| print(f"RLIMIT_NOFILE: {soft} → {target}") |
| except Exception as e: |
| print(f"Could not raise RLIMIT_NOFILE: {e} (non-fatal)") |
|
|
|
|
| EXECUTABLE_PATH = ensure_rust_binary() |
|
|
| |
| async def run_discovery(cidr: str, tcp_concurrency: int, http_concurrency: int, rate_limit: int): |
| if not EXECUTABLE_PATH: |
| yield [], "Error: Rust engine failed to compile. Check server logs." |
| return |
|
|
| start_time = time.time() |
| results = [] |
|
|
| combos_path = "combos.txt" |
| if not os.path.exists(combos_path): |
| with open(combos_path, "w") as f: |
| f.write( |
| "admin:admin\nadmin:12345\nadmin:password\n" |
| "root:root\nadmin:\nadmin:1234\nadmin:admin123\n" |
| ) |
|
|
| cmd = [ |
| EXECUTABLE_PATH, |
| cidr, |
| combos_path, |
| str(tcp_concurrency), |
| str(http_concurrency), |
| str(rate_limit), |
| ] |
|
|
| try: |
| process = await asyncio.create_subprocess_exec( |
| *cmd, |
| stdout=asyncio.subprocess.PIPE, |
| stderr=asyncio.subprocess.PIPE, |
| ) |
| except Exception as e: |
| yield results, f"Error launching scanner: {e}" |
| return |
|
|
| last_ui_update = 0.0 |
| pass1_total = None |
| in_pass2 = False |
| pass2_seen = 0 |
|
|
| while True: |
| line = await process.stdout.readline() |
| if not line: |
| break |
|
|
| try: |
| msg = json.loads(line.decode().strip()) |
|
|
| if msg["type"] == "Progress": |
| scanned = msg["scanned"] |
| total = msg["total"] |
| now = time.time() |
|
|
| |
| if pass1_total is None: |
| pass1_total = total |
| elif total < pass1_total and not in_pass2: |
| in_pass2 = True |
|
|
| if now - last_ui_update > 0.25: |
| elapsed = now - start_time |
| pct = (scanned / total * 100) if total > 0 else 0 |
| rate = scanned / elapsed if elapsed > 0 else 0 |
| eta = (total - scanned) / rate if rate > 0 else 0 |
|
|
| if in_pass2: |
| label = ( |
| f"Pass 2 (retry): {scanned:,}/{total:,} ({pct:.1f}%) | " |
| f"{rate:,.0f} targets/s | {len(results)} found" |
| ) |
| else: |
| label = ( |
| f"Pass 1: {scanned:,}/{total:,} ({pct:.1f}%) | " |
| f"{rate:,.0f} targets/s | ETA {eta:.0f}s" |
| ) |
| yield results, label |
| last_ui_update = now |
|
|
| elif msg["type"] == "Found": |
| results.append([ |
| msg["ip"], |
| msg["url"], |
| msg["auth"], |
| msg["status"], |
| str(msg["port"]), |
| ]) |
| yield results, f"Found {len(results)} device(s)…" |
|
|
| except Exception: |
| continue |
|
|
| await process.wait() |
| elapsed = time.time() - start_time |
| yield results, f"✓ Complete — {len(results)} found in {elapsed:.1f}s" |
|
|
|
|
| |
| |
| |
| def get_snapshot(evt: gr.SelectData, table_data): |
| try: |
| row = table_data.iloc[evt.index[0]] |
| url = row[1] |
| auth = row[2] |
|
|
| paths = [ |
| "/ISAPI/Streaming/Channels/101/picture", |
| "/cgi-bin/snapshot.cgi", |
| "/jpg/image.jpg", |
| "/onvif/snapshot", |
| "/snapshot.jpg", |
| "/image/jpeg.cgi", |
| "/cgi-bin/mjpg/video.cgi", |
| ] |
|
|
| for p in paths: |
| try: |
| parts = auth.split(":") |
| creds = (parts[0], parts[1]) if len(parts) >= 2 else None |
| r = requests.get( |
| f"{url}{p}", auth=creds, timeout=2.0, verify=False |
| ) |
| if r.status_code == 200 and r.content[:2] == b"\xff\xd8": |
| return r.content |
| except Exception: |
| continue |
| except Exception: |
| pass |
| return None |
|
|
|
|
| |
| |
| |
| with gr.Blocks(title="Rust Network Scanner") as app: |
| gr.Markdown("# Rust Network Scanner") |
|
|
| with gr.Row(): |
| cidr = gr.Textbox(label="Target CIDR", value="192.168.1.0/24", scale=5) |
| scan_btn = gr.Button("▶ Start Scan", variant="primary", scale=1) |
|
|
| with gr.Row(): |
| tcp_concurrency = gr.Slider( |
| label="TCP Concurrency", |
| minimum=100, maximum=10000, step=100, value=3000, |
| info="Phase 1 simultaneous probes. 2000–5000 suits most systems.", |
| ) |
| http_concurrency = gr.Slider( |
| label="HTTP Concurrency", |
| minimum=10, maximum=2000, step=10, value=500, |
| info="Phase 2 HTTP slots. Keep ≤ 500 on typical hardware.", |
| ) |
| rate_limit = gr.Slider( |
| label="Rate Limit (SYNs/sec)", |
| minimum=0, maximum=10000, step=100, value=2000, |
| info="0 = unlimited. Lower if your router drops packets.", |
| ) |
|
|
| status_label = gr.Textbox(label="Status", interactive=False) |
|
|
| with gr.Row(): |
| with gr.Column(scale=3): |
| table = gr.Dataframe( |
| headers=["IP Address", "URL", "Credentials", "Status", "Port"], |
| interactive=False, |
| ) |
| with gr.Column(scale=2): |
| preview = gr.Image(label="Snapshot Preview") |
|
|
| scan_btn.click( |
| fn=run_discovery, |
| inputs=[cidr, tcp_concurrency, http_concurrency, rate_limit], |
| outputs=[table, status_label], |
| ) |
|
|
| table.select( |
| fn=get_snapshot, |
| inputs=[table], |
| outputs=[preview], |
| ) |
|
|
| app.queue() |
|
|
|
|
| |
| |
| |
| if __name__ == "__main__": |
| port = int(os.getenv("PORT", 7860)) |
| app.launch( |
| server_name="0.0.0.0", |
| server_port=port, |
| share=True, |
| show_error=True, |
| prevent_thread_lock=False, |
| inbrowser=False, |
| ) |