sync_stock / sync_data.py
superxu520's picture
"feat:use-efinance-as-primary-fund-flow-source"
c334682
"""
数据同步脚本 - Sync Space 专用版
从 AkShare 抓取数据并同步到 Hugging Face Dataset
"""
import os
import sys
import logging
import time
import threading
import gc
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Optional, Dict, Any
from pathlib import Path
import pandas as pd
import akshare as ak
from huggingface_hub import hf_hub_download, upload_file, list_repo_files
# Tushare 适配器(优先使用)
from app.tushare_adapter import (
get_stock_list_tushare,
get_stock_daily_tushare,
get_dividend_tushare,
TUSHARE_AVAILABLE
)
# 混合数据适配器(yfinance + efinance,作为回退)
from app.hybrid_adapter import (
get_stock_daily_yfinance,
get_index_daily_yfinance,
get_fund_flow_efinance,
YFINANCE_AVAILABLE
)
# 添加当前目录到路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app.database import get_db
from app.database_user import get_beijing_time
from app.sync_status import get_sync_status
from app.stock_list_cache import get_cached_stock_list, save_stock_list_cache
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 配置
YEARS_OF_DATA = 10
def _safe_int_env(var_name: str, default: int) -> int:
"""安全地读取环境变量并转换为整数"""
try:
value = os.getenv(var_name)
if value is None:
return default
return int(value)
except (ValueError, TypeError):
logger.warning(f"Invalid value for {var_name}, using default: {default}")
return default
# 动态线程数配置(延迟计算,避免导入时触发 multiprocessing)
def get_thread_config():
"""获取线程池配置(延迟计算)"""
import multiprocessing
cpu_count = multiprocessing.cpu_count()
# 分层并发策略(降低并发,避免触发服务端限流)
config = {
'daily': _safe_int_env("MAX_WORKERS_DAILY", min(4, cpu_count)),
'fund': _safe_int_env("MAX_WORKERS_FUND", min(3, cpu_count)),
'valuation': _safe_int_env("MAX_WORKERS_VALUATION", min(3, cpu_count)),
'margin': _safe_int_env("MAX_WORKERS_MARGIN", min(3, cpu_count)),
'financial': _safe_int_env("MAX_WORKERS_FINANCIAL", min(2, cpu_count)),
'dividend': _safe_int_env("MAX_WORKERS_DIVIDEND", min(2, cpu_count)),
}
# 向后兼容
legacy = _safe_int_env("MAX_WORKERS", 0)
if legacy > 0:
config = {k: legacy for k in config}
return cpu_count, config
# 延迟初始化线程配置(在 main 中调用)
_CPU_COUNT = None
_THREAD_CONFIG = None
_thread_config_lock = threading.Lock()
def get_thread_config_safe():
"""安全获取线程配置(自动初始化,线程安全)"""
global _CPU_COUNT, _THREAD_CONFIG
if _THREAD_CONFIG is None:
with _thread_config_lock:
# 双重检查锁定模式
if _THREAD_CONFIG is None:
_CPU_COUNT, _THREAD_CONFIG = get_thread_config()
logger.info(f"Thread pool config: CPU={_CPU_COUNT}, "
f"Daily={_THREAD_CONFIG['daily']}, Fund={_THREAD_CONFIG['fund']}, "
f"Valuation={_THREAD_CONFIG['valuation']}, Margin={_THREAD_CONFIG['margin']}, "
f"Financial={_THREAD_CONFIG['financial']}, Dividend={_THREAD_CONFIG['dividend']}")
return _THREAD_CONFIG
def init_thread_config():
"""初始化线程配置(在 main 中调用)"""
get_thread_config_safe()
def get_stock_list() -> pd.DataFrame:
"""获取全市场标的列表(带缓存机制)"""
# 1. 尝试使用缓存
cached_df = get_cached_stock_list()
if cached_df is not None:
return cached_df
# 2. 缓存无效,重新获取
logger.info("Fetching all-market target list...")
all_lists = []
# A股列表获取(优先使用Tushare,失败则回退到AkShare)
max_retries = 5
base_delay = 2.0
# 尝试使用Tushare获取A股列表(更稳定,字段完整)
if TUSHARE_AVAILABLE:
try:
df_a = get_stock_list_tushare()
if df_a is not None and len(df_a) > 0:
all_lists.append(df_a)
logger.info(f"A-stock list fetched from Tushare: {len(df_a)} stocks")
else:
raise Exception("Tushare returned empty data")
except Exception as e:
logger.warning(f"Tushare get_stock_list failed: {e}, falling back to AkShare")
# 如果Tushare失败或不可用,使用AkShare
if not all_lists:
for attempt in range(max_retries):
try:
# 使用 stock_info_a_code_name() 替代 stock_zh_a_spot_em(),更稳定
df_a = ak.stock_info_a_code_name()
df_a.columns = ['code', 'name']
df_a['market'] = df_a['code'].apply(lambda x: '主板' if x.startswith(('60', '00')) else ('创业板' if x.startswith('30') else ('科创板' if x.startswith('68') else ('北交所' if x.startswith(('8', '4', '920')) else '其他'))))
all_lists.append(df_a)
logger.info(f"A-stock list fetched from AkShare: {len(df_a)} stocks")
break # 成功则退出重试循环
except Exception as e:
if attempt == max_retries - 1:
logger.error(f"Failed to fetch A-stock list after {max_retries} attempts: {e}")
else:
delay = base_delay * (2 ** attempt)
logger.warning(f"Attempt {attempt + 1} failed, retrying in {delay}s... Error: {e}")
time.sleep(delay)
# ETF (增加容错)
try:
df_etf = ak.fund_etf_spot_em()
# 检查实际列名
code_cols = ['代码', 'code', 'fund_code', 'ETF代码']
name_cols = ['名称', 'name', 'fund_name', 'ETF名称']
c_code = None
c_name = None
for col in code_cols:
if col in df_etf.columns:
c_code = col
break
for col in name_cols:
if col in df_etf.columns:
c_name = col
break
if c_code and c_name:
df_etf = df_etf[[c_code, c_name]]
df_etf.columns = ['code', 'name']
df_etf['market'] = 'ETF'
all_lists.append(df_etf)
logger.info(f"ETF list fetched: {len(df_etf)} funds")
else:
logger.warning(f"Could not find code/name columns in ETF data. Available columns: {df_etf.columns.tolist()}")
except Exception as e:
logger.warning(f"ETF list fetch failed: {e}")
# LOF
try:
df_lof = ak.fund_lof_spot_em()
# 检查实际列名
code_cols = ['代码', 'code', 'fund_code', 'LOF代码']
name_cols = ['名称', 'name', 'fund_name', 'LOF名称']
c_code = None
c_name = None
for col in code_cols:
if col in df_lof.columns:
c_code = col
break
for col in name_cols:
if col in df_lof.columns:
c_name = col
break
if c_code and c_name:
df_lof = df_lof[[c_code, c_name]]
df_lof.columns = ['code', 'name']
df_lof['market'] = 'LOF'
all_lists.append(df_lof)
logger.info(f"LOF list fetched: {len(df_lof)} funds")
else:
logger.warning(f"Could not find code/name columns in LOF data. Available columns: {df_lof.columns.tolist()}")
except Exception as e:
logger.warning(f"LOF list fetch failed: {e}")
# REITs
try:
df_reits = ak.reits_realtime_em()
# 检查实际列名
code_cols = ['代码', 'code', 'REITs代码']
name_cols = ['名称', 'name', 'REITs名称']
c_code = None
c_name = None
for col in code_cols:
if col in df_reits.columns:
c_code = col
break
for col in name_cols:
if col in df_reits.columns:
c_name = col
break
if c_code and c_name:
df_reits = df_reits[[c_code, c_name]]
df_reits.columns = ['code', 'name']
df_reits['market'] = 'REITs'
all_lists.append(df_reits)
logger.info(f"REITs list fetched: {len(df_reits)} products")
else:
logger.warning(f"Could not find code/name columns in REITs data. Available columns: {df_reits.columns.tolist()}")
except Exception as e:
logger.warning(f"REITs list fetch failed: {e}")
# 可转债
try:
df_cb = ak.bond_zh_hs_cov_spot()
# 检查实际列名
logger.info(f"Convertible bond columns: {df_cb.columns.tolist()}")
# 尝试找到正确的代码列名
code_cols = ['代码', 'symbol', 'bond_code', '代码', '转债代码', '代码']
name_cols = ['名称', 'name', 'bond_name', '转债名称', '名称']
c_code = None
c_name = None
for col in code_cols:
if col in df_cb.columns:
c_code = col
break
for col in name_cols:
if col in df_cb.columns:
c_name = col
break
if c_code and c_name:
# 过滤掉未上市可转债(成交额=0 或 最新价=0)
# 检查是否有成交额列
amount_col = None
for col in ['amount', '成交额', 'Amount']:
if col in df_cb.columns:
amount_col = col
break
if amount_col:
before_count = len(df_cb)
df_cb = df_cb[df_cb[amount_col] > 0]
filtered_count = before_count - len(df_cb)
if filtered_count > 0:
logger.info(f"Filtered {filtered_count} unlisted convertible bonds (amount=0)")
df_cb = df_cb[[c_code, c_name]]
df_cb.columns = ['code', 'name']
df_cb['market'] = '可转债'
all_lists.append(df_cb)
logger.info(f"Convertible bond list fetched: {len(df_cb)} bonds")
else:
logger.warning(f"Could not find code/name columns in convertible bond data. Available columns: {df_cb.columns.tolist()}")
except Exception as e:
logger.warning(f"Convertible bond list fetch failed: {e}")
if not all_lists:
db = get_db()
return db.conn.execute("SELECT code, name, market FROM stock_list").df()
df = pd.concat(all_lists).drop_duplicates(subset=['code'])
df['list_date'] = None
# 3. 保存到缓存
save_stock_list_cache(df)
return df
def get_target_daily(code: str, start_date: str, market: str) -> Optional[pd.DataFrame]:
"""抓取单只标的数据"""
max_retries = 5
base_delay = 1.0 # 寂础延迟秒
for attempt in range(max_retries):
try:
# 指数退避:每次重试增加延迟
if attempt > 0:
delay = base_delay * (2 ** attempt) # 1s, 2s, 4s, 8s
time.sleep(delay)
end_date = get_beijing_time().strftime('%Y%m%d')
fetch_start = start_date.replace('-', '')
df = None
if market == 'INDEX':
# 指数:优先使用 yfinance,失败则回退 AkShare
if YFINANCE_AVAILABLE:
df = get_index_daily_yfinance(code, fetch_start, end_date)
if df is not None:
logger.debug(f"Got index data from yfinance for {code}")
if df is None:
df = ak.stock_zh_index_daily_em(symbol=f"sh{code}" if code.startswith('000') else f"sz{code}")
elif market == 'ETF':
df = ak.fund_etf_hist_em(symbol=code, period="daily", start_date=fetch_start, end_date=end_date, adjust="hfq")
elif market == 'LOF':
df = ak.fund_lof_hist_em(symbol=code, period="daily", start_date=fetch_start, end_date=end_date, adjust="hfq")
elif market == '可转债':
# bond_zh_hs_cov_daily 需要带交易所前缀的代码(如 sh110048, sz123015)
# 北交所可转债(bj开头)不支持
if code.startswith('bj'):
return None # 北交所可转债 API 不支持
df = ak.bond_zh_hs_cov_daily(symbol=code)
elif market == 'REITs':
df = ak.reits_hist_em(symbol=code)
else:
# A股个股:优先使用 Tushare,失败则回退 yfinance/AkShare
if TUSHARE_AVAILABLE:
df = get_stock_daily_tushare(code, fetch_start, end_date, adj='qfq')
if df is not None:
logger.debug(f"Got data from Tushare for {code}")
if df is None and YFINANCE_AVAILABLE:
df = get_stock_daily_yfinance(code, fetch_start, end_date)
if df is not None:
logger.debug(f"Got data from yfinance for {code}")
if df is None:
logger.debug(f"Tushare/yfinance failed, falling back to AkShare for {code}")
df = ak.stock_zh_a_hist(symbol=code, period="daily", start_date=fetch_start, end_date=end_date, adjust="hfq")
if df is not None and not df.empty:
# 标准化列名
rename_map = {
'日期': 'trade_date', 'date': 'trade_date', 'Date': 'trade_date',
'开盘': 'open', '今开': 'open', 'Open': 'open',
'最高': 'high', 'High': 'high',
'最低': 'low', 'Low': 'low',
'收盘': 'close', '最新价': 'close', 'Close': 'close',
'成交量': 'volume', 'Volume': 'volume',
'成交额': 'amount', 'Amount': 'amount',
'涨跌幅': 'pct_chg',
'换手率': 'turnover_rate', '换手': 'turnover_rate'
}
df = df.rename(columns=rename_map)
if 'trade_date' not in df.columns:
df = df.reset_index().rename(columns={'index': 'trade_date', 'date': 'trade_date'})
df['trade_date'] = pd.to_datetime(df['trade_date'])
df = df[df['trade_date'] >= pd.to_datetime(start_date)]
if 'amount' not in df.columns: df['amount'] = 0
if 'pct_chg' not in df.columns: df['pct_chg'] = df['close'].pct_change() * 100
if 'turnover_rate' not in df.columns: df['turnover_rate'] = 0
df['code'] = code
return df[['code', 'trade_date', 'open', 'high', 'low', 'close', 'volume', 'amount', 'pct_chg', 'turnover_rate']]
except Exception as e:
if attempt == max_retries - 1:
logger.warning(f"Failed to fetch {code} ({market}): {str(e)}")
time.sleep(1)
return None
def check_today_data_available() -> bool:
"""
探测当日数据是否已更新可用
通过尝试获取一只活跃股票(平安银行 000001)的当日数据来判断
Returns:
True: 当日数据已可用
False: 当日数据尚未更新
"""
try:
today = get_beijing_time().strftime('%Y%m%d')
# 优先使用 Tushare 探测
if TUSHARE_AVAILABLE:
try:
import tushare as ts
ts.set_token(TUSHARE_TOKENS[0])
pro = ts.pro_api()
df = pro.daily(ts_code='000001.SZ', trade_date=today)
if df is not None and not df.empty:
logger.debug(f"Tushare check: today data available")
return True
except Exception:
pass
# 回退到 AkShare 探测
try:
df = ak.stock_zh_a_spot_em()
if df is not None and not df.empty:
# 检查是否有今日数据
logger.debug(f"AkShare check: today data available")
return True
except Exception:
pass
except Exception as e:
logger.debug(f"Check today data failed: {e}")
return False
def get_last_trading_day() -> str:
"""获取最近一个交易日(通过探测数据可用性)
逻辑:
1. 先尝试获取当日数据(探测 000001.SZ)
2. 如果能获取到,说明当日数据已更新,返回今日
3. 如果获取不到,说明当日数据未更新,返回上一个交易日
"""
now = get_beijing_time()
today = now.date()
today_str = today.strftime('%Y-%m-%d')
# 探测当日数据是否可用
logger.info(f"Checking if today ({today_str}) data is available...")
if check_today_data_available():
logger.info(f"Today data is available, using today: {today_str}")
return today_str
else:
# 获取上一个交易日
try:
# 使用交易日历获取上一个交易日
df = ak.tool_trade_date_hist_sina()
if df is not None and not df.empty:
df['trade_date'] = pd.to_datetime(df['trade_date']).dt.date
prev_trading_days = df[df['trade_date'] < today]['trade_date']
if not prev_trading_days.empty:
last_day = prev_trading_days.iloc[-1]
logger.info(f"Today data not available yet, using previous trading day: {last_day}")
return last_day.strftime('%Y-%m-%d')
except Exception as e:
logger.warning(f"Failed to get previous trading day: {e}")
# 回退:昨天
yesterday = (today - timedelta(days=1)).strftime('%Y-%m-%d')
logger.info(f"Using yesterday as fallback: {yesterday}")
return yesterday
# 备用:使用指数行情获取
try:
df = ak.stock_zh_index_daily_em(symbol="sh000300")
if df is not None and not df.empty:
date_col = 'date' if 'date' in df.columns else ('日期' if '日期' in df.columns else None)
if date_col:
last_date = pd.to_datetime(df[date_col].iloc[-1]).date()
# 同样检查时间条件
if last_date == today and current_hour < 20:
# 返回前一天的数据
return pd.to_datetime(df[date_col].iloc[-2]).strftime('%Y-%m-%d')
return last_date.strftime('%Y-%m-%d')
except Exception: pass
# 最终回退:按工作日估算
d = get_beijing_time()
# 如果当前时间 < 20:00,从昨天开始查找
if current_hour < 20:
d -= timedelta(days=1)
while d.weekday() >= 5:
d -= timedelta(days=1)
return d.strftime('%Y-%m-%d')
def get_index_daily(code: str) -> Optional[pd.DataFrame]:
"""抓取指数日线"""
try:
symbol = f"sh{code}" if code.startswith('000') else f"sz{code}"
df = ak.stock_zh_index_daily_em(symbol=symbol)
if df is None or df.empty:
return None
rename_map = {
'date': 'trade_date', '日期': 'trade_date',
'open': 'open', '开盘': 'open',
'high': 'high', '最高': 'high',
'low': 'low', '最低': 'low',
'close': 'close', '收盘': 'close',
'volume': 'volume', '成交量': 'volume',
'amount': 'amount', '成交额': 'amount',
'pct_chg': 'pct_chg', '涨跌幅': 'pct_chg'
}
df = df.rename(columns=rename_map)
if 'trade_date' not in df.columns:
return None
df['trade_date'] = pd.to_datetime(df['trade_date'])
if 'amount' not in df.columns:
df['amount'] = 0
if 'pct_chg' not in df.columns:
df['pct_chg'] = df['close'].pct_change() * 100
if 'volume' not in df.columns:
df['volume'] = 0
df['turnover_rate'] = 0
df['code'] = code
return df[['code', 'trade_date', 'open', 'high', 'low', 'close', 'volume', 'amount', 'pct_chg', 'turnover_rate']]
except Exception as e:
logger.warning(f"Failed to fetch index {code}: {e}")
return None
def sync_stock_daily(targets: List[Dict[str, str]], last_trade_day: str) -> Dict[str, Any]:
"""增量同步逻辑,返回详细结果(采用全局水位线机制)"""
logger.info("Syncing daily data...")
# 1. 扫描本地 parquet 文件获取全局最新日期(类似其他指标)
parquet_dir = Path("/tmp/data/parquet")
parquet_dir.mkdir(parents=True, exist_ok=True)
global_latest_date = "2000-01-01"
existing_codes = set()
for f in parquet_dir.glob("*.parquet"):
if f.name.startswith('index_'): # 跳过指数文件
continue
try:
df = pd.read_parquet(f)
if not df.empty and 'trade_date' in df.columns:
max_date = df['trade_date'].max()
if isinstance(max_date, pd.Timestamp):
max_date = max_date.strftime('%Y-%m-%d')
if max_date > global_latest_date:
global_latest_date = max_date
existing_codes.update(df['code'].unique())
except Exception:
pass
# 2. 如果本地没有数据,尝试从云端下载最近3个月作为基准
if global_latest_date == "2000-01-01":
repo_id = os.getenv("DATASET_REPO_ID")
if repo_id:
try:
files = list_repo_files(repo_id=repo_id, repo_type="dataset")
parquet_files = sorted([f for f in files if f.startswith("data/parquet/") and f.endswith(".parquet")])
# 下载最近3个月的数据作为基准
for pf in parquet_files[-3:]:
try:
local_file = hf_hub_download(repo_id=repo_id, filename=pf, repo_type="dataset")
df = pd.read_parquet(local_file)
if not df.empty and 'trade_date' in df.columns:
max_date = df['trade_date'].max()
if isinstance(max_date, pd.Timestamp):
max_date = max_date.strftime('%Y-%m-%d')
if max_date > global_latest_date:
global_latest_date = max_date
existing_codes.update(df['code'].unique())
except Exception:
pass
logger.info(f"Downloaded daily data from cloud, latest date: {global_latest_date}")
except Exception as e:
logger.info(f"No existing daily data in cloud: {e}")
# 3. 区分新股和存量股票
new_codes = [t for t in targets if t['code'] not in existing_codes]
# 4. 全局水位线拦截
if global_latest_date >= last_trade_day and not new_codes:
logger.info(f"Daily data is already up to date ({global_latest_date}) and no new stocks. Skip.")
return {'count': 0, 'failed_codes': [], 'status': 'skipped', 'message': f'Already up to date ({global_latest_date})'}
# 5. 确定同步策略
if global_latest_date >= last_trade_day:
logger.info(f"Global date is up to date, but found {len(new_codes)} new stocks. Syncing new stocks only.")
sync_targets = new_codes
# 新股只获取最近1年数据(而非10年)
start_dt = (get_beijing_time() - timedelta(days=365)).strftime('%Y-%m-%d')
else:
logger.info(f"Syncing daily data from {global_latest_date} to {last_trade_day}...")
sync_targets = targets
start_dt = (pd.to_datetime(global_latest_date) + timedelta(days=1)).strftime('%Y-%m-%d')
# 设置每只股票的start_dt
pending = []
for t in sync_targets:
t['start_dt'] = start_dt
pending.append(t)
# 应用 SYNC_LIMIT 限制
sync_limit = int(os.getenv("SYNC_LIMIT", -1))
if sync_limit > 0 and len(pending) > sync_limit:
logger.info(f"Limiting sync to first {sync_limit} targets (out of {len(pending)})")
pending = pending[:sync_limit]
if not pending:
return {'count': 0, 'failed_codes': [], 'status': 'skipped', 'message': 'No pending targets'}
logger.info(f"Syncing {len(pending)} targets...")
all_new_data = []
failed_codes = []
success_codes = []
with ThreadPoolExecutor(max_workers=get_thread_config_safe()['daily']) as executor:
futures = {executor.submit(get_target_daily, t['code'], t['start_dt'], t['market']): t['code'] for t in pending}
for i, future in enumerate(as_completed(futures), 1):
code = futures[future]
res = future.result()
if res is not None:
all_new_data.append(res)
success_codes.append(code)
else:
failed_codes.append(code)
if i % 500 == 0: logger.info(f"Progress: {i}/{len(pending)}")
changed_files = [] # 记录变更的文件名
if all_new_data:
inc_df = pd.concat(all_new_data, ignore_index=True)
# 方案3:及时释放内存
del all_new_data
total_records = len(inc_df)
# 识别变动月份
changed = inc_df.assign(yr=inc_df['trade_date'].dt.year, mo=inc_df['trade_date'].dt.month)[['yr', 'mo']].drop_duplicates().values
for yr, mo in changed:
yr, mo = int(yr), int(mo)
filename = f"{yr}-{mo:02d}.parquet"
local_path = Path(f"/tmp/data/parquet/{filename}") # Sync Space 使用 /tmp
local_path.parent.mkdir(parents=True, exist_ok=True)
# 增量核心:先检查本地是否有,没有再从云端拉取
old_df = None
if local_path.exists():
try:
old_df = pd.read_parquet(local_path)
logger.info(f"Using local cache for {filename}")
except Exception: pass
if old_df is None:
repo_id = os.getenv("DATASET_REPO_ID")
if repo_id:
try:
old_file = hf_hub_download(repo_id=repo_id, filename=f"data/parquet/{filename}", repo_type="dataset")
old_df = pd.read_parquet(old_file)
logger.info(f"Downloaded {filename} from cloud")
except Exception:
pass
# 合并新数据
month_inc = inc_df[(inc_df['trade_date'].dt.year == yr) & (inc_df['trade_date'].dt.month == mo)]
if old_df is not None:
final_month_df = pd.concat([old_df, month_inc]).drop_duplicates(subset=['code', 'trade_date'])
# 方案3:释放旧数据内存
del old_df, month_inc
else:
final_month_df = month_inc
final_month_df.to_parquet(local_path)
changed_files.append(filename) # 记录变更的文件
logger.info(f"Saved updated data for {filename}")
# 方案3:释放最终数据内存
del final_month_df
# 方案3:循环结束后释放inc_df并触发GC
del inc_df
gc.collect()
else:
total_records = 0
return {
'count': len(success_codes),
'failed_codes': failed_codes,
'status': 'success' if not failed_codes else 'partial_fail',
'record_count': total_records,
'success_rate': len(success_codes) / len(pending) if pending else 0,
'changed_files': changed_files # 返回变更文件列表
}
# ==================== 新增:资金流向数据同步 ====================
def get_stock_fund_flow(code: str, market: str) -> Optional[pd.DataFrame]:
"""获取单只股票资金流向数据(优先使用 efinance,失败则回退 AkShare)"""
# 标准化列名映射
standard_cols = ['code', 'trade_date', 'close', 'pct_chg',
'main_net_inflow', 'main_net_inflow_pct',
'huge_net_inflow', 'huge_net_inflow_pct',
'large_net_inflow', 'large_net_inflow_pct',
'medium_net_inflow', 'medium_net_inflow_pct',
'small_net_inflow', 'small_net_inflow_pct']
# 1. 优先尝试 efinance
try:
df = get_fund_flow_efinance(code, '20000101', '20991231')
if df is not None and not df.empty:
# efinance 字段映射
rename_map = {
'日期': 'trade_date', '收盘价': 'close', '涨跌幅': 'pct_chg',
'主力净流入': 'main_net_inflow',
'主力净流入占比': 'main_net_inflow_pct',
'超大单净流入': 'huge_net_inflow',
'超大单流入净占比': 'huge_net_inflow_pct',
'大单净流入': 'large_net_inflow',
'大单流入净占比': 'large_net_inflow_pct',
'中单净流入': 'medium_net_inflow',
'中单流入净占比': 'medium_net_inflow_pct',
'小单净流入': 'small_net_inflow',
'小单流入净占比': 'small_net_inflow_pct',
}
df = df.rename(columns=rename_map)
if 'trade_date' in df.columns:
df['trade_date'] = pd.to_datetime(df['trade_date'])
df['code'] = code
result_cols = [c for c in standard_cols if c in df.columns]
logger.debug(f"Got fund flow from efinance for {code}")
return df[result_cols]
except Exception as e:
logger.debug(f"efinance fund flow failed for {code}: {e}")
# 2. 回退到 AkShare
max_retries = 3
base_delay = 1.0
for attempt in range(max_retries):
try:
# 确定 market 参数
if market == '北交所' or code.startswith(('8', '4', '920')):
mk = 'bj'
elif code.startswith(('6', '9')):
mk = 'sh'
else:
mk = 'sz'
df = ak.stock_individual_fund_flow(stock=code, market=mk)
if df is not None and not df.empty:
# AkShare 字段映射
rename_map = {
'日期': 'trade_date', '收盘价': 'close', '涨跌幅': 'pct_chg',
'主力净流入-净额': 'main_net_inflow',
'主力净流入-净占比': 'main_net_inflow_pct',
'超大单净流入-净额': 'huge_net_inflow',
'超大单净流入-净占比': 'huge_net_inflow_pct',
'大单净流入-净额': 'large_net_inflow',
'大单净流入-净占比': 'large_net_inflow_pct',
'中单净流入-净额': 'medium_net_inflow',
'中单净流入-净占比': 'medium_net_inflow_pct',
'小单净流入-净额': 'small_net_inflow',
'小单净流入-净占比': 'small_net_inflow_pct',
}
df = df.rename(columns=rename_map)
df['trade_date'] = pd.to_datetime(df['trade_date'])
df['code'] = code
result_cols = [c for c in standard_cols if c in df.columns]
logger.debug(f"Got fund flow from AkShare for {code}")
return df[result_cols]
except Exception as e:
if attempt > 0:
delay = base_delay * (2 ** attempt)
time.sleep(delay)
return None
def sync_fund_flow(targets: List[Dict[str, str]], last_trade_day: str) -> Dict[str, Any]:
"""同步资金流向数据(按月分表版),返回详细结果"""
logger.info("Syncing fund flow data...")
# 1. 从所有本地 parquet 文件计算全局最新日期
flow_dir = Path("/tmp/data/fund_flow")
flow_dir.mkdir(parents=True, exist_ok=True)
global_latest_date = "2000-01-01"
existing_codes = set()
# 扫描本地已有的月度文件
for f in flow_dir.glob("*.parquet"):
try:
df = pd.read_parquet(f)
if not df.empty:
max_date = df['trade_date'].max().strftime('%Y-%m-%d')
if max_date > global_latest_date:
global_latest_date = max_date
existing_codes.update(df['code'].unique())
except Exception:
pass
# 如果本地没有数据,尝试从云端下载
if global_latest_date == "2000-01-01":
repo_id = os.getenv("DATASET_REPO_ID")
if repo_id:
try:
# 获取云端文件列表
files = list_repo_files(repo_id=repo_id, repo_type="dataset")
flow_files = sorted([f for f in files if f.startswith("data/fund_flow/") and f.endswith(".parquet")])
for ff in flow_files[-3:]: # 先下载最近3个月的数据作为基准
try:
local_file = hf_hub_download(repo_id=repo_id, filename=ff, repo_type="dataset")
df = pd.read_parquet(local_file)
if not df.empty:
# 提取月份
filename = Path(ff).stem # 如 "2026-02"
local_path = flow_dir / f"{filename}.parquet"
df.to_parquet(local_path)
max_date = df['trade_date'].max().strftime('%Y-%m-%d')
if max_date > global_latest_date:
global_latest_date = max_date
existing_codes.update(df['code'].unique())
except Exception:
pass
logger.info(f"Downloaded fund flow from cloud, latest date: {global_latest_date}")
except Exception as e:
logger.info(f"No existing fund flow data in cloud: {e}")
# 2. 过滤目标(排除 ETF/LOF/REITs/可转债)
stock_targets = [t for t in targets if t['market'] not in ['ETF', 'LOF', 'REITs', '可转债']]
new_codes = [t for t in stock_targets if t['code'] not in existing_codes]
# 3. 全局水位线拦截
if global_latest_date >= last_trade_day and not new_codes:
logger.info(f"Fund flow data is already up to date ({global_latest_date}) and no new stocks. Skip.")
return {'count': 0, 'failed_codes': [], 'status': 'skipped', 'message': f'Already up to date ({global_latest_date})'}
# 4. 增量获取
if global_latest_date >= last_trade_day:
logger.info(f"Global date is up to date, but found {len(new_codes)} new stocks. Syncing new stocks only.")
sync_targets = new_codes
else:
logger.info(f"Syncing fund flow from {global_latest_date} to {last_trade_day}...")
sync_targets = stock_targets
all_data = []
success_codes = []
failed_codes = []
with ThreadPoolExecutor(max_workers=get_thread_config_safe()['fund']) as executor:
futures = {executor.submit(get_stock_fund_flow, t['code'], t['market']): t['code'] for t in sync_targets}
for i, future in enumerate(as_completed(futures), 1):
code = futures[future]
res = future.result()
if res is not None and not res.empty:
if code in existing_codes:
res = res[res['trade_date'] > pd.to_datetime(global_latest_date)]
if not res.empty:
all_data.append(res)
success_codes.append(code)
else:
failed_codes.append(code)
if i % 500 == 0:
logger.info(f"Fund flow progress: {i}/{len(sync_targets)}, success: {len(success_codes)}")
total_records = 0
changed_files = [] # 记录变更的文件名
# 5. 按月分表保存
if all_data:
new_df = pd.concat(all_data, ignore_index=True)
# 方案3:及时释放内存
del all_data
total_records = len(new_df)
# 确定需要更新的月份
if not new_df.empty:
min_date = new_df['trade_date'].min()
max_date = new_df['trade_date'].max()
current = min_date.to_period('M')
end_period = max_date.to_period('M')
while current <= end_period:
yr, mo = current.year, current.month
month_data = new_df[(new_df['trade_date'].dt.year == yr) & (new_df['trade_date'].dt.month == mo)]
if not month_data.empty:
filename = f"{yr}-{mo:02d}.parquet"
local_path = flow_dir / filename
# 合并已有数据
old_month_df = None
if local_path.exists():
try:
old_month_df = pd.read_parquet(local_path)
except Exception:
pass
if old_month_df is not None:
final_month_df = pd.concat([old_month_df, month_data]).drop_duplicates(subset=['code', 'trade_date'])
# 方案3:释放旧数据内存
del old_month_df, month_data
else:
final_month_df = month_data
final_month_df.to_parquet(local_path)
changed_files.append(filename) # 记录变更的文件
logger.info(f"Saved fund flow data for {filename}")
# 方案3:释放最终数据内存
del final_month_df
current += 1
logger.info(f"Fund flow updated: {len(new_df)} new records")
# 方案3:释放new_df并触发GC
del new_df
gc.collect()
return {
'count': len(success_codes),
'failed_codes': failed_codes,
'status': 'success' if not failed_codes else 'partial_fail',
'record_count': total_records,
'success_rate': len(success_codes) / len(sync_targets) if sync_targets else 0,
'changed_files': changed_files # 返回变更文件列表
}
# ==================== 新增:估值指标数据同步 ====================
def get_stock_valuation(code: str) -> Optional[pd.DataFrame]:
"""获取单只股票估值指标数据"""
max_retries = 5
base_delay = 1.0
for attempt in range(max_retries):
try:
df = ak.stock_a_lg_indicator(symbol=code)
if df is not None and not df.empty:
# 标准化列名
rename_map = {
'日期': 'trade_date',
'市盈率': 'pe_ttm',
'市盈率TTM': 'pe_ttm',
'静态市盈率': 'pe_static',
'市净率': 'pb',
'市销率': 'ps_ttm',
'股息率': 'dv_ratio',
'总市值': 'total_mv',
'流通市值': 'circ_mv',
}
df = df.rename(columns=rename_map)
df['trade_date'] = pd.to_datetime(df['trade_date'])
df['code'] = code
cols = ['code', 'trade_date', 'pe_ttm', 'pe_static', 'pb',
'ps_ttm', 'dv_ratio', 'total_mv', 'circ_mv']
available_cols = [c for c in cols if c in df.columns]
return df[available_cols]
except Exception:
if attempt > 0:
delay = base_delay * (2 ** attempt)
time.sleep(delay)
return None
def sync_valuation(targets: List[Dict[str, str]], last_trade_day: str) -> Dict[str, Any]:
"""同步估值指标数据(按月分表版)"""
logger.info("Syncing valuation data...")
# 1. 从所有本地 parquet 文件计算全局最新日期
val_dir = Path("/tmp/data/valuation")
val_dir.mkdir(parents=True, exist_ok=True)
global_latest_date = "2000-01-01"
existing_codes = set()
# 扫描本地已有的月度文件
for f in val_dir.glob("*.parquet"):
try:
df = pd.read_parquet(f)
if not df.empty:
max_date = df['trade_date'].max().strftime('%Y-%m-%d')
if max_date > global_latest_date:
global_latest_date = max_date
existing_codes.update(df['code'].unique())
except Exception:
pass
# 如果本地没有数据,尝试从云端下载
if global_latest_date == "2000-01-01":
repo_id = os.getenv("DATASET_REPO_ID")
if repo_id:
try:
files = list_repo_files(repo_id=repo_id, repo_type="dataset")
val_files = sorted([f for f in files if f.startswith("data/valuation/") and f.endswith(".parquet")])
for vf in val_files[-3:]: # 先下载最近3个月
try:
local_file = hf_hub_download(repo_id=repo_id, filename=vf, repo_type="dataset")
df = pd.read_parquet(local_file)
if not df.empty:
filename = Path(vf).stem
local_path = val_dir / f"{filename}.parquet"
df.to_parquet(local_path)
max_date = df['trade_date'].max().strftime('%Y-%m-%d')
if max_date > global_latest_date:
global_latest_date = max_date
existing_codes.update(df['code'].unique())
except Exception:
pass
logger.info(f"Downloaded valuation from cloud, latest date: {global_latest_date}")
except Exception as e:
logger.info(f"No existing valuation data in cloud: {e}")
# 2. 过滤目标(排除 ETF/LOF/REITs/可转债)
stock_targets = [t for t in targets if t['market'] not in ['ETF', 'LOF', 'REITs', '可转债']]
new_codes = [t for t in stock_targets if t['code'] not in existing_codes]
# 3. 全局水位线拦截
if global_latest_date >= last_trade_day and not new_codes:
logger.info(f"Valuation data is already up to date ({global_latest_date}) and no new stocks. Skip.")
return {'count': 0, 'failed_codes': [], 'status': 'skipped', 'record_count': 0, 'changed_files': [], 'message': f'Already up to date ({global_latest_date})'}
# 4. 增量获取
if global_latest_date >= last_trade_day:
logger.info(f"Global date is up to date, but found {len(new_codes)} new stocks. Syncing new stocks only.")
sync_targets = new_codes
else:
logger.info(f"Syncing valuation from {global_latest_date} to {last_trade_day}...")
sync_targets = stock_targets
all_data = []
success_count = 0
with ThreadPoolExecutor(max_workers=get_thread_config_safe()['valuation']) as executor:
futures = {executor.submit(get_stock_valuation, t['code']): t['code'] for t in sync_targets}
for i, future in enumerate(as_completed(futures), 1):
res = future.result()
if res is not None and not res.empty:
code = futures[future]
if code in existing_codes:
res = res[res['trade_date'] > pd.to_datetime(global_latest_date)]
if not res.empty:
all_data.append(res)
success_count += 1
if i % 500 == 0:
logger.info(f"Valuation progress: {i}/{len(sync_targets)}, success: {success_count}")
total_records = 0
changed_files = [] # 记录变更的文件名
# 5. 按月分表保存
if all_data:
new_df = pd.concat(all_data, ignore_index=True)
# 方案3:及时释放内存
del all_data
total_records = len(new_df)
if not new_df.empty:
min_date = new_df['trade_date'].min()
max_date = new_df['trade_date'].max()
current = min_date.to_period('M')
end_period = max_date.to_period('M')
while current <= end_period:
yr, mo = current.year, current.month
month_data = new_df[(new_df['trade_date'].dt.year == yr) & (new_df['trade_date'].dt.month == mo)]
if not month_data.empty:
filename = f"{yr}-{mo:02d}.parquet"
local_path = val_dir / filename
old_month_df = None
if local_path.exists():
try:
old_month_df = pd.read_parquet(local_path)
except Exception:
pass
if old_month_df is not None:
final_month_df = pd.concat([old_month_df, month_data]).drop_duplicates(subset=['code', 'trade_date'])
# 方案3:释放旧数据内存
del old_month_df, month_data
else:
final_month_df = month_data
final_month_df.to_parquet(local_path)
changed_files.append(filename) # 记录变更的文件
logger.info(f"Saved valuation data for {filename}")
# 方案3:释放最终数据内存
del final_month_df
current += 1
logger.info(f"Valuation updated: {len(new_df)} new records")
# 方案3:释放new_df并触发GC
del new_df
gc.collect()
return {
'count': success_count,
'failed_codes': [],
'status': 'success',
'record_count': total_records,
'changed_files': changed_files # 返回变更文件列表
}
# ==================== 新增:融资融券数据同步 ====================
def get_stock_margin(code: str) -> Optional[pd.DataFrame]:
"""获取单只股票融资融券数据"""
max_retries = 5
base_delay = 1.0
for attempt in range(max_retries):
try:
# 尝试上交所
if code.startswith('6'):
df = ak.stock_margin_detail_sh(symbol=code)
else:
df = ak.stock_margin_detail_sz(symbol=code)
if df is not None and not df.empty:
# 标准化列名
rename_map = {
'日期': 'trade_date',
'融资余额': 'rzye',
'融资买入额': 'rzmre',
'融资偿还额': 'rzche',
'融券余额': 'rqye',
'融券卖出量': 'rqmcl',
'融资融券余额': 'rzrqye',
}
df = df.rename(columns=rename_map)
if 'trade_date' in df.columns:
df['trade_date'] = pd.to_datetime(df['trade_date'])
df['code'] = code
cols = ['code', 'trade_date', 'rzye', 'rzmre', 'rzche', 'rqye', 'rqmcl', 'rzrqye']
available_cols = [c for c in cols if c in df.columns]
return df[available_cols]
except Exception as e:
if attempt == max_retries - 1:
pass
if attempt > 0:
delay = base_delay * (2 ** attempt)
time.sleep(delay)
return None
def sync_margin(targets: List[Dict[str, str]], last_trade_day: str) -> Dict[str, Any]:
"""同步融资融券数据(按月分表版)"""
logger.info("Syncing margin trading data...")
# 1. 从所有本地 parquet 文件计算全局最新日期
margin_dir = Path("/tmp/data/margin")
margin_dir.mkdir(parents=True, exist_ok=True)
global_latest_date = "2000-01-01"
existing_codes = set()
# 扫描本地已有的月度文件
for f in margin_dir.glob("*.parquet"):
try:
df = pd.read_parquet(f)
if not df.empty:
max_date = df['trade_date'].max().strftime('%Y-%m-%d')
if max_date > global_latest_date:
global_latest_date = max_date
existing_codes.update(df['code'].unique())
except Exception:
pass
# 如果本地没有数据,尝试从云端下载
if global_latest_date == "2000-01-01":
repo_id = os.getenv("DATASET_REPO_ID")
if repo_id:
try:
files = list_repo_files(repo_id=repo_id, repo_type="dataset")
margin_files = sorted([f for f in files if f.startswith("data/margin/") and f.endswith(".parquet")])
for mf in margin_files[-3:]: # 先下载最近3个月
try:
local_file = hf_hub_download(repo_id=repo_id, filename=mf, repo_type="dataset")
df = pd.read_parquet(local_file)
if not df.empty:
filename = Path(mf).stem
local_path = margin_dir / f"{filename}.parquet"
df.to_parquet(local_path)
max_date = df['trade_date'].max().strftime('%Y-%m-%d')
if max_date > global_latest_date:
global_latest_date = max_date
existing_codes.update(df['code'].unique())
except Exception:
pass
logger.info(f"Downloaded margin from cloud, latest date: {global_latest_date}")
except Exception as e:
logger.info(f"No existing margin data in cloud: {e}")
# 2. 过滤目标(只保留主板、创业板、科创板)
stock_targets = [t for t in targets if t['market'] in ['主板', '创业板', '科创板']]
new_codes = [t for t in stock_targets if t['code'] not in existing_codes]
# 3. 全局水位线拦截
if global_latest_date >= last_trade_day and not new_codes:
logger.info(f"Margin data is already up to date ({global_latest_date}) and no new stocks. Skip.")
return {'count': 0, 'failed_codes': [], 'status': 'skipped', 'record_count': 0, 'changed_files': [], 'message': f'Already up to date ({global_latest_date})'}
# 4. 增量获取
if global_latest_date >= last_trade_day:
logger.info(f"Global date is up to date, but found {len(new_codes)} new stocks. Syncing new stocks only.")
sync_targets = new_codes
else:
logger.info(f"Syncing margin data from {global_latest_date} to {last_trade_day}...")
sync_targets = stock_targets
all_data = []
success_count = 0
with ThreadPoolExecutor(max_workers=get_thread_config_safe()['margin']) as executor:
futures = {executor.submit(get_stock_margin, t['code']): t['code'] for t in sync_targets}
for i, future in enumerate(as_completed(futures), 1):
res = future.result()
if res is not None and not res.empty:
code = futures[future]
if code in existing_codes:
res = res[res['trade_date'] > pd.to_datetime(global_latest_date)]
if not res.empty:
all_data.append(res)
success_count += 1
if i % 500 == 0:
logger.info(f"Margin progress: {i}/{len(sync_targets)}, success: {success_count}")
total_records = 0
changed_files = [] # 记录变更的文件名
# 5. 按月分表保存
if all_data:
new_df = pd.concat(all_data, ignore_index=True)
total_records = len(new_df)
if not new_df.empty:
min_date = new_df['trade_date'].min()
max_date = new_df['trade_date'].max()
current = min_date.to_period('M')
end_period = max_date.to_period('M')
while current <= end_period:
yr, mo = current.year, current.month
month_data = new_df[(new_df['trade_date'].dt.year == yr) & (new_df['trade_date'].dt.month == mo)]
if not month_data.empty:
filename = f"{yr}-{mo:02d}.parquet"
local_path = margin_dir / filename
old_month_df = None
if local_path.exists():
try:
old_month_df = pd.read_parquet(local_path)
except Exception:
pass
if old_month_df is not None:
final_month_df = pd.concat([old_month_df, month_data]).drop_duplicates(subset=['code', 'trade_date'])
else:
final_month_df = month_data
final_month_df.to_parquet(local_path)
changed_files.append(filename) # 记录变更的文件
logger.info(f"Saved margin data for {filename}")
current += 1
logger.info(f"Margin updated: {len(new_df)} new records")
return {
'count': success_count,
'failed_codes': [],
'status': 'success',
'record_count': total_records,
'changed_files': changed_files # 返回变更文件列表
}
# ==================== 新增:财务指标数据同步 ====================
def get_stock_financial_indicator(code: str) -> Optional[pd.DataFrame]:
"""获取单只股票财务指标数据"""
max_retries = 5
base_delay = 1.0
for attempt in range(max_retries):
try:
df = ak.stock_financial_analysis_indicator(symbol=code)
if df is not None and not df.empty:
# 标准化列名
rename_map = {
'日期': 'trade_date',
'净资产收益率': 'roe',
'总资产净利率': 'roa',
'销售毛利率': 'gross_margin',
'销售净利率': 'net_margin',
'资产负债率': 'debt_ratio',
'流动比率': 'current_ratio',
'速动比率': 'quick_ratio',
'存货周转率': 'inventory_turnover',
'应收账款周转率': 'receivable_turnover',
'总资产周转率': 'total_asset_turnover',
}
df = df.rename(columns=rename_map)
if 'trade_date' in df.columns:
df['trade_date'] = pd.to_datetime(df['trade_date'])
df['code'] = code
cols = ['code', 'trade_date', 'roe', 'roa', 'gross_margin', 'net_margin',
'debt_ratio', 'current_ratio', 'quick_ratio',
'inventory_turnover', 'receivable_turnover', 'total_asset_turnover']
available_cols = [c for c in cols if c in df.columns]
return df[available_cols]
except Exception:
if attempt > 0:
delay = base_delay * (2 ** attempt)
time.sleep(delay)
return None
def sync_financial_indicator(targets: List[Dict[str, str]]) -> Dict[str, Any]:
"""同步财务指标数据(极致增量版),返回详细结果"""
logger.info("Syncing financial indicator data...")
fi_path = Path("/tmp/data/financial_indicator.parquet")
fi_path.parent.mkdir(parents=True, exist_ok=True)
old_df = None
old_count = 0
global_latest_date = "2000-01-01"
existing_codes = set()
# 1. 优先读取本地缓存
if fi_path.exists():
try:
old_df = pd.read_parquet(fi_path)
old_count = len(old_df)
global_latest_date = old_df['trade_date'].max().strftime('%Y-%m-%d')
existing_codes = set(old_df['code'].unique())
logger.info(f"Local financial cache found, latest date: {global_latest_date}, records: {old_count}")
except Exception as e:
logger.warning(f"Failed to read local financial cache: {e}")
# 2. 本地无缓存,尝试从云端拉取
if old_df is None:
repo_id = os.getenv("DATASET_REPO_ID")
if repo_id:
try:
old_file = hf_hub_download(repo_id=repo_id, filename="data/financial_indicator.parquet", repo_type="dataset")
old_df = pd.read_parquet(old_file)
old_count = len(old_df)
global_latest_date = old_df['trade_date'].max().strftime('%Y-%m-%d')
existing_codes = set(old_df['code'].unique())
old_df.to_parquet(fi_path)
logger.info(f"Downloaded financial from cloud, latest date: {global_latest_date}, records: {old_count}")
except Exception:
logger.info("No existing financial data found in cloud.")
# 3. 财务指标特殊拦截 + 新股检测
stock_targets = [t for t in targets if t['market'] not in ['ETF', 'LOF', 'REITs', '可转债']]
new_codes = [t for t in stock_targets if t['code'] not in existing_codes]
today = get_beijing_time()
is_recent = False
if global_latest_date != "2000-01-01":
days_diff = (today - pd.to_datetime(global_latest_date)).days
if days_diff < 90:
is_recent = True
if is_recent and not new_codes:
logger.info(f"Financial data is recent ({global_latest_date}) and no new stocks. Skip.")
return {'count': 0, 'status': 'skipped', 'record_count': old_count, 'new_records': 0, 'message': f'Already up to date ({global_latest_date})'}
# 4. 增量获取
if is_recent:
logger.info(f"Financial data is recent, but found {len(new_codes)} new stocks. Syncing new stocks only.")
sync_targets = new_codes
else:
logger.info(f"Syncing financial indicators (last update: {global_latest_date})...")
sync_targets = stock_targets
all_data = []
success_count = 0
with ThreadPoolExecutor(max_workers=get_thread_config_safe()['financial']) as executor:
futures = {executor.submit(get_stock_financial_indicator, t['code']): t['code'] for t in sync_targets}
for i, future in enumerate(as_completed(futures), 1):
res = future.result()
if res is not None and not res.empty:
code = futures[future]
if code in existing_codes:
res = res[res['trade_date'] > pd.to_datetime(global_latest_date)]
if not res.empty:
all_data.append(res)
success_count += 1
if i % 500 == 0:
logger.info(f"Financial indicator progress: {i}/{len(sync_targets)}, success: {success_count}")
# 5. 合并保存
new_records = 0
final_count = old_count
if all_data:
new_df = pd.concat(all_data, ignore_index=True)
new_records = len(new_df)
final_df = pd.concat([old_df, new_df]) if old_df is not None else new_df
final_df = final_df.drop_duplicates(subset=['code', 'trade_date'])
final_count = len(final_df)
final_df.to_parquet(fi_path)
logger.info(f"Financial updated: {final_count} total records ({new_records} new)")
return {
'count': success_count,
'status': 'success' if success_count > 0 or old_count > 0 else 'fail',
'record_count': final_count,
'new_records': new_records,
'previous_count': old_count
}
# ==================== 新增:股东户数数据同步 ====================
def sync_holder_num() -> Dict[str, Any]:
"""同步股东户数数据(批量获取),返回详细结果"""
logger.info("Syncing holder number data...")
hn_path = Path("/tmp/data/holder_num.parquet")
old_count = 0
# 读取现有数据
if hn_path.exists():
try:
old_df = pd.read_parquet(hn_path)
old_count = len(old_df)
except Exception:
pass
try:
# 获取最近报告期的股东户数数据
df = ak.stock_zh_a_gdhs(symbol="全部")
if df is not None and not df.empty:
# 标准化列名
rename_map = {
'代码': 'code',
'股东户数': 'holder_num',
'户均持股数量': 'avg_share',
'户均持股金额': 'avg_value',
'总股本': 'total_share',
'总市值': 'total_value',
'日期': 'trade_date',
}
df = df.rename(columns=rename_map)
if 'trade_date' in df.columns:
df['trade_date'] = pd.to_datetime(df['trade_date'])
new_count = len(df)
# 保存到 Parquet
hn_path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(hn_path)
# 判断是否有变化
is_changed = new_count != old_count
logger.info(f"Holder number data saved: {new_count} records (previous: {old_count}, changed: {is_changed})")
return {
'count': new_count,
'status': 'success',
'record_count': new_count,
'previous_count': old_count,
'is_changed': is_changed
}
except Exception as e:
logger.warning(f"Failed to sync holder number data: {e}")
return {
'count': 0,
'status': 'fail',
'record_count': old_count,
'previous_count': old_count,
'is_changed': False
}
# ==================== 新增:分红数据同步 ====================
def get_stock_dividend(code: str) -> Optional[pd.DataFrame]:
"""获取单只股票分红数据"""
max_retries = 5
base_delay = 1.0
for attempt in range(max_retries):
try:
df = ak.stock_history_dividend(symbol=code)
if df is not None and not df.empty:
# 标准化列名
rename_map = {
'公告日期': 'trade_date',
'分红方案': 'dividend_type',
'分红金额': 'dividend_amount',
'股权登记日': 'record_date',
'除权除息日': 'ex_date',
'派息日': 'pay_date',
}
df = df.rename(columns=rename_map)
df['trade_date'] = pd.to_datetime(df['trade_date'])
df['code'] = code
cols = ['code', 'trade_date', 'dividend_type', 'dividend_amount', 'record_date', 'ex_date', 'pay_date']
available_cols = [c for c in cols if c in df.columns]
return df[available_cols]
except Exception:
if attempt > 0:
delay = base_delay * (2 ** attempt)
time.sleep(delay)
return None
def sync_dividend(targets: List[Dict[str, str]]) -> Dict[str, Any]:
"""同步分红数据(极致增量版),返回详细结果"""
logger.info("Syncing dividend data...")
div_path = Path("/tmp/data/dividend.parquet")
div_path.parent.mkdir(parents=True, exist_ok=True)
old_df = None
old_count = 0
global_latest_date = "2000-01-01"
existing_codes = set()
if div_path.exists():
try:
old_df = pd.read_parquet(div_path)
old_count = len(old_df)
global_latest_date = old_df['trade_date'].max().strftime('%Y-%m-%d')
existing_codes = set(old_df['code'].unique())
except Exception: pass
if old_df is None:
repo_id = os.getenv("DATASET_REPO_ID")
if repo_id:
try:
old_file = hf_hub_download(repo_id=repo_id, filename="data/dividend.parquet", repo_type="dataset")
old_df = pd.read_parquet(old_file)
old_count = len(old_df)
global_latest_date = old_df['trade_date'].max().strftime('%Y-%m-%d')
existing_codes = set(old_df['code'].unique())
old_df.to_parquet(div_path)
except Exception: pass
# 90天检查一次 + 新股检测
stock_targets = [t for t in targets if t['market'] not in ['ETF', 'LOF', 'REITs', '可转债']]
new_codes = [t for t in stock_targets if t['code'] not in existing_codes]
today = get_beijing_time()
is_recent = False
if global_latest_date != "2000-01-01":
if (today - pd.to_datetime(global_latest_date)).days < 90:
is_recent = True
if is_recent and not new_codes:
logger.info(f"Dividend data is recent and no new stocks. Skip.")
return {'count': 0, 'status': 'skipped', 'record_count': old_count, 'new_records': 0, 'message': f'Already up to date ({global_latest_date})'}
# 4. 增量获取
if is_recent:
logger.info(f"Dividend data is recent, but found {len(new_codes)} new stocks. Syncing new stocks only.")
sync_targets = new_codes
else:
logger.info(f"Syncing dividend data (last update: {global_latest_date})...")
sync_targets = stock_targets
all_data = []
success_count = 0
with ThreadPoolExecutor(max_workers=get_thread_config_safe()['dividend']) as executor:
futures = {executor.submit(get_stock_dividend, t['code']): t['code'] for t in sync_targets}
for i, future in enumerate(as_completed(futures), 1):
res = future.result()
if res is not None and not res.empty:
code = futures[future]
if code in existing_codes:
res = res[res['trade_date'] > pd.to_datetime(global_latest_date)]
if not res.empty:
all_data.append(res)
success_count += 1
if i % 500 == 0:
logger.info(f"Dividend progress: {i}/{len(sync_targets)}, success: {success_count}")
new_records = 0
final_count = old_count
if all_data:
new_df = pd.concat(all_data, ignore_index=True)
new_records = len(new_df)
final_df = pd.concat([old_df, new_df]) if old_df is not None else new_df
final_df = final_df.drop_duplicates(subset=['code', 'trade_date', 'dividend_type'])
final_count = len(final_df)
final_df.to_parquet(div_path)
logger.info(f"Dividend updated: {final_count} total records ({new_records} new)")
return {
'count': success_count,
'status': 'success' if success_count > 0 or old_count > 0 else 'fail',
'record_count': final_count,
'new_records': new_records,
'previous_count': old_count
}
# ==================== 新增:十大股东数据同步 ====================
def sync_top_holders() -> Dict[str, Any]:
"""同步十大股东数据(批量获取),返回详细结果"""
logger.info("Syncing top holders data...")
path = Path("/tmp/data/top_holders.parquet")
old_count = 0
# 读取现有数据
if path.exists():
try:
old_df = pd.read_parquet(path)
old_count = len(old_df)
except Exception:
pass
try:
today = get_beijing_time()
df = ak.stock_gdfx_holding_analyse_em(date=today.strftime('%Y%m%d'))
if df is not None and not df.empty:
rename_map = {
'股票代码': 'code',
'公告日期': 'trade_date',
'股东名称': 'holder_name',
'持股数量': 'hold_num',
'持股比例': 'hold_ratio',
'持股变动': 'hold_change',
}
df = df.rename(columns=rename_map)
df['trade_date'] = pd.to_datetime(df['trade_date'])
new_count = len(df)
path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(path)
# 判断是否有变化
is_changed = new_count != old_count
logger.info(f"Top holders data saved: {new_count} records (previous: {old_count}, changed: {is_changed})")
return {
'count': new_count,
'status': 'success',
'record_count': new_count,
'previous_count': old_count,
'is_changed': is_changed
}
except Exception as e:
logger.warning(f"Failed to sync top holders: {e}")
return {
'count': 0,
'status': 'fail',
'record_count': old_count,
'previous_count': old_count,
'is_changed': False
}
# ==================== 新增:限售解禁数据同步 ====================
def sync_restricted_unlock() -> Dict[str, Any]:
"""同步限售解禁数据(批量获取),返回详细结果"""
logger.info("Syncing restricted unlock data...")
path = Path("/tmp/data/restricted_unlock.parquet")
path.parent.mkdir(parents=True, exist_ok=True)
old_count = 0
# 读取现有数据
if path.exists():
try:
old_df = pd.read_parquet(path)
old_count = len(old_df)
except Exception:
pass
try:
# 获取全市场限售解禁数据
df = ak.stock_restricted_shares(stock="all")
if df is not None and not df.empty:
rename_map = {
'代码': 'code',
'名称': 'name',
'解禁日期': 'unlock_date',
'解禁数量': 'unlock_num',
'解禁股本占总股本比例': 'unlock_ratio',
}
df = df.rename(columns=rename_map)
df['unlock_date'] = pd.to_datetime(df['unlock_date'])
df['trade_date'] = get_beijing_time() # 记录同步日期
new_count = len(df)
df.to_parquet(path)
# 判断是否有变化
is_changed = new_count != old_count
logger.info(f"Restricted unlock data saved: {new_count} records (previous: {old_count}, changed: {is_changed})")
return {
'count': new_count,
'status': 'success',
'record_count': new_count,
'previous_count': old_count,
'is_changed': is_changed
}
except Exception as e:
logger.warning(f"Failed to sync restricted unlock: {e}")
return {
'count': 0,
'status': 'fail',
'record_count': old_count,
'previous_count': old_count,
'is_changed': False
}
def main() -> int:
"""
主函数 - 执行完整的数据同步流程(每类指标完成后即时上传,并记录状态)
Returns:
int: 退出码,0 表示成功,1 表示失败
"""
logger.info("=" * 60)
logger.info("Stock Data Sync Started")
logger.info("=" * 60)
try:
# 初始化线程配置
init_thread_config()
db = get_db()
db.init_db()
# 获取状态管理器
status = get_sync_status()
# 获取最后交易日
last_day = get_last_trading_day()
logger.info(f"Last trading day: {last_day}")
# 1. 列表同步
logger.info("-" * 40)
logger.info("Syncing stock list...")
target_list = get_stock_list()
list_parquet = Path("/tmp/data/stock_list.parquet")
list_parquet.parent.mkdir(parents=True, exist_ok=True)
target_list.to_parquet(list_parquet)
db.upload_indicator("Stock List", list_parquet, "data")
status.update('stock_list',
last_trade_date=last_day,
record_count=len(target_list),
status='success')
# 2. 行情同步
logger.info("-" * 40)
logger.info("Syncing daily data...")
daily_result = sync_stock_daily(target_list.to_dict('records'), last_day)
# 智能上传日K行情数据(根据变更文件数量选择策略)
parquet_dir = Path("/tmp/data/parquet")
if parquet_dir.exists() and any(parquet_dir.glob("*.parquet")):
db.upload_indicator_smart("Daily Data", parquet_dir, "data/parquet",
daily_result.get('changed_files', []))
status.update('daily',
last_trade_date=last_day,
record_count=daily_result.get('record_count', 0),
status=daily_result.get('status', 'unknown'),
failed_codes=daily_result.get('failed_codes', []),
success_rate=daily_result.get('success_rate', 0),
message=daily_result.get('message', ''))
# 3. 指数同步
logger.info("-" * 40)
logger.info("Syncing index data...")
idx_df = get_index_daily('000300')
idx_count = 0
if idx_df is not None:
idx_path = Path("/tmp/data/parquet/index_000300.parquet")
idx_path.parent.mkdir(parents=True, exist_ok=True)
idx_df.to_parquet(idx_path)
db.upload_indicator("Index Data", idx_path, "data/parquet")
idx_count = len(idx_df)
status.update('index',
last_trade_date=last_day,
record_count=idx_count,
status='success' if idx_count > 0 else 'fail')
# 4. 资金流向同步
logger.info("-" * 40)
logger.info("Syncing fund flow...")
fund_flow_result = sync_fund_flow(target_list.to_dict('records'), last_day)
# 智能上传资金流向数据(根据变更文件数量选择策略)
fund_flow_dir = Path("/tmp/data/fund_flow")
if fund_flow_dir.exists() and any(fund_flow_dir.glob("*.parquet")):
db.upload_indicator_smart("Fund Flow", fund_flow_dir, "data/fund_flow",
fund_flow_result.get('changed_files', []))
status.update('fund_flow',
last_trade_date=last_day,
record_count=fund_flow_result.get('record_count', 0),
status=fund_flow_result.get('status', 'unknown'),
failed_codes=fund_flow_result.get('failed_codes', []),
success_rate=fund_flow_result.get('success_rate', 0),
message=fund_flow_result.get('message', ''))
# 5. 估值指标同步
logger.info("-" * 40)
logger.info("Syncing valuation...")
valuation_result = sync_valuation(target_list.to_dict('records'), last_day)
# 智能上传估值指标数据(根据变更文件数量选择策略)
valuation_dir = Path("/tmp/data/valuation")
if valuation_dir.exists() and any(valuation_dir.glob("*.parquet")):
db.upload_indicator_smart("Valuation", valuation_dir, "data/valuation",
valuation_result.get('changed_files', []))
status.update('valuation',
last_trade_date=last_day,
record_count=valuation_result.get('record_count', 0),
status=valuation_result.get('status', 'success'))
# 6. 融资融券同步
logger.info("-" * 40)
logger.info("Syncing margin...")
margin_result = sync_margin(target_list.to_dict('records'), last_day)
# 智能上传融资融券数据(根据变更文件数量选择策略)
margin_dir = Path("/tmp/data/margin")
if margin_dir.exists() and any(margin_dir.glob("*.parquet")):
db.upload_indicator_smart("Margin", margin_dir, "data/margin",
margin_result.get('changed_files', []))
status.update('margin',
last_trade_date=last_day,
record_count=margin_result.get('record_count', 0),
status=margin_result.get('status', 'success'))
# 7. 财务指标同步
logger.info("-" * 40)
logger.info("Syncing financial indicator...")
financial_result = sync_financial_indicator(target_list.to_dict('records'))
fi_path = Path("/tmp/data/financial_indicator.parquet")
if fi_path.exists() and financial_result.get('new_records', 0) > 0:
upload_success = db.upload_indicator("Financial Indicator", fi_path, "data")
financial_status = 'success' if upload_success else 'upload_fail'
else:
financial_status = financial_result.get('status', 'skipped')
status.update('financial',
last_trade_date=last_day,
record_count=financial_result.get('record_count', 0),
status=financial_status,
new_records=financial_result.get('new_records', 0))
# 8. 股东户数同步
logger.info("-" * 40)
logger.info("Syncing holder num...")
holder_result = sync_holder_num()
holder_path = Path("/tmp/data/holder_num.parquet")
if holder_result.get('is_changed', False) and holder_path.exists():
upload_success = db.upload_indicator("Holder Num", holder_path, "data")
holder_status = 'success' if upload_success else 'upload_fail'
else:
holder_status = 'skipped' if not holder_result.get('is_changed', False) else holder_result.get('status', 'fail')
status.update('holder_num',
last_trade_date=last_day,
record_count=holder_result.get('record_count', 0),
status=holder_status,
is_changed=holder_result.get('is_changed', False))
# 9. 分红数据同步
logger.info("-" * 40)
logger.info("Syncing dividend...")
dividend_result = sync_dividend(target_list.to_dict('records'))
div_path = Path("/tmp/data/dividend.parquet")
if div_path.exists() and dividend_result.get('new_records', 0) > 0:
upload_success = db.upload_indicator("Dividend", div_path, "data")
dividend_status = 'success' if upload_success else 'upload_fail'
else:
dividend_status = dividend_result.get('status', 'skipped')
status.update('dividend',
last_trade_date=last_day,
record_count=dividend_result.get('record_count', 0),
status=dividend_status,
new_records=dividend_result.get('new_records', 0))
# 10. 十大股东同步
logger.info("-" * 40)
logger.info("Syncing top holders...")
top_holders_result = sync_top_holders()
top_holders_path = Path("/tmp/data/top_holders.parquet")
if top_holders_result.get('is_changed', False) and top_holders_path.exists():
upload_success = db.upload_indicator("Top Holders", top_holders_path, "data")
top_holders_status = 'success' if upload_success else 'upload_fail'
else:
top_holders_status = 'skipped' if not top_holders_result.get('is_changed', False) else top_holders_result.get('status', 'fail')
status.update('top_holders',
last_trade_date=last_day,
record_count=top_holders_result.get('record_count', 0),
status=top_holders_status,
is_changed=top_holders_result.get('is_changed', False))
# 11. 限售解禁同步
logger.info("-" * 40)
logger.info("Syncing restricted unlock...")
restricted_result = sync_restricted_unlock()
restricted_path = Path("/tmp/data/restricted_unlock.parquet")
if restricted_result.get('is_changed', False) and restricted_path.exists():
upload_success = db.upload_indicator("Restricted Unlock", restricted_path, "data")
restricted_status = 'success' if upload_success else 'upload_fail'
else:
restricted_status = 'skipped' if not restricted_result.get('is_changed', False) else restricted_result.get('status', 'fail')
status.update('restricted_unlock',
last_trade_date=last_day,
record_count=restricted_result.get('record_count', 0),
status=restricted_status,
is_changed=restricted_result.get('is_changed', False))
# 12. 上传状态文件
logger.info("-" * 40)
logger.info("Uploading sync status...")
status_path = Path("/tmp/data/sync_status.json")
status_upload_success = db.upload_indicator("Sync Status", status_path, "data")
if not status_upload_success:
logger.warning("Failed to upload sync status file")
logger.info("=" * 60)
logger.info("Sync Completed Successfully!")
summary = (f"Daily={daily_result.get('count', 0)}, FundFlow={fund_flow_result.get('count', 0)}, "
f"Valuation={valuation_result.get('count', 0)}, Margin={margin_result.get('count', 0)}, Financial={financial_result.get('count', 0)}, "
f"Holder={holder_result.get('count', 0)}, Dividend={dividend_result.get('count', 0)}, "
f"TopHolders={top_holders_result.get('count', 0)}, Restricted={restricted_result.get('count', 0)}")
logger.info(f"Summary: {summary}")
logger.info("=" * 60)
return 0
except Exception as e:
logger.error(f"Sync failed with error: {e}")
return 1
if __name__ == "__main__":
sys.exit(main())