sinastock / src /main.rs
Spooker's picture
Upload 16 files
7c3e988 verified
use anyhow::{Context, Result};
use clap::Parser;
use std::net::SocketAddr;
use std::path::PathBuf;
use tokio::sync::mpsc;
use tracing::info;
mod api;
mod models;
mod storage;
mod ws_client;
use api::build_router;
use models::AppState;
use storage::StorageWriter;
use ws_client::spawn_connection;
/// Sina Finance WebSocket real-time data API.
///
/// Connects to wss://hq.sinajs.cn/wskt, keeps latest stock tick data in memory,
/// exposes a REST API, and also persists raw records to daily CSV files.
#[derive(Parser)]
#[command(name = "sina-realtime-api", version)]
struct Cli {
/// Stock list file — one code per line, e.g. sz300394 (lines starting with # are ignored)
#[arg(short, long, default_value = "stocks_100.txt")]
stocks: PathBuf,
/// Directory for output CSV files
#[arg(short, long, default_value = "data")]
output: PathBuf,
/// Max stocks per WebSocket connection (tune based on URL length / server limits)
#[arg(long, default_value_t = 500)]
chunk_size: usize,
/// Internal channel buffer (records in flight between WS tasks and storage)
#[arg(long, default_value_t = 131_072)]
buffer: usize,
/// API host. Hugging Face Spaces should use 0.0.0.0.
#[arg(long, default_value = "0.0.0.0")]
api_host: String,
/// API port. If omitted, reads PORT env var, then falls back to 7860.
#[arg(long)]
api_port: Option<u16>,
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "sina_realtime_collector=info,tower_http=warn".into()),
)
.init();
let cli = Cli::parse();
// Load and validate stock list
let content = std::fs::read_to_string(&cli.stocks)
.with_context(|| format!("Cannot read stock list: {:?}", cli.stocks))?;
let stocks: Vec<String> = content
.lines()
.map(|l| l.trim().to_string())
.filter(|l| !l.is_empty() && !l.starts_with('#'))
.collect();
anyhow::ensure!(!stocks.is_empty(), "Stock list is empty");
info!("Loaded {} stocks from {:?}", stocks.len(), cli.stocks);
std::fs::create_dir_all(&cli.output)
.with_context(|| format!("Cannot create output dir: {:?}", cli.output))?;
let started_at = chrono::Local::now()
.format("%Y-%m-%dT%H:%M:%S%.3f")
.to_string();
let state = AppState::new(stocks.clone(), started_at);
// Shared channel: WS tasks -> storage/API-state task
let (tx, rx) = mpsc::channel::<String>(cli.buffer);
// Storage also updates the latest-quote in-memory map used by the API.
let storage_state = state.clone();
let output_dir = cli.output.clone();
let storage_handle = tokio::spawn(async move {
if let Err(e) = StorageWriter::run(rx, output_dir, storage_state).await {
tracing::error!("Storage error: {e:#}");
}
});
// Split stocks into chunks, one WebSocket connection per chunk.
let chunks: Vec<Vec<String>> = stocks
.chunks(cli.chunk_size)
.map(|c| c.to_vec())
.collect();
info!(
"Starting {} connection(s) (~{} stocks each)",
chunks.len(),
cli.chunk_size
);
let mut conn_handles = Vec::with_capacity(chunks.len());
for (i, chunk) in chunks.into_iter().enumerate() {
let tx = tx.clone();
conn_handles.push(tokio::spawn(spawn_connection(chunk, tx, i)));
}
drop(tx);
let port = cli
.api_port
.or_else(|| std::env::var("PORT").ok().and_then(|v| v.parse::<u16>().ok()))
.unwrap_or(7860);
let addr: SocketAddr = format!("{}:{}", cli.api_host, port)
.parse()
.with_context(|| format!("Invalid API bind address: {}:{}", cli.api_host, port))?;
let app = build_router(state);
let listener = tokio::net::TcpListener::bind(addr).await?;
info!("API listening on http://{addr}");
let server_result = axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await;
for h in conn_handles {
h.abort();
}
storage_handle.abort();
server_result?;
Ok(())
}
async fn shutdown_signal() {
if let Err(e) = tokio::signal::ctrl_c().await {
tracing::warn!("Failed to listen for shutdown signal: {e}");
}
info!("Shutdown signal received");
}