| use crate::models::{quote_from_record, AppState}; |
| use anyhow::Result; |
| use chrono::Local; |
| use std::fs::{File, OpenOptions}; |
| use std::io::{BufWriter, Write}; |
| use std::path::PathBuf; |
| use std::time::Duration; |
| use tokio::sync::mpsc::Receiver; |
| use tracing::info; |
|
|
| pub struct StorageWriter; |
|
|
| impl StorageWriter { |
| |
| |
| |
| |
| pub async fn run(mut rx: Receiver<String>, output_dir: PathBuf, state: AppState) -> Result<()> { |
| let mut current_date = String::new(); |
| let mut writer: Option<BufWriter<File>> = None; |
| let mut total: u64 = 0; |
| let mut since_last_stat: u64 = 0; |
|
|
| |
| let mut flush_tick = tokio::time::interval(Duration::from_secs(5)); |
| flush_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); |
|
|
| |
| let mut stat_tick = tokio::time::interval(Duration::from_secs(60)); |
| stat_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); |
| |
| stat_tick.tick().await; |
|
|
| loop { |
| tokio::select! { |
| biased; |
|
|
| msg = rx.recv() => { |
| let Some(record) = msg else { |
| break; |
| }; |
|
|
| let now = Local::now(); |
| let date_str = now.format("%Y-%m-%d").to_string(); |
| let timestamp = now.format("%Y-%m-%dT%H:%M:%S%.3f").to_string(); |
|
|
| if let Some(quote) = quote_from_record(&record, timestamp.clone()) { |
| state.latest.write().await.insert(quote.code.clone(), quote); |
| } |
|
|
| |
| if date_str != current_date { |
| if let Some(mut w) = writer.take() { |
| w.flush()?; |
| } |
| writer = Some(open_daily_file(&output_dir, &date_str)?); |
| current_date = date_str; |
| } |
|
|
| if let Some(w) = writer.as_mut() { |
| if let Some((code, fields)) = record.split_once('=') { |
| writeln!(w, "{timestamp},{code},\"{fields}\"")?; |
| } else { |
| writeln!(w, "{timestamp},,\"{record}\"")?; |
| } |
| total += 1; |
| since_last_stat += 1; |
| } |
| } |
|
|
| _ = flush_tick.tick() => { |
| if let Some(w) = writer.as_mut() { |
| w.flush()?; |
| } |
| } |
|
|
| _ = stat_tick.tick() => { |
| let rate = since_last_stat / 60; |
| if since_last_stat > 0 { |
| info!("[stats] {rate} records/sec (total: {total})"); |
| } else { |
| info!("[stats] No data in last 60s (total: {total}) — market may be closed or connections reconnecting"); |
| } |
| since_last_stat = 0; |
| } |
| } |
| } |
|
|
| |
| if let Some(mut w) = writer { |
| w.flush()?; |
| } |
| info!("Storage closed. Total records written: {total}"); |
| Ok(()) |
| } |
| } |
|
|
| fn open_daily_file(dir: &PathBuf, date: &str) -> Result<BufWriter<File>> { |
| let path = dir.join(format!("data_{date}.csv")); |
| let is_new = !path.exists(); |
| let file = OpenOptions::new().create(true).append(true).open(&path)?; |
| let mut writer = BufWriter::with_capacity(512 * 1024, file); |
| if is_new { |
| writeln!(writer, "received_at,code,fields")?; |
| } |
| info!("Opened data file: {:?}", path); |
| Ok(writer) |
| } |
|
|