import gradio as gr import asyncio import json import time import requests import os import subprocess import sys import shutil from urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) # ========================================================= # Rust binary setup (WSL + Windows + HF safe, unchanged logic) # ========================================================= def ensure_rust_binary(): binary_name = "rust-scanner" # Windows compatibility if os.name == "nt": binary_name += ".exe" bin_path = os.path.join(".", binary_name) if os.path.exists(bin_path): return bin_path print("Scanner engine not found. Initializing build process...") # WSL / Linux / HF safe cargo detection cargo_path = shutil.which("cargo") # Try rustup path fallback (WSL/Linux) if not cargo_path: fallback = os.path.expanduser("~/.cargo/bin/cargo") if os.path.exists(fallback): cargo_path = fallback if not cargo_path: print("Rust toolchain not found. Installing rustup...") os.system("curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y") cargo_path = os.path.expanduser("~/.cargo/bin/cargo") if not os.path.exists(cargo_path): print("Cargo still not available.") return None print("Compiling Rust engine for release...") try: env = os.environ.copy() env["PATH"] = f"{os.path.expanduser('~/.cargo/bin')}:{env.get('PATH', '')}" subprocess.run( [cargo_path, "build", "--release"], check=True, env=env ) source_bin = os.path.join("target", "release", binary_name) if not os.path.exists(source_bin): print("Build succeeded but binary missing.") return None # Cross-platform copy (fix for Windows + HF) shutil.copy(source_bin, bin_path) if os.name != "nt": try: os.chmod(bin_path, 0o755) except: pass print("Engine compiled successfully.") return bin_path except Exception as e: print(f"Compilation failed: {e}") return None # ========================================================= # SAFE RLIMIT (WSL ONLY, prevents Windows crash) # ========================================================= try: if os.name != "nt": import resource resource.setrlimit(resource.RLIMIT_NOFILE, (100000, 100000)) except: pass EXECUTABLE_PATH = ensure_rust_binary() # ========================================================= # Async discovery (UNCHANGED LOGIC) # ========================================================= async def run_discovery(cidr: str): if not EXECUTABLE_PATH: yield [], "Error: Rust engine failed to compile. Check logs." return start_time = time.time() results = [] combos_path = "combos.txt" if not os.path.exists(combos_path): with open(combos_path, "w") as f: f.write("admin:admin\nadmin:12345\nroot:root") cmd = [EXECUTABLE_PATH, cidr, combos_path] try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) except Exception as e: yield results, f"Error launching scanner: {str(e)}" return last_ui_update = 0 while True: line = await process.stdout.readline() if not line: break try: msg = json.loads(line.decode().strip()) if msg["type"] == "Progress": scanned, total = msg["scanned"], msg["total"] if time.time() - last_ui_update > 0.2: progress_val = (scanned / total) * 100 if total > 0 else 0 yield results, f"Scanning: {scanned}/{total} ({progress_val:.1f}%)" last_ui_update = time.time() elif msg["type"] == "Found": results.append([ msg["ip"], msg["url"], msg["auth"], msg["status"], msg["port"] ]) yield results, f"Found: {len(results)} devices..." except: continue await process.wait() yield results, f"Scan complete! {len(results)} found in {time.time() - start_time:.2f}s" # ========================================================= # Snapshot (UNCHANGED) # ========================================================= def get_snapshot(evt: gr.SelectData, table_data): try: row = table_data.iloc[evt.index[0]] url, auth = row[1], row[2] paths = [ "/ISAPI/Streaming/Channels/101/picture", "/cgi-bin/snapshot.cgi", "/jpg/image.jpg", "/onvif/snapshot" ] for p in paths: try: auth_parts = auth.split(":") r = requests.get( f"{url}{p}", auth=(auth_parts[0], auth_parts[1]) if len(auth_parts) == 2 else None, timeout=1.5, verify=False ) if r.status_code == 200 and r.content[:2] == b'\xff\xd8': return r.content except: continue except: pass return None # ========================================================= # UI (UNCHANGED) # ========================================================= with gr.Blocks(title="Rust Scanner") as app: gr.Markdown("# Rust Network Scanner") with gr.Row(): cidr = gr.Textbox(label="Target CIDR", value="192.168.1.0/24", scale=4) scan_btn = gr.Button("Start Scan", variant="primary", scale=1) status_label = gr.Textbox(label="Status", interactive=False) with gr.Row(): with gr.Column(scale=3): table = gr.Dataframe( headers=["IP Address", "URL", "Credentials", "Status", "Port"], interactive=False ) with gr.Column(scale=2): preview = gr.Image(label="Snapshot Preview") scan_btn.click( fn=run_discovery, inputs=[cidr], outputs=[table, status_label] ) table.select( fn=get_snapshot, inputs=[table], outputs=[preview] ) # ========================================================= # Launch (WSL + HF + Windows compatible) # ========================================================= if __name__ == "__main__": port = int(os.getenv("PORT", 7860)) app.queue() app.launch( server_name="0.0.0.0", server_port=port, share=True, show_error=True, prevent_thread_lock=False, inbrowser=False )