| import gradio as gr |
| import asyncio |
| import json |
| import time |
| import requests |
| import os |
| import subprocess |
| import sys |
| import shutil |
| from urllib3.exceptions import InsecureRequestWarning |
|
|
| requests.packages.urllib3.disable_warnings(InsecureRequestWarning) |
|
|
| |
| |
| |
| def ensure_rust_binary(): |
| binary_name = "rust-scanner" |
|
|
| |
| if os.name == "nt": |
| binary_name += ".exe" |
|
|
| bin_path = os.path.join(".", binary_name) |
|
|
| if os.path.exists(bin_path): |
| return bin_path |
|
|
| print("Scanner engine not found. Initializing build process...") |
|
|
| |
| cargo_path = shutil.which("cargo") |
|
|
| |
| if not cargo_path: |
| fallback = os.path.expanduser("~/.cargo/bin/cargo") |
| if os.path.exists(fallback): |
| cargo_path = fallback |
|
|
| if not cargo_path: |
| print("Rust toolchain not found. Installing rustup...") |
| os.system("curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y") |
| cargo_path = os.path.expanduser("~/.cargo/bin/cargo") |
|
|
| if not os.path.exists(cargo_path): |
| print("Cargo still not available.") |
| return None |
|
|
| print("Compiling Rust engine for release...") |
|
|
| try: |
| env = os.environ.copy() |
| env["PATH"] = f"{os.path.expanduser('~/.cargo/bin')}:{env.get('PATH', '')}" |
|
|
| subprocess.run( |
| [cargo_path, "build", "--release"], |
| check=True, |
| env=env |
| ) |
|
|
| source_bin = os.path.join("target", "release", binary_name) |
|
|
| if not os.path.exists(source_bin): |
| print("Build succeeded but binary missing.") |
| return None |
|
|
| |
| shutil.copy(source_bin, bin_path) |
|
|
| if os.name != "nt": |
| try: |
| os.chmod(bin_path, 0o755) |
| except: |
| pass |
|
|
| print("Engine compiled successfully.") |
| return bin_path |
|
|
| except Exception as e: |
| print(f"Compilation failed: {e}") |
| return None |
|
|
|
|
| |
| |
| |
| try: |
| if os.name != "nt": |
| import resource |
| resource.setrlimit(resource.RLIMIT_NOFILE, (100000, 100000)) |
| except: |
| pass |
|
|
|
|
| EXECUTABLE_PATH = ensure_rust_binary() |
|
|
|
|
| |
| |
| |
| async def run_discovery(cidr: str): |
| if not EXECUTABLE_PATH: |
| yield [], "Error: Rust engine failed to compile. Check logs." |
| return |
|
|
| start_time = time.time() |
| results = [] |
|
|
| combos_path = "combos.txt" |
| if not os.path.exists(combos_path): |
| with open(combos_path, "w") as f: |
| f.write("admin:admin\nadmin:12345\nroot:root") |
|
|
| cmd = [EXECUTABLE_PATH, cidr, combos_path] |
|
|
| try: |
| process = await asyncio.create_subprocess_exec( |
| *cmd, |
| stdout=asyncio.subprocess.PIPE, |
| stderr=asyncio.subprocess.PIPE |
| ) |
| except Exception as e: |
| yield results, f"Error launching scanner: {str(e)}" |
| return |
|
|
| last_ui_update = 0 |
|
|
| while True: |
| line = await process.stdout.readline() |
| if not line: |
| break |
|
|
| try: |
| msg = json.loads(line.decode().strip()) |
|
|
| if msg["type"] == "Progress": |
| scanned, total = msg["scanned"], msg["total"] |
|
|
| if time.time() - last_ui_update > 0.2: |
| progress_val = (scanned / total) * 100 if total > 0 else 0 |
| yield results, f"Scanning: {scanned}/{total} ({progress_val:.1f}%)" |
| last_ui_update = time.time() |
|
|
| elif msg["type"] == "Found": |
| results.append([ |
| msg["ip"], |
| msg["url"], |
| msg["auth"], |
| msg["status"], |
| msg["port"] |
| ]) |
| yield results, f"Found: {len(results)} devices..." |
|
|
| except: |
| continue |
|
|
| await process.wait() |
| yield results, f"Scan complete! {len(results)} found in {time.time() - start_time:.2f}s" |
|
|
|
|
| |
| |
| |
| def get_snapshot(evt: gr.SelectData, table_data): |
| try: |
| row = table_data.iloc[evt.index[0]] |
| url, auth = row[1], row[2] |
|
|
| paths = [ |
| "/ISAPI/Streaming/Channels/101/picture", |
| "/cgi-bin/snapshot.cgi", |
| "/jpg/image.jpg", |
| "/onvif/snapshot" |
| ] |
|
|
| for p in paths: |
| try: |
| auth_parts = auth.split(":") |
|
|
| r = requests.get( |
| f"{url}{p}", |
| auth=(auth_parts[0], auth_parts[1]) if len(auth_parts) == 2 else None, |
| timeout=1.5, |
| verify=False |
| ) |
|
|
| if r.status_code == 200 and r.content[:2] == b'\xff\xd8': |
| return r.content |
|
|
| except: |
| continue |
|
|
| except: |
| pass |
|
|
| return None |
|
|
|
|
| |
| |
| |
| with gr.Blocks(title="Rust Scanner") as app: |
| gr.Markdown("# Rust Network Scanner") |
|
|
| with gr.Row(): |
| cidr = gr.Textbox(label="Target CIDR", value="192.168.1.0/24", scale=4) |
| scan_btn = gr.Button("Start Scan", variant="primary", scale=1) |
|
|
| status_label = gr.Textbox(label="Status", interactive=False) |
|
|
| with gr.Row(): |
| with gr.Column(scale=3): |
| table = gr.Dataframe( |
| headers=["IP Address", "URL", "Credentials", "Status", "Port"], |
| interactive=False |
| ) |
|
|
| with gr.Column(scale=2): |
| preview = gr.Image(label="Snapshot Preview") |
|
|
| scan_btn.click( |
| fn=run_discovery, |
| inputs=[cidr], |
| outputs=[table, status_label] |
| ) |
|
|
| table.select( |
| fn=get_snapshot, |
| inputs=[table], |
| outputs=[preview] |
| ) |
|
|
|
|
| |
| |
| |
| if __name__ == "__main__": |
| port = int(os.getenv("PORT", 7860)) |
|
|
| app.queue() |
|
|
| app.launch( |
| server_name="0.0.0.0", |
| server_port=port, |
| share=True, |
| show_error=True, |
| prevent_thread_lock=False, |
| inbrowser=False |
| ) |