File size: 7,426 Bytes
7c3e988 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 | use anyhow::Result;
use futures_util::{SinkExt, StreamExt};
use std::time::Duration;
use tokio::sync::mpsc::Sender;
use tokio::time::{sleep, interval, Instant};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
use tracing::{debug, error, info, warn};
const WSS_BASE: &str = "wss://hq.sinajs.cn/wskt";
const ORIGIN: &str = "https://finance.sina.com.cn";
const USER_AGENT: &str =
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36";
/// Send a WebSocket ping at this interval to keep the connection alive.
const PING_INTERVAL: Duration = Duration::from_secs(20);
/// If no message (data or Pong) is received within this duration, treat the connection as dead.
/// Must be greater than PING_INTERVAL so a pong reply can reset the clock first.
const IDLE_TIMEOUT: Duration = Duration::from_secs(45);
/// Runs a WebSocket connection for the given stock chunk, reconnecting on error.
pub async fn spawn_connection(stocks: Vec<String>, tx: Sender<String>, conn_id: usize) {
let list = stocks.join(",");
let url = format!("{}?list={}", WSS_BASE, list);
let mut backoff = Duration::from_secs(1);
let max_backoff = Duration::from_secs(30);
let mut attempt = 0u32;
// Spread out initial reconnects so all 11 connections don't hammer the server at once.
// Each conn waits an extra conn_id * 150ms.
if conn_id > 0 {
sleep(Duration::from_millis(conn_id as u64 * 150)).await;
}
loop {
if attempt > 0 {
// Add per-connection jitter to prevent thundering-herd reconnects.
let jitter = Duration::from_millis(conn_id as u64 * 50);
warn!(
"[conn {conn_id}] Reconnect attempt {attempt}, waiting {:.1}s",
(backoff + jitter).as_secs_f32()
);
sleep(backoff + jitter).await;
backoff = (backoff * 2).min(max_backoff);
}
// Stop if the storage channel is gone (shutdown)
if tx.is_closed() {
info!("[conn {conn_id}] Channel closed, stopping");
return;
}
match build_request(&url) {
Err(e) => {
error!("[conn {conn_id}] Bad request: {e}");
return;
}
Ok(req) => match connect_async(req).await {
Err(e) => {
error!("[conn {conn_id}] Connect failed: {e}");
attempt += 1;
}
Ok((ws_stream, _)) => {
info!("[conn {conn_id}] Connected ({} stocks)", stocks.len());
// Reset backoff only after we've successfully received data.
// If the server immediately drops us, keep backing off.
let received = read_loop(ws_stream, &tx, conn_id).await;
match received {
Ok(n) if n > 0 => {
// Got real data → healthy connection, reset backoff
backoff = Duration::from_secs(1);
attempt = 0;
info!("[conn {conn_id}] Disconnected after {n} records");
}
Ok(_) => {
warn!("[conn {conn_id}] Connected but received no data (idle timeout or immediate close)");
attempt += 1;
}
Err(e) => {
warn!("[conn {conn_id}] Read error: {e}");
// Server hard-closed the TCP connection — reconnect immediately.
// If the new connect also fails, normal backoff will kick in.
let msg = e.to_string();
if msg.contains("Connection reset") || msg.contains("without closing") {
backoff = Duration::from_secs(1);
attempt = 0; // skip backoff wait on next iteration
} else {
attempt += 1;
}
}
}
}
},
}
}
}
fn build_request(url: &str) -> Result<http::Request<()>> {
// tungstenite requires Sec-WebSocket-Key when using a custom http::Request
let key = tokio_tungstenite::tungstenite::handshake::client::generate_key();
Ok(http::Request::builder()
.uri(url)
.header("Host", "hq.sinajs.cn")
.header("Connection", "Upgrade")
.header("Upgrade", "websocket")
.header("Sec-WebSocket-Version", "13")
.header("Sec-WebSocket-Key", key)
.header("Origin", ORIGIN)
.header("User-Agent", USER_AGENT)
.body(())?)
}
/// Returns the number of records successfully forwarded, or an error.
async fn read_loop(
mut stream: WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>,
tx: &Sender<String>,
conn_id: usize,
) -> Result<u64> {
let mut count: u64 = 0;
// Send a ping every PING_INTERVAL to keep the connection alive.
let mut ping_ticker = interval(PING_INTERVAL);
ping_ticker.tick().await; // consume the first immediate tick
// Resettable idle-timeout: reset on every received message (data or Pong).
let mut idle = Box::pin(sleep(IDLE_TIMEOUT));
loop {
tokio::select! {
_ = ping_ticker.tick() => {
if let Err(e) = stream.send(Message::Ping(vec![])).await {
warn!("[conn {conn_id}] Ping failed: {e}");
return Ok(count);
}
debug!("[conn {conn_id}] Sent keepalive ping");
}
_ = &mut idle => {
warn!(
"[conn {conn_id}] No data for {}s, reconnecting",
IDLE_TIMEOUT.as_secs()
);
return Ok(count);
}
msg_opt = stream.next() => {
// Any incoming message (Pong, data, …) proves the connection is alive.
idle.as_mut().reset(Instant::now() + IDLE_TIMEOUT);
match msg_opt {
None => return Ok(count), // stream closed
Some(Err(e)) => return Err(e.into()),
Some(Ok(msg)) => match msg {
Message::Text(text) => {
let text_str: &str = &text;
for token in text_str.split_ascii_whitespace() {
if token.contains('=') {
if tx.send(token.to_string()).await.is_err() {
return Ok(count); // channel closed = shutdown
}
count += 1;
}
}
}
Message::Close(_) => {
warn!("[conn {conn_id}] Server sent Close");
return Ok(count);
}
// tungstenite auto-replies to server Pings; Pong frames just reset idle.
_ => {}
},
}
}
}
}
}
|