camscan2 / app.py
wuhp's picture
Update app.py
94e4226 verified
import gradio as gr
import asyncio
import json
import time
import requests
import os
import subprocess
import shutil
from urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
def ensure_rust_binary():
binary_name = "rust-scanner"
if os.name == "nt":
binary_name += ".exe"
bin_path = os.path.join(".", binary_name)
if os.path.exists(bin_path):
return bin_path
print("Scanner engine not found. Building...")
cargo_path = shutil.which("cargo")
if not cargo_path:
fallback = os.path.expanduser("~/.cargo/bin/cargo")
if os.path.exists(fallback):
cargo_path = fallback
if not cargo_path:
print("Rust toolchain not found. Installing rustup...")
os.system("curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y")
cargo_path = os.path.expanduser("~/.cargo/bin/cargo")
if not os.path.exists(cargo_path):
print("Cargo not available after install.")
return None
print("Compiling (this takes ~30–60s on first run)...")
try:
env = os.environ.copy()
env["PATH"] = f"{os.path.expanduser('~/.cargo/bin')}:{env.get('PATH', '')}"
subprocess.run([cargo_path, "build", "--release"], check=True, env=env)
source_bin = os.path.join("target", "release", binary_name)
if not os.path.exists(source_bin):
print("Build succeeded but binary missing.")
return None
shutil.copy(source_bin, bin_path)
if os.name != "nt":
try:
os.chmod(bin_path, 0o755)
except Exception:
pass
print("Engine compiled successfully.")
return bin_path
except Exception as e:
print(f"Compilation failed: {e}")
return None
try:
if os.name != "nt":
import resource
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
target = min(131072, hard)
resource.setrlimit(resource.RLIMIT_NOFILE, (target, hard))
print(f"RLIMIT_NOFILE: {soft}{target}")
except Exception as e:
print(f"Could not raise RLIMIT_NOFILE: {e} (non-fatal)")
EXECUTABLE_PATH = ensure_rust_binary()
# =========================================================
async def run_discovery(cidr: str, tcp_concurrency: int, http_concurrency: int, rate_limit: int):
if not EXECUTABLE_PATH:
yield [], "Error: Rust engine failed to compile. Check server logs."
return
start_time = time.time()
results = []
combos_path = "combos.txt"
if not os.path.exists(combos_path):
with open(combos_path, "w") as f:
f.write(
"admin:admin\nadmin:12345\nadmin:password\n"
"root:root\nadmin:\nadmin:1234\nadmin:admin123\n"
)
cmd = [
EXECUTABLE_PATH,
cidr,
combos_path,
str(tcp_concurrency),
str(http_concurrency),
str(rate_limit),
]
try:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except Exception as e:
yield results, f"Error launching scanner: {e}"
return
last_ui_update = 0.0
pass1_total = None # set on first Progress message
in_pass2 = False
pass2_seen = 0
while True:
line = await process.stdout.readline()
if not line:
break
try:
msg = json.loads(line.decode().strip())
if msg["type"] == "Progress":
scanned = msg["scanned"]
total = msg["total"]
now = time.time()
# Detect pass boundary: total resets to a smaller number
if pass1_total is None:
pass1_total = total
elif total < pass1_total and not in_pass2:
in_pass2 = True
if now - last_ui_update > 0.25:
elapsed = now - start_time
pct = (scanned / total * 100) if total > 0 else 0
rate = scanned / elapsed if elapsed > 0 else 0
eta = (total - scanned) / rate if rate > 0 else 0
if in_pass2:
label = (
f"Pass 2 (retry): {scanned:,}/{total:,} ({pct:.1f}%) | "
f"{rate:,.0f} targets/s | {len(results)} found"
)
else:
label = (
f"Pass 1: {scanned:,}/{total:,} ({pct:.1f}%) | "
f"{rate:,.0f} targets/s | ETA {eta:.0f}s"
)
yield results, label
last_ui_update = now
elif msg["type"] == "Found":
results.append([
msg["ip"],
msg["url"],
msg["auth"],
msg["status"],
str(msg["port"]),
])
yield results, f"Found {len(results)} device(s)…"
except Exception:
continue
await process.wait()
elapsed = time.time() - start_time
yield results, f"✓ Complete — {len(results)} found in {elapsed:.1f}s"
# =========================================================
# Snapshot capture
# =========================================================
def get_snapshot(evt: gr.SelectData, table_data):
try:
row = table_data.iloc[evt.index[0]]
url = row[1]
auth = row[2]
paths = [
"/ISAPI/Streaming/Channels/101/picture",
"/cgi-bin/snapshot.cgi",
"/jpg/image.jpg",
"/onvif/snapshot",
"/snapshot.jpg",
"/image/jpeg.cgi",
"/cgi-bin/mjpg/video.cgi",
]
for p in paths:
try:
parts = auth.split(":")
creds = (parts[0], parts[1]) if len(parts) >= 2 else None
r = requests.get(
f"{url}{p}", auth=creds, timeout=2.0, verify=False
)
if r.status_code == 200 and r.content[:2] == b"\xff\xd8":
return r.content
except Exception:
continue
except Exception:
pass
return None
# =========================================================
# UI
# =========================================================
with gr.Blocks(title="Rust Network Scanner") as app:
gr.Markdown("# Rust Network Scanner")
with gr.Row():
cidr = gr.Textbox(label="Target CIDR", value="192.168.1.0/24", scale=5)
scan_btn = gr.Button("▶ Start Scan", variant="primary", scale=1)
with gr.Row():
tcp_concurrency = gr.Slider(
label="TCP Concurrency",
minimum=100, maximum=10000, step=100, value=3000,
info="Phase 1 simultaneous probes. 2000–5000 suits most systems.",
)
http_concurrency = gr.Slider(
label="HTTP Concurrency",
minimum=10, maximum=2000, step=10, value=500,
info="Phase 2 HTTP slots. Keep ≤ 500 on typical hardware.",
)
rate_limit = gr.Slider(
label="Rate Limit (SYNs/sec)",
minimum=0, maximum=10000, step=100, value=2000,
info="0 = unlimited. Lower if your router drops packets.",
)
status_label = gr.Textbox(label="Status", interactive=False)
with gr.Row():
with gr.Column(scale=3):
table = gr.Dataframe(
headers=["IP Address", "URL", "Credentials", "Status", "Port"],
interactive=False,
)
with gr.Column(scale=2):
preview = gr.Image(label="Snapshot Preview")
scan_btn.click(
fn=run_discovery,
inputs=[cidr, tcp_concurrency, http_concurrency, rate_limit],
outputs=[table, status_label],
)
table.select(
fn=get_snapshot,
inputs=[table],
outputs=[preview],
)
app.queue()
# =========================================================
# Launch
# =========================================================
if __name__ == "__main__":
port = int(os.getenv("PORT", 7860))
app.launch(
server_name="0.0.0.0",
server_port=port,
share=True,
show_error=True,
prevent_thread_lock=False,
inbrowser=False,
)