camscan2 / src /main.rs
wuhp's picture
Update src/main.rs
2ef27e6 verified
use reqwest::{Client, StatusCode};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
use tokio::net::TcpStream;
use tokio::sync::{Semaphore, RwLock};
lazy_static::lazy_static! {
static ref HTTPS_PORTS: HashSet<u16> =
[443, 8443, 9443, 8883].iter().copied().collect();
static ref TIER1_PORTS: Vec<u16> = vec![80, 8080, 443];
static ref TIER2_PORTS: Vec<u16> = vec![8000, 8443, 8888, 554];
static ref TIER3_PORTS: Vec<u16> = vec![9000, 8081, 37777];
static ref SKIP_BANNER_PORTS: HashSet<u16> =
[80, 8080, 8000, 8888, 443, 8443, 9443].iter().copied().collect();
}
struct NetworkStats {
fast_responses: AtomicUsize, // < 50ms
medium_responses: AtomicUsize, // 50-100ms
slow_responses: AtomicUsize, // > 100ms
total_response_time_ns: AtomicU64,
successful_connects: AtomicUsize,
}
impl NetworkStats {
fn new() -> Arc<Self> {
Arc::new(Self {
fast_responses: AtomicUsize::new(0),
medium_responses: AtomicUsize::new(0),
slow_responses: AtomicUsize::new(0),
total_response_time_ns: AtomicU64::new(0),
successful_connects: AtomicUsize::new(0),
})
}
fn record_response(&self, duration_ns: u64) {
self.successful_connects.fetch_add(1, Ordering::Relaxed);
self.total_response_time_ns.fetch_add(duration_ns, Ordering::Relaxed);
let ms = duration_ns / 1_000_000;
if ms < 50 {
self.fast_responses.fetch_add(1, Ordering::Relaxed);
} else if ms < 100 {
self.medium_responses.fetch_add(1, Ordering::Relaxed);
} else {
self.slow_responses.fetch_add(1, Ordering::Relaxed);
}
}
fn get_adaptive_timeout(&self) -> u64 {
let total = self.successful_connects.load(Ordering::Relaxed);
if total < 10 { return 150; } // Not enough data, use fast timeout
let fast = self.fast_responses.load(Ordering::Relaxed);
let medium = self.medium_responses.load(Ordering::Relaxed);
// Calculate percentiles
let fast_pct = (fast * 100) / total;
let medium_pct = (medium * 100) / total;
// Adaptive timeout based on network characteristics
if fast_pct > 70 {
100 // Very responsive network
} else if fast_pct + medium_pct > 70 {
150 // Moderately responsive
} else {
300 // Slow/congested network
}
}
}
struct HostState {
responsive_hosts: RwLock<HashSet<Ipv4Addr>>, // Hosts that responded on ANY port
}
impl HostState {
fn new() -> Arc<Self> {
Arc::new(Self {
responsive_hosts: RwLock::new(HashSet::new()),
})
}
async fn mark_responsive(&self, ip: Ipv4Addr) {
self.responsive_hosts.write().await.insert(ip);
}
async fn is_responsive(&self, ip: Ipv4Addr) -> bool {
self.responsive_hosts.read().await.contains(&ip)
}
}
#[derive(Clone)]
struct Config {
cidr: String,
combos_file: String,
tcp_workers: usize,
http_concurrency: usize,
rate_per_sec: u64,
pass1_ms: u64,
banner_ms: u64,
http_ms: u64,
client_shards: usize,
}
impl Config {
fn from_args() -> Self {
let args: Vec<String> = std::env::args().collect();
if args.len() < 2 {
eprintln!(
"Usage: {} <CIDR> [combos] [tcp_workers] [http_conc] [rate/sec]",
args[0]
);
eprintln!("Defaults: combos.txt 6000 1500 8000");
std::process::exit(1);
}
let cpus = num_cpus::get();
Config {
cidr: args[1].clone(),
combos_file: args.get(2).cloned().unwrap_or_else(|| "combos.txt".into()),
tcp_workers: args.get(3).and_then(|s| s.parse().ok()).unwrap_or(6000),
http_concurrency: args.get(4).and_then(|s| s.parse().ok()).unwrap_or(1500),
rate_per_sec: args.get(5).and_then(|s| s.parse().ok()).unwrap_or(8000),
pass1_ms: 80, // Reduced from 100ms
banner_ms: 40, // Reduced from 60ms
http_ms: 1500, // Reduced from 2000ms
client_shards: cpus.min(12).max(4), // Allow more shards
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "type")]
enum OutputMsg {
Progress { scanned: usize, total: usize },
Found {
ip: String,
port: u16,
url: String,
auth: String,
status: String,
},
}
struct TokenBucket {
interval_ns: u64,
burst_size: usize,
next_slot_ns: std::sync::atomic::AtomicU64,
epoch: Instant,
}
impl TokenBucket {
fn new(per_second: u64) -> Arc<Self> {
Arc::new(Self {
interval_ns: if per_second == 0 { 0 } else { 1_000_000_000 / per_second },
burst_size: (per_second / 10).max(100) as usize, // 10% burst capacity
next_slot_ns: std::sync::atomic::AtomicU64::new(0),
epoch: Instant::now(),
})
}
async fn acquire(&self) {
if self.interval_ns == 0 { return; }
loop {
let now = self.epoch.elapsed().as_nanos() as u64;
let prev = self.next_slot_ns.load(Ordering::Relaxed);
let slot = prev.max(now) + self.interval_ns;
if self.next_slot_ns
.compare_exchange_weak(prev, slot, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
let wait = slot.saturating_sub(now + self.interval_ns);
if wait > 0 { tokio::time::sleep(Duration::from_nanos(wait)).await; }
return;
}
tokio::task::yield_now().await;
}
}
}
async fn is_http_like(stream: &mut TcpStream, port: u16, timeout_ms: u64) -> bool {
// OPTIMIZATION: Skip banner read for known HTTP ports
if SKIP_BANNER_PORTS.contains(&port) {
return true;
}
// TLS ports: server waits for ClientHello → silence = proceed
if HTTPS_PORTS.contains(&port) { return true; }
// Dahua binary protocol — never HTTP
if port == 37777 { return false; }
let mut buf = [0u8; 12]; // Reduced from 16 - we only need 12 bytes max
match tokio::time::timeout(
Duration::from_millis(timeout_ms),
stream.read(&mut buf),
).await {
Err(_) => true,
Ok(Err(_)) => false,
Ok(Ok(0)) => false,
Ok(Ok(n)) => classify_banner(&buf[..n]),
}
}
#[inline]
fn classify_banner(b: &[u8]) -> bool {
let len = b.len();
if len < 2 { return false; }
// Fast path: common protocols by first two bytes
let first_two = u16::from_be_bytes([b[0], b[1]]);
match first_two {
0x1603 => return true, // TLS 1.x
0x4854 => return true, // "HT" from HTTP
0x5353 => return false, // "SS" from SSH
0x3232 => return false, // "22" from FTP
0x5254 => return false, // "RT" from RTSP
0xFFFD => return false, // Telnet IAC
0x2000 => return false, // Dahua
_ => {}
}
// Binary heuristic with early exit
let threshold = len / 2;
let mut non_printable = 0;
for &byte in b.iter() {
if byte < 0x20 && byte != b'\r' && byte != b'\n' && byte != b'\t' {
non_printable += 1;
if non_printable > threshold { return false; }
}
}
true
}
fn load_credentials(path: &str) -> Arc<Vec<(String, String)>> {
let content = std::fs::read_to_string(path).unwrap_or_default();
let combos: Vec<_> = content.lines()
.filter_map(|l| {
let t = l.trim();
if t.is_empty() || t.starts_with('#') { return None; }
t.split_once(':').map(|(u, p)| (u.to_string(), p.to_string()))
})
.collect();
if combos.is_empty() {
Arc::new(vec![
("admin".into(), "admin".into()),
("admin".into(), "12345".into()),
("admin".into(), "password".into()),
("root".into(), "root".into()),
("admin".into(), "".into()),
])
} else {
eprintln!("Loaded {} credential pairs", combos.len());
Arc::new(combos)
}
}
async fn probe_http(
ip: Ipv4Addr,
port: u16,
client: &Client,
combos: &[(String, String)],
) -> Option<OutputMsg> {
let scheme = if HTTPS_PORTS.contains(&port) { "https" } else { "http" };
let base_url = format!("{}://{}:{}", scheme, ip, port);
// OPTIMIZATION: Use GET instead of HEAD for cameras (many respond badly to HEAD)
let resp = match tokio::time::timeout(
Duration::from_millis(1500),
client.get(&base_url).send()
).await {
Ok(Ok(r)) => r,
_ => return None,
};
let st = resp.status();
if st != StatusCode::OK && st != StatusCode::UNAUTHORIZED { return None; }
let mut found_auth = "None".to_string();
let mut verified = st == StatusCode::OK;
if !verified {
// OPTIMIZATION: Try most common credentials first (ordered by likelihood)
let priority_combos = [
("admin", "admin"),
("admin", ""),
("admin", "12345"),
("admin", "password"),
];
// Try priority combos first
for (u, p) in &priority_combos {
if let Ok(Ok(r)) = tokio::time::timeout(
Duration::from_millis(1000),
client.get(&base_url).basic_auth(u, Some(p)).send()
).await {
if r.status() == StatusCode::OK {
found_auth = format!("{}:{}", u, p);
verified = true;
return Some(OutputMsg::Found {
ip: ip.to_string(), port, url: base_url, auth: found_auth,
status: "Verified".into(),
});
}
}
}
// Only try remaining combos if priority ones failed
if !verified {
for (u, p) in combos.iter().take(10) { // Limit to 10 total attempts
if priority_combos.iter().any(|(pu, pp)| pu == u && pp == p) {
continue; // Skip already tested
}
if let Ok(Ok(r)) = tokio::time::timeout(
Duration::from_millis(800),
client.get(&base_url).basic_auth(u, Some(p)).send()
).await {
if r.status() == StatusCode::OK {
found_auth = format!("{}:{}", u, p);
verified = true;
break;
}
}
}
}
}
Some(OutputMsg::Found {
ip: ip.to_string(), port, url: base_url, auth: found_auth,
status: if verified { "Verified".into() } else { "Locked".into() },
})
}
async fn tcp_connect_fast(
addr: SocketAddr,
timeout_ms: u64,
stats: Option<&NetworkStats>,
) -> Option<(TcpStream, u64)> {
use socket2::{Domain, Protocol, Socket, Type};
let start = Instant::now();
let sock = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP)).ok()?;
sock.set_reuse_address(true).ok()?;
sock.set_nonblocking(true).ok()?;
sock.set_nodelay(true).ok()?;
// OPTIMIZATION: Set TCP_QUICKACK on Linux for faster handshake
#[cfg(target_os = "linux")]
{
use std::os::unix::io::AsRawFd;
let fd = sock.as_raw_fd();
unsafe {
libc::setsockopt(
fd,
libc::IPPROTO_TCP,
libc::TCP_QUICKACK,
&1i32 as *const _ as *const libc::c_void,
std::mem::size_of::<i32>() as libc::socklen_t,
);
}
}
let addr2 = socket2::SockAddr::from(addr);
match sock.connect(&addr2) {
Ok(_) => {}
Err(e) => {
let err_code = e.raw_os_error();
#[cfg(unix)]
let is_in_progress = err_code == Some(libc::EINPROGRESS) || err_code == Some(libc::EWOULDBLOCK);
#[cfg(windows)]
let is_in_progress = err_code == Some(10035);
if !is_in_progress { return None; }
}
}
let std_stream: std::net::TcpStream = sock.into();
let mut stream = TcpStream::from_std(std_stream).ok()?;
tokio::time::timeout(
Duration::from_millis(timeout_ms),
stream.writable(),
).await.ok()?.ok()?;
match stream.peer_addr() {
Ok(_) => {
let elapsed_ns = start.elapsed().as_nanos() as u64;
if let Some(s) = stats {
s.record_response(elapsed_ns);
}
Some((stream, elapsed_ns))
}
Err(_) => None,
}
}
struct SharedState {
scanned: AtomicUsize,
found: AtomicUsize,
total: usize,
}
#[derive(Clone, Copy)]
struct RetryEntry {
ip: Ipv4Addr,
port: u16,
timeout_ms: u64,
}
async fn worker(
_worker_id: usize,
work_rx: flume::Receiver<(Ipv4Addr, u16, u8)>,
retry_tx: flume::Sender<RetryEntry>,
out_tx: tokio::sync::mpsc::Sender<OutputMsg>,
client: Arc<Client>,
combos: Arc<Vec<(String, String)>>,
http_sem: Arc<Semaphore>,
bucket: Arc<TokenBucket>,
state: Arc<SharedState>,
stats: Arc<NetworkStats>,
host_state: Arc<HostState>,
pass1_ms: u64,
banner_ms: u64,
) {
while let Ok((ip, port, pass)) = work_rx.recv_async().await {
bucket.acquire().await;
let timeout_ms = if pass == 1 { pass1_ms } else { pass1_ms * 2 };
let addr = SocketAddr::V4(SocketAddrV4::new(ip, port));
match tcp_connect_fast(addr, timeout_ms, Some(&stats)).await {
Some((mut stream, response_ns)) => {
// Mark host as responsive for progressive scanning
if pass == 1 {
host_state.mark_responsive(ip).await;
}
if is_http_like(&mut stream, port, banner_ms).await {
let _http = http_sem.acquire().await.expect("http sem");
if let Some(msg) = probe_http(ip, port, &client, &combos).await {
state.found.fetch_add(1, Ordering::Relaxed);
let _ = out_tx.send(msg).await;
}
}
}
None => {
if pass == 1 {
// ADAPTIVE RETRY: Calculate timeout based on network stats
let adaptive_timeout = stats.get_adaptive_timeout();
let retry_timeout = if TIER1_PORTS.contains(&port) {
adaptive_timeout // Priority ports get fast retry
} else {
(adaptive_timeout * 3).min(500) // Other ports get longer retry
};
let _ = retry_tx.send(RetryEntry {
ip,
port,
timeout_ms: retry_timeout,
});
}
}
}
state.scanned.fetch_add(1, Ordering::Relaxed);
}
}
async fn progressive_scan_phase(
hosts: &[Ipv4Addr],
ports: &[u16],
work_tx: &flume::Sender<(Ipv4Addr, u16, u8)>,
host_state: &Arc<HostState>,
is_tier1: bool,
) {
for &port in ports {
for &ip in hosts {
// OPTIMIZATION: In Tier 2/3, skip IPs that never responded in Tier 1
if !is_tier1 && !host_state.is_responsive(ip).await {
continue;
}
if work_tx.send_async((ip, port, 1)).await.is_err() {
return;
}
}
}
}
#[tokio::main(flavor = "multi_thread")]
async fn main() {
let cfg = Config::from_args();
let net: ipnet::Ipv4Net = match cfg.cidr.parse() {
Ok(n) => n,
Err(e) => { eprintln!("Invalid CIDR: {}", e); std::process::exit(1); }
};
let combos = load_credentials(&cfg.combos_file);
let hosts: Vec<Ipv4Addr> = net.hosts().collect();
let all_ports: Vec<u16> = TIER1_PORTS.iter()
.chain(TIER2_PORTS.iter())
.chain(TIER3_PORTS.iter())
.copied()
.collect();
let pass1_total = hosts.len() * all_ports.len();
eprintln!(
"Scanner v6 ULTRA | {} hosts × {} ports = {} targets",
hosts.len(), all_ports.len(), pass1_total,
);
eprintln!(
"Config: workers={} http={} rate={}/s shards={} adaptive-retry=ON",
cfg.tcp_workers, cfg.http_concurrency, cfg.rate_per_sec, cfg.client_shards,
);
// Network statistics for adaptive tuning
let stats = NetworkStats::new();
let host_state = HostState::new();
// Optimized HTTP clients with connection pooling
let clients: Vec<Arc<Client>> = (0..cfg.client_shards)
.map(|_| {
Arc::new(
Client::builder()
.use_rustls_tls()
.danger_accept_invalid_certs(true)
.danger_accept_invalid_hostnames(true)
.pool_max_idle_per_host(12)
.pool_idle_timeout(Duration::from_secs(4))
.tcp_keepalive(None)
.tcp_nodelay(true)
.timeout(Duration::from_millis(cfg.http_ms))
.redirect(reqwest::redirect::Policy::none())
.http2_prior_knowledge()
.build()
.unwrap(),
)
})
.collect();
let http_sem = Arc::new(Semaphore::new(cfg.http_concurrency));
let bucket = TokenBucket::new(cfg.rate_per_sec);
let (work_tx, work_rx) = flume::bounded::<(Ipv4Addr, u16, u8)>(cfg.tcp_workers * 2);
let (retry_tx, retry_rx) = flume::unbounded::<RetryEntry>();
let (out_tx, mut out_rx) = tokio::sync::mpsc::channel::<OutputMsg>(65536);
let state = Arc::new(SharedState {
scanned: AtomicUsize::new(0),
found: AtomicUsize::new(0),
total: pass1_total,
});
// Buffered stdout writer
let writer = tokio::spawn(async move {
let mut out = BufWriter::with_capacity(2 * 1024 * 1024, tokio::io::stdout());
while let Some(msg) = out_rx.recv().await {
let is_progress = matches!(msg, OutputMsg::Progress { .. });
if let Ok(json) = serde_json::to_string(&msg) {
let _ = out.write_all(json.as_bytes()).await;
let _ = out.write_all(b"\n").await;
}
if is_progress { let _ = out.flush().await; }
}
let _ = out.flush().await;
});
// Progress ticker
let prog_state = Arc::clone(&state);
let prog_tx = out_tx.clone();
let ticker = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(150));
loop {
interval.tick().await;
let s = prog_state.scanned.load(Ordering::Relaxed);
let t = prog_state.total;
let _ = prog_tx.send(OutputMsg::Progress { scanned: s, total: t }).await;
if s >= t { break; }
}
});
// Spawn worker pool
let mut worker_handles = Vec::with_capacity(cfg.tcp_workers);
for id in 0..cfg.tcp_workers {
let client = Arc::clone(&clients[id % cfg.client_shards]);
let combos = Arc::clone(&combos);
let http_sem = Arc::clone(&http_sem);
let bucket = Arc::clone(&bucket);
let state = Arc::clone(&state);
let stats = Arc::clone(&stats);
let host_state = Arc::clone(&host_state);
let work_rx = work_rx.clone();
let retry_tx = retry_tx.clone();
let out_tx = out_tx.clone();
let p1ms = cfg.pass1_ms;
let bms = cfg.banner_ms;
worker_handles.push(tokio::spawn(async move {
worker(id, work_rx, retry_tx, out_tx, client, combos,
http_sem, bucket, state, stats, host_state, p1ms, bms).await;
}));
}
drop(retry_tx);
// PROGRESSIVE SCANNING: Feed work in tiers
eprintln!("Pass 1a: Scanning Tier 1 ports (high probability)...");
progressive_scan_phase(&hosts, &TIER1_PORTS, &work_tx, &host_state, true).await;
eprintln!("Pass 1b: Scanning Tier 2 ports (medium probability)...");
progressive_scan_phase(&hosts, &TIER2_PORTS, &work_tx, &host_state, false).await;
eprintln!("Pass 1c: Scanning Tier 3 ports (low probability)...");
progressive_scan_phase(&hosts, &TIER3_PORTS, &work_tx, &host_state, false).await;
drop(work_tx);
// Wait for all workers to complete Pass 1
for h in worker_handles { let _ = h.await; }
ticker.abort();
// ADAPTIVE RETRY PHASE
let retry_targets: Vec<RetryEntry> = retry_rx.drain().collect();
if !retry_targets.is_empty() {
let avg_timeout = retry_targets.iter()
.map(|r| r.timeout_ms)
.sum::<u64>() / retry_targets.len() as u64;
eprintln!(
"Pass 2: {} retries with adaptive timeout (avg {}ms)",
retry_targets.len(),
avg_timeout
);
let p2_state = Arc::new(SharedState {
scanned: AtomicUsize::new(0),
found: AtomicUsize::new(0),
total: retry_targets.len(),
});
let p2_workers = (cfg.tcp_workers / 2).max(1000);
let (w2_tx, w2_rx) = flume::bounded::<(Ipv4Addr, u16, u8)>(p2_workers * 2);
let p2_ticker_state = Arc::clone(&p2_state);
let p2_ticker_tx = out_tx.clone();
let p2_ticker = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(150));
loop {
interval.tick().await;
let s = p2_ticker_state.scanned.load(Ordering::Relaxed);
let t = p2_ticker_state.total;
let _ = p2_ticker_tx.send(OutputMsg::Progress { scanned: s, total: t }).await;
if s >= t { break; }
}
});
let (dummy_retry_tx, _) = flume::unbounded::<RetryEntry>();
let mut p2_handles = Vec::with_capacity(p2_workers);
for id in 0..p2_workers {
let client = Arc::clone(&clients[id % cfg.client_shards]);
let combos = Arc::clone(&combos);
let http_sem = Arc::clone(&http_sem);
let bucket = Arc::clone(&bucket);
let state2 = Arc::clone(&p2_state);
let stats2 = Arc::clone(&stats);
let host_state2 = Arc::clone(&host_state);
let w2_rx = w2_rx.clone();
let retry_tx2 = dummy_retry_tx.clone();
let out_tx2 = out_tx.clone();
let p1ms = cfg.pass1_ms;
let bms = cfg.banner_ms;
p2_handles.push(tokio::spawn(async move {
worker(id, w2_rx, retry_tx2, out_tx2, client, combos,
http_sem, bucket, state2, stats2, host_state2, p1ms, bms).await;
}));
}
// Feed retry targets with their adaptive timeouts
// (Worker will use 2x pass1_ms, but we've already calculated optimal timeout)
for entry in retry_targets {
if w2_tx.send_async((entry.ip, entry.port, 2)).await.is_err() {
break;
}
}
drop(w2_tx);
for h in p2_handles { let _ = h.await; }
p2_ticker.abort();
}
drop(out_tx);
let _ = writer.await;
// Print final statistics
let total_connects = stats.successful_connects.load(Ordering::Relaxed);
if total_connects > 0 {
let avg_response = stats.total_response_time_ns.load(Ordering::Relaxed) / total_connects as u64;
eprintln!(
"Network stats: {} successful connects, avg response time {}ms",
total_connects,
avg_response / 1_000_000
);
}
}