use crate::config::Config; use crate::models::{StockInfo, MarketData}; use anyhow::Result; use clickhouse::Row; use serde::{Deserialize, Serialize}; use tracing::info; /// ClickHouse客户端封装 pub struct ClickHouseClient { client: clickhouse::Client, batch_size: usize, } #[derive(Debug, Clone, Row, Serialize, Deserialize)] struct StockInfoRow { code: String, name: String, industry_l1_code: String, industry_l1_name: String, industry_l2_code: String, industry_l2_name: String, industry_l3_code: String, industry_l3_name: String, industry_l4_code: String, industry_l4_name: String, } impl From for StockInfoRow { fn from(s: StockInfo) -> Self { Self { code: s.code, name: s.name, industry_l1_code: s.industry_l1_code, industry_l1_name: s.industry_l1_name, industry_l2_code: s.industry_l2_code, industry_l2_name: s.industry_l2_name, industry_l3_code: s.industry_l3_code, industry_l3_name: s.industry_l3_name, industry_l4_code: s.industry_l4_code, industry_l4_name: s.industry_l4_name, } } } /// ClickHouse 原生行类型:trade_date 使用 time::Date 直接映射 Date 列 #[derive(Row, Serialize)] struct MarketDataRow { code: String, #[serde(with = "clickhouse::serde::time::date")] trade_date: time::Date, time_sec: u32, avg_sell_price: Option, cum_volume: Option, cum_amount: Option, cum_trades: Option, high_price: Option, low_price: Option, sell5_price: Option, sell5_volume: Option, sell4_price: Option, sell4_volume: Option, sell3_price: Option, sell3_volume: Option, sell2_price: Option, sell2_volume: Option, sell1_price: Option, sell1_volume: Option, buy1_price: Option, buy1_volume: Option, buy2_price: Option, buy2_volume: Option, buy3_price: Option, buy3_volume: Option, buy4_price: Option, buy4_volume: Option, buy5_price: Option, buy5_volume: Option, } /// YYYYMMDD u32 → time::Date fn u32_to_date(d: u32) -> Result { let year = (d / 10000) as i32; let month = time::Month::try_from(((d / 100) % 100) as u8) .map_err(|_| anyhow::anyhow!("invalid month in date {}", d))?; let day = (d % 100) as u8; time::Date::from_calendar_date(year, month, day) .map_err(|e| anyhow::anyhow!("invalid date {}: {}", d, e)) } impl ClickHouseClient { pub fn new(config: &Config) -> Self { let client = clickhouse::Client::default() .with_url(&config.clickhouse.url) .with_database(&config.clickhouse.database) .with_user(&config.clickhouse.username) .with_password(&config.clickhouse.password); Self { client, batch_size: config.clickhouse.batch_size, } } /// 批量插入股票信息 pub async fn insert_stock_info(&self, stocks: &[StockInfo]) -> Result<()> { let mut insert = self.client.insert("stock_info")?; for stock in stocks { let row: StockInfoRow = stock.clone().into(); insert.write(&row).await?; } insert.end().await?; Ok(()) } /// 批量插入市场数据(ClickHouse 原生二进制协议,无 SQL 字符串拼接) pub async fn insert_market_data(&self, data: &[MarketData]) -> Result<()> { if data.is_empty() { return Ok(()); } let mut insert = self.client.insert("market_data")?; for item in data { let row = MarketDataRow { code: item.code.clone(), trade_date: u32_to_date(item.trade_date)?, time_sec: item.time_sec, avg_sell_price: item.avg_sell_price, cum_volume: item.cum_volume, cum_amount: item.cum_amount, cum_trades: item.cum_trades, high_price: item.high_price, low_price: item.low_price, sell5_price: item.sell5_price, sell5_volume: item.sell5_volume, sell4_price: item.sell4_price, sell4_volume: item.sell4_volume, sell3_price: item.sell3_price, sell3_volume: item.sell3_volume, sell2_price: item.sell2_price, sell2_volume: item.sell2_volume, sell1_price: item.sell1_price, sell1_volume: item.sell1_volume, buy1_price: item.buy1_price, buy1_volume: item.buy1_volume, buy2_price: item.buy2_price, buy2_volume: item.buy2_volume, buy3_price: item.buy3_price, buy3_volume: item.buy3_volume, buy4_price: item.buy4_price, buy4_volume: item.buy4_volume, buy5_price: item.buy5_price, buy5_volume: item.buy5_volume, }; insert.write(&row).await?; } insert.end().await?; Ok(()) } /// 检查数据是否已存在 pub async fn data_exists(&self, code: &str, date: u32) -> Result { let year = date / 10000; let month = (date / 100) % 100; let day = date % 100; let date_str = format!("{:04}-{:02}-{:02}", year, month, day); let sql = format!( "SELECT COUNT(*) FROM market_data WHERE code = '{}' AND trade_date = '{}'", code.replace("'", "''"), date_str ); let count: u64 = self.client .query(&sql) .fetch_one::() .await .unwrap_or(0); Ok(count > 0) } } /// 导入股票信息到数据库 pub async fn import_stock_info(config: &Config, stocks: &[StockInfo]) -> Result<()> { let client = ClickHouseClient::new(config); info!("开始导入股票信息,共 {} 条", stocks.len()); // 分批插入 for chunk in stocks.chunks(config.clickhouse.batch_size) { client.insert_stock_info(chunk).await?; } info!("✓ 股票信息导入完成"); Ok(()) }