| use anyhow::{Context, Result}; |
| use clap::Parser; |
| use std::net::SocketAddr; |
| use std::path::PathBuf; |
| use tokio::sync::mpsc; |
| use tracing::info; |
|
|
| mod api; |
| mod models; |
| mod storage; |
| mod ws_client; |
|
|
| use api::build_router; |
| use models::AppState; |
| use storage::StorageWriter; |
| use ws_client::spawn_connection; |
|
|
| |
| |
| |
| |
| #[derive(Parser)] |
| #[command(name = "sina-realtime-api", version)] |
| struct Cli { |
| |
| #[arg(short, long, default_value = "stocks_100.txt")] |
| stocks: PathBuf, |
|
|
| |
| #[arg(short, long, default_value = "data")] |
| output: PathBuf, |
|
|
| |
| #[arg(long, default_value_t = 500)] |
| chunk_size: usize, |
|
|
| |
| #[arg(long, default_value_t = 131_072)] |
| buffer: usize, |
|
|
| |
| #[arg(long, default_value = "0.0.0.0")] |
| api_host: String, |
|
|
| |
| #[arg(long)] |
| api_port: Option<u16>, |
| } |
|
|
| #[tokio::main] |
| async fn main() -> Result<()> { |
| tracing_subscriber::fmt() |
| .with_env_filter( |
| tracing_subscriber::EnvFilter::try_from_default_env() |
| .unwrap_or_else(|_| "sina_realtime_collector=info,tower_http=warn".into()), |
| ) |
| .init(); |
|
|
| let cli = Cli::parse(); |
|
|
| |
| let content = std::fs::read_to_string(&cli.stocks) |
| .with_context(|| format!("Cannot read stock list: {:?}", cli.stocks))?; |
|
|
| let stocks: Vec<String> = content |
| .lines() |
| .map(|l| l.trim().to_string()) |
| .filter(|l| !l.is_empty() && !l.starts_with('#')) |
| .collect(); |
|
|
| anyhow::ensure!(!stocks.is_empty(), "Stock list is empty"); |
| info!("Loaded {} stocks from {:?}", stocks.len(), cli.stocks); |
|
|
| std::fs::create_dir_all(&cli.output) |
| .with_context(|| format!("Cannot create output dir: {:?}", cli.output))?; |
|
|
| let started_at = chrono::Local::now() |
| .format("%Y-%m-%dT%H:%M:%S%.3f") |
| .to_string(); |
| let state = AppState::new(stocks.clone(), started_at); |
|
|
| |
| let (tx, rx) = mpsc::channel::<String>(cli.buffer); |
|
|
| |
| let storage_state = state.clone(); |
| let output_dir = cli.output.clone(); |
| let storage_handle = tokio::spawn(async move { |
| if let Err(e) = StorageWriter::run(rx, output_dir, storage_state).await { |
| tracing::error!("Storage error: {e:#}"); |
| } |
| }); |
|
|
| |
| let chunks: Vec<Vec<String>> = stocks |
| .chunks(cli.chunk_size) |
| .map(|c| c.to_vec()) |
| .collect(); |
|
|
| info!( |
| "Starting {} connection(s) (~{} stocks each)", |
| chunks.len(), |
| cli.chunk_size |
| ); |
|
|
| let mut conn_handles = Vec::with_capacity(chunks.len()); |
| for (i, chunk) in chunks.into_iter().enumerate() { |
| let tx = tx.clone(); |
| conn_handles.push(tokio::spawn(spawn_connection(chunk, tx, i))); |
| } |
| drop(tx); |
|
|
| let port = cli |
| .api_port |
| .or_else(|| std::env::var("PORT").ok().and_then(|v| v.parse::<u16>().ok())) |
| .unwrap_or(7860); |
| let addr: SocketAddr = format!("{}:{}", cli.api_host, port) |
| .parse() |
| .with_context(|| format!("Invalid API bind address: {}:{}", cli.api_host, port))?; |
|
|
| let app = build_router(state); |
| let listener = tokio::net::TcpListener::bind(addr).await?; |
| info!("API listening on http://{addr}"); |
|
|
| let server_result = axum::serve(listener, app) |
| .with_graceful_shutdown(shutdown_signal()) |
| .await; |
|
|
| for h in conn_handles { |
| h.abort(); |
| } |
| storage_handle.abort(); |
|
|
| server_result?; |
| Ok(()) |
| } |
|
|
| async fn shutdown_signal() { |
| if let Err(e) = tokio::signal::ctrl_c().await { |
| tracing::warn!("Failed to listen for shutdown signal: {e}"); |
| } |
| info!("Shutdown signal received"); |
| } |
|
|