| use reqwest::{Client, StatusCode}; |
| use serde::{Deserialize, Serialize}; |
| use std::collections::{HashMap, HashSet}; |
| use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; |
| use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; |
| use std::sync::Arc; |
| use std::time::{Duration, Instant}; |
| use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter}; |
| use tokio::net::TcpStream; |
| use tokio::sync::{Semaphore, RwLock}; |
|
|
| lazy_static::lazy_static! { |
| static ref HTTPS_PORTS: HashSet<u16> = |
| [443, 8443, 9443, 8883].iter().copied().collect(); |
|
|
| static ref TIER1_PORTS: Vec<u16> = vec![80, 8080, 443]; |
| |
| static ref TIER2_PORTS: Vec<u16> = vec![8000, 8443, 8888, 554]; |
| |
| static ref TIER3_PORTS: Vec<u16> = vec![9000, 8081, 37777]; |
| |
| static ref SKIP_BANNER_PORTS: HashSet<u16> = |
| [80, 8080, 8000, 8888, 443, 8443, 9443].iter().copied().collect(); |
| } |
|
|
| struct NetworkStats { |
| fast_responses: AtomicUsize, |
| medium_responses: AtomicUsize, |
| slow_responses: AtomicUsize, |
| total_response_time_ns: AtomicU64, |
| successful_connects: AtomicUsize, |
| } |
|
|
| impl NetworkStats { |
| fn new() -> Arc<Self> { |
| Arc::new(Self { |
| fast_responses: AtomicUsize::new(0), |
| medium_responses: AtomicUsize::new(0), |
| slow_responses: AtomicUsize::new(0), |
| total_response_time_ns: AtomicU64::new(0), |
| successful_connects: AtomicUsize::new(0), |
| }) |
| } |
|
|
| fn record_response(&self, duration_ns: u64) { |
| self.successful_connects.fetch_add(1, Ordering::Relaxed); |
| self.total_response_time_ns.fetch_add(duration_ns, Ordering::Relaxed); |
| |
| let ms = duration_ns / 1_000_000; |
| if ms < 50 { |
| self.fast_responses.fetch_add(1, Ordering::Relaxed); |
| } else if ms < 100 { |
| self.medium_responses.fetch_add(1, Ordering::Relaxed); |
| } else { |
| self.slow_responses.fetch_add(1, Ordering::Relaxed); |
| } |
| } |
|
|
| fn get_adaptive_timeout(&self) -> u64 { |
| let total = self.successful_connects.load(Ordering::Relaxed); |
| if total < 10 { return 150; } |
|
|
| let fast = self.fast_responses.load(Ordering::Relaxed); |
| let medium = self.medium_responses.load(Ordering::Relaxed); |
| |
| |
| let fast_pct = (fast * 100) / total; |
| let medium_pct = (medium * 100) / total; |
| |
| |
| if fast_pct > 70 { |
| 100 |
| } else if fast_pct + medium_pct > 70 { |
| 150 |
| } else { |
| 300 |
| } |
| } |
| } |
|
|
| struct HostState { |
| responsive_hosts: RwLock<HashSet<Ipv4Addr>>, |
| } |
|
|
| impl HostState { |
| fn new() -> Arc<Self> { |
| Arc::new(Self { |
| responsive_hosts: RwLock::new(HashSet::new()), |
| }) |
| } |
|
|
| async fn mark_responsive(&self, ip: Ipv4Addr) { |
| self.responsive_hosts.write().await.insert(ip); |
| } |
|
|
| async fn is_responsive(&self, ip: Ipv4Addr) -> bool { |
| self.responsive_hosts.read().await.contains(&ip) |
| } |
| } |
|
|
| #[derive(Clone)] |
| struct Config { |
| cidr: String, |
| combos_file: String, |
| tcp_workers: usize, |
| http_concurrency: usize, |
| rate_per_sec: u64, |
| pass1_ms: u64, |
| banner_ms: u64, |
| http_ms: u64, |
| client_shards: usize, |
| } |
|
|
| impl Config { |
| fn from_args() -> Self { |
| let args: Vec<String> = std::env::args().collect(); |
| if args.len() < 2 { |
| eprintln!( |
| "Usage: {} <CIDR> [combos] [tcp_workers] [http_conc] [rate/sec]", |
| args[0] |
| ); |
| eprintln!("Defaults: combos.txt 6000 1500 8000"); |
| std::process::exit(1); |
| } |
| let cpus = num_cpus::get(); |
| Config { |
| cidr: args[1].clone(), |
| combos_file: args.get(2).cloned().unwrap_or_else(|| "combos.txt".into()), |
| tcp_workers: args.get(3).and_then(|s| s.parse().ok()).unwrap_or(6000), |
| http_concurrency: args.get(4).and_then(|s| s.parse().ok()).unwrap_or(1500), |
| rate_per_sec: args.get(5).and_then(|s| s.parse().ok()).unwrap_or(8000), |
| pass1_ms: 80, |
| banner_ms: 40, |
| http_ms: 1500, |
| client_shards: cpus.min(12).max(4), |
| } |
| } |
| } |
|
|
| #[derive(Serialize, Deserialize, Debug, Clone)] |
| #[serde(tag = "type")] |
| enum OutputMsg { |
| Progress { scanned: usize, total: usize }, |
| Found { |
| ip: String, |
| port: u16, |
| url: String, |
| auth: String, |
| status: String, |
| }, |
| } |
|
|
| struct TokenBucket { |
| interval_ns: u64, |
| burst_size: usize, |
| next_slot_ns: std::sync::atomic::AtomicU64, |
| epoch: Instant, |
| } |
|
|
| impl TokenBucket { |
| fn new(per_second: u64) -> Arc<Self> { |
| Arc::new(Self { |
| interval_ns: if per_second == 0 { 0 } else { 1_000_000_000 / per_second }, |
| burst_size: (per_second / 10).max(100) as usize, |
| next_slot_ns: std::sync::atomic::AtomicU64::new(0), |
| epoch: Instant::now(), |
| }) |
| } |
| |
| async fn acquire(&self) { |
| if self.interval_ns == 0 { return; } |
| loop { |
| let now = self.epoch.elapsed().as_nanos() as u64; |
| let prev = self.next_slot_ns.load(Ordering::Relaxed); |
| let slot = prev.max(now) + self.interval_ns; |
| if self.next_slot_ns |
| .compare_exchange_weak(prev, slot, Ordering::AcqRel, Ordering::Relaxed) |
| .is_ok() |
| { |
| let wait = slot.saturating_sub(now + self.interval_ns); |
| if wait > 0 { tokio::time::sleep(Duration::from_nanos(wait)).await; } |
| return; |
| } |
| tokio::task::yield_now().await; |
| } |
| } |
| } |
|
|
| async fn is_http_like(stream: &mut TcpStream, port: u16, timeout_ms: u64) -> bool { |
| |
| if SKIP_BANNER_PORTS.contains(&port) { |
| return true; |
| } |
| |
| |
| if HTTPS_PORTS.contains(&port) { return true; } |
| |
| |
| if port == 37777 { return false; } |
|
|
| let mut buf = [0u8; 12]; |
| match tokio::time::timeout( |
| Duration::from_millis(timeout_ms), |
| stream.read(&mut buf), |
| ).await { |
| Err(_) => true, |
| Ok(Err(_)) => false, |
| Ok(Ok(0)) => false, |
| Ok(Ok(n)) => classify_banner(&buf[..n]), |
| } |
| } |
|
|
| #[inline] |
| fn classify_banner(b: &[u8]) -> bool { |
| let len = b.len(); |
| if len < 2 { return false; } |
| |
| |
| let first_two = u16::from_be_bytes([b[0], b[1]]); |
| match first_two { |
| 0x1603 => return true, |
| 0x4854 => return true, |
| 0x5353 => return false, |
| 0x3232 => return false, |
| 0x5254 => return false, |
| 0xFFFD => return false, |
| 0x2000 => return false, |
| _ => {} |
| } |
| |
| |
| let threshold = len / 2; |
| let mut non_printable = 0; |
| for &byte in b.iter() { |
| if byte < 0x20 && byte != b'\r' && byte != b'\n' && byte != b'\t' { |
| non_printable += 1; |
| if non_printable > threshold { return false; } |
| } |
| } |
| true |
| } |
|
|
| fn load_credentials(path: &str) -> Arc<Vec<(String, String)>> { |
| let content = std::fs::read_to_string(path).unwrap_or_default(); |
| let combos: Vec<_> = content.lines() |
| .filter_map(|l| { |
| let t = l.trim(); |
| if t.is_empty() || t.starts_with('#') { return None; } |
| t.split_once(':').map(|(u, p)| (u.to_string(), p.to_string())) |
| }) |
| .collect(); |
| if combos.is_empty() { |
| Arc::new(vec![ |
| ("admin".into(), "admin".into()), |
| ("admin".into(), "12345".into()), |
| ("admin".into(), "password".into()), |
| ("root".into(), "root".into()), |
| ("admin".into(), "".into()), |
| ]) |
| } else { |
| eprintln!("Loaded {} credential pairs", combos.len()); |
| Arc::new(combos) |
| } |
| } |
|
|
| async fn probe_http( |
| ip: Ipv4Addr, |
| port: u16, |
| client: &Client, |
| combos: &[(String, String)], |
| ) -> Option<OutputMsg> { |
| let scheme = if HTTPS_PORTS.contains(&port) { "https" } else { "http" }; |
| let base_url = format!("{}://{}:{}", scheme, ip, port); |
|
|
| |
| let resp = match tokio::time::timeout( |
| Duration::from_millis(1500), |
| client.get(&base_url).send() |
| ).await { |
| Ok(Ok(r)) => r, |
| _ => return None, |
| }; |
| |
| let st = resp.status(); |
| if st != StatusCode::OK && st != StatusCode::UNAUTHORIZED { return None; } |
|
|
| let mut found_auth = "None".to_string(); |
| let mut verified = st == StatusCode::OK; |
|
|
| if !verified { |
| |
| let priority_combos = [ |
| ("admin", "admin"), |
| ("admin", ""), |
| ("admin", "12345"), |
| ("admin", "password"), |
| ]; |
| |
| |
| for (u, p) in &priority_combos { |
| if let Ok(Ok(r)) = tokio::time::timeout( |
| Duration::from_millis(1000), |
| client.get(&base_url).basic_auth(u, Some(p)).send() |
| ).await { |
| if r.status() == StatusCode::OK { |
| found_auth = format!("{}:{}", u, p); |
| verified = true; |
| return Some(OutputMsg::Found { |
| ip: ip.to_string(), port, url: base_url, auth: found_auth, |
| status: "Verified".into(), |
| }); |
| } |
| } |
| } |
| |
| |
| if !verified { |
| for (u, p) in combos.iter().take(10) { |
| if priority_combos.iter().any(|(pu, pp)| pu == u && pp == p) { |
| continue; |
| } |
| if let Ok(Ok(r)) = tokio::time::timeout( |
| Duration::from_millis(800), |
| client.get(&base_url).basic_auth(u, Some(p)).send() |
| ).await { |
| if r.status() == StatusCode::OK { |
| found_auth = format!("{}:{}", u, p); |
| verified = true; |
| break; |
| } |
| } |
| } |
| } |
| } |
|
|
| Some(OutputMsg::Found { |
| ip: ip.to_string(), port, url: base_url, auth: found_auth, |
| status: if verified { "Verified".into() } else { "Locked".into() }, |
| }) |
| } |
|
|
| async fn tcp_connect_fast( |
| addr: SocketAddr, |
| timeout_ms: u64, |
| stats: Option<&NetworkStats>, |
| ) -> Option<(TcpStream, u64)> { |
| use socket2::{Domain, Protocol, Socket, Type}; |
|
|
| let start = Instant::now(); |
| |
| let sock = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP)).ok()?; |
| sock.set_reuse_address(true).ok()?; |
| sock.set_nonblocking(true).ok()?; |
| sock.set_nodelay(true).ok()?; |
| |
| |
| #[cfg(target_os = "linux")] |
| { |
| use std::os::unix::io::AsRawFd; |
| let fd = sock.as_raw_fd(); |
| unsafe { |
| libc::setsockopt( |
| fd, |
| libc::IPPROTO_TCP, |
| libc::TCP_QUICKACK, |
| &1i32 as *const _ as *const libc::c_void, |
| std::mem::size_of::<i32>() as libc::socklen_t, |
| ); |
| } |
| } |
|
|
| let addr2 = socket2::SockAddr::from(addr); |
| match sock.connect(&addr2) { |
| Ok(_) => {} |
| Err(e) => { |
| let err_code = e.raw_os_error(); |
| #[cfg(unix)] |
| let is_in_progress = err_code == Some(libc::EINPROGRESS) || err_code == Some(libc::EWOULDBLOCK); |
| #[cfg(windows)] |
| let is_in_progress = err_code == Some(10035); |
| |
| if !is_in_progress { return None; } |
| } |
| } |
|
|
| let std_stream: std::net::TcpStream = sock.into(); |
| let mut stream = TcpStream::from_std(std_stream).ok()?; |
|
|
| tokio::time::timeout( |
| Duration::from_millis(timeout_ms), |
| stream.writable(), |
| ).await.ok()?.ok()?; |
|
|
| match stream.peer_addr() { |
| Ok(_) => { |
| let elapsed_ns = start.elapsed().as_nanos() as u64; |
| if let Some(s) = stats { |
| s.record_response(elapsed_ns); |
| } |
| Some((stream, elapsed_ns)) |
| } |
| Err(_) => None, |
| } |
| } |
|
|
| struct SharedState { |
| scanned: AtomicUsize, |
| found: AtomicUsize, |
| total: usize, |
| } |
|
|
| #[derive(Clone, Copy)] |
| struct RetryEntry { |
| ip: Ipv4Addr, |
| port: u16, |
| timeout_ms: u64, |
| } |
|
|
| async fn worker( |
| _worker_id: usize, |
| work_rx: flume::Receiver<(Ipv4Addr, u16, u8)>, |
| retry_tx: flume::Sender<RetryEntry>, |
| out_tx: tokio::sync::mpsc::Sender<OutputMsg>, |
| client: Arc<Client>, |
| combos: Arc<Vec<(String, String)>>, |
| http_sem: Arc<Semaphore>, |
| bucket: Arc<TokenBucket>, |
| state: Arc<SharedState>, |
| stats: Arc<NetworkStats>, |
| host_state: Arc<HostState>, |
| pass1_ms: u64, |
| banner_ms: u64, |
| ) { |
| while let Ok((ip, port, pass)) = work_rx.recv_async().await { |
| bucket.acquire().await; |
|
|
| let timeout_ms = if pass == 1 { pass1_ms } else { pass1_ms * 2 }; |
| let addr = SocketAddr::V4(SocketAddrV4::new(ip, port)); |
|
|
| match tcp_connect_fast(addr, timeout_ms, Some(&stats)).await { |
| Some((mut stream, response_ns)) => { |
| |
| if pass == 1 { |
| host_state.mark_responsive(ip).await; |
| } |
| |
| if is_http_like(&mut stream, port, banner_ms).await { |
| let _http = http_sem.acquire().await.expect("http sem"); |
| if let Some(msg) = probe_http(ip, port, &client, &combos).await { |
| state.found.fetch_add(1, Ordering::Relaxed); |
| let _ = out_tx.send(msg).await; |
| } |
| } |
| } |
| None => { |
| if pass == 1 { |
| |
| let adaptive_timeout = stats.get_adaptive_timeout(); |
| let retry_timeout = if TIER1_PORTS.contains(&port) { |
| adaptive_timeout |
| } else { |
| (adaptive_timeout * 3).min(500) |
| }; |
| |
| let _ = retry_tx.send(RetryEntry { |
| ip, |
| port, |
| timeout_ms: retry_timeout, |
| }); |
| } |
| } |
| } |
|
|
| state.scanned.fetch_add(1, Ordering::Relaxed); |
| } |
| } |
|
|
| async fn progressive_scan_phase( |
| hosts: &[Ipv4Addr], |
| ports: &[u16], |
| work_tx: &flume::Sender<(Ipv4Addr, u16, u8)>, |
| host_state: &Arc<HostState>, |
| is_tier1: bool, |
| ) { |
| for &port in ports { |
| for &ip in hosts { |
| |
| if !is_tier1 && !host_state.is_responsive(ip).await { |
| continue; |
| } |
| |
| if work_tx.send_async((ip, port, 1)).await.is_err() { |
| return; |
| } |
| } |
| } |
| } |
|
|
| #[tokio::main(flavor = "multi_thread")] |
| async fn main() { |
| let cfg = Config::from_args(); |
|
|
| let net: ipnet::Ipv4Net = match cfg.cidr.parse() { |
| Ok(n) => n, |
| Err(e) => { eprintln!("Invalid CIDR: {}", e); std::process::exit(1); } |
| }; |
|
|
| let combos = load_credentials(&cfg.combos_file); |
| let hosts: Vec<Ipv4Addr> = net.hosts().collect(); |
| let all_ports: Vec<u16> = TIER1_PORTS.iter() |
| .chain(TIER2_PORTS.iter()) |
| .chain(TIER3_PORTS.iter()) |
| .copied() |
| .collect(); |
| let pass1_total = hosts.len() * all_ports.len(); |
|
|
| eprintln!( |
| "Scanner v6 ULTRA | {} hosts × {} ports = {} targets", |
| hosts.len(), all_ports.len(), pass1_total, |
| ); |
| eprintln!( |
| "Config: workers={} http={} rate={}/s shards={} adaptive-retry=ON", |
| cfg.tcp_workers, cfg.http_concurrency, cfg.rate_per_sec, cfg.client_shards, |
| ); |
|
|
| |
| let stats = NetworkStats::new(); |
| let host_state = HostState::new(); |
|
|
| |
| let clients: Vec<Arc<Client>> = (0..cfg.client_shards) |
| .map(|_| { |
| Arc::new( |
| Client::builder() |
| .use_rustls_tls() |
| .danger_accept_invalid_certs(true) |
| .danger_accept_invalid_hostnames(true) |
| .pool_max_idle_per_host(12) |
| .pool_idle_timeout(Duration::from_secs(4)) |
| .tcp_keepalive(None) |
| .tcp_nodelay(true) |
| .timeout(Duration::from_millis(cfg.http_ms)) |
| .redirect(reqwest::redirect::Policy::none()) |
| .http2_prior_knowledge() |
| .build() |
| .unwrap(), |
| ) |
| }) |
| .collect(); |
|
|
| let http_sem = Arc::new(Semaphore::new(cfg.http_concurrency)); |
| let bucket = TokenBucket::new(cfg.rate_per_sec); |
|
|
| let (work_tx, work_rx) = flume::bounded::<(Ipv4Addr, u16, u8)>(cfg.tcp_workers * 2); |
| let (retry_tx, retry_rx) = flume::unbounded::<RetryEntry>(); |
| let (out_tx, mut out_rx) = tokio::sync::mpsc::channel::<OutputMsg>(65536); |
|
|
| let state = Arc::new(SharedState { |
| scanned: AtomicUsize::new(0), |
| found: AtomicUsize::new(0), |
| total: pass1_total, |
| }); |
|
|
| |
| let writer = tokio::spawn(async move { |
| let mut out = BufWriter::with_capacity(2 * 1024 * 1024, tokio::io::stdout()); |
| while let Some(msg) = out_rx.recv().await { |
| let is_progress = matches!(msg, OutputMsg::Progress { .. }); |
| if let Ok(json) = serde_json::to_string(&msg) { |
| let _ = out.write_all(json.as_bytes()).await; |
| let _ = out.write_all(b"\n").await; |
| } |
| if is_progress { let _ = out.flush().await; } |
| } |
| let _ = out.flush().await; |
| }); |
|
|
| |
| let prog_state = Arc::clone(&state); |
| let prog_tx = out_tx.clone(); |
| let ticker = tokio::spawn(async move { |
| let mut interval = tokio::time::interval(Duration::from_millis(150)); |
| loop { |
| interval.tick().await; |
| let s = prog_state.scanned.load(Ordering::Relaxed); |
| let t = prog_state.total; |
| let _ = prog_tx.send(OutputMsg::Progress { scanned: s, total: t }).await; |
| if s >= t { break; } |
| } |
| }); |
|
|
| |
| let mut worker_handles = Vec::with_capacity(cfg.tcp_workers); |
| for id in 0..cfg.tcp_workers { |
| let client = Arc::clone(&clients[id % cfg.client_shards]); |
| let combos = Arc::clone(&combos); |
| let http_sem = Arc::clone(&http_sem); |
| let bucket = Arc::clone(&bucket); |
| let state = Arc::clone(&state); |
| let stats = Arc::clone(&stats); |
| let host_state = Arc::clone(&host_state); |
| let work_rx = work_rx.clone(); |
| let retry_tx = retry_tx.clone(); |
| let out_tx = out_tx.clone(); |
| let p1ms = cfg.pass1_ms; |
| let bms = cfg.banner_ms; |
|
|
| worker_handles.push(tokio::spawn(async move { |
| worker(id, work_rx, retry_tx, out_tx, client, combos, |
| http_sem, bucket, state, stats, host_state, p1ms, bms).await; |
| })); |
| } |
| drop(retry_tx); |
|
|
| |
| eprintln!("Pass 1a: Scanning Tier 1 ports (high probability)..."); |
| progressive_scan_phase(&hosts, &TIER1_PORTS, &work_tx, &host_state, true).await; |
| |
| eprintln!("Pass 1b: Scanning Tier 2 ports (medium probability)..."); |
| progressive_scan_phase(&hosts, &TIER2_PORTS, &work_tx, &host_state, false).await; |
| |
| eprintln!("Pass 1c: Scanning Tier 3 ports (low probability)..."); |
| progressive_scan_phase(&hosts, &TIER3_PORTS, &work_tx, &host_state, false).await; |
| |
| drop(work_tx); |
|
|
| |
| for h in worker_handles { let _ = h.await; } |
| ticker.abort(); |
|
|
| |
| let retry_targets: Vec<RetryEntry> = retry_rx.drain().collect(); |
|
|
| if !retry_targets.is_empty() { |
| let avg_timeout = retry_targets.iter() |
| .map(|r| r.timeout_ms) |
| .sum::<u64>() / retry_targets.len() as u64; |
| |
| eprintln!( |
| "Pass 2: {} retries with adaptive timeout (avg {}ms)", |
| retry_targets.len(), |
| avg_timeout |
| ); |
|
|
| let p2_state = Arc::new(SharedState { |
| scanned: AtomicUsize::new(0), |
| found: AtomicUsize::new(0), |
| total: retry_targets.len(), |
| }); |
|
|
| let p2_workers = (cfg.tcp_workers / 2).max(1000); |
| let (w2_tx, w2_rx) = flume::bounded::<(Ipv4Addr, u16, u8)>(p2_workers * 2); |
|
|
| let p2_ticker_state = Arc::clone(&p2_state); |
| let p2_ticker_tx = out_tx.clone(); |
| let p2_ticker = tokio::spawn(async move { |
| let mut interval = tokio::time::interval(Duration::from_millis(150)); |
| loop { |
| interval.tick().await; |
| let s = p2_ticker_state.scanned.load(Ordering::Relaxed); |
| let t = p2_ticker_state.total; |
| let _ = p2_ticker_tx.send(OutputMsg::Progress { scanned: s, total: t }).await; |
| if s >= t { break; } |
| } |
| }); |
|
|
| let (dummy_retry_tx, _) = flume::unbounded::<RetryEntry>(); |
| let mut p2_handles = Vec::with_capacity(p2_workers); |
| |
| for id in 0..p2_workers { |
| let client = Arc::clone(&clients[id % cfg.client_shards]); |
| let combos = Arc::clone(&combos); |
| let http_sem = Arc::clone(&http_sem); |
| let bucket = Arc::clone(&bucket); |
| let state2 = Arc::clone(&p2_state); |
| let stats2 = Arc::clone(&stats); |
| let host_state2 = Arc::clone(&host_state); |
| let w2_rx = w2_rx.clone(); |
| let retry_tx2 = dummy_retry_tx.clone(); |
| let out_tx2 = out_tx.clone(); |
| let p1ms = cfg.pass1_ms; |
| let bms = cfg.banner_ms; |
|
|
| p2_handles.push(tokio::spawn(async move { |
| worker(id, w2_rx, retry_tx2, out_tx2, client, combos, |
| http_sem, bucket, state2, stats2, host_state2, p1ms, bms).await; |
| })); |
| } |
|
|
| |
| |
| for entry in retry_targets { |
| if w2_tx.send_async((entry.ip, entry.port, 2)).await.is_err() { |
| break; |
| } |
| } |
| drop(w2_tx); |
| |
| for h in p2_handles { let _ = h.await; } |
| p2_ticker.abort(); |
| } |
|
|
| drop(out_tx); |
| let _ = writer.await; |
| |
| |
| let total_connects = stats.successful_connects.load(Ordering::Relaxed); |
| if total_connects > 0 { |
| let avg_response = stats.total_response_time_ns.load(Ordering::Relaxed) / total_connects as u64; |
| eprintln!( |
| "Network stats: {} successful connects, avg response time {}ms", |
| total_connects, |
| avg_response / 1_000_000 |
| ); |
| } |
| } |