""" 数据同步脚本 - Sync Space 专用版 从 AkShare 抓取数据并同步到 Hugging Face Dataset """ import os import sys import logging import time import threading import gc from datetime import datetime, timedelta from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Optional, Dict, Any from pathlib import Path import pandas as pd import akshare as ak from huggingface_hub import hf_hub_download, upload_file, list_repo_files # Tushare 适配器(优先使用) from app.tushare_adapter import ( get_stock_list_tushare, get_stock_daily_tushare, get_dividend_tushare, TUSHARE_AVAILABLE ) # 混合数据适配器(yfinance + efinance,作为回退) from app.hybrid_adapter import ( get_stock_daily_yfinance, get_index_daily_yfinance, get_fund_flow_efinance, YFINANCE_AVAILABLE ) # 添加当前目录到路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from app.database import get_db from app.database_user import get_beijing_time from app.sync_status import get_sync_status from app.stock_list_cache import get_cached_stock_list, save_stock_list_cache logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # 配置 YEARS_OF_DATA = 10 def _safe_int_env(var_name: str, default: int) -> int: """安全地读取环境变量并转换为整数""" try: value = os.getenv(var_name) if value is None: return default return int(value) except (ValueError, TypeError): logger.warning(f"Invalid value for {var_name}, using default: {default}") return default # 动态线程数配置(延迟计算,避免导入时触发 multiprocessing) def get_thread_config(): """获取线程池配置(延迟计算)""" import multiprocessing cpu_count = multiprocessing.cpu_count() # 分层并发策略(降低并发,避免触发服务端限流) config = { 'daily': _safe_int_env("MAX_WORKERS_DAILY", min(4, cpu_count)), 'fund': _safe_int_env("MAX_WORKERS_FUND", min(3, cpu_count)), 'valuation': _safe_int_env("MAX_WORKERS_VALUATION", min(3, cpu_count)), 'margin': _safe_int_env("MAX_WORKERS_MARGIN", min(3, cpu_count)), 'financial': _safe_int_env("MAX_WORKERS_FINANCIAL", min(2, cpu_count)), 'dividend': _safe_int_env("MAX_WORKERS_DIVIDEND", min(2, cpu_count)), } # 向后兼容 legacy = _safe_int_env("MAX_WORKERS", 0) if legacy > 0: config = {k: legacy for k in config} return cpu_count, config # 延迟初始化线程配置(在 main 中调用) _CPU_COUNT = None _THREAD_CONFIG = None _thread_config_lock = threading.Lock() def get_thread_config_safe(): """安全获取线程配置(自动初始化,线程安全)""" global _CPU_COUNT, _THREAD_CONFIG if _THREAD_CONFIG is None: with _thread_config_lock: # 双重检查锁定模式 if _THREAD_CONFIG is None: _CPU_COUNT, _THREAD_CONFIG = get_thread_config() logger.info(f"Thread pool config: CPU={_CPU_COUNT}, " f"Daily={_THREAD_CONFIG['daily']}, Fund={_THREAD_CONFIG['fund']}, " f"Valuation={_THREAD_CONFIG['valuation']}, Margin={_THREAD_CONFIG['margin']}, " f"Financial={_THREAD_CONFIG['financial']}, Dividend={_THREAD_CONFIG['dividend']}") return _THREAD_CONFIG def init_thread_config(): """初始化线程配置(在 main 中调用)""" get_thread_config_safe() def get_stock_list() -> pd.DataFrame: """获取全市场标的列表(带缓存机制)""" # 1. 尝试使用缓存 cached_df = get_cached_stock_list() if cached_df is not None: return cached_df # 2. 缓存无效,重新获取 logger.info("Fetching all-market target list...") all_lists = [] # A股列表获取(优先使用Tushare,失败则回退到AkShare) max_retries = 5 base_delay = 2.0 # 尝试使用Tushare获取A股列表(更稳定,字段完整) if TUSHARE_AVAILABLE: try: df_a = get_stock_list_tushare() if df_a is not None and len(df_a) > 0: all_lists.append(df_a) logger.info(f"A-stock list fetched from Tushare: {len(df_a)} stocks") else: raise Exception("Tushare returned empty data") except Exception as e: logger.warning(f"Tushare get_stock_list failed: {e}, falling back to AkShare") # 如果Tushare失败或不可用,使用AkShare if not all_lists: for attempt in range(max_retries): try: # 使用 stock_info_a_code_name() 替代 stock_zh_a_spot_em(),更稳定 df_a = ak.stock_info_a_code_name() df_a.columns = ['code', 'name'] df_a['market'] = df_a['code'].apply(lambda x: '主板' if x.startswith(('60', '00')) else ('创业板' if x.startswith('30') else ('科创板' if x.startswith('68') else ('北交所' if x.startswith(('8', '4', '920')) else '其他')))) all_lists.append(df_a) logger.info(f"A-stock list fetched from AkShare: {len(df_a)} stocks") break # 成功则退出重试循环 except Exception as e: if attempt == max_retries - 1: logger.error(f"Failed to fetch A-stock list after {max_retries} attempts: {e}") else: delay = base_delay * (2 ** attempt) logger.warning(f"Attempt {attempt + 1} failed, retrying in {delay}s... Error: {e}") time.sleep(delay) # ETF (增加容错) try: df_etf = ak.fund_etf_spot_em() # 检查实际列名 code_cols = ['代码', 'code', 'fund_code', 'ETF代码'] name_cols = ['名称', 'name', 'fund_name', 'ETF名称'] c_code = None c_name = None for col in code_cols: if col in df_etf.columns: c_code = col break for col in name_cols: if col in df_etf.columns: c_name = col break if c_code and c_name: df_etf = df_etf[[c_code, c_name]] df_etf.columns = ['code', 'name'] df_etf['market'] = 'ETF' all_lists.append(df_etf) logger.info(f"ETF list fetched: {len(df_etf)} funds") else: logger.warning(f"Could not find code/name columns in ETF data. Available columns: {df_etf.columns.tolist()}") except Exception as e: logger.warning(f"ETF list fetch failed: {e}") # LOF try: df_lof = ak.fund_lof_spot_em() # 检查实际列名 code_cols = ['代码', 'code', 'fund_code', 'LOF代码'] name_cols = ['名称', 'name', 'fund_name', 'LOF名称'] c_code = None c_name = None for col in code_cols: if col in df_lof.columns: c_code = col break for col in name_cols: if col in df_lof.columns: c_name = col break if c_code and c_name: df_lof = df_lof[[c_code, c_name]] df_lof.columns = ['code', 'name'] df_lof['market'] = 'LOF' all_lists.append(df_lof) logger.info(f"LOF list fetched: {len(df_lof)} funds") else: logger.warning(f"Could not find code/name columns in LOF data. Available columns: {df_lof.columns.tolist()}") except Exception as e: logger.warning(f"LOF list fetch failed: {e}") # REITs try: df_reits = ak.reits_realtime_em() # 检查实际列名 code_cols = ['代码', 'code', 'REITs代码'] name_cols = ['名称', 'name', 'REITs名称'] c_code = None c_name = None for col in code_cols: if col in df_reits.columns: c_code = col break for col in name_cols: if col in df_reits.columns: c_name = col break if c_code and c_name: df_reits = df_reits[[c_code, c_name]] df_reits.columns = ['code', 'name'] df_reits['market'] = 'REITs' all_lists.append(df_reits) logger.info(f"REITs list fetched: {len(df_reits)} products") else: logger.warning(f"Could not find code/name columns in REITs data. Available columns: {df_reits.columns.tolist()}") except Exception as e: logger.warning(f"REITs list fetch failed: {e}") # 可转债 try: df_cb = ak.bond_zh_hs_cov_spot() # 检查实际列名 logger.info(f"Convertible bond columns: {df_cb.columns.tolist()}") # 尝试找到正确的代码列名 code_cols = ['代码', 'symbol', 'bond_code', '代码', '转债代码', '代码'] name_cols = ['名称', 'name', 'bond_name', '转债名称', '名称'] c_code = None c_name = None for col in code_cols: if col in df_cb.columns: c_code = col break for col in name_cols: if col in df_cb.columns: c_name = col break if c_code and c_name: # 过滤掉未上市可转债(成交额=0 或 最新价=0) # 检查是否有成交额列 amount_col = None for col in ['amount', '成交额', 'Amount']: if col in df_cb.columns: amount_col = col break if amount_col: before_count = len(df_cb) df_cb = df_cb[df_cb[amount_col] > 0] filtered_count = before_count - len(df_cb) if filtered_count > 0: logger.info(f"Filtered {filtered_count} unlisted convertible bonds (amount=0)") df_cb = df_cb[[c_code, c_name]] df_cb.columns = ['code', 'name'] df_cb['market'] = '可转债' all_lists.append(df_cb) logger.info(f"Convertible bond list fetched: {len(df_cb)} bonds") else: logger.warning(f"Could not find code/name columns in convertible bond data. Available columns: {df_cb.columns.tolist()}") except Exception as e: logger.warning(f"Convertible bond list fetch failed: {e}") if not all_lists: db = get_db() return db.conn.execute("SELECT code, name, market FROM stock_list").df() df = pd.concat(all_lists).drop_duplicates(subset=['code']) df['list_date'] = None # 3. 保存到缓存 save_stock_list_cache(df) return df def get_target_daily(code: str, start_date: str, market: str) -> Optional[pd.DataFrame]: """抓取单只标的数据""" max_retries = 5 base_delay = 1.0 # 寂础延迟秒 for attempt in range(max_retries): try: # 指数退避:每次重试增加延迟 if attempt > 0: delay = base_delay * (2 ** attempt) # 1s, 2s, 4s, 8s time.sleep(delay) end_date = get_beijing_time().strftime('%Y%m%d') fetch_start = start_date.replace('-', '') df = None if market == 'INDEX': # 指数:优先使用 yfinance,失败则回退 AkShare if YFINANCE_AVAILABLE: df = get_index_daily_yfinance(code, fetch_start, end_date) if df is not None: logger.debug(f"Got index data from yfinance for {code}") if df is None: df = ak.stock_zh_index_daily_em(symbol=f"sh{code}" if code.startswith('000') else f"sz{code}") elif market == 'ETF': df = ak.fund_etf_hist_em(symbol=code, period="daily", start_date=fetch_start, end_date=end_date, adjust="hfq") elif market == 'LOF': df = ak.fund_lof_hist_em(symbol=code, period="daily", start_date=fetch_start, end_date=end_date, adjust="hfq") elif market == '可转债': # bond_zh_hs_cov_daily 需要带交易所前缀的代码(如 sh110048, sz123015) # 北交所可转债(bj开头)不支持 if code.startswith('bj'): return None # 北交所可转债 API 不支持 df = ak.bond_zh_hs_cov_daily(symbol=code) elif market == 'REITs': df = ak.reits_hist_em(symbol=code) else: # A股个股:优先使用 Tushare,失败则回退 yfinance/AkShare if TUSHARE_AVAILABLE: df = get_stock_daily_tushare(code, fetch_start, end_date, adj='qfq') if df is not None: logger.debug(f"Got data from Tushare for {code}") if df is None and YFINANCE_AVAILABLE: df = get_stock_daily_yfinance(code, fetch_start, end_date) if df is not None: logger.debug(f"Got data from yfinance for {code}") if df is None: logger.debug(f"Tushare/yfinance failed, falling back to AkShare for {code}") df = ak.stock_zh_a_hist(symbol=code, period="daily", start_date=fetch_start, end_date=end_date, adjust="hfq") if df is not None and not df.empty: # 标准化列名 rename_map = { '日期': 'trade_date', 'date': 'trade_date', 'Date': 'trade_date', '开盘': 'open', '今开': 'open', 'Open': 'open', '最高': 'high', 'High': 'high', '最低': 'low', 'Low': 'low', '收盘': 'close', '最新价': 'close', 'Close': 'close', '成交量': 'volume', 'Volume': 'volume', '成交额': 'amount', 'Amount': 'amount', '涨跌幅': 'pct_chg', '换手率': 'turnover_rate', '换手': 'turnover_rate' } df = df.rename(columns=rename_map) if 'trade_date' not in df.columns: df = df.reset_index().rename(columns={'index': 'trade_date', 'date': 'trade_date'}) df['trade_date'] = pd.to_datetime(df['trade_date']) df = df[df['trade_date'] >= pd.to_datetime(start_date)] if 'amount' not in df.columns: df['amount'] = 0 if 'pct_chg' not in df.columns: df['pct_chg'] = df['close'].pct_change() * 100 if 'turnover_rate' not in df.columns: df['turnover_rate'] = 0 df['code'] = code return df[['code', 'trade_date', 'open', 'high', 'low', 'close', 'volume', 'amount', 'pct_chg', 'turnover_rate']] except Exception as e: if attempt == max_retries - 1: logger.warning(f"Failed to fetch {code} ({market}): {str(e)}") time.sleep(1) return None def check_today_data_available() -> bool: """ 探测当日数据是否已更新可用 通过尝试获取一只活跃股票(平安银行 000001)的当日数据来判断 Returns: True: 当日数据已可用 False: 当日数据尚未更新 """ try: today = get_beijing_time().strftime('%Y%m%d') # 优先使用 Tushare 探测 if TUSHARE_AVAILABLE: try: import tushare as ts ts.set_token(TUSHARE_TOKENS[0]) pro = ts.pro_api() df = pro.daily(ts_code='000001.SZ', trade_date=today) if df is not None and not df.empty: logger.debug(f"Tushare check: today data available") return True except Exception: pass # 回退到 AkShare 探测 try: df = ak.stock_zh_a_spot_em() if df is not None and not df.empty: # 检查是否有今日数据 logger.debug(f"AkShare check: today data available") return True except Exception: pass except Exception as e: logger.debug(f"Check today data failed: {e}") return False def get_last_trading_day() -> str: """获取最近一个交易日(通过探测数据可用性) 逻辑: 1. 先尝试获取当日数据(探测 000001.SZ) 2. 如果能获取到,说明当日数据已更新,返回今日 3. 如果获取不到,说明当日数据未更新,返回上一个交易日 """ now = get_beijing_time() today = now.date() today_str = today.strftime('%Y-%m-%d') # 探测当日数据是否可用 logger.info(f"Checking if today ({today_str}) data is available...") if check_today_data_available(): logger.info(f"Today data is available, using today: {today_str}") return today_str else: # 获取上一个交易日 try: # 使用交易日历获取上一个交易日 df = ak.tool_trade_date_hist_sina() if df is not None and not df.empty: df['trade_date'] = pd.to_datetime(df['trade_date']).dt.date prev_trading_days = df[df['trade_date'] < today]['trade_date'] if not prev_trading_days.empty: last_day = prev_trading_days.iloc[-1] logger.info(f"Today data not available yet, using previous trading day: {last_day}") return last_day.strftime('%Y-%m-%d') except Exception as e: logger.warning(f"Failed to get previous trading day: {e}") # 回退:昨天 yesterday = (today - timedelta(days=1)).strftime('%Y-%m-%d') logger.info(f"Using yesterday as fallback: {yesterday}") return yesterday # 备用:使用指数行情获取 try: df = ak.stock_zh_index_daily_em(symbol="sh000300") if df is not None and not df.empty: date_col = 'date' if 'date' in df.columns else ('日期' if '日期' in df.columns else None) if date_col: last_date = pd.to_datetime(df[date_col].iloc[-1]).date() # 同样检查时间条件 if last_date == today and current_hour < 20: # 返回前一天的数据 return pd.to_datetime(df[date_col].iloc[-2]).strftime('%Y-%m-%d') return last_date.strftime('%Y-%m-%d') except Exception: pass # 最终回退:按工作日估算 d = get_beijing_time() # 如果当前时间 < 20:00,从昨天开始查找 if current_hour < 20: d -= timedelta(days=1) while d.weekday() >= 5: d -= timedelta(days=1) return d.strftime('%Y-%m-%d') def get_index_daily(code: str) -> Optional[pd.DataFrame]: """抓取指数日线""" try: symbol = f"sh{code}" if code.startswith('000') else f"sz{code}" df = ak.stock_zh_index_daily_em(symbol=symbol) if df is None or df.empty: return None rename_map = { 'date': 'trade_date', '日期': 'trade_date', 'open': 'open', '开盘': 'open', 'high': 'high', '最高': 'high', 'low': 'low', '最低': 'low', 'close': 'close', '收盘': 'close', 'volume': 'volume', '成交量': 'volume', 'amount': 'amount', '成交额': 'amount', 'pct_chg': 'pct_chg', '涨跌幅': 'pct_chg' } df = df.rename(columns=rename_map) if 'trade_date' not in df.columns: return None df['trade_date'] = pd.to_datetime(df['trade_date']) if 'amount' not in df.columns: df['amount'] = 0 if 'pct_chg' not in df.columns: df['pct_chg'] = df['close'].pct_change() * 100 if 'volume' not in df.columns: df['volume'] = 0 df['turnover_rate'] = 0 df['code'] = code return df[['code', 'trade_date', 'open', 'high', 'low', 'close', 'volume', 'amount', 'pct_chg', 'turnover_rate']] except Exception as e: logger.warning(f"Failed to fetch index {code}: {e}") return None def sync_stock_daily(targets: List[Dict[str, str]], last_trade_day: str) -> Dict[str, Any]: """增量同步逻辑,返回详细结果(采用全局水位线机制)""" logger.info("Syncing daily data...") # 1. 扫描本地 parquet 文件获取全局最新日期(类似其他指标) parquet_dir = Path("/tmp/data/parquet") parquet_dir.mkdir(parents=True, exist_ok=True) global_latest_date = "2000-01-01" existing_codes = set() for f in parquet_dir.glob("*.parquet"): if f.name.startswith('index_'): # 跳过指数文件 continue try: df = pd.read_parquet(f) if not df.empty and 'trade_date' in df.columns: max_date = df['trade_date'].max() if isinstance(max_date, pd.Timestamp): max_date = max_date.strftime('%Y-%m-%d') if max_date > global_latest_date: global_latest_date = max_date existing_codes.update(df['code'].unique()) except Exception: pass # 2. 如果本地没有数据,尝试从云端下载最近3个月作为基准 if global_latest_date == "2000-01-01": repo_id = os.getenv("DATASET_REPO_ID") if repo_id: try: files = list_repo_files(repo_id=repo_id, repo_type="dataset") parquet_files = sorted([f for f in files if f.startswith("data/parquet/") and f.endswith(".parquet")]) # 下载最近3个月的数据作为基准 for pf in parquet_files[-3:]: try: local_file = hf_hub_download(repo_id=repo_id, filename=pf, repo_type="dataset") df = pd.read_parquet(local_file) if not df.empty and 'trade_date' in df.columns: max_date = df['trade_date'].max() if isinstance(max_date, pd.Timestamp): max_date = max_date.strftime('%Y-%m-%d') if max_date > global_latest_date: global_latest_date = max_date existing_codes.update(df['code'].unique()) except Exception: pass logger.info(f"Downloaded daily data from cloud, latest date: {global_latest_date}") except Exception as e: logger.info(f"No existing daily data in cloud: {e}") # 3. 区分新股和存量股票 new_codes = [t for t in targets if t['code'] not in existing_codes] # 4. 全局水位线拦截 if global_latest_date >= last_trade_day and not new_codes: logger.info(f"Daily data is already up to date ({global_latest_date}) and no new stocks. Skip.") return {'count': 0, 'failed_codes': [], 'status': 'skipped', 'message': f'Already up to date ({global_latest_date})'} # 5. 确定同步策略 if global_latest_date >= last_trade_day: logger.info(f"Global date is up to date, but found {len(new_codes)} new stocks. Syncing new stocks only.") sync_targets = new_codes # 新股只获取最近1年数据(而非10年) start_dt = (get_beijing_time() - timedelta(days=365)).strftime('%Y-%m-%d') else: logger.info(f"Syncing daily data from {global_latest_date} to {last_trade_day}...") sync_targets = targets start_dt = (pd.to_datetime(global_latest_date) + timedelta(days=1)).strftime('%Y-%m-%d') # 设置每只股票的start_dt pending = [] for t in sync_targets: t['start_dt'] = start_dt pending.append(t) # 应用 SYNC_LIMIT 限制 sync_limit = int(os.getenv("SYNC_LIMIT", -1)) if sync_limit > 0 and len(pending) > sync_limit: logger.info(f"Limiting sync to first {sync_limit} targets (out of {len(pending)})") pending = pending[:sync_limit] if not pending: return {'count': 0, 'failed_codes': [], 'status': 'skipped', 'message': 'No pending targets'} logger.info(f"Syncing {len(pending)} targets...") all_new_data = [] failed_codes = [] success_codes = [] with ThreadPoolExecutor(max_workers=get_thread_config_safe()['daily']) as executor: futures = {executor.submit(get_target_daily, t['code'], t['start_dt'], t['market']): t['code'] for t in pending} for i, future in enumerate(as_completed(futures), 1): code = futures[future] res = future.result() if res is not None: all_new_data.append(res) success_codes.append(code) else: failed_codes.append(code) if i % 500 == 0: logger.info(f"Progress: {i}/{len(pending)}") changed_files = [] # 记录变更的文件名 if all_new_data: inc_df = pd.concat(all_new_data, ignore_index=True) # 方案3:及时释放内存 del all_new_data total_records = len(inc_df) # 识别变动月份 changed = inc_df.assign(yr=inc_df['trade_date'].dt.year, mo=inc_df['trade_date'].dt.month)[['yr', 'mo']].drop_duplicates().values for yr, mo in changed: yr, mo = int(yr), int(mo) filename = f"{yr}-{mo:02d}.parquet" local_path = Path(f"/tmp/data/parquet/{filename}") # Sync Space 使用 /tmp local_path.parent.mkdir(parents=True, exist_ok=True) # 增量核心:先检查本地是否有,没有再从云端拉取 old_df = None if local_path.exists(): try: old_df = pd.read_parquet(local_path) logger.info(f"Using local cache for {filename}") except Exception: pass if old_df is None: repo_id = os.getenv("DATASET_REPO_ID") if repo_id: try: old_file = hf_hub_download(repo_id=repo_id, filename=f"data/parquet/{filename}", repo_type="dataset") old_df = pd.read_parquet(old_file) logger.info(f"Downloaded {filename} from cloud") except Exception: pass # 合并新数据 month_inc = inc_df[(inc_df['trade_date'].dt.year == yr) & (inc_df['trade_date'].dt.month == mo)] if old_df is not None: final_month_df = pd.concat([old_df, month_inc]).drop_duplicates(subset=['code', 'trade_date']) # 方案3:释放旧数据内存 del old_df, month_inc else: final_month_df = month_inc final_month_df.to_parquet(local_path) changed_files.append(filename) # 记录变更的文件 logger.info(f"Saved updated data for {filename}") # 方案3:释放最终数据内存 del final_month_df # 方案3:循环结束后释放inc_df并触发GC del inc_df gc.collect() else: total_records = 0 return { 'count': len(success_codes), 'failed_codes': failed_codes, 'status': 'success' if not failed_codes else 'partial_fail', 'record_count': total_records, 'success_rate': len(success_codes) / len(pending) if pending else 0, 'changed_files': changed_files # 返回变更文件列表 } # ==================== 新增:资金流向数据同步 ==================== def get_stock_fund_flow(code: str, market: str) -> Optional[pd.DataFrame]: """获取单只股票资金流向数据(优先使用 efinance,失败则回退 AkShare)""" # 标准化列名映射 standard_cols = ['code', 'trade_date', 'close', 'pct_chg', 'main_net_inflow', 'main_net_inflow_pct', 'huge_net_inflow', 'huge_net_inflow_pct', 'large_net_inflow', 'large_net_inflow_pct', 'medium_net_inflow', 'medium_net_inflow_pct', 'small_net_inflow', 'small_net_inflow_pct'] # 1. 优先尝试 efinance try: df = get_fund_flow_efinance(code, '20000101', '20991231') if df is not None and not df.empty: # efinance 字段映射 rename_map = { '日期': 'trade_date', '收盘价': 'close', '涨跌幅': 'pct_chg', '主力净流入': 'main_net_inflow', '主力净流入占比': 'main_net_inflow_pct', '超大单净流入': 'huge_net_inflow', '超大单流入净占比': 'huge_net_inflow_pct', '大单净流入': 'large_net_inflow', '大单流入净占比': 'large_net_inflow_pct', '中单净流入': 'medium_net_inflow', '中单流入净占比': 'medium_net_inflow_pct', '小单净流入': 'small_net_inflow', '小单流入净占比': 'small_net_inflow_pct', } df = df.rename(columns=rename_map) if 'trade_date' in df.columns: df['trade_date'] = pd.to_datetime(df['trade_date']) df['code'] = code result_cols = [c for c in standard_cols if c in df.columns] logger.debug(f"Got fund flow from efinance for {code}") return df[result_cols] except Exception as e: logger.debug(f"efinance fund flow failed for {code}: {e}") # 2. 回退到 AkShare max_retries = 3 base_delay = 1.0 for attempt in range(max_retries): try: # 确定 market 参数 if market == '北交所' or code.startswith(('8', '4', '920')): mk = 'bj' elif code.startswith(('6', '9')): mk = 'sh' else: mk = 'sz' df = ak.stock_individual_fund_flow(stock=code, market=mk) if df is not None and not df.empty: # AkShare 字段映射 rename_map = { '日期': 'trade_date', '收盘价': 'close', '涨跌幅': 'pct_chg', '主力净流入-净额': 'main_net_inflow', '主力净流入-净占比': 'main_net_inflow_pct', '超大单净流入-净额': 'huge_net_inflow', '超大单净流入-净占比': 'huge_net_inflow_pct', '大单净流入-净额': 'large_net_inflow', '大单净流入-净占比': 'large_net_inflow_pct', '中单净流入-净额': 'medium_net_inflow', '中单净流入-净占比': 'medium_net_inflow_pct', '小单净流入-净额': 'small_net_inflow', '小单净流入-净占比': 'small_net_inflow_pct', } df = df.rename(columns=rename_map) df['trade_date'] = pd.to_datetime(df['trade_date']) df['code'] = code result_cols = [c for c in standard_cols if c in df.columns] logger.debug(f"Got fund flow from AkShare for {code}") return df[result_cols] except Exception as e: if attempt > 0: delay = base_delay * (2 ** attempt) time.sleep(delay) return None def sync_fund_flow(targets: List[Dict[str, str]], last_trade_day: str) -> Dict[str, Any]: """同步资金流向数据(按月分表版),返回详细结果""" logger.info("Syncing fund flow data...") # 1. 从所有本地 parquet 文件计算全局最新日期 flow_dir = Path("/tmp/data/fund_flow") flow_dir.mkdir(parents=True, exist_ok=True) global_latest_date = "2000-01-01" existing_codes = set() # 扫描本地已有的月度文件 for f in flow_dir.glob("*.parquet"): try: df = pd.read_parquet(f) if not df.empty: max_date = df['trade_date'].max().strftime('%Y-%m-%d') if max_date > global_latest_date: global_latest_date = max_date existing_codes.update(df['code'].unique()) except Exception: pass # 如果本地没有数据,尝试从云端下载 if global_latest_date == "2000-01-01": repo_id = os.getenv("DATASET_REPO_ID") if repo_id: try: # 获取云端文件列表 files = list_repo_files(repo_id=repo_id, repo_type="dataset") flow_files = sorted([f for f in files if f.startswith("data/fund_flow/") and f.endswith(".parquet")]) for ff in flow_files[-3:]: # 先下载最近3个月的数据作为基准 try: local_file = hf_hub_download(repo_id=repo_id, filename=ff, repo_type="dataset") df = pd.read_parquet(local_file) if not df.empty: # 提取月份 filename = Path(ff).stem # 如 "2026-02" local_path = flow_dir / f"{filename}.parquet" df.to_parquet(local_path) max_date = df['trade_date'].max().strftime('%Y-%m-%d') if max_date > global_latest_date: global_latest_date = max_date existing_codes.update(df['code'].unique()) except Exception: pass logger.info(f"Downloaded fund flow from cloud, latest date: {global_latest_date}") except Exception as e: logger.info(f"No existing fund flow data in cloud: {e}") # 2. 过滤目标(排除 ETF/LOF/REITs/可转债) stock_targets = [t for t in targets if t['market'] not in ['ETF', 'LOF', 'REITs', '可转债']] new_codes = [t for t in stock_targets if t['code'] not in existing_codes] # 3. 全局水位线拦截 if global_latest_date >= last_trade_day and not new_codes: logger.info(f"Fund flow data is already up to date ({global_latest_date}) and no new stocks. Skip.") return {'count': 0, 'failed_codes': [], 'status': 'skipped', 'message': f'Already up to date ({global_latest_date})'} # 4. 增量获取 if global_latest_date >= last_trade_day: logger.info(f"Global date is up to date, but found {len(new_codes)} new stocks. Syncing new stocks only.") sync_targets = new_codes else: logger.info(f"Syncing fund flow from {global_latest_date} to {last_trade_day}...") sync_targets = stock_targets all_data = [] success_codes = [] failed_codes = [] with ThreadPoolExecutor(max_workers=get_thread_config_safe()['fund']) as executor: futures = {executor.submit(get_stock_fund_flow, t['code'], t['market']): t['code'] for t in sync_targets} for i, future in enumerate(as_completed(futures), 1): code = futures[future] res = future.result() if res is not None and not res.empty: if code in existing_codes: res = res[res['trade_date'] > pd.to_datetime(global_latest_date)] if not res.empty: all_data.append(res) success_codes.append(code) else: failed_codes.append(code) if i % 500 == 0: logger.info(f"Fund flow progress: {i}/{len(sync_targets)}, success: {len(success_codes)}") total_records = 0 changed_files = [] # 记录变更的文件名 # 5. 按月分表保存 if all_data: new_df = pd.concat(all_data, ignore_index=True) # 方案3:及时释放内存 del all_data total_records = len(new_df) # 确定需要更新的月份 if not new_df.empty: min_date = new_df['trade_date'].min() max_date = new_df['trade_date'].max() current = min_date.to_period('M') end_period = max_date.to_period('M') while current <= end_period: yr, mo = current.year, current.month month_data = new_df[(new_df['trade_date'].dt.year == yr) & (new_df['trade_date'].dt.month == mo)] if not month_data.empty: filename = f"{yr}-{mo:02d}.parquet" local_path = flow_dir / filename # 合并已有数据 old_month_df = None if local_path.exists(): try: old_month_df = pd.read_parquet(local_path) except Exception: pass if old_month_df is not None: final_month_df = pd.concat([old_month_df, month_data]).drop_duplicates(subset=['code', 'trade_date']) # 方案3:释放旧数据内存 del old_month_df, month_data else: final_month_df = month_data final_month_df.to_parquet(local_path) changed_files.append(filename) # 记录变更的文件 logger.info(f"Saved fund flow data for {filename}") # 方案3:释放最终数据内存 del final_month_df current += 1 logger.info(f"Fund flow updated: {len(new_df)} new records") # 方案3:释放new_df并触发GC del new_df gc.collect() return { 'count': len(success_codes), 'failed_codes': failed_codes, 'status': 'success' if not failed_codes else 'partial_fail', 'record_count': total_records, 'success_rate': len(success_codes) / len(sync_targets) if sync_targets else 0, 'changed_files': changed_files # 返回变更文件列表 } # ==================== 新增:估值指标数据同步 ==================== def get_stock_valuation(code: str) -> Optional[pd.DataFrame]: """获取单只股票估值指标数据""" max_retries = 5 base_delay = 1.0 for attempt in range(max_retries): try: df = ak.stock_a_lg_indicator(symbol=code) if df is not None and not df.empty: # 标准化列名 rename_map = { '日期': 'trade_date', '市盈率': 'pe_ttm', '市盈率TTM': 'pe_ttm', '静态市盈率': 'pe_static', '市净率': 'pb', '市销率': 'ps_ttm', '股息率': 'dv_ratio', '总市值': 'total_mv', '流通市值': 'circ_mv', } df = df.rename(columns=rename_map) df['trade_date'] = pd.to_datetime(df['trade_date']) df['code'] = code cols = ['code', 'trade_date', 'pe_ttm', 'pe_static', 'pb', 'ps_ttm', 'dv_ratio', 'total_mv', 'circ_mv'] available_cols = [c for c in cols if c in df.columns] return df[available_cols] except Exception: if attempt > 0: delay = base_delay * (2 ** attempt) time.sleep(delay) return None def sync_valuation(targets: List[Dict[str, str]], last_trade_day: str) -> Dict[str, Any]: """同步估值指标数据(按月分表版)""" logger.info("Syncing valuation data...") # 1. 从所有本地 parquet 文件计算全局最新日期 val_dir = Path("/tmp/data/valuation") val_dir.mkdir(parents=True, exist_ok=True) global_latest_date = "2000-01-01" existing_codes = set() # 扫描本地已有的月度文件 for f in val_dir.glob("*.parquet"): try: df = pd.read_parquet(f) if not df.empty: max_date = df['trade_date'].max().strftime('%Y-%m-%d') if max_date > global_latest_date: global_latest_date = max_date existing_codes.update(df['code'].unique()) except Exception: pass # 如果本地没有数据,尝试从云端下载 if global_latest_date == "2000-01-01": repo_id = os.getenv("DATASET_REPO_ID") if repo_id: try: files = list_repo_files(repo_id=repo_id, repo_type="dataset") val_files = sorted([f for f in files if f.startswith("data/valuation/") and f.endswith(".parquet")]) for vf in val_files[-3:]: # 先下载最近3个月 try: local_file = hf_hub_download(repo_id=repo_id, filename=vf, repo_type="dataset") df = pd.read_parquet(local_file) if not df.empty: filename = Path(vf).stem local_path = val_dir / f"{filename}.parquet" df.to_parquet(local_path) max_date = df['trade_date'].max().strftime('%Y-%m-%d') if max_date > global_latest_date: global_latest_date = max_date existing_codes.update(df['code'].unique()) except Exception: pass logger.info(f"Downloaded valuation from cloud, latest date: {global_latest_date}") except Exception as e: logger.info(f"No existing valuation data in cloud: {e}") # 2. 过滤目标(排除 ETF/LOF/REITs/可转债) stock_targets = [t for t in targets if t['market'] not in ['ETF', 'LOF', 'REITs', '可转债']] new_codes = [t for t in stock_targets if t['code'] not in existing_codes] # 3. 全局水位线拦截 if global_latest_date >= last_trade_day and not new_codes: logger.info(f"Valuation data is already up to date ({global_latest_date}) and no new stocks. Skip.") return {'count': 0, 'failed_codes': [], 'status': 'skipped', 'record_count': 0, 'changed_files': [], 'message': f'Already up to date ({global_latest_date})'} # 4. 增量获取 if global_latest_date >= last_trade_day: logger.info(f"Global date is up to date, but found {len(new_codes)} new stocks. Syncing new stocks only.") sync_targets = new_codes else: logger.info(f"Syncing valuation from {global_latest_date} to {last_trade_day}...") sync_targets = stock_targets all_data = [] success_count = 0 with ThreadPoolExecutor(max_workers=get_thread_config_safe()['valuation']) as executor: futures = {executor.submit(get_stock_valuation, t['code']): t['code'] for t in sync_targets} for i, future in enumerate(as_completed(futures), 1): res = future.result() if res is not None and not res.empty: code = futures[future] if code in existing_codes: res = res[res['trade_date'] > pd.to_datetime(global_latest_date)] if not res.empty: all_data.append(res) success_count += 1 if i % 500 == 0: logger.info(f"Valuation progress: {i}/{len(sync_targets)}, success: {success_count}") total_records = 0 changed_files = [] # 记录变更的文件名 # 5. 按月分表保存 if all_data: new_df = pd.concat(all_data, ignore_index=True) # 方案3:及时释放内存 del all_data total_records = len(new_df) if not new_df.empty: min_date = new_df['trade_date'].min() max_date = new_df['trade_date'].max() current = min_date.to_period('M') end_period = max_date.to_period('M') while current <= end_period: yr, mo = current.year, current.month month_data = new_df[(new_df['trade_date'].dt.year == yr) & (new_df['trade_date'].dt.month == mo)] if not month_data.empty: filename = f"{yr}-{mo:02d}.parquet" local_path = val_dir / filename old_month_df = None if local_path.exists(): try: old_month_df = pd.read_parquet(local_path) except Exception: pass if old_month_df is not None: final_month_df = pd.concat([old_month_df, month_data]).drop_duplicates(subset=['code', 'trade_date']) # 方案3:释放旧数据内存 del old_month_df, month_data else: final_month_df = month_data final_month_df.to_parquet(local_path) changed_files.append(filename) # 记录变更的文件 logger.info(f"Saved valuation data for {filename}") # 方案3:释放最终数据内存 del final_month_df current += 1 logger.info(f"Valuation updated: {len(new_df)} new records") # 方案3:释放new_df并触发GC del new_df gc.collect() return { 'count': success_count, 'failed_codes': [], 'status': 'success', 'record_count': total_records, 'changed_files': changed_files # 返回变更文件列表 } # ==================== 新增:融资融券数据同步 ==================== def get_stock_margin(code: str) -> Optional[pd.DataFrame]: """获取单只股票融资融券数据""" max_retries = 5 base_delay = 1.0 for attempt in range(max_retries): try: # 尝试上交所 if code.startswith('6'): df = ak.stock_margin_detail_sh(symbol=code) else: df = ak.stock_margin_detail_sz(symbol=code) if df is not None and not df.empty: # 标准化列名 rename_map = { '日期': 'trade_date', '融资余额': 'rzye', '融资买入额': 'rzmre', '融资偿还额': 'rzche', '融券余额': 'rqye', '融券卖出量': 'rqmcl', '融资融券余额': 'rzrqye', } df = df.rename(columns=rename_map) if 'trade_date' in df.columns: df['trade_date'] = pd.to_datetime(df['trade_date']) df['code'] = code cols = ['code', 'trade_date', 'rzye', 'rzmre', 'rzche', 'rqye', 'rqmcl', 'rzrqye'] available_cols = [c for c in cols if c in df.columns] return df[available_cols] except Exception as e: if attempt == max_retries - 1: pass if attempt > 0: delay = base_delay * (2 ** attempt) time.sleep(delay) return None def sync_margin(targets: List[Dict[str, str]], last_trade_day: str) -> Dict[str, Any]: """同步融资融券数据(按月分表版)""" logger.info("Syncing margin trading data...") # 1. 从所有本地 parquet 文件计算全局最新日期 margin_dir = Path("/tmp/data/margin") margin_dir.mkdir(parents=True, exist_ok=True) global_latest_date = "2000-01-01" existing_codes = set() # 扫描本地已有的月度文件 for f in margin_dir.glob("*.parquet"): try: df = pd.read_parquet(f) if not df.empty: max_date = df['trade_date'].max().strftime('%Y-%m-%d') if max_date > global_latest_date: global_latest_date = max_date existing_codes.update(df['code'].unique()) except Exception: pass # 如果本地没有数据,尝试从云端下载 if global_latest_date == "2000-01-01": repo_id = os.getenv("DATASET_REPO_ID") if repo_id: try: files = list_repo_files(repo_id=repo_id, repo_type="dataset") margin_files = sorted([f for f in files if f.startswith("data/margin/") and f.endswith(".parquet")]) for mf in margin_files[-3:]: # 先下载最近3个月 try: local_file = hf_hub_download(repo_id=repo_id, filename=mf, repo_type="dataset") df = pd.read_parquet(local_file) if not df.empty: filename = Path(mf).stem local_path = margin_dir / f"{filename}.parquet" df.to_parquet(local_path) max_date = df['trade_date'].max().strftime('%Y-%m-%d') if max_date > global_latest_date: global_latest_date = max_date existing_codes.update(df['code'].unique()) except Exception: pass logger.info(f"Downloaded margin from cloud, latest date: {global_latest_date}") except Exception as e: logger.info(f"No existing margin data in cloud: {e}") # 2. 过滤目标(只保留主板、创业板、科创板) stock_targets = [t for t in targets if t['market'] in ['主板', '创业板', '科创板']] new_codes = [t for t in stock_targets if t['code'] not in existing_codes] # 3. 全局水位线拦截 if global_latest_date >= last_trade_day and not new_codes: logger.info(f"Margin data is already up to date ({global_latest_date}) and no new stocks. Skip.") return {'count': 0, 'failed_codes': [], 'status': 'skipped', 'record_count': 0, 'changed_files': [], 'message': f'Already up to date ({global_latest_date})'} # 4. 增量获取 if global_latest_date >= last_trade_day: logger.info(f"Global date is up to date, but found {len(new_codes)} new stocks. Syncing new stocks only.") sync_targets = new_codes else: logger.info(f"Syncing margin data from {global_latest_date} to {last_trade_day}...") sync_targets = stock_targets all_data = [] success_count = 0 with ThreadPoolExecutor(max_workers=get_thread_config_safe()['margin']) as executor: futures = {executor.submit(get_stock_margin, t['code']): t['code'] for t in sync_targets} for i, future in enumerate(as_completed(futures), 1): res = future.result() if res is not None and not res.empty: code = futures[future] if code in existing_codes: res = res[res['trade_date'] > pd.to_datetime(global_latest_date)] if not res.empty: all_data.append(res) success_count += 1 if i % 500 == 0: logger.info(f"Margin progress: {i}/{len(sync_targets)}, success: {success_count}") total_records = 0 changed_files = [] # 记录变更的文件名 # 5. 按月分表保存 if all_data: new_df = pd.concat(all_data, ignore_index=True) total_records = len(new_df) if not new_df.empty: min_date = new_df['trade_date'].min() max_date = new_df['trade_date'].max() current = min_date.to_period('M') end_period = max_date.to_period('M') while current <= end_period: yr, mo = current.year, current.month month_data = new_df[(new_df['trade_date'].dt.year == yr) & (new_df['trade_date'].dt.month == mo)] if not month_data.empty: filename = f"{yr}-{mo:02d}.parquet" local_path = margin_dir / filename old_month_df = None if local_path.exists(): try: old_month_df = pd.read_parquet(local_path) except Exception: pass if old_month_df is not None: final_month_df = pd.concat([old_month_df, month_data]).drop_duplicates(subset=['code', 'trade_date']) else: final_month_df = month_data final_month_df.to_parquet(local_path) changed_files.append(filename) # 记录变更的文件 logger.info(f"Saved margin data for {filename}") current += 1 logger.info(f"Margin updated: {len(new_df)} new records") return { 'count': success_count, 'failed_codes': [], 'status': 'success', 'record_count': total_records, 'changed_files': changed_files # 返回变更文件列表 } # ==================== 新增:财务指标数据同步 ==================== def get_stock_financial_indicator(code: str) -> Optional[pd.DataFrame]: """获取单只股票财务指标数据""" max_retries = 5 base_delay = 1.0 for attempt in range(max_retries): try: df = ak.stock_financial_analysis_indicator(symbol=code) if df is not None and not df.empty: # 标准化列名 rename_map = { '日期': 'trade_date', '净资产收益率': 'roe', '总资产净利率': 'roa', '销售毛利率': 'gross_margin', '销售净利率': 'net_margin', '资产负债率': 'debt_ratio', '流动比率': 'current_ratio', '速动比率': 'quick_ratio', '存货周转率': 'inventory_turnover', '应收账款周转率': 'receivable_turnover', '总资产周转率': 'total_asset_turnover', } df = df.rename(columns=rename_map) if 'trade_date' in df.columns: df['trade_date'] = pd.to_datetime(df['trade_date']) df['code'] = code cols = ['code', 'trade_date', 'roe', 'roa', 'gross_margin', 'net_margin', 'debt_ratio', 'current_ratio', 'quick_ratio', 'inventory_turnover', 'receivable_turnover', 'total_asset_turnover'] available_cols = [c for c in cols if c in df.columns] return df[available_cols] except Exception: if attempt > 0: delay = base_delay * (2 ** attempt) time.sleep(delay) return None def sync_financial_indicator(targets: List[Dict[str, str]]) -> Dict[str, Any]: """同步财务指标数据(极致增量版),返回详细结果""" logger.info("Syncing financial indicator data...") fi_path = Path("/tmp/data/financial_indicator.parquet") fi_path.parent.mkdir(parents=True, exist_ok=True) old_df = None old_count = 0 global_latest_date = "2000-01-01" existing_codes = set() # 1. 优先读取本地缓存 if fi_path.exists(): try: old_df = pd.read_parquet(fi_path) old_count = len(old_df) global_latest_date = old_df['trade_date'].max().strftime('%Y-%m-%d') existing_codes = set(old_df['code'].unique()) logger.info(f"Local financial cache found, latest date: {global_latest_date}, records: {old_count}") except Exception as e: logger.warning(f"Failed to read local financial cache: {e}") # 2. 本地无缓存,尝试从云端拉取 if old_df is None: repo_id = os.getenv("DATASET_REPO_ID") if repo_id: try: old_file = hf_hub_download(repo_id=repo_id, filename="data/financial_indicator.parquet", repo_type="dataset") old_df = pd.read_parquet(old_file) old_count = len(old_df) global_latest_date = old_df['trade_date'].max().strftime('%Y-%m-%d') existing_codes = set(old_df['code'].unique()) old_df.to_parquet(fi_path) logger.info(f"Downloaded financial from cloud, latest date: {global_latest_date}, records: {old_count}") except Exception: logger.info("No existing financial data found in cloud.") # 3. 财务指标特殊拦截 + 新股检测 stock_targets = [t for t in targets if t['market'] not in ['ETF', 'LOF', 'REITs', '可转债']] new_codes = [t for t in stock_targets if t['code'] not in existing_codes] today = get_beijing_time() is_recent = False if global_latest_date != "2000-01-01": days_diff = (today - pd.to_datetime(global_latest_date)).days if days_diff < 90: is_recent = True if is_recent and not new_codes: logger.info(f"Financial data is recent ({global_latest_date}) and no new stocks. Skip.") return {'count': 0, 'status': 'skipped', 'record_count': old_count, 'new_records': 0, 'message': f'Already up to date ({global_latest_date})'} # 4. 增量获取 if is_recent: logger.info(f"Financial data is recent, but found {len(new_codes)} new stocks. Syncing new stocks only.") sync_targets = new_codes else: logger.info(f"Syncing financial indicators (last update: {global_latest_date})...") sync_targets = stock_targets all_data = [] success_count = 0 with ThreadPoolExecutor(max_workers=get_thread_config_safe()['financial']) as executor: futures = {executor.submit(get_stock_financial_indicator, t['code']): t['code'] for t in sync_targets} for i, future in enumerate(as_completed(futures), 1): res = future.result() if res is not None and not res.empty: code = futures[future] if code in existing_codes: res = res[res['trade_date'] > pd.to_datetime(global_latest_date)] if not res.empty: all_data.append(res) success_count += 1 if i % 500 == 0: logger.info(f"Financial indicator progress: {i}/{len(sync_targets)}, success: {success_count}") # 5. 合并保存 new_records = 0 final_count = old_count if all_data: new_df = pd.concat(all_data, ignore_index=True) new_records = len(new_df) final_df = pd.concat([old_df, new_df]) if old_df is not None else new_df final_df = final_df.drop_duplicates(subset=['code', 'trade_date']) final_count = len(final_df) final_df.to_parquet(fi_path) logger.info(f"Financial updated: {final_count} total records ({new_records} new)") return { 'count': success_count, 'status': 'success' if success_count > 0 or old_count > 0 else 'fail', 'record_count': final_count, 'new_records': new_records, 'previous_count': old_count } # ==================== 新增:股东户数数据同步 ==================== def sync_holder_num() -> Dict[str, Any]: """同步股东户数数据(批量获取),返回详细结果""" logger.info("Syncing holder number data...") hn_path = Path("/tmp/data/holder_num.parquet") old_count = 0 # 读取现有数据 if hn_path.exists(): try: old_df = pd.read_parquet(hn_path) old_count = len(old_df) except Exception: pass try: # 获取最近报告期的股东户数数据 df = ak.stock_zh_a_gdhs(symbol="全部") if df is not None and not df.empty: # 标准化列名 rename_map = { '代码': 'code', '股东户数': 'holder_num', '户均持股数量': 'avg_share', '户均持股金额': 'avg_value', '总股本': 'total_share', '总市值': 'total_value', '日期': 'trade_date', } df = df.rename(columns=rename_map) if 'trade_date' in df.columns: df['trade_date'] = pd.to_datetime(df['trade_date']) new_count = len(df) # 保存到 Parquet hn_path.parent.mkdir(parents=True, exist_ok=True) df.to_parquet(hn_path) # 判断是否有变化 is_changed = new_count != old_count logger.info(f"Holder number data saved: {new_count} records (previous: {old_count}, changed: {is_changed})") return { 'count': new_count, 'status': 'success', 'record_count': new_count, 'previous_count': old_count, 'is_changed': is_changed } except Exception as e: logger.warning(f"Failed to sync holder number data: {e}") return { 'count': 0, 'status': 'fail', 'record_count': old_count, 'previous_count': old_count, 'is_changed': False } # ==================== 新增:分红数据同步 ==================== def get_stock_dividend(code: str) -> Optional[pd.DataFrame]: """获取单只股票分红数据""" max_retries = 5 base_delay = 1.0 for attempt in range(max_retries): try: df = ak.stock_history_dividend(symbol=code) if df is not None and not df.empty: # 标准化列名 rename_map = { '公告日期': 'trade_date', '分红方案': 'dividend_type', '分红金额': 'dividend_amount', '股权登记日': 'record_date', '除权除息日': 'ex_date', '派息日': 'pay_date', } df = df.rename(columns=rename_map) df['trade_date'] = pd.to_datetime(df['trade_date']) df['code'] = code cols = ['code', 'trade_date', 'dividend_type', 'dividend_amount', 'record_date', 'ex_date', 'pay_date'] available_cols = [c for c in cols if c in df.columns] return df[available_cols] except Exception: if attempt > 0: delay = base_delay * (2 ** attempt) time.sleep(delay) return None def sync_dividend(targets: List[Dict[str, str]]) -> Dict[str, Any]: """同步分红数据(极致增量版),返回详细结果""" logger.info("Syncing dividend data...") div_path = Path("/tmp/data/dividend.parquet") div_path.parent.mkdir(parents=True, exist_ok=True) old_df = None old_count = 0 global_latest_date = "2000-01-01" existing_codes = set() if div_path.exists(): try: old_df = pd.read_parquet(div_path) old_count = len(old_df) global_latest_date = old_df['trade_date'].max().strftime('%Y-%m-%d') existing_codes = set(old_df['code'].unique()) except Exception: pass if old_df is None: repo_id = os.getenv("DATASET_REPO_ID") if repo_id: try: old_file = hf_hub_download(repo_id=repo_id, filename="data/dividend.parquet", repo_type="dataset") old_df = pd.read_parquet(old_file) old_count = len(old_df) global_latest_date = old_df['trade_date'].max().strftime('%Y-%m-%d') existing_codes = set(old_df['code'].unique()) old_df.to_parquet(div_path) except Exception: pass # 90天检查一次 + 新股检测 stock_targets = [t for t in targets if t['market'] not in ['ETF', 'LOF', 'REITs', '可转债']] new_codes = [t for t in stock_targets if t['code'] not in existing_codes] today = get_beijing_time() is_recent = False if global_latest_date != "2000-01-01": if (today - pd.to_datetime(global_latest_date)).days < 90: is_recent = True if is_recent and not new_codes: logger.info(f"Dividend data is recent and no new stocks. Skip.") return {'count': 0, 'status': 'skipped', 'record_count': old_count, 'new_records': 0, 'message': f'Already up to date ({global_latest_date})'} # 4. 增量获取 if is_recent: logger.info(f"Dividend data is recent, but found {len(new_codes)} new stocks. Syncing new stocks only.") sync_targets = new_codes else: logger.info(f"Syncing dividend data (last update: {global_latest_date})...") sync_targets = stock_targets all_data = [] success_count = 0 with ThreadPoolExecutor(max_workers=get_thread_config_safe()['dividend']) as executor: futures = {executor.submit(get_stock_dividend, t['code']): t['code'] for t in sync_targets} for i, future in enumerate(as_completed(futures), 1): res = future.result() if res is not None and not res.empty: code = futures[future] if code in existing_codes: res = res[res['trade_date'] > pd.to_datetime(global_latest_date)] if not res.empty: all_data.append(res) success_count += 1 if i % 500 == 0: logger.info(f"Dividend progress: {i}/{len(sync_targets)}, success: {success_count}") new_records = 0 final_count = old_count if all_data: new_df = pd.concat(all_data, ignore_index=True) new_records = len(new_df) final_df = pd.concat([old_df, new_df]) if old_df is not None else new_df final_df = final_df.drop_duplicates(subset=['code', 'trade_date', 'dividend_type']) final_count = len(final_df) final_df.to_parquet(div_path) logger.info(f"Dividend updated: {final_count} total records ({new_records} new)") return { 'count': success_count, 'status': 'success' if success_count > 0 or old_count > 0 else 'fail', 'record_count': final_count, 'new_records': new_records, 'previous_count': old_count } # ==================== 新增:十大股东数据同步 ==================== def sync_top_holders() -> Dict[str, Any]: """同步十大股东数据(批量获取),返回详细结果""" logger.info("Syncing top holders data...") path = Path("/tmp/data/top_holders.parquet") old_count = 0 # 读取现有数据 if path.exists(): try: old_df = pd.read_parquet(path) old_count = len(old_df) except Exception: pass try: today = get_beijing_time() df = ak.stock_gdfx_holding_analyse_em(date=today.strftime('%Y%m%d')) if df is not None and not df.empty: rename_map = { '股票代码': 'code', '公告日期': 'trade_date', '股东名称': 'holder_name', '持股数量': 'hold_num', '持股比例': 'hold_ratio', '持股变动': 'hold_change', } df = df.rename(columns=rename_map) df['trade_date'] = pd.to_datetime(df['trade_date']) new_count = len(df) path.parent.mkdir(parents=True, exist_ok=True) df.to_parquet(path) # 判断是否有变化 is_changed = new_count != old_count logger.info(f"Top holders data saved: {new_count} records (previous: {old_count}, changed: {is_changed})") return { 'count': new_count, 'status': 'success', 'record_count': new_count, 'previous_count': old_count, 'is_changed': is_changed } except Exception as e: logger.warning(f"Failed to sync top holders: {e}") return { 'count': 0, 'status': 'fail', 'record_count': old_count, 'previous_count': old_count, 'is_changed': False } # ==================== 新增:限售解禁数据同步 ==================== def sync_restricted_unlock() -> Dict[str, Any]: """同步限售解禁数据(批量获取),返回详细结果""" logger.info("Syncing restricted unlock data...") path = Path("/tmp/data/restricted_unlock.parquet") path.parent.mkdir(parents=True, exist_ok=True) old_count = 0 # 读取现有数据 if path.exists(): try: old_df = pd.read_parquet(path) old_count = len(old_df) except Exception: pass try: # 获取全市场限售解禁数据 df = ak.stock_restricted_shares(stock="all") if df is not None and not df.empty: rename_map = { '代码': 'code', '名称': 'name', '解禁日期': 'unlock_date', '解禁数量': 'unlock_num', '解禁股本占总股本比例': 'unlock_ratio', } df = df.rename(columns=rename_map) df['unlock_date'] = pd.to_datetime(df['unlock_date']) df['trade_date'] = get_beijing_time() # 记录同步日期 new_count = len(df) df.to_parquet(path) # 判断是否有变化 is_changed = new_count != old_count logger.info(f"Restricted unlock data saved: {new_count} records (previous: {old_count}, changed: {is_changed})") return { 'count': new_count, 'status': 'success', 'record_count': new_count, 'previous_count': old_count, 'is_changed': is_changed } except Exception as e: logger.warning(f"Failed to sync restricted unlock: {e}") return { 'count': 0, 'status': 'fail', 'record_count': old_count, 'previous_count': old_count, 'is_changed': False } def main() -> int: """ 主函数 - 执行完整的数据同步流程(每类指标完成后即时上传,并记录状态) Returns: int: 退出码,0 表示成功,1 表示失败 """ logger.info("=" * 60) logger.info("Stock Data Sync Started") logger.info("=" * 60) try: # 初始化线程配置 init_thread_config() db = get_db() db.init_db() # 获取状态管理器 status = get_sync_status() # 获取最后交易日 last_day = get_last_trading_day() logger.info(f"Last trading day: {last_day}") # 1. 列表同步 logger.info("-" * 40) logger.info("Syncing stock list...") target_list = get_stock_list() list_parquet = Path("/tmp/data/stock_list.parquet") list_parquet.parent.mkdir(parents=True, exist_ok=True) target_list.to_parquet(list_parquet) db.upload_indicator("Stock List", list_parquet, "data") status.update('stock_list', last_trade_date=last_day, record_count=len(target_list), status='success') # 2. 行情同步 logger.info("-" * 40) logger.info("Syncing daily data...") daily_result = sync_stock_daily(target_list.to_dict('records'), last_day) # 智能上传日K行情数据(根据变更文件数量选择策略) parquet_dir = Path("/tmp/data/parquet") if parquet_dir.exists() and any(parquet_dir.glob("*.parquet")): db.upload_indicator_smart("Daily Data", parquet_dir, "data/parquet", daily_result.get('changed_files', [])) status.update('daily', last_trade_date=last_day, record_count=daily_result.get('record_count', 0), status=daily_result.get('status', 'unknown'), failed_codes=daily_result.get('failed_codes', []), success_rate=daily_result.get('success_rate', 0), message=daily_result.get('message', '')) # 3. 指数同步 logger.info("-" * 40) logger.info("Syncing index data...") idx_df = get_index_daily('000300') idx_count = 0 if idx_df is not None: idx_path = Path("/tmp/data/parquet/index_000300.parquet") idx_path.parent.mkdir(parents=True, exist_ok=True) idx_df.to_parquet(idx_path) db.upload_indicator("Index Data", idx_path, "data/parquet") idx_count = len(idx_df) status.update('index', last_trade_date=last_day, record_count=idx_count, status='success' if idx_count > 0 else 'fail') # 4. 资金流向同步 logger.info("-" * 40) logger.info("Syncing fund flow...") fund_flow_result = sync_fund_flow(target_list.to_dict('records'), last_day) # 智能上传资金流向数据(根据变更文件数量选择策略) fund_flow_dir = Path("/tmp/data/fund_flow") if fund_flow_dir.exists() and any(fund_flow_dir.glob("*.parquet")): db.upload_indicator_smart("Fund Flow", fund_flow_dir, "data/fund_flow", fund_flow_result.get('changed_files', [])) status.update('fund_flow', last_trade_date=last_day, record_count=fund_flow_result.get('record_count', 0), status=fund_flow_result.get('status', 'unknown'), failed_codes=fund_flow_result.get('failed_codes', []), success_rate=fund_flow_result.get('success_rate', 0), message=fund_flow_result.get('message', '')) # 5. 估值指标同步 logger.info("-" * 40) logger.info("Syncing valuation...") valuation_result = sync_valuation(target_list.to_dict('records'), last_day) # 智能上传估值指标数据(根据变更文件数量选择策略) valuation_dir = Path("/tmp/data/valuation") if valuation_dir.exists() and any(valuation_dir.glob("*.parquet")): db.upload_indicator_smart("Valuation", valuation_dir, "data/valuation", valuation_result.get('changed_files', [])) status.update('valuation', last_trade_date=last_day, record_count=valuation_result.get('record_count', 0), status=valuation_result.get('status', 'success')) # 6. 融资融券同步 logger.info("-" * 40) logger.info("Syncing margin...") margin_result = sync_margin(target_list.to_dict('records'), last_day) # 智能上传融资融券数据(根据变更文件数量选择策略) margin_dir = Path("/tmp/data/margin") if margin_dir.exists() and any(margin_dir.glob("*.parquet")): db.upload_indicator_smart("Margin", margin_dir, "data/margin", margin_result.get('changed_files', [])) status.update('margin', last_trade_date=last_day, record_count=margin_result.get('record_count', 0), status=margin_result.get('status', 'success')) # 7. 财务指标同步 logger.info("-" * 40) logger.info("Syncing financial indicator...") financial_result = sync_financial_indicator(target_list.to_dict('records')) fi_path = Path("/tmp/data/financial_indicator.parquet") if fi_path.exists() and financial_result.get('new_records', 0) > 0: upload_success = db.upload_indicator("Financial Indicator", fi_path, "data") financial_status = 'success' if upload_success else 'upload_fail' else: financial_status = financial_result.get('status', 'skipped') status.update('financial', last_trade_date=last_day, record_count=financial_result.get('record_count', 0), status=financial_status, new_records=financial_result.get('new_records', 0)) # 8. 股东户数同步 logger.info("-" * 40) logger.info("Syncing holder num...") holder_result = sync_holder_num() holder_path = Path("/tmp/data/holder_num.parquet") if holder_result.get('is_changed', False) and holder_path.exists(): upload_success = db.upload_indicator("Holder Num", holder_path, "data") holder_status = 'success' if upload_success else 'upload_fail' else: holder_status = 'skipped' if not holder_result.get('is_changed', False) else holder_result.get('status', 'fail') status.update('holder_num', last_trade_date=last_day, record_count=holder_result.get('record_count', 0), status=holder_status, is_changed=holder_result.get('is_changed', False)) # 9. 分红数据同步 logger.info("-" * 40) logger.info("Syncing dividend...") dividend_result = sync_dividend(target_list.to_dict('records')) div_path = Path("/tmp/data/dividend.parquet") if div_path.exists() and dividend_result.get('new_records', 0) > 0: upload_success = db.upload_indicator("Dividend", div_path, "data") dividend_status = 'success' if upload_success else 'upload_fail' else: dividend_status = dividend_result.get('status', 'skipped') status.update('dividend', last_trade_date=last_day, record_count=dividend_result.get('record_count', 0), status=dividend_status, new_records=dividend_result.get('new_records', 0)) # 10. 十大股东同步 logger.info("-" * 40) logger.info("Syncing top holders...") top_holders_result = sync_top_holders() top_holders_path = Path("/tmp/data/top_holders.parquet") if top_holders_result.get('is_changed', False) and top_holders_path.exists(): upload_success = db.upload_indicator("Top Holders", top_holders_path, "data") top_holders_status = 'success' if upload_success else 'upload_fail' else: top_holders_status = 'skipped' if not top_holders_result.get('is_changed', False) else top_holders_result.get('status', 'fail') status.update('top_holders', last_trade_date=last_day, record_count=top_holders_result.get('record_count', 0), status=top_holders_status, is_changed=top_holders_result.get('is_changed', False)) # 11. 限售解禁同步 logger.info("-" * 40) logger.info("Syncing restricted unlock...") restricted_result = sync_restricted_unlock() restricted_path = Path("/tmp/data/restricted_unlock.parquet") if restricted_result.get('is_changed', False) and restricted_path.exists(): upload_success = db.upload_indicator("Restricted Unlock", restricted_path, "data") restricted_status = 'success' if upload_success else 'upload_fail' else: restricted_status = 'skipped' if not restricted_result.get('is_changed', False) else restricted_result.get('status', 'fail') status.update('restricted_unlock', last_trade_date=last_day, record_count=restricted_result.get('record_count', 0), status=restricted_status, is_changed=restricted_result.get('is_changed', False)) # 12. 上传状态文件 logger.info("-" * 40) logger.info("Uploading sync status...") status_path = Path("/tmp/data/sync_status.json") status_upload_success = db.upload_indicator("Sync Status", status_path, "data") if not status_upload_success: logger.warning("Failed to upload sync status file") logger.info("=" * 60) logger.info("Sync Completed Successfully!") summary = (f"Daily={daily_result.get('count', 0)}, FundFlow={fund_flow_result.get('count', 0)}, " f"Valuation={valuation_result.get('count', 0)}, Margin={margin_result.get('count', 0)}, Financial={financial_result.get('count', 0)}, " f"Holder={holder_result.get('count', 0)}, Dividend={dividend_result.get('count', 0)}, " f"TopHolders={top_holders_result.get('count', 0)}, Restricted={restricted_result.get('count', 0)}") logger.info(f"Summary: {summary}") logger.info("=" * 60) return 0 except Exception as e: logger.error(f"Sync failed with error: {e}") return 1 if __name__ == "__main__": sys.exit(main())