use reqwest::{Client, StatusCode}; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter}; use tokio::net::TcpStream; use tokio::sync::{Semaphore, RwLock}; lazy_static::lazy_static! { static ref HTTPS_PORTS: HashSet = [443, 8443, 9443, 8883].iter().copied().collect(); static ref TIER1_PORTS: Vec = vec![80, 8080, 443]; static ref TIER2_PORTS: Vec = vec![8000, 8443, 8888, 554]; static ref TIER3_PORTS: Vec = vec![9000, 8081, 37777]; static ref SKIP_BANNER_PORTS: HashSet = [80, 8080, 8000, 8888, 443, 8443, 9443].iter().copied().collect(); } struct NetworkStats { fast_responses: AtomicUsize, // < 50ms medium_responses: AtomicUsize, // 50-100ms slow_responses: AtomicUsize, // > 100ms total_response_time_ns: AtomicU64, successful_connects: AtomicUsize, } impl NetworkStats { fn new() -> Arc { Arc::new(Self { fast_responses: AtomicUsize::new(0), medium_responses: AtomicUsize::new(0), slow_responses: AtomicUsize::new(0), total_response_time_ns: AtomicU64::new(0), successful_connects: AtomicUsize::new(0), }) } fn record_response(&self, duration_ns: u64) { self.successful_connects.fetch_add(1, Ordering::Relaxed); self.total_response_time_ns.fetch_add(duration_ns, Ordering::Relaxed); let ms = duration_ns / 1_000_000; if ms < 50 { self.fast_responses.fetch_add(1, Ordering::Relaxed); } else if ms < 100 { self.medium_responses.fetch_add(1, Ordering::Relaxed); } else { self.slow_responses.fetch_add(1, Ordering::Relaxed); } } fn get_adaptive_timeout(&self) -> u64 { let total = self.successful_connects.load(Ordering::Relaxed); if total < 10 { return 150; } // Not enough data, use fast timeout let fast = self.fast_responses.load(Ordering::Relaxed); let medium = self.medium_responses.load(Ordering::Relaxed); // Calculate percentiles let fast_pct = (fast * 100) / total; let medium_pct = (medium * 100) / total; // Adaptive timeout based on network characteristics if fast_pct > 70 { 100 // Very responsive network } else if fast_pct + medium_pct > 70 { 150 // Moderately responsive } else { 300 // Slow/congested network } } } struct HostState { responsive_hosts: RwLock>, // Hosts that responded on ANY port } impl HostState { fn new() -> Arc { Arc::new(Self { responsive_hosts: RwLock::new(HashSet::new()), }) } async fn mark_responsive(&self, ip: Ipv4Addr) { self.responsive_hosts.write().await.insert(ip); } async fn is_responsive(&self, ip: Ipv4Addr) -> bool { self.responsive_hosts.read().await.contains(&ip) } } #[derive(Clone)] struct Config { cidr: String, combos_file: String, tcp_workers: usize, http_concurrency: usize, rate_per_sec: u64, pass1_ms: u64, banner_ms: u64, http_ms: u64, client_shards: usize, } impl Config { fn from_args() -> Self { let args: Vec = std::env::args().collect(); if args.len() < 2 { eprintln!( "Usage: {} [combos] [tcp_workers] [http_conc] [rate/sec]", args[0] ); eprintln!("Defaults: combos.txt 6000 1500 8000"); std::process::exit(1); } let cpus = num_cpus::get(); Config { cidr: args[1].clone(), combos_file: args.get(2).cloned().unwrap_or_else(|| "combos.txt".into()), tcp_workers: args.get(3).and_then(|s| s.parse().ok()).unwrap_or(6000), http_concurrency: args.get(4).and_then(|s| s.parse().ok()).unwrap_or(1500), rate_per_sec: args.get(5).and_then(|s| s.parse().ok()).unwrap_or(8000), pass1_ms: 80, // Reduced from 100ms banner_ms: 40, // Reduced from 60ms http_ms: 1500, // Reduced from 2000ms client_shards: cpus.min(12).max(4), // Allow more shards } } } #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(tag = "type")] enum OutputMsg { Progress { scanned: usize, total: usize }, Found { ip: String, port: u16, url: String, auth: String, status: String, }, } struct TokenBucket { interval_ns: u64, burst_size: usize, next_slot_ns: std::sync::atomic::AtomicU64, epoch: Instant, } impl TokenBucket { fn new(per_second: u64) -> Arc { Arc::new(Self { interval_ns: if per_second == 0 { 0 } else { 1_000_000_000 / per_second }, burst_size: (per_second / 10).max(100) as usize, // 10% burst capacity next_slot_ns: std::sync::atomic::AtomicU64::new(0), epoch: Instant::now(), }) } async fn acquire(&self) { if self.interval_ns == 0 { return; } loop { let now = self.epoch.elapsed().as_nanos() as u64; let prev = self.next_slot_ns.load(Ordering::Relaxed); let slot = prev.max(now) + self.interval_ns; if self.next_slot_ns .compare_exchange_weak(prev, slot, Ordering::AcqRel, Ordering::Relaxed) .is_ok() { let wait = slot.saturating_sub(now + self.interval_ns); if wait > 0 { tokio::time::sleep(Duration::from_nanos(wait)).await; } return; } tokio::task::yield_now().await; } } } async fn is_http_like(stream: &mut TcpStream, port: u16, timeout_ms: u64) -> bool { // OPTIMIZATION: Skip banner read for known HTTP ports if SKIP_BANNER_PORTS.contains(&port) { return true; } // TLS ports: server waits for ClientHello → silence = proceed if HTTPS_PORTS.contains(&port) { return true; } // Dahua binary protocol — never HTTP if port == 37777 { return false; } let mut buf = [0u8; 12]; // Reduced from 16 - we only need 12 bytes max match tokio::time::timeout( Duration::from_millis(timeout_ms), stream.read(&mut buf), ).await { Err(_) => true, Ok(Err(_)) => false, Ok(Ok(0)) => false, Ok(Ok(n)) => classify_banner(&buf[..n]), } } #[inline] fn classify_banner(b: &[u8]) -> bool { let len = b.len(); if len < 2 { return false; } // Fast path: common protocols by first two bytes let first_two = u16::from_be_bytes([b[0], b[1]]); match first_two { 0x1603 => return true, // TLS 1.x 0x4854 => return true, // "HT" from HTTP 0x5353 => return false, // "SS" from SSH 0x3232 => return false, // "22" from FTP 0x5254 => return false, // "RT" from RTSP 0xFFFD => return false, // Telnet IAC 0x2000 => return false, // Dahua _ => {} } // Binary heuristic with early exit let threshold = len / 2; let mut non_printable = 0; for &byte in b.iter() { if byte < 0x20 && byte != b'\r' && byte != b'\n' && byte != b'\t' { non_printable += 1; if non_printable > threshold { return false; } } } true } fn load_credentials(path: &str) -> Arc> { let content = std::fs::read_to_string(path).unwrap_or_default(); let combos: Vec<_> = content.lines() .filter_map(|l| { let t = l.trim(); if t.is_empty() || t.starts_with('#') { return None; } t.split_once(':').map(|(u, p)| (u.to_string(), p.to_string())) }) .collect(); if combos.is_empty() { Arc::new(vec![ ("admin".into(), "admin".into()), ("admin".into(), "12345".into()), ("admin".into(), "password".into()), ("root".into(), "root".into()), ("admin".into(), "".into()), ]) } else { eprintln!("Loaded {} credential pairs", combos.len()); Arc::new(combos) } } async fn probe_http( ip: Ipv4Addr, port: u16, client: &Client, combos: &[(String, String)], ) -> Option { let scheme = if HTTPS_PORTS.contains(&port) { "https" } else { "http" }; let base_url = format!("{}://{}:{}", scheme, ip, port); // OPTIMIZATION: Use GET instead of HEAD for cameras (many respond badly to HEAD) let resp = match tokio::time::timeout( Duration::from_millis(1500), client.get(&base_url).send() ).await { Ok(Ok(r)) => r, _ => return None, }; let st = resp.status(); if st != StatusCode::OK && st != StatusCode::UNAUTHORIZED { return None; } let mut found_auth = "None".to_string(); let mut verified = st == StatusCode::OK; if !verified { // OPTIMIZATION: Try most common credentials first (ordered by likelihood) let priority_combos = [ ("admin", "admin"), ("admin", ""), ("admin", "12345"), ("admin", "password"), ]; // Try priority combos first for (u, p) in &priority_combos { if let Ok(Ok(r)) = tokio::time::timeout( Duration::from_millis(1000), client.get(&base_url).basic_auth(u, Some(p)).send() ).await { if r.status() == StatusCode::OK { found_auth = format!("{}:{}", u, p); verified = true; return Some(OutputMsg::Found { ip: ip.to_string(), port, url: base_url, auth: found_auth, status: "Verified".into(), }); } } } // Only try remaining combos if priority ones failed if !verified { for (u, p) in combos.iter().take(10) { // Limit to 10 total attempts if priority_combos.iter().any(|(pu, pp)| pu == u && pp == p) { continue; // Skip already tested } if let Ok(Ok(r)) = tokio::time::timeout( Duration::from_millis(800), client.get(&base_url).basic_auth(u, Some(p)).send() ).await { if r.status() == StatusCode::OK { found_auth = format!("{}:{}", u, p); verified = true; break; } } } } } Some(OutputMsg::Found { ip: ip.to_string(), port, url: base_url, auth: found_auth, status: if verified { "Verified".into() } else { "Locked".into() }, }) } async fn tcp_connect_fast( addr: SocketAddr, timeout_ms: u64, stats: Option<&NetworkStats>, ) -> Option<(TcpStream, u64)> { use socket2::{Domain, Protocol, Socket, Type}; let start = Instant::now(); let sock = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP)).ok()?; sock.set_reuse_address(true).ok()?; sock.set_nonblocking(true).ok()?; sock.set_nodelay(true).ok()?; // OPTIMIZATION: Set TCP_QUICKACK on Linux for faster handshake #[cfg(target_os = "linux")] { use std::os::unix::io::AsRawFd; let fd = sock.as_raw_fd(); unsafe { libc::setsockopt( fd, libc::IPPROTO_TCP, libc::TCP_QUICKACK, &1i32 as *const _ as *const libc::c_void, std::mem::size_of::() as libc::socklen_t, ); } } let addr2 = socket2::SockAddr::from(addr); match sock.connect(&addr2) { Ok(_) => {} Err(e) => { let err_code = e.raw_os_error(); #[cfg(unix)] let is_in_progress = err_code == Some(libc::EINPROGRESS) || err_code == Some(libc::EWOULDBLOCK); #[cfg(windows)] let is_in_progress = err_code == Some(10035); if !is_in_progress { return None; } } } let std_stream: std::net::TcpStream = sock.into(); let mut stream = TcpStream::from_std(std_stream).ok()?; tokio::time::timeout( Duration::from_millis(timeout_ms), stream.writable(), ).await.ok()?.ok()?; match stream.peer_addr() { Ok(_) => { let elapsed_ns = start.elapsed().as_nanos() as u64; if let Some(s) = stats { s.record_response(elapsed_ns); } Some((stream, elapsed_ns)) } Err(_) => None, } } struct SharedState { scanned: AtomicUsize, found: AtomicUsize, total: usize, } #[derive(Clone, Copy)] struct RetryEntry { ip: Ipv4Addr, port: u16, timeout_ms: u64, } async fn worker( _worker_id: usize, work_rx: flume::Receiver<(Ipv4Addr, u16, u8)>, retry_tx: flume::Sender, out_tx: tokio::sync::mpsc::Sender, client: Arc, combos: Arc>, http_sem: Arc, bucket: Arc, state: Arc, stats: Arc, host_state: Arc, pass1_ms: u64, banner_ms: u64, ) { while let Ok((ip, port, pass)) = work_rx.recv_async().await { bucket.acquire().await; let timeout_ms = if pass == 1 { pass1_ms } else { pass1_ms * 2 }; let addr = SocketAddr::V4(SocketAddrV4::new(ip, port)); match tcp_connect_fast(addr, timeout_ms, Some(&stats)).await { Some((mut stream, response_ns)) => { // Mark host as responsive for progressive scanning if pass == 1 { host_state.mark_responsive(ip).await; } if is_http_like(&mut stream, port, banner_ms).await { let _http = http_sem.acquire().await.expect("http sem"); if let Some(msg) = probe_http(ip, port, &client, &combos).await { state.found.fetch_add(1, Ordering::Relaxed); let _ = out_tx.send(msg).await; } } } None => { if pass == 1 { // ADAPTIVE RETRY: Calculate timeout based on network stats let adaptive_timeout = stats.get_adaptive_timeout(); let retry_timeout = if TIER1_PORTS.contains(&port) { adaptive_timeout // Priority ports get fast retry } else { (adaptive_timeout * 3).min(500) // Other ports get longer retry }; let _ = retry_tx.send(RetryEntry { ip, port, timeout_ms: retry_timeout, }); } } } state.scanned.fetch_add(1, Ordering::Relaxed); } } async fn progressive_scan_phase( hosts: &[Ipv4Addr], ports: &[u16], work_tx: &flume::Sender<(Ipv4Addr, u16, u8)>, host_state: &Arc, is_tier1: bool, ) { for &port in ports { for &ip in hosts { // OPTIMIZATION: In Tier 2/3, skip IPs that never responded in Tier 1 if !is_tier1 && !host_state.is_responsive(ip).await { continue; } if work_tx.send_async((ip, port, 1)).await.is_err() { return; } } } } #[tokio::main(flavor = "multi_thread")] async fn main() { let cfg = Config::from_args(); let net: ipnet::Ipv4Net = match cfg.cidr.parse() { Ok(n) => n, Err(e) => { eprintln!("Invalid CIDR: {}", e); std::process::exit(1); } }; let combos = load_credentials(&cfg.combos_file); let hosts: Vec = net.hosts().collect(); let all_ports: Vec = TIER1_PORTS.iter() .chain(TIER2_PORTS.iter()) .chain(TIER3_PORTS.iter()) .copied() .collect(); let pass1_total = hosts.len() * all_ports.len(); eprintln!( "Scanner v6 ULTRA | {} hosts × {} ports = {} targets", hosts.len(), all_ports.len(), pass1_total, ); eprintln!( "Config: workers={} http={} rate={}/s shards={} adaptive-retry=ON", cfg.tcp_workers, cfg.http_concurrency, cfg.rate_per_sec, cfg.client_shards, ); // Network statistics for adaptive tuning let stats = NetworkStats::new(); let host_state = HostState::new(); // Optimized HTTP clients with connection pooling let clients: Vec> = (0..cfg.client_shards) .map(|_| { Arc::new( Client::builder() .use_rustls_tls() .danger_accept_invalid_certs(true) .danger_accept_invalid_hostnames(true) .pool_max_idle_per_host(12) .pool_idle_timeout(Duration::from_secs(4)) .tcp_keepalive(None) .tcp_nodelay(true) .timeout(Duration::from_millis(cfg.http_ms)) .redirect(reqwest::redirect::Policy::none()) .http2_prior_knowledge() .build() .unwrap(), ) }) .collect(); let http_sem = Arc::new(Semaphore::new(cfg.http_concurrency)); let bucket = TokenBucket::new(cfg.rate_per_sec); let (work_tx, work_rx) = flume::bounded::<(Ipv4Addr, u16, u8)>(cfg.tcp_workers * 2); let (retry_tx, retry_rx) = flume::unbounded::(); let (out_tx, mut out_rx) = tokio::sync::mpsc::channel::(65536); let state = Arc::new(SharedState { scanned: AtomicUsize::new(0), found: AtomicUsize::new(0), total: pass1_total, }); // Buffered stdout writer let writer = tokio::spawn(async move { let mut out = BufWriter::with_capacity(2 * 1024 * 1024, tokio::io::stdout()); while let Some(msg) = out_rx.recv().await { let is_progress = matches!(msg, OutputMsg::Progress { .. }); if let Ok(json) = serde_json::to_string(&msg) { let _ = out.write_all(json.as_bytes()).await; let _ = out.write_all(b"\n").await; } if is_progress { let _ = out.flush().await; } } let _ = out.flush().await; }); // Progress ticker let prog_state = Arc::clone(&state); let prog_tx = out_tx.clone(); let ticker = tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_millis(150)); loop { interval.tick().await; let s = prog_state.scanned.load(Ordering::Relaxed); let t = prog_state.total; let _ = prog_tx.send(OutputMsg::Progress { scanned: s, total: t }).await; if s >= t { break; } } }); // Spawn worker pool let mut worker_handles = Vec::with_capacity(cfg.tcp_workers); for id in 0..cfg.tcp_workers { let client = Arc::clone(&clients[id % cfg.client_shards]); let combos = Arc::clone(&combos); let http_sem = Arc::clone(&http_sem); let bucket = Arc::clone(&bucket); let state = Arc::clone(&state); let stats = Arc::clone(&stats); let host_state = Arc::clone(&host_state); let work_rx = work_rx.clone(); let retry_tx = retry_tx.clone(); let out_tx = out_tx.clone(); let p1ms = cfg.pass1_ms; let bms = cfg.banner_ms; worker_handles.push(tokio::spawn(async move { worker(id, work_rx, retry_tx, out_tx, client, combos, http_sem, bucket, state, stats, host_state, p1ms, bms).await; })); } drop(retry_tx); // PROGRESSIVE SCANNING: Feed work in tiers eprintln!("Pass 1a: Scanning Tier 1 ports (high probability)..."); progressive_scan_phase(&hosts, &TIER1_PORTS, &work_tx, &host_state, true).await; eprintln!("Pass 1b: Scanning Tier 2 ports (medium probability)..."); progressive_scan_phase(&hosts, &TIER2_PORTS, &work_tx, &host_state, false).await; eprintln!("Pass 1c: Scanning Tier 3 ports (low probability)..."); progressive_scan_phase(&hosts, &TIER3_PORTS, &work_tx, &host_state, false).await; drop(work_tx); // Wait for all workers to complete Pass 1 for h in worker_handles { let _ = h.await; } ticker.abort(); // ADAPTIVE RETRY PHASE let retry_targets: Vec = retry_rx.drain().collect(); if !retry_targets.is_empty() { let avg_timeout = retry_targets.iter() .map(|r| r.timeout_ms) .sum::() / retry_targets.len() as u64; eprintln!( "Pass 2: {} retries with adaptive timeout (avg {}ms)", retry_targets.len(), avg_timeout ); let p2_state = Arc::new(SharedState { scanned: AtomicUsize::new(0), found: AtomicUsize::new(0), total: retry_targets.len(), }); let p2_workers = (cfg.tcp_workers / 2).max(1000); let (w2_tx, w2_rx) = flume::bounded::<(Ipv4Addr, u16, u8)>(p2_workers * 2); let p2_ticker_state = Arc::clone(&p2_state); let p2_ticker_tx = out_tx.clone(); let p2_ticker = tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_millis(150)); loop { interval.tick().await; let s = p2_ticker_state.scanned.load(Ordering::Relaxed); let t = p2_ticker_state.total; let _ = p2_ticker_tx.send(OutputMsg::Progress { scanned: s, total: t }).await; if s >= t { break; } } }); let (dummy_retry_tx, _) = flume::unbounded::(); let mut p2_handles = Vec::with_capacity(p2_workers); for id in 0..p2_workers { let client = Arc::clone(&clients[id % cfg.client_shards]); let combos = Arc::clone(&combos); let http_sem = Arc::clone(&http_sem); let bucket = Arc::clone(&bucket); let state2 = Arc::clone(&p2_state); let stats2 = Arc::clone(&stats); let host_state2 = Arc::clone(&host_state); let w2_rx = w2_rx.clone(); let retry_tx2 = dummy_retry_tx.clone(); let out_tx2 = out_tx.clone(); let p1ms = cfg.pass1_ms; let bms = cfg.banner_ms; p2_handles.push(tokio::spawn(async move { worker(id, w2_rx, retry_tx2, out_tx2, client, combos, http_sem, bucket, state2, stats2, host_state2, p1ms, bms).await; })); } // Feed retry targets with their adaptive timeouts // (Worker will use 2x pass1_ms, but we've already calculated optimal timeout) for entry in retry_targets { if w2_tx.send_async((entry.ip, entry.port, 2)).await.is_err() { break; } } drop(w2_tx); for h in p2_handles { let _ = h.await; } p2_ticker.abort(); } drop(out_tx); let _ = writer.await; // Print final statistics let total_connects = stats.successful_connects.load(Ordering::Relaxed); if total_connects > 0 { let avg_response = stats.total_response_time_ns.load(Ordering::Relaxed) / total_connects as u64; eprintln!( "Network stats: {} successful connects, avg response time {}ms", total_connects, avg_response / 1_000_000 ); } }