Spaces:
Paused
Paused
| """ | |
| 数据同步脚本 - Sync Space 专用版 | |
| 从 AkShare 抓取数据并同步到 Hugging Face Dataset | |
| """ | |
| import os | |
| import sys | |
| import logging | |
| import time | |
| import threading | |
| import gc | |
| from datetime import datetime, timedelta | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from typing import List, Optional, Dict, Any | |
| from pathlib import Path | |
| import pandas as pd | |
| import akshare as ak | |
| from huggingface_hub import hf_hub_download, upload_file, list_repo_files | |
| # Tushare 适配器(优先使用) | |
| from app.tushare_adapter import ( | |
| get_stock_list_tushare, | |
| get_stock_daily_tushare, | |
| get_dividend_tushare, | |
| TUSHARE_AVAILABLE | |
| ) | |
| # 混合数据适配器(yfinance + efinance,作为回退) | |
| from app.hybrid_adapter import ( | |
| get_stock_daily_yfinance, | |
| get_index_daily_yfinance, | |
| get_fund_flow_efinance, | |
| YFINANCE_AVAILABLE | |
| ) | |
| # 添加当前目录到路径 | |
| sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) | |
| from app.database import get_db | |
| from app.database_user import get_beijing_time | |
| from app.sync_status import get_sync_status | |
| from app.stock_list_cache import get_cached_stock_list, save_stock_list_cache | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # 配置 | |
| YEARS_OF_DATA = 10 | |
| def _safe_int_env(var_name: str, default: int) -> int: | |
| """安全地读取环境变量并转换为整数""" | |
| try: | |
| value = os.getenv(var_name) | |
| if value is None: | |
| return default | |
| return int(value) | |
| except (ValueError, TypeError): | |
| logger.warning(f"Invalid value for {var_name}, using default: {default}") | |
| return default | |
| # 动态线程数配置(延迟计算,避免导入时触发 multiprocessing) | |
| def get_thread_config(): | |
| """获取线程池配置(延迟计算)""" | |
| import multiprocessing | |
| cpu_count = multiprocessing.cpu_count() | |
| # 分层并发策略(降低并发,避免触发服务端限流) | |
| config = { | |
| 'daily': _safe_int_env("MAX_WORKERS_DAILY", min(4, cpu_count)), | |
| 'fund': _safe_int_env("MAX_WORKERS_FUND", min(3, cpu_count)), | |
| 'valuation': _safe_int_env("MAX_WORKERS_VALUATION", min(3, cpu_count)), | |
| 'margin': _safe_int_env("MAX_WORKERS_MARGIN", min(3, cpu_count)), | |
| 'financial': _safe_int_env("MAX_WORKERS_FINANCIAL", min(2, cpu_count)), | |
| 'dividend': _safe_int_env("MAX_WORKERS_DIVIDEND", min(2, cpu_count)), | |
| } | |
| # 向后兼容 | |
| legacy = _safe_int_env("MAX_WORKERS", 0) | |
| if legacy > 0: | |
| config = {k: legacy for k in config} | |
| return cpu_count, config | |
| # 延迟初始化线程配置(在 main 中调用) | |
| _CPU_COUNT = None | |
| _THREAD_CONFIG = None | |
| _thread_config_lock = threading.Lock() | |
| def get_thread_config_safe(): | |
| """安全获取线程配置(自动初始化,线程安全)""" | |
| global _CPU_COUNT, _THREAD_CONFIG | |
| if _THREAD_CONFIG is None: | |
| with _thread_config_lock: | |
| # 双重检查锁定模式 | |
| if _THREAD_CONFIG is None: | |
| _CPU_COUNT, _THREAD_CONFIG = get_thread_config() | |
| logger.info(f"Thread pool config: CPU={_CPU_COUNT}, " | |
| f"Daily={_THREAD_CONFIG['daily']}, Fund={_THREAD_CONFIG['fund']}, " | |
| f"Valuation={_THREAD_CONFIG['valuation']}, Margin={_THREAD_CONFIG['margin']}, " | |
| f"Financial={_THREAD_CONFIG['financial']}, Dividend={_THREAD_CONFIG['dividend']}") | |
| return _THREAD_CONFIG | |
| def init_thread_config(): | |
| """初始化线程配置(在 main 中调用)""" | |
| get_thread_config_safe() | |
| def get_stock_list() -> pd.DataFrame: | |
| """获取全市场标的列表(带缓存机制)""" | |
| # 1. 尝试使用缓存 | |
| cached_df = get_cached_stock_list() | |
| if cached_df is not None: | |
| return cached_df | |
| # 2. 缓存无效,重新获取 | |
| logger.info("Fetching all-market target list...") | |
| all_lists = [] | |
| # A股列表获取(优先使用Tushare,失败则回退到AkShare) | |
| max_retries = 5 | |
| base_delay = 2.0 | |
| # 尝试使用Tushare获取A股列表(更稳定,字段完整) | |
| if TUSHARE_AVAILABLE: | |
| try: | |
| df_a = get_stock_list_tushare() | |
| if df_a is not None and len(df_a) > 0: | |
| all_lists.append(df_a) | |
| logger.info(f"A-stock list fetched from Tushare: {len(df_a)} stocks") | |
| else: | |
| raise Exception("Tushare returned empty data") | |
| except Exception as e: | |
| logger.warning(f"Tushare get_stock_list failed: {e}, falling back to AkShare") | |
| # 如果Tushare失败或不可用,使用AkShare | |
| if not all_lists: | |
| for attempt in range(max_retries): | |
| try: | |
| # 使用 stock_info_a_code_name() 替代 stock_zh_a_spot_em(),更稳定 | |
| df_a = ak.stock_info_a_code_name() | |
| df_a.columns = ['code', 'name'] | |
| df_a['market'] = df_a['code'].apply(lambda x: '主板' if x.startswith(('60', '00')) else ('创业板' if x.startswith('30') else ('科创板' if x.startswith('68') else ('北交所' if x.startswith(('8', '4', '920')) else '其他')))) | |
| all_lists.append(df_a) | |
| logger.info(f"A-stock list fetched from AkShare: {len(df_a)} stocks") | |
| break # 成功则退出重试循环 | |
| except Exception as e: | |
| if attempt == max_retries - 1: | |
| logger.error(f"Failed to fetch A-stock list after {max_retries} attempts: {e}") | |
| else: | |
| delay = base_delay * (2 ** attempt) | |
| logger.warning(f"Attempt {attempt + 1} failed, retrying in {delay}s... Error: {e}") | |
| time.sleep(delay) | |
| # ETF (增加容错) | |
| try: | |
| df_etf = ak.fund_etf_spot_em() | |
| # 检查实际列名 | |
| code_cols = ['代码', 'code', 'fund_code', 'ETF代码'] | |
| name_cols = ['名称', 'name', 'fund_name', 'ETF名称'] | |
| c_code = None | |
| c_name = None | |
| for col in code_cols: | |
| if col in df_etf.columns: | |
| c_code = col | |
| break | |
| for col in name_cols: | |
| if col in df_etf.columns: | |
| c_name = col | |
| break | |
| if c_code and c_name: | |
| df_etf = df_etf[[c_code, c_name]] | |
| df_etf.columns = ['code', 'name'] | |
| df_etf['market'] = 'ETF' | |
| all_lists.append(df_etf) | |
| logger.info(f"ETF list fetched: {len(df_etf)} funds") | |
| else: | |
| logger.warning(f"Could not find code/name columns in ETF data. Available columns: {df_etf.columns.tolist()}") | |
| except Exception as e: | |
| logger.warning(f"ETF list fetch failed: {e}") | |
| # LOF | |
| try: | |
| df_lof = ak.fund_lof_spot_em() | |
| # 检查实际列名 | |
| code_cols = ['代码', 'code', 'fund_code', 'LOF代码'] | |
| name_cols = ['名称', 'name', 'fund_name', 'LOF名称'] | |
| c_code = None | |
| c_name = None | |
| for col in code_cols: | |
| if col in df_lof.columns: | |
| c_code = col | |
| break | |
| for col in name_cols: | |
| if col in df_lof.columns: | |
| c_name = col | |
| break | |
| if c_code and c_name: | |
| df_lof = df_lof[[c_code, c_name]] | |
| df_lof.columns = ['code', 'name'] | |
| df_lof['market'] = 'LOF' | |
| all_lists.append(df_lof) | |
| logger.info(f"LOF list fetched: {len(df_lof)} funds") | |
| else: | |
| logger.warning(f"Could not find code/name columns in LOF data. Available columns: {df_lof.columns.tolist()}") | |
| except Exception as e: | |
| logger.warning(f"LOF list fetch failed: {e}") | |
| # REITs | |
| try: | |
| df_reits = ak.reits_realtime_em() | |
| # 检查实际列名 | |
| code_cols = ['代码', 'code', 'REITs代码'] | |
| name_cols = ['名称', 'name', 'REITs名称'] | |
| c_code = None | |
| c_name = None | |
| for col in code_cols: | |
| if col in df_reits.columns: | |
| c_code = col | |
| break | |
| for col in name_cols: | |
| if col in df_reits.columns: | |
| c_name = col | |
| break | |
| if c_code and c_name: | |
| df_reits = df_reits[[c_code, c_name]] | |
| df_reits.columns = ['code', 'name'] | |
| df_reits['market'] = 'REITs' | |
| all_lists.append(df_reits) | |
| logger.info(f"REITs list fetched: {len(df_reits)} products") | |
| else: | |
| logger.warning(f"Could not find code/name columns in REITs data. Available columns: {df_reits.columns.tolist()}") | |
| except Exception as e: | |
| logger.warning(f"REITs list fetch failed: {e}") | |
| # 可转债 | |
| try: | |
| df_cb = ak.bond_zh_hs_cov_spot() | |
| # 检查实际列名 | |
| logger.info(f"Convertible bond columns: {df_cb.columns.tolist()}") | |
| # 尝试找到正确的代码列名 | |
| code_cols = ['代码', 'symbol', 'bond_code', '代码', '转债代码', '代码'] | |
| name_cols = ['名称', 'name', 'bond_name', '转债名称', '名称'] | |
| c_code = None | |
| c_name = None | |
| for col in code_cols: | |
| if col in df_cb.columns: | |
| c_code = col | |
| break | |
| for col in name_cols: | |
| if col in df_cb.columns: | |
| c_name = col | |
| break | |
| if c_code and c_name: | |
| # 过滤掉未上市可转债(成交额=0 或 最新价=0) | |
| # 检查是否有成交额列 | |
| amount_col = None | |
| for col in ['amount', '成交额', 'Amount']: | |
| if col in df_cb.columns: | |
| amount_col = col | |
| break | |
| if amount_col: | |
| before_count = len(df_cb) | |
| df_cb = df_cb[df_cb[amount_col] > 0] | |
| filtered_count = before_count - len(df_cb) | |
| if filtered_count > 0: | |
| logger.info(f"Filtered {filtered_count} unlisted convertible bonds (amount=0)") | |
| df_cb = df_cb[[c_code, c_name]] | |
| df_cb.columns = ['code', 'name'] | |
| df_cb['market'] = '可转债' | |
| all_lists.append(df_cb) | |
| logger.info(f"Convertible bond list fetched: {len(df_cb)} bonds") | |
| else: | |
| logger.warning(f"Could not find code/name columns in convertible bond data. Available columns: {df_cb.columns.tolist()}") | |
| except Exception as e: | |
| logger.warning(f"Convertible bond list fetch failed: {e}") | |
| if not all_lists: | |
| db = get_db() | |
| return db.conn.execute("SELECT code, name, market FROM stock_list").df() | |
| df = pd.concat(all_lists).drop_duplicates(subset=['code']) | |
| df['list_date'] = None | |
| # 3. 保存到缓存 | |
| save_stock_list_cache(df) | |
| return df | |
| def get_target_daily(code: str, start_date: str, market: str) -> Optional[pd.DataFrame]: | |
| """抓取单只标的数据""" | |
| max_retries = 5 | |
| base_delay = 1.0 # 寂础延迟秒 | |
| for attempt in range(max_retries): | |
| try: | |
| # 指数退避:每次重试增加延迟 | |
| if attempt > 0: | |
| delay = base_delay * (2 ** attempt) # 1s, 2s, 4s, 8s | |
| time.sleep(delay) | |
| end_date = get_beijing_time().strftime('%Y%m%d') | |
| fetch_start = start_date.replace('-', '') | |
| df = None | |
| if market == 'INDEX': | |
| # 指数:优先使用 yfinance,失败则回退 AkShare | |
| if YFINANCE_AVAILABLE: | |
| df = get_index_daily_yfinance(code, fetch_start, end_date) | |
| if df is not None: | |
| logger.debug(f"Got index data from yfinance for {code}") | |
| if df is None: | |
| df = ak.stock_zh_index_daily_em(symbol=f"sh{code}" if code.startswith('000') else f"sz{code}") | |
| elif market == 'ETF': | |
| df = ak.fund_etf_hist_em(symbol=code, period="daily", start_date=fetch_start, end_date=end_date, adjust="hfq") | |
| elif market == 'LOF': | |
| df = ak.fund_lof_hist_em(symbol=code, period="daily", start_date=fetch_start, end_date=end_date, adjust="hfq") | |
| elif market == '可转债': | |
| # bond_zh_hs_cov_daily 需要带交易所前缀的代码(如 sh110048, sz123015) | |
| # 北交所可转债(bj开头)不支持 | |
| if code.startswith('bj'): | |
| return None # 北交所可转债 API 不支持 | |
| df = ak.bond_zh_hs_cov_daily(symbol=code) | |
| elif market == 'REITs': | |
| df = ak.reits_hist_em(symbol=code) | |
| else: | |
| # A股个股:优先使用 Tushare,失败则回退 yfinance/AkShare | |
| if TUSHARE_AVAILABLE: | |
| df = get_stock_daily_tushare(code, fetch_start, end_date, adj='qfq') | |
| if df is not None: | |
| logger.debug(f"Got data from Tushare for {code}") | |
| if df is None and YFINANCE_AVAILABLE: | |
| df = get_stock_daily_yfinance(code, fetch_start, end_date) | |
| if df is not None: | |
| logger.debug(f"Got data from yfinance for {code}") | |
| if df is None: | |
| logger.debug(f"Tushare/yfinance failed, falling back to AkShare for {code}") | |
| df = ak.stock_zh_a_hist(symbol=code, period="daily", start_date=fetch_start, end_date=end_date, adjust="hfq") | |
| if df is not None and not df.empty: | |
| # 标准化列名 | |
| rename_map = { | |
| '日期': 'trade_date', 'date': 'trade_date', 'Date': 'trade_date', | |
| '开盘': 'open', '今开': 'open', 'Open': 'open', | |
| '最高': 'high', 'High': 'high', | |
| '最低': 'low', 'Low': 'low', | |
| '收盘': 'close', '最新价': 'close', 'Close': 'close', | |
| '成交量': 'volume', 'Volume': 'volume', | |
| '成交额': 'amount', 'Amount': 'amount', | |
| '涨跌幅': 'pct_chg', | |
| '换手率': 'turnover_rate', '换手': 'turnover_rate' | |
| } | |
| df = df.rename(columns=rename_map) | |
| if 'trade_date' not in df.columns: | |
| df = df.reset_index().rename(columns={'index': 'trade_date', 'date': 'trade_date'}) | |
| df['trade_date'] = pd.to_datetime(df['trade_date']) | |
| df = df[df['trade_date'] >= pd.to_datetime(start_date)] | |
| if 'amount' not in df.columns: df['amount'] = 0 | |
| if 'pct_chg' not in df.columns: df['pct_chg'] = df['close'].pct_change() * 100 | |
| if 'turnover_rate' not in df.columns: df['turnover_rate'] = 0 | |
| df['code'] = code | |
| return df[['code', 'trade_date', 'open', 'high', 'low', 'close', 'volume', 'amount', 'pct_chg', 'turnover_rate']] | |
| except Exception as e: | |
| if attempt == max_retries - 1: | |
| logger.warning(f"Failed to fetch {code} ({market}): {str(e)}") | |
| time.sleep(1) | |
| return None | |
| def check_today_data_available() -> bool: | |
| """ | |
| 探测当日数据是否已更新可用 | |
| 通过尝试获取一只活跃股票(平安银行 000001)的当日数据来判断 | |
| Returns: | |
| True: 当日数据已可用 | |
| False: 当日数据尚未更新 | |
| """ | |
| try: | |
| today = get_beijing_time().strftime('%Y%m%d') | |
| # 优先使用 Tushare 探测 | |
| if TUSHARE_AVAILABLE: | |
| try: | |
| import tushare as ts | |
| ts.set_token(TUSHARE_TOKENS[0]) | |
| pro = ts.pro_api() | |
| df = pro.daily(ts_code='000001.SZ', trade_date=today) | |
| if df is not None and not df.empty: | |
| logger.debug(f"Tushare check: today data available") | |
| return True | |
| except Exception: | |
| pass | |
| # 回退到 AkShare 探测 | |
| try: | |
| df = ak.stock_zh_a_spot_em() | |
| if df is not None and not df.empty: | |
| # 检查是否有今日数据 | |
| logger.debug(f"AkShare check: today data available") | |
| return True | |
| except Exception: | |
| pass | |
| except Exception as e: | |
| logger.debug(f"Check today data failed: {e}") | |
| return False | |
| def get_last_trading_day() -> str: | |
| """获取最近一个交易日(通过探测数据可用性) | |
| 逻辑: | |
| 1. 先尝试获取当日数据(探测 000001.SZ) | |
| 2. 如果能获取到,说明当日数据已更新,返回今日 | |
| 3. 如果获取不到,说明当日数据未更新,返回上一个交易日 | |
| """ | |
| now = get_beijing_time() | |
| today = now.date() | |
| today_str = today.strftime('%Y-%m-%d') | |
| # 探测当日数据是否可用 | |
| logger.info(f"Checking if today ({today_str}) data is available...") | |
| if check_today_data_available(): | |
| logger.info(f"Today data is available, using today: {today_str}") | |
| return today_str | |
| else: | |
| # 获取上一个交易日 | |
| try: | |
| # 使用交易日历获取上一个交易日 | |
| df = ak.tool_trade_date_hist_sina() | |
| if df is not None and not df.empty: | |
| df['trade_date'] = pd.to_datetime(df['trade_date']).dt.date | |
| prev_trading_days = df[df['trade_date'] < today]['trade_date'] | |
| if not prev_trading_days.empty: | |
| last_day = prev_trading_days.iloc[-1] | |
| logger.info(f"Today data not available yet, using previous trading day: {last_day}") | |
| return last_day.strftime('%Y-%m-%d') | |
| except Exception as e: | |
| logger.warning(f"Failed to get previous trading day: {e}") | |
| # 回退:昨天 | |
| yesterday = (today - timedelta(days=1)).strftime('%Y-%m-%d') | |
| logger.info(f"Using yesterday as fallback: {yesterday}") | |
| return yesterday | |
| # 备用:使用指数行情获取 | |
| try: | |
| df = ak.stock_zh_index_daily_em(symbol="sh000300") | |
| if df is not None and not df.empty: | |
| date_col = 'date' if 'date' in df.columns else ('日期' if '日期' in df.columns else None) | |
| if date_col: | |
| last_date = pd.to_datetime(df[date_col].iloc[-1]).date() | |
| # 同样检查时间条件 | |
| if last_date == today and current_hour < 20: | |
| # 返回前一天的数据 | |
| return pd.to_datetime(df[date_col].iloc[-2]).strftime('%Y-%m-%d') | |
| return last_date.strftime('%Y-%m-%d') | |
| except Exception: pass | |
| # 最终回退:按工作日估算 | |
| d = get_beijing_time() | |
| # 如果当前时间 < 20:00,从昨天开始查找 | |
| if current_hour < 20: | |
| d -= timedelta(days=1) | |
| while d.weekday() >= 5: | |
| d -= timedelta(days=1) | |
| return d.strftime('%Y-%m-%d') | |
| def get_index_daily(code: str) -> Optional[pd.DataFrame]: | |
| """抓取指数日线""" | |
| try: | |
| symbol = f"sh{code}" if code.startswith('000') else f"sz{code}" | |
| df = ak.stock_zh_index_daily_em(symbol=symbol) | |
| if df is None or df.empty: | |
| return None | |
| rename_map = { | |
| 'date': 'trade_date', '日期': 'trade_date', | |
| 'open': 'open', '开盘': 'open', | |
| 'high': 'high', '最高': 'high', | |
| 'low': 'low', '最低': 'low', | |
| 'close': 'close', '收盘': 'close', | |
| 'volume': 'volume', '成交量': 'volume', | |
| 'amount': 'amount', '成交额': 'amount', | |
| 'pct_chg': 'pct_chg', '涨跌幅': 'pct_chg' | |
| } | |
| df = df.rename(columns=rename_map) | |
| if 'trade_date' not in df.columns: | |
| return None | |
| df['trade_date'] = pd.to_datetime(df['trade_date']) | |
| if 'amount' not in df.columns: | |
| df['amount'] = 0 | |
| if 'pct_chg' not in df.columns: | |
| df['pct_chg'] = df['close'].pct_change() * 100 | |
| if 'volume' not in df.columns: | |
| df['volume'] = 0 | |
| df['turnover_rate'] = 0 | |
| df['code'] = code | |
| return df[['code', 'trade_date', 'open', 'high', 'low', 'close', 'volume', 'amount', 'pct_chg', 'turnover_rate']] | |
| except Exception as e: | |
| logger.warning(f"Failed to fetch index {code}: {e}") | |
| return None | |
| def sync_stock_daily(targets: List[Dict[str, str]], last_trade_day: str) -> Dict[str, Any]: | |
| """增量同步逻辑,返回详细结果(采用全局水位线机制)""" | |
| logger.info("Syncing daily data...") | |
| # 1. 扫描本地 parquet 文件获取全局最新日期(类似其他指标) | |
| parquet_dir = Path("/tmp/data/parquet") | |
| parquet_dir.mkdir(parents=True, exist_ok=True) | |
| global_latest_date = "2000-01-01" | |
| existing_codes = set() | |
| for f in parquet_dir.glob("*.parquet"): | |
| if f.name.startswith('index_'): # 跳过指数文件 | |
| continue | |
| try: | |
| df = pd.read_parquet(f) | |
| if not df.empty and 'trade_date' in df.columns: | |
| max_date = df['trade_date'].max() | |
| if isinstance(max_date, pd.Timestamp): | |
| max_date = max_date.strftime('%Y-%m-%d') | |
| if max_date > global_latest_date: | |
| global_latest_date = max_date | |
| existing_codes.update(df['code'].unique()) | |
| except Exception: | |
| pass | |
| # 2. 如果本地没有数据,尝试从云端下载最近3个月作为基准 | |
| if global_latest_date == "2000-01-01": | |
| repo_id = os.getenv("DATASET_REPO_ID") | |
| if repo_id: | |
| try: | |
| files = list_repo_files(repo_id=repo_id, repo_type="dataset") | |
| parquet_files = sorted([f for f in files if f.startswith("data/parquet/") and f.endswith(".parquet")]) | |
| # 下载最近3个月的数据作为基准 | |
| for pf in parquet_files[-3:]: | |
| try: | |
| local_file = hf_hub_download(repo_id=repo_id, filename=pf, repo_type="dataset") | |
| df = pd.read_parquet(local_file) | |
| if not df.empty and 'trade_date' in df.columns: | |
| max_date = df['trade_date'].max() | |
| if isinstance(max_date, pd.Timestamp): | |
| max_date = max_date.strftime('%Y-%m-%d') | |
| if max_date > global_latest_date: | |
| global_latest_date = max_date | |
| existing_codes.update(df['code'].unique()) | |
| except Exception: | |
| pass | |
| logger.info(f"Downloaded daily data from cloud, latest date: {global_latest_date}") | |
| except Exception as e: | |
| logger.info(f"No existing daily data in cloud: {e}") | |
| # 3. 区分新股和存量股票 | |
| new_codes = [t for t in targets if t['code'] not in existing_codes] | |
| # 4. 全局水位线拦截 | |
| if global_latest_date >= last_trade_day and not new_codes: | |
| logger.info(f"Daily data is already up to date ({global_latest_date}) and no new stocks. Skip.") | |
| return {'count': 0, 'failed_codes': [], 'status': 'skipped', 'message': f'Already up to date ({global_latest_date})'} | |
| # 5. 确定同步策略 | |
| if global_latest_date >= last_trade_day: | |
| logger.info(f"Global date is up to date, but found {len(new_codes)} new stocks. Syncing new stocks only.") | |
| sync_targets = new_codes | |
| # 新股只获取最近1年数据(而非10年) | |
| start_dt = (get_beijing_time() - timedelta(days=365)).strftime('%Y-%m-%d') | |
| else: | |
| logger.info(f"Syncing daily data from {global_latest_date} to {last_trade_day}...") | |
| sync_targets = targets | |
| start_dt = (pd.to_datetime(global_latest_date) + timedelta(days=1)).strftime('%Y-%m-%d') | |
| # 设置每只股票的start_dt | |
| pending = [] | |
| for t in sync_targets: | |
| t['start_dt'] = start_dt | |
| pending.append(t) | |
| # 应用 SYNC_LIMIT 限制 | |
| sync_limit = int(os.getenv("SYNC_LIMIT", -1)) | |
| if sync_limit > 0 and len(pending) > sync_limit: | |
| logger.info(f"Limiting sync to first {sync_limit} targets (out of {len(pending)})") | |
| pending = pending[:sync_limit] | |
| if not pending: | |
| return {'count': 0, 'failed_codes': [], 'status': 'skipped', 'message': 'No pending targets'} | |
| logger.info(f"Syncing {len(pending)} targets...") | |
| all_new_data = [] | |
| failed_codes = [] | |
| success_codes = [] | |
| with ThreadPoolExecutor(max_workers=get_thread_config_safe()['daily']) as executor: | |
| futures = {executor.submit(get_target_daily, t['code'], t['start_dt'], t['market']): t['code'] for t in pending} | |
| for i, future in enumerate(as_completed(futures), 1): | |
| code = futures[future] | |
| res = future.result() | |
| if res is not None: | |
| all_new_data.append(res) | |
| success_codes.append(code) | |
| else: | |
| failed_codes.append(code) | |
| if i % 500 == 0: logger.info(f"Progress: {i}/{len(pending)}") | |
| changed_files = [] # 记录变更的文件名 | |
| if all_new_data: | |
| inc_df = pd.concat(all_new_data, ignore_index=True) | |
| # 方案3:及时释放内存 | |
| del all_new_data | |
| total_records = len(inc_df) | |
| # 识别变动月份 | |
| changed = inc_df.assign(yr=inc_df['trade_date'].dt.year, mo=inc_df['trade_date'].dt.month)[['yr', 'mo']].drop_duplicates().values | |
| for yr, mo in changed: | |
| yr, mo = int(yr), int(mo) | |
| filename = f"{yr}-{mo:02d}.parquet" | |
| local_path = Path(f"/tmp/data/parquet/{filename}") # Sync Space 使用 /tmp | |
| local_path.parent.mkdir(parents=True, exist_ok=True) | |
| # 增量核心:先检查本地是否有,没有再从云端拉取 | |
| old_df = None | |
| if local_path.exists(): | |
| try: | |
| old_df = pd.read_parquet(local_path) | |
| logger.info(f"Using local cache for {filename}") | |
| except Exception: pass | |
| if old_df is None: | |
| repo_id = os.getenv("DATASET_REPO_ID") | |
| if repo_id: | |
| try: | |
| old_file = hf_hub_download(repo_id=repo_id, filename=f"data/parquet/{filename}", repo_type="dataset") | |
| old_df = pd.read_parquet(old_file) | |
| logger.info(f"Downloaded {filename} from cloud") | |
| except Exception: | |
| pass | |
| # 合并新数据 | |
| month_inc = inc_df[(inc_df['trade_date'].dt.year == yr) & (inc_df['trade_date'].dt.month == mo)] | |
| if old_df is not None: | |
| final_month_df = pd.concat([old_df, month_inc]).drop_duplicates(subset=['code', 'trade_date']) | |
| # 方案3:释放旧数据内存 | |
| del old_df, month_inc | |
| else: | |
| final_month_df = month_inc | |
| final_month_df.to_parquet(local_path) | |
| changed_files.append(filename) # 记录变更的文件 | |
| logger.info(f"Saved updated data for {filename}") | |
| # 方案3:释放最终数据内存 | |
| del final_month_df | |
| # 方案3:循环结束后释放inc_df并触发GC | |
| del inc_df | |
| gc.collect() | |
| else: | |
| total_records = 0 | |
| return { | |
| 'count': len(success_codes), | |
| 'failed_codes': failed_codes, | |
| 'status': 'success' if not failed_codes else 'partial_fail', | |
| 'record_count': total_records, | |
| 'success_rate': len(success_codes) / len(pending) if pending else 0, | |
| 'changed_files': changed_files # 返回变更文件列表 | |
| } | |
| # ==================== 新增:资金流向数据同步 ==================== | |
| def get_stock_fund_flow(code: str, market: str) -> Optional[pd.DataFrame]: | |
| """获取单只股票资金流向数据(优先使用 efinance,失败则回退 AkShare)""" | |
| # 标准化列名映射 | |
| standard_cols = ['code', 'trade_date', 'close', 'pct_chg', | |
| 'main_net_inflow', 'main_net_inflow_pct', | |
| 'huge_net_inflow', 'huge_net_inflow_pct', | |
| 'large_net_inflow', 'large_net_inflow_pct', | |
| 'medium_net_inflow', 'medium_net_inflow_pct', | |
| 'small_net_inflow', 'small_net_inflow_pct'] | |
| # 1. 优先尝试 efinance | |
| try: | |
| df = get_fund_flow_efinance(code, '20000101', '20991231') | |
| if df is not None and not df.empty: | |
| # efinance 字段映射 | |
| rename_map = { | |
| '日期': 'trade_date', '收盘价': 'close', '涨跌幅': 'pct_chg', | |
| '主力净流入': 'main_net_inflow', | |
| '主力净流入占比': 'main_net_inflow_pct', | |
| '超大单净流入': 'huge_net_inflow', | |
| '超大单流入净占比': 'huge_net_inflow_pct', | |
| '大单净流入': 'large_net_inflow', | |
| '大单流入净占比': 'large_net_inflow_pct', | |
| '中单净流入': 'medium_net_inflow', | |
| '中单流入净占比': 'medium_net_inflow_pct', | |
| '小单净流入': 'small_net_inflow', | |
| '小单流入净占比': 'small_net_inflow_pct', | |
| } | |
| df = df.rename(columns=rename_map) | |
| if 'trade_date' in df.columns: | |
| df['trade_date'] = pd.to_datetime(df['trade_date']) | |
| df['code'] = code | |
| result_cols = [c for c in standard_cols if c in df.columns] | |
| logger.debug(f"Got fund flow from efinance for {code}") | |
| return df[result_cols] | |
| except Exception as e: | |
| logger.debug(f"efinance fund flow failed for {code}: {e}") | |
| # 2. 回退到 AkShare | |
| max_retries = 3 | |
| base_delay = 1.0 | |
| for attempt in range(max_retries): | |
| try: | |
| # 确定 market 参数 | |
| if market == '北交所' or code.startswith(('8', '4', '920')): | |
| mk = 'bj' | |
| elif code.startswith(('6', '9')): | |
| mk = 'sh' | |
| else: | |
| mk = 'sz' | |
| df = ak.stock_individual_fund_flow(stock=code, market=mk) | |
| if df is not None and not df.empty: | |
| # AkShare 字段映射 | |
| rename_map = { | |
| '日期': 'trade_date', '收盘价': 'close', '涨跌幅': 'pct_chg', | |
| '主力净流入-净额': 'main_net_inflow', | |
| '主力净流入-净占比': 'main_net_inflow_pct', | |
| '超大单净流入-净额': 'huge_net_inflow', | |
| '超大单净流入-净占比': 'huge_net_inflow_pct', | |
| '大单净流入-净额': 'large_net_inflow', | |
| '大单净流入-净占比': 'large_net_inflow_pct', | |
| '中单净流入-净额': 'medium_net_inflow', | |
| '中单净流入-净占比': 'medium_net_inflow_pct', | |
| '小单净流入-净额': 'small_net_inflow', | |
| '小单净流入-净占比': 'small_net_inflow_pct', | |
| } | |
| df = df.rename(columns=rename_map) | |
| df['trade_date'] = pd.to_datetime(df['trade_date']) | |
| df['code'] = code | |
| result_cols = [c for c in standard_cols if c in df.columns] | |
| logger.debug(f"Got fund flow from AkShare for {code}") | |
| return df[result_cols] | |
| except Exception as e: | |
| if attempt > 0: | |
| delay = base_delay * (2 ** attempt) | |
| time.sleep(delay) | |
| return None | |
| def sync_fund_flow(targets: List[Dict[str, str]], last_trade_day: str) -> Dict[str, Any]: | |
| """同步资金流向数据(按月分表版),返回详细结果""" | |
| logger.info("Syncing fund flow data...") | |
| # 1. 从所有本地 parquet 文件计算全局最新日期 | |
| flow_dir = Path("/tmp/data/fund_flow") | |
| flow_dir.mkdir(parents=True, exist_ok=True) | |
| global_latest_date = "2000-01-01" | |
| existing_codes = set() | |
| # 扫描本地已有的月度文件 | |
| for f in flow_dir.glob("*.parquet"): | |
| try: | |
| df = pd.read_parquet(f) | |
| if not df.empty: | |
| max_date = df['trade_date'].max().strftime('%Y-%m-%d') | |
| if max_date > global_latest_date: | |
| global_latest_date = max_date | |
| existing_codes.update(df['code'].unique()) | |
| except Exception: | |
| pass | |
| # 如果本地没有数据,尝试从云端下载 | |
| if global_latest_date == "2000-01-01": | |
| repo_id = os.getenv("DATASET_REPO_ID") | |
| if repo_id: | |
| try: | |
| # 获取云端文件列表 | |
| files = list_repo_files(repo_id=repo_id, repo_type="dataset") | |
| flow_files = sorted([f for f in files if f.startswith("data/fund_flow/") and f.endswith(".parquet")]) | |
| for ff in flow_files[-3:]: # 先下载最近3个月的数据作为基准 | |
| try: | |
| local_file = hf_hub_download(repo_id=repo_id, filename=ff, repo_type="dataset") | |
| df = pd.read_parquet(local_file) | |
| if not df.empty: | |
| # 提取月份 | |
| filename = Path(ff).stem # 如 "2026-02" | |
| local_path = flow_dir / f"{filename}.parquet" | |
| df.to_parquet(local_path) | |
| max_date = df['trade_date'].max().strftime('%Y-%m-%d') | |
| if max_date > global_latest_date: | |
| global_latest_date = max_date | |
| existing_codes.update(df['code'].unique()) | |
| except Exception: | |
| pass | |
| logger.info(f"Downloaded fund flow from cloud, latest date: {global_latest_date}") | |
| except Exception as e: | |
| logger.info(f"No existing fund flow data in cloud: {e}") | |
| # 2. 过滤目标(排除 ETF/LOF/REITs/可转债) | |
| stock_targets = [t for t in targets if t['market'] not in ['ETF', 'LOF', 'REITs', '可转债']] | |
| new_codes = [t for t in stock_targets if t['code'] not in existing_codes] | |
| # 3. 全局水位线拦截 | |
| if global_latest_date >= last_trade_day and not new_codes: | |
| logger.info(f"Fund flow data is already up to date ({global_latest_date}) and no new stocks. Skip.") | |
| return {'count': 0, 'failed_codes': [], 'status': 'skipped', 'message': f'Already up to date ({global_latest_date})'} | |
| # 4. 增量获取 | |
| if global_latest_date >= last_trade_day: | |
| logger.info(f"Global date is up to date, but found {len(new_codes)} new stocks. Syncing new stocks only.") | |
| sync_targets = new_codes | |
| else: | |
| logger.info(f"Syncing fund flow from {global_latest_date} to {last_trade_day}...") | |
| sync_targets = stock_targets | |
| all_data = [] | |
| success_codes = [] | |
| failed_codes = [] | |
| with ThreadPoolExecutor(max_workers=get_thread_config_safe()['fund']) as executor: | |
| futures = {executor.submit(get_stock_fund_flow, t['code'], t['market']): t['code'] for t in sync_targets} | |
| for i, future in enumerate(as_completed(futures), 1): | |
| code = futures[future] | |
| res = future.result() | |
| if res is not None and not res.empty: | |
| if code in existing_codes: | |
| res = res[res['trade_date'] > pd.to_datetime(global_latest_date)] | |
| if not res.empty: | |
| all_data.append(res) | |
| success_codes.append(code) | |
| else: | |
| failed_codes.append(code) | |
| if i % 500 == 0: | |
| logger.info(f"Fund flow progress: {i}/{len(sync_targets)}, success: {len(success_codes)}") | |
| total_records = 0 | |
| changed_files = [] # 记录变更的文件名 | |
| # 5. 按月分表保存 | |
| if all_data: | |
| new_df = pd.concat(all_data, ignore_index=True) | |
| # 方案3:及时释放内存 | |
| del all_data | |
| total_records = len(new_df) | |
| # 确定需要更新的月份 | |
| if not new_df.empty: | |
| min_date = new_df['trade_date'].min() | |
| max_date = new_df['trade_date'].max() | |
| current = min_date.to_period('M') | |
| end_period = max_date.to_period('M') | |
| while current <= end_period: | |
| yr, mo = current.year, current.month | |
| month_data = new_df[(new_df['trade_date'].dt.year == yr) & (new_df['trade_date'].dt.month == mo)] | |
| if not month_data.empty: | |
| filename = f"{yr}-{mo:02d}.parquet" | |
| local_path = flow_dir / filename | |
| # 合并已有数据 | |
| old_month_df = None | |
| if local_path.exists(): | |
| try: | |
| old_month_df = pd.read_parquet(local_path) | |
| except Exception: | |
| pass | |
| if old_month_df is not None: | |
| final_month_df = pd.concat([old_month_df, month_data]).drop_duplicates(subset=['code', 'trade_date']) | |
| # 方案3:释放旧数据内存 | |
| del old_month_df, month_data | |
| else: | |
| final_month_df = month_data | |
| final_month_df.to_parquet(local_path) | |
| changed_files.append(filename) # 记录变更的文件 | |
| logger.info(f"Saved fund flow data for {filename}") | |
| # 方案3:释放最终数据内存 | |
| del final_month_df | |
| current += 1 | |
| logger.info(f"Fund flow updated: {len(new_df)} new records") | |
| # 方案3:释放new_df并触发GC | |
| del new_df | |
| gc.collect() | |
| return { | |
| 'count': len(success_codes), | |
| 'failed_codes': failed_codes, | |
| 'status': 'success' if not failed_codes else 'partial_fail', | |
| 'record_count': total_records, | |
| 'success_rate': len(success_codes) / len(sync_targets) if sync_targets else 0, | |
| 'changed_files': changed_files # 返回变更文件列表 | |
| } | |
| # ==================== 新增:估值指标数据同步 ==================== | |
| def get_stock_valuation(code: str) -> Optional[pd.DataFrame]: | |
| """获取单只股票估值指标数据""" | |
| max_retries = 5 | |
| base_delay = 1.0 | |
| for attempt in range(max_retries): | |
| try: | |
| df = ak.stock_a_lg_indicator(symbol=code) | |
| if df is not None and not df.empty: | |
| # 标准化列名 | |
| rename_map = { | |
| '日期': 'trade_date', | |
| '市盈率': 'pe_ttm', | |
| '市盈率TTM': 'pe_ttm', | |
| '静态市盈率': 'pe_static', | |
| '市净率': 'pb', | |
| '市销率': 'ps_ttm', | |
| '股息率': 'dv_ratio', | |
| '总市值': 'total_mv', | |
| '流通市值': 'circ_mv', | |
| } | |
| df = df.rename(columns=rename_map) | |
| df['trade_date'] = pd.to_datetime(df['trade_date']) | |
| df['code'] = code | |
| cols = ['code', 'trade_date', 'pe_ttm', 'pe_static', 'pb', | |
| 'ps_ttm', 'dv_ratio', 'total_mv', 'circ_mv'] | |
| available_cols = [c for c in cols if c in df.columns] | |
| return df[available_cols] | |
| except Exception: | |
| if attempt > 0: | |
| delay = base_delay * (2 ** attempt) | |
| time.sleep(delay) | |
| return None | |
| def sync_valuation(targets: List[Dict[str, str]], last_trade_day: str) -> Dict[str, Any]: | |
| """同步估值指标数据(按月分表版)""" | |
| logger.info("Syncing valuation data...") | |
| # 1. 从所有本地 parquet 文件计算全局最新日期 | |
| val_dir = Path("/tmp/data/valuation") | |
| val_dir.mkdir(parents=True, exist_ok=True) | |
| global_latest_date = "2000-01-01" | |
| existing_codes = set() | |
| # 扫描本地已有的月度文件 | |
| for f in val_dir.glob("*.parquet"): | |
| try: | |
| df = pd.read_parquet(f) | |
| if not df.empty: | |
| max_date = df['trade_date'].max().strftime('%Y-%m-%d') | |
| if max_date > global_latest_date: | |
| global_latest_date = max_date | |
| existing_codes.update(df['code'].unique()) | |
| except Exception: | |
| pass | |
| # 如果本地没有数据,尝试从云端下载 | |
| if global_latest_date == "2000-01-01": | |
| repo_id = os.getenv("DATASET_REPO_ID") | |
| if repo_id: | |
| try: | |
| files = list_repo_files(repo_id=repo_id, repo_type="dataset") | |
| val_files = sorted([f for f in files if f.startswith("data/valuation/") and f.endswith(".parquet")]) | |
| for vf in val_files[-3:]: # 先下载最近3个月 | |
| try: | |
| local_file = hf_hub_download(repo_id=repo_id, filename=vf, repo_type="dataset") | |
| df = pd.read_parquet(local_file) | |
| if not df.empty: | |
| filename = Path(vf).stem | |
| local_path = val_dir / f"{filename}.parquet" | |
| df.to_parquet(local_path) | |
| max_date = df['trade_date'].max().strftime('%Y-%m-%d') | |
| if max_date > global_latest_date: | |
| global_latest_date = max_date | |
| existing_codes.update(df['code'].unique()) | |
| except Exception: | |
| pass | |
| logger.info(f"Downloaded valuation from cloud, latest date: {global_latest_date}") | |
| except Exception as e: | |
| logger.info(f"No existing valuation data in cloud: {e}") | |
| # 2. 过滤目标(排除 ETF/LOF/REITs/可转债) | |
| stock_targets = [t for t in targets if t['market'] not in ['ETF', 'LOF', 'REITs', '可转债']] | |
| new_codes = [t for t in stock_targets if t['code'] not in existing_codes] | |
| # 3. 全局水位线拦截 | |
| if global_latest_date >= last_trade_day and not new_codes: | |
| logger.info(f"Valuation data is already up to date ({global_latest_date}) and no new stocks. Skip.") | |
| return {'count': 0, 'failed_codes': [], 'status': 'skipped', 'record_count': 0, 'changed_files': [], 'message': f'Already up to date ({global_latest_date})'} | |
| # 4. 增量获取 | |
| if global_latest_date >= last_trade_day: | |
| logger.info(f"Global date is up to date, but found {len(new_codes)} new stocks. Syncing new stocks only.") | |
| sync_targets = new_codes | |
| else: | |
| logger.info(f"Syncing valuation from {global_latest_date} to {last_trade_day}...") | |
| sync_targets = stock_targets | |
| all_data = [] | |
| success_count = 0 | |
| with ThreadPoolExecutor(max_workers=get_thread_config_safe()['valuation']) as executor: | |
| futures = {executor.submit(get_stock_valuation, t['code']): t['code'] for t in sync_targets} | |
| for i, future in enumerate(as_completed(futures), 1): | |
| res = future.result() | |
| if res is not None and not res.empty: | |
| code = futures[future] | |
| if code in existing_codes: | |
| res = res[res['trade_date'] > pd.to_datetime(global_latest_date)] | |
| if not res.empty: | |
| all_data.append(res) | |
| success_count += 1 | |
| if i % 500 == 0: | |
| logger.info(f"Valuation progress: {i}/{len(sync_targets)}, success: {success_count}") | |
| total_records = 0 | |
| changed_files = [] # 记录变更的文件名 | |
| # 5. 按月分表保存 | |
| if all_data: | |
| new_df = pd.concat(all_data, ignore_index=True) | |
| # 方案3:及时释放内存 | |
| del all_data | |
| total_records = len(new_df) | |
| if not new_df.empty: | |
| min_date = new_df['trade_date'].min() | |
| max_date = new_df['trade_date'].max() | |
| current = min_date.to_period('M') | |
| end_period = max_date.to_period('M') | |
| while current <= end_period: | |
| yr, mo = current.year, current.month | |
| month_data = new_df[(new_df['trade_date'].dt.year == yr) & (new_df['trade_date'].dt.month == mo)] | |
| if not month_data.empty: | |
| filename = f"{yr}-{mo:02d}.parquet" | |
| local_path = val_dir / filename | |
| old_month_df = None | |
| if local_path.exists(): | |
| try: | |
| old_month_df = pd.read_parquet(local_path) | |
| except Exception: | |
| pass | |
| if old_month_df is not None: | |
| final_month_df = pd.concat([old_month_df, month_data]).drop_duplicates(subset=['code', 'trade_date']) | |
| # 方案3:释放旧数据内存 | |
| del old_month_df, month_data | |
| else: | |
| final_month_df = month_data | |
| final_month_df.to_parquet(local_path) | |
| changed_files.append(filename) # 记录变更的文件 | |
| logger.info(f"Saved valuation data for {filename}") | |
| # 方案3:释放最终数据内存 | |
| del final_month_df | |
| current += 1 | |
| logger.info(f"Valuation updated: {len(new_df)} new records") | |
| # 方案3:释放new_df并触发GC | |
| del new_df | |
| gc.collect() | |
| return { | |
| 'count': success_count, | |
| 'failed_codes': [], | |
| 'status': 'success', | |
| 'record_count': total_records, | |
| 'changed_files': changed_files # 返回变更文件列表 | |
| } | |
| # ==================== 新增:融资融券数据同步 ==================== | |
| def get_stock_margin(code: str) -> Optional[pd.DataFrame]: | |
| """获取单只股票融资融券数据""" | |
| max_retries = 5 | |
| base_delay = 1.0 | |
| for attempt in range(max_retries): | |
| try: | |
| # 尝试上交所 | |
| if code.startswith('6'): | |
| df = ak.stock_margin_detail_sh(symbol=code) | |
| else: | |
| df = ak.stock_margin_detail_sz(symbol=code) | |
| if df is not None and not df.empty: | |
| # 标准化列名 | |
| rename_map = { | |
| '日期': 'trade_date', | |
| '融资余额': 'rzye', | |
| '融资买入额': 'rzmre', | |
| '融资偿还额': 'rzche', | |
| '融券余额': 'rqye', | |
| '融券卖出量': 'rqmcl', | |
| '融资融券余额': 'rzrqye', | |
| } | |
| df = df.rename(columns=rename_map) | |
| if 'trade_date' in df.columns: | |
| df['trade_date'] = pd.to_datetime(df['trade_date']) | |
| df['code'] = code | |
| cols = ['code', 'trade_date', 'rzye', 'rzmre', 'rzche', 'rqye', 'rqmcl', 'rzrqye'] | |
| available_cols = [c for c in cols if c in df.columns] | |
| return df[available_cols] | |
| except Exception as e: | |
| if attempt == max_retries - 1: | |
| pass | |
| if attempt > 0: | |
| delay = base_delay * (2 ** attempt) | |
| time.sleep(delay) | |
| return None | |
| def sync_margin(targets: List[Dict[str, str]], last_trade_day: str) -> Dict[str, Any]: | |
| """同步融资融券数据(按月分表版)""" | |
| logger.info("Syncing margin trading data...") | |
| # 1. 从所有本地 parquet 文件计算全局最新日期 | |
| margin_dir = Path("/tmp/data/margin") | |
| margin_dir.mkdir(parents=True, exist_ok=True) | |
| global_latest_date = "2000-01-01" | |
| existing_codes = set() | |
| # 扫描本地已有的月度文件 | |
| for f in margin_dir.glob("*.parquet"): | |
| try: | |
| df = pd.read_parquet(f) | |
| if not df.empty: | |
| max_date = df['trade_date'].max().strftime('%Y-%m-%d') | |
| if max_date > global_latest_date: | |
| global_latest_date = max_date | |
| existing_codes.update(df['code'].unique()) | |
| except Exception: | |
| pass | |
| # 如果本地没有数据,尝试从云端下载 | |
| if global_latest_date == "2000-01-01": | |
| repo_id = os.getenv("DATASET_REPO_ID") | |
| if repo_id: | |
| try: | |
| files = list_repo_files(repo_id=repo_id, repo_type="dataset") | |
| margin_files = sorted([f for f in files if f.startswith("data/margin/") and f.endswith(".parquet")]) | |
| for mf in margin_files[-3:]: # 先下载最近3个月 | |
| try: | |
| local_file = hf_hub_download(repo_id=repo_id, filename=mf, repo_type="dataset") | |
| df = pd.read_parquet(local_file) | |
| if not df.empty: | |
| filename = Path(mf).stem | |
| local_path = margin_dir / f"{filename}.parquet" | |
| df.to_parquet(local_path) | |
| max_date = df['trade_date'].max().strftime('%Y-%m-%d') | |
| if max_date > global_latest_date: | |
| global_latest_date = max_date | |
| existing_codes.update(df['code'].unique()) | |
| except Exception: | |
| pass | |
| logger.info(f"Downloaded margin from cloud, latest date: {global_latest_date}") | |
| except Exception as e: | |
| logger.info(f"No existing margin data in cloud: {e}") | |
| # 2. 过滤目标(只保留主板、创业板、科创板) | |
| stock_targets = [t for t in targets if t['market'] in ['主板', '创业板', '科创板']] | |
| new_codes = [t for t in stock_targets if t['code'] not in existing_codes] | |
| # 3. 全局水位线拦截 | |
| if global_latest_date >= last_trade_day and not new_codes: | |
| logger.info(f"Margin data is already up to date ({global_latest_date}) and no new stocks. Skip.") | |
| return {'count': 0, 'failed_codes': [], 'status': 'skipped', 'record_count': 0, 'changed_files': [], 'message': f'Already up to date ({global_latest_date})'} | |
| # 4. 增量获取 | |
| if global_latest_date >= last_trade_day: | |
| logger.info(f"Global date is up to date, but found {len(new_codes)} new stocks. Syncing new stocks only.") | |
| sync_targets = new_codes | |
| else: | |
| logger.info(f"Syncing margin data from {global_latest_date} to {last_trade_day}...") | |
| sync_targets = stock_targets | |
| all_data = [] | |
| success_count = 0 | |
| with ThreadPoolExecutor(max_workers=get_thread_config_safe()['margin']) as executor: | |
| futures = {executor.submit(get_stock_margin, t['code']): t['code'] for t in sync_targets} | |
| for i, future in enumerate(as_completed(futures), 1): | |
| res = future.result() | |
| if res is not None and not res.empty: | |
| code = futures[future] | |
| if code in existing_codes: | |
| res = res[res['trade_date'] > pd.to_datetime(global_latest_date)] | |
| if not res.empty: | |
| all_data.append(res) | |
| success_count += 1 | |
| if i % 500 == 0: | |
| logger.info(f"Margin progress: {i}/{len(sync_targets)}, success: {success_count}") | |
| total_records = 0 | |
| changed_files = [] # 记录变更的文件名 | |
| # 5. 按月分表保存 | |
| if all_data: | |
| new_df = pd.concat(all_data, ignore_index=True) | |
| total_records = len(new_df) | |
| if not new_df.empty: | |
| min_date = new_df['trade_date'].min() | |
| max_date = new_df['trade_date'].max() | |
| current = min_date.to_period('M') | |
| end_period = max_date.to_period('M') | |
| while current <= end_period: | |
| yr, mo = current.year, current.month | |
| month_data = new_df[(new_df['trade_date'].dt.year == yr) & (new_df['trade_date'].dt.month == mo)] | |
| if not month_data.empty: | |
| filename = f"{yr}-{mo:02d}.parquet" | |
| local_path = margin_dir / filename | |
| old_month_df = None | |
| if local_path.exists(): | |
| try: | |
| old_month_df = pd.read_parquet(local_path) | |
| except Exception: | |
| pass | |
| if old_month_df is not None: | |
| final_month_df = pd.concat([old_month_df, month_data]).drop_duplicates(subset=['code', 'trade_date']) | |
| else: | |
| final_month_df = month_data | |
| final_month_df.to_parquet(local_path) | |
| changed_files.append(filename) # 记录变更的文件 | |
| logger.info(f"Saved margin data for {filename}") | |
| current += 1 | |
| logger.info(f"Margin updated: {len(new_df)} new records") | |
| return { | |
| 'count': success_count, | |
| 'failed_codes': [], | |
| 'status': 'success', | |
| 'record_count': total_records, | |
| 'changed_files': changed_files # 返回变更文件列表 | |
| } | |
| # ==================== 新增:财务指标数据同步 ==================== | |
| def get_stock_financial_indicator(code: str) -> Optional[pd.DataFrame]: | |
| """获取单只股票财务指标数据""" | |
| max_retries = 5 | |
| base_delay = 1.0 | |
| for attempt in range(max_retries): | |
| try: | |
| df = ak.stock_financial_analysis_indicator(symbol=code) | |
| if df is not None and not df.empty: | |
| # 标准化列名 | |
| rename_map = { | |
| '日期': 'trade_date', | |
| '净资产收益率': 'roe', | |
| '总资产净利率': 'roa', | |
| '销售毛利率': 'gross_margin', | |
| '销售净利率': 'net_margin', | |
| '资产负债率': 'debt_ratio', | |
| '流动比率': 'current_ratio', | |
| '速动比率': 'quick_ratio', | |
| '存货周转率': 'inventory_turnover', | |
| '应收账款周转率': 'receivable_turnover', | |
| '总资产周转率': 'total_asset_turnover', | |
| } | |
| df = df.rename(columns=rename_map) | |
| if 'trade_date' in df.columns: | |
| df['trade_date'] = pd.to_datetime(df['trade_date']) | |
| df['code'] = code | |
| cols = ['code', 'trade_date', 'roe', 'roa', 'gross_margin', 'net_margin', | |
| 'debt_ratio', 'current_ratio', 'quick_ratio', | |
| 'inventory_turnover', 'receivable_turnover', 'total_asset_turnover'] | |
| available_cols = [c for c in cols if c in df.columns] | |
| return df[available_cols] | |
| except Exception: | |
| if attempt > 0: | |
| delay = base_delay * (2 ** attempt) | |
| time.sleep(delay) | |
| return None | |
| def sync_financial_indicator(targets: List[Dict[str, str]]) -> Dict[str, Any]: | |
| """同步财务指标数据(极致增量版),返回详细结果""" | |
| logger.info("Syncing financial indicator data...") | |
| fi_path = Path("/tmp/data/financial_indicator.parquet") | |
| fi_path.parent.mkdir(parents=True, exist_ok=True) | |
| old_df = None | |
| old_count = 0 | |
| global_latest_date = "2000-01-01" | |
| existing_codes = set() | |
| # 1. 优先读取本地缓存 | |
| if fi_path.exists(): | |
| try: | |
| old_df = pd.read_parquet(fi_path) | |
| old_count = len(old_df) | |
| global_latest_date = old_df['trade_date'].max().strftime('%Y-%m-%d') | |
| existing_codes = set(old_df['code'].unique()) | |
| logger.info(f"Local financial cache found, latest date: {global_latest_date}, records: {old_count}") | |
| except Exception as e: | |
| logger.warning(f"Failed to read local financial cache: {e}") | |
| # 2. 本地无缓存,尝试从云端拉取 | |
| if old_df is None: | |
| repo_id = os.getenv("DATASET_REPO_ID") | |
| if repo_id: | |
| try: | |
| old_file = hf_hub_download(repo_id=repo_id, filename="data/financial_indicator.parquet", repo_type="dataset") | |
| old_df = pd.read_parquet(old_file) | |
| old_count = len(old_df) | |
| global_latest_date = old_df['trade_date'].max().strftime('%Y-%m-%d') | |
| existing_codes = set(old_df['code'].unique()) | |
| old_df.to_parquet(fi_path) | |
| logger.info(f"Downloaded financial from cloud, latest date: {global_latest_date}, records: {old_count}") | |
| except Exception: | |
| logger.info("No existing financial data found in cloud.") | |
| # 3. 财务指标特殊拦截 + 新股检测 | |
| stock_targets = [t for t in targets if t['market'] not in ['ETF', 'LOF', 'REITs', '可转债']] | |
| new_codes = [t for t in stock_targets if t['code'] not in existing_codes] | |
| today = get_beijing_time() | |
| is_recent = False | |
| if global_latest_date != "2000-01-01": | |
| days_diff = (today - pd.to_datetime(global_latest_date)).days | |
| if days_diff < 90: | |
| is_recent = True | |
| if is_recent and not new_codes: | |
| logger.info(f"Financial data is recent ({global_latest_date}) and no new stocks. Skip.") | |
| return {'count': 0, 'status': 'skipped', 'record_count': old_count, 'new_records': 0, 'message': f'Already up to date ({global_latest_date})'} | |
| # 4. 增量获取 | |
| if is_recent: | |
| logger.info(f"Financial data is recent, but found {len(new_codes)} new stocks. Syncing new stocks only.") | |
| sync_targets = new_codes | |
| else: | |
| logger.info(f"Syncing financial indicators (last update: {global_latest_date})...") | |
| sync_targets = stock_targets | |
| all_data = [] | |
| success_count = 0 | |
| with ThreadPoolExecutor(max_workers=get_thread_config_safe()['financial']) as executor: | |
| futures = {executor.submit(get_stock_financial_indicator, t['code']): t['code'] for t in sync_targets} | |
| for i, future in enumerate(as_completed(futures), 1): | |
| res = future.result() | |
| if res is not None and not res.empty: | |
| code = futures[future] | |
| if code in existing_codes: | |
| res = res[res['trade_date'] > pd.to_datetime(global_latest_date)] | |
| if not res.empty: | |
| all_data.append(res) | |
| success_count += 1 | |
| if i % 500 == 0: | |
| logger.info(f"Financial indicator progress: {i}/{len(sync_targets)}, success: {success_count}") | |
| # 5. 合并保存 | |
| new_records = 0 | |
| final_count = old_count | |
| if all_data: | |
| new_df = pd.concat(all_data, ignore_index=True) | |
| new_records = len(new_df) | |
| final_df = pd.concat([old_df, new_df]) if old_df is not None else new_df | |
| final_df = final_df.drop_duplicates(subset=['code', 'trade_date']) | |
| final_count = len(final_df) | |
| final_df.to_parquet(fi_path) | |
| logger.info(f"Financial updated: {final_count} total records ({new_records} new)") | |
| return { | |
| 'count': success_count, | |
| 'status': 'success' if success_count > 0 or old_count > 0 else 'fail', | |
| 'record_count': final_count, | |
| 'new_records': new_records, | |
| 'previous_count': old_count | |
| } | |
| # ==================== 新增:股东户数数据同步 ==================== | |
| def sync_holder_num() -> Dict[str, Any]: | |
| """同步股东户数数据(批量获取),返回详细结果""" | |
| logger.info("Syncing holder number data...") | |
| hn_path = Path("/tmp/data/holder_num.parquet") | |
| old_count = 0 | |
| # 读取现有数据 | |
| if hn_path.exists(): | |
| try: | |
| old_df = pd.read_parquet(hn_path) | |
| old_count = len(old_df) | |
| except Exception: | |
| pass | |
| try: | |
| # 获取最近报告期的股东户数数据 | |
| df = ak.stock_zh_a_gdhs(symbol="全部") | |
| if df is not None and not df.empty: | |
| # 标准化列名 | |
| rename_map = { | |
| '代码': 'code', | |
| '股东户数': 'holder_num', | |
| '户均持股数量': 'avg_share', | |
| '户均持股金额': 'avg_value', | |
| '总股本': 'total_share', | |
| '总市值': 'total_value', | |
| '日期': 'trade_date', | |
| } | |
| df = df.rename(columns=rename_map) | |
| if 'trade_date' in df.columns: | |
| df['trade_date'] = pd.to_datetime(df['trade_date']) | |
| new_count = len(df) | |
| # 保存到 Parquet | |
| hn_path.parent.mkdir(parents=True, exist_ok=True) | |
| df.to_parquet(hn_path) | |
| # 判断是否有变化 | |
| is_changed = new_count != old_count | |
| logger.info(f"Holder number data saved: {new_count} records (previous: {old_count}, changed: {is_changed})") | |
| return { | |
| 'count': new_count, | |
| 'status': 'success', | |
| 'record_count': new_count, | |
| 'previous_count': old_count, | |
| 'is_changed': is_changed | |
| } | |
| except Exception as e: | |
| logger.warning(f"Failed to sync holder number data: {e}") | |
| return { | |
| 'count': 0, | |
| 'status': 'fail', | |
| 'record_count': old_count, | |
| 'previous_count': old_count, | |
| 'is_changed': False | |
| } | |
| # ==================== 新增:分红数据同步 ==================== | |
| def get_stock_dividend(code: str) -> Optional[pd.DataFrame]: | |
| """获取单只股票分红数据""" | |
| max_retries = 5 | |
| base_delay = 1.0 | |
| for attempt in range(max_retries): | |
| try: | |
| df = ak.stock_history_dividend(symbol=code) | |
| if df is not None and not df.empty: | |
| # 标准化列名 | |
| rename_map = { | |
| '公告日期': 'trade_date', | |
| '分红方案': 'dividend_type', | |
| '分红金额': 'dividend_amount', | |
| '股权登记日': 'record_date', | |
| '除权除息日': 'ex_date', | |
| '派息日': 'pay_date', | |
| } | |
| df = df.rename(columns=rename_map) | |
| df['trade_date'] = pd.to_datetime(df['trade_date']) | |
| df['code'] = code | |
| cols = ['code', 'trade_date', 'dividend_type', 'dividend_amount', 'record_date', 'ex_date', 'pay_date'] | |
| available_cols = [c for c in cols if c in df.columns] | |
| return df[available_cols] | |
| except Exception: | |
| if attempt > 0: | |
| delay = base_delay * (2 ** attempt) | |
| time.sleep(delay) | |
| return None | |
| def sync_dividend(targets: List[Dict[str, str]]) -> Dict[str, Any]: | |
| """同步分红数据(极致增量版),返回详细结果""" | |
| logger.info("Syncing dividend data...") | |
| div_path = Path("/tmp/data/dividend.parquet") | |
| div_path.parent.mkdir(parents=True, exist_ok=True) | |
| old_df = None | |
| old_count = 0 | |
| global_latest_date = "2000-01-01" | |
| existing_codes = set() | |
| if div_path.exists(): | |
| try: | |
| old_df = pd.read_parquet(div_path) | |
| old_count = len(old_df) | |
| global_latest_date = old_df['trade_date'].max().strftime('%Y-%m-%d') | |
| existing_codes = set(old_df['code'].unique()) | |
| except Exception: pass | |
| if old_df is None: | |
| repo_id = os.getenv("DATASET_REPO_ID") | |
| if repo_id: | |
| try: | |
| old_file = hf_hub_download(repo_id=repo_id, filename="data/dividend.parquet", repo_type="dataset") | |
| old_df = pd.read_parquet(old_file) | |
| old_count = len(old_df) | |
| global_latest_date = old_df['trade_date'].max().strftime('%Y-%m-%d') | |
| existing_codes = set(old_df['code'].unique()) | |
| old_df.to_parquet(div_path) | |
| except Exception: pass | |
| # 90天检查一次 + 新股检测 | |
| stock_targets = [t for t in targets if t['market'] not in ['ETF', 'LOF', 'REITs', '可转债']] | |
| new_codes = [t for t in stock_targets if t['code'] not in existing_codes] | |
| today = get_beijing_time() | |
| is_recent = False | |
| if global_latest_date != "2000-01-01": | |
| if (today - pd.to_datetime(global_latest_date)).days < 90: | |
| is_recent = True | |
| if is_recent and not new_codes: | |
| logger.info(f"Dividend data is recent and no new stocks. Skip.") | |
| return {'count': 0, 'status': 'skipped', 'record_count': old_count, 'new_records': 0, 'message': f'Already up to date ({global_latest_date})'} | |
| # 4. 增量获取 | |
| if is_recent: | |
| logger.info(f"Dividend data is recent, but found {len(new_codes)} new stocks. Syncing new stocks only.") | |
| sync_targets = new_codes | |
| else: | |
| logger.info(f"Syncing dividend data (last update: {global_latest_date})...") | |
| sync_targets = stock_targets | |
| all_data = [] | |
| success_count = 0 | |
| with ThreadPoolExecutor(max_workers=get_thread_config_safe()['dividend']) as executor: | |
| futures = {executor.submit(get_stock_dividend, t['code']): t['code'] for t in sync_targets} | |
| for i, future in enumerate(as_completed(futures), 1): | |
| res = future.result() | |
| if res is not None and not res.empty: | |
| code = futures[future] | |
| if code in existing_codes: | |
| res = res[res['trade_date'] > pd.to_datetime(global_latest_date)] | |
| if not res.empty: | |
| all_data.append(res) | |
| success_count += 1 | |
| if i % 500 == 0: | |
| logger.info(f"Dividend progress: {i}/{len(sync_targets)}, success: {success_count}") | |
| new_records = 0 | |
| final_count = old_count | |
| if all_data: | |
| new_df = pd.concat(all_data, ignore_index=True) | |
| new_records = len(new_df) | |
| final_df = pd.concat([old_df, new_df]) if old_df is not None else new_df | |
| final_df = final_df.drop_duplicates(subset=['code', 'trade_date', 'dividend_type']) | |
| final_count = len(final_df) | |
| final_df.to_parquet(div_path) | |
| logger.info(f"Dividend updated: {final_count} total records ({new_records} new)") | |
| return { | |
| 'count': success_count, | |
| 'status': 'success' if success_count > 0 or old_count > 0 else 'fail', | |
| 'record_count': final_count, | |
| 'new_records': new_records, | |
| 'previous_count': old_count | |
| } | |
| # ==================== 新增:十大股东数据同步 ==================== | |
| def sync_top_holders() -> Dict[str, Any]: | |
| """同步十大股东数据(批量获取),返回详细结果""" | |
| logger.info("Syncing top holders data...") | |
| path = Path("/tmp/data/top_holders.parquet") | |
| old_count = 0 | |
| # 读取现有数据 | |
| if path.exists(): | |
| try: | |
| old_df = pd.read_parquet(path) | |
| old_count = len(old_df) | |
| except Exception: | |
| pass | |
| try: | |
| today = get_beijing_time() | |
| df = ak.stock_gdfx_holding_analyse_em(date=today.strftime('%Y%m%d')) | |
| if df is not None and not df.empty: | |
| rename_map = { | |
| '股票代码': 'code', | |
| '公告日期': 'trade_date', | |
| '股东名称': 'holder_name', | |
| '持股数量': 'hold_num', | |
| '持股比例': 'hold_ratio', | |
| '持股变动': 'hold_change', | |
| } | |
| df = df.rename(columns=rename_map) | |
| df['trade_date'] = pd.to_datetime(df['trade_date']) | |
| new_count = len(df) | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| df.to_parquet(path) | |
| # 判断是否有变化 | |
| is_changed = new_count != old_count | |
| logger.info(f"Top holders data saved: {new_count} records (previous: {old_count}, changed: {is_changed})") | |
| return { | |
| 'count': new_count, | |
| 'status': 'success', | |
| 'record_count': new_count, | |
| 'previous_count': old_count, | |
| 'is_changed': is_changed | |
| } | |
| except Exception as e: | |
| logger.warning(f"Failed to sync top holders: {e}") | |
| return { | |
| 'count': 0, | |
| 'status': 'fail', | |
| 'record_count': old_count, | |
| 'previous_count': old_count, | |
| 'is_changed': False | |
| } | |
| # ==================== 新增:限售解禁数据同步 ==================== | |
| def sync_restricted_unlock() -> Dict[str, Any]: | |
| """同步限售解禁数据(批量获取),返回详细结果""" | |
| logger.info("Syncing restricted unlock data...") | |
| path = Path("/tmp/data/restricted_unlock.parquet") | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| old_count = 0 | |
| # 读取现有数据 | |
| if path.exists(): | |
| try: | |
| old_df = pd.read_parquet(path) | |
| old_count = len(old_df) | |
| except Exception: | |
| pass | |
| try: | |
| # 获取全市场限售解禁数据 | |
| df = ak.stock_restricted_shares(stock="all") | |
| if df is not None and not df.empty: | |
| rename_map = { | |
| '代码': 'code', | |
| '名称': 'name', | |
| '解禁日期': 'unlock_date', | |
| '解禁数量': 'unlock_num', | |
| '解禁股本占总股本比例': 'unlock_ratio', | |
| } | |
| df = df.rename(columns=rename_map) | |
| df['unlock_date'] = pd.to_datetime(df['unlock_date']) | |
| df['trade_date'] = get_beijing_time() # 记录同步日期 | |
| new_count = len(df) | |
| df.to_parquet(path) | |
| # 判断是否有变化 | |
| is_changed = new_count != old_count | |
| logger.info(f"Restricted unlock data saved: {new_count} records (previous: {old_count}, changed: {is_changed})") | |
| return { | |
| 'count': new_count, | |
| 'status': 'success', | |
| 'record_count': new_count, | |
| 'previous_count': old_count, | |
| 'is_changed': is_changed | |
| } | |
| except Exception as e: | |
| logger.warning(f"Failed to sync restricted unlock: {e}") | |
| return { | |
| 'count': 0, | |
| 'status': 'fail', | |
| 'record_count': old_count, | |
| 'previous_count': old_count, | |
| 'is_changed': False | |
| } | |
| def main() -> int: | |
| """ | |
| 主函数 - 执行完整的数据同步流程(每类指标完成后即时上传,并记录状态) | |
| Returns: | |
| int: 退出码,0 表示成功,1 表示失败 | |
| """ | |
| logger.info("=" * 60) | |
| logger.info("Stock Data Sync Started") | |
| logger.info("=" * 60) | |
| try: | |
| # 初始化线程配置 | |
| init_thread_config() | |
| db = get_db() | |
| db.init_db() | |
| # 获取状态管理器 | |
| status = get_sync_status() | |
| # 获取最后交易日 | |
| last_day = get_last_trading_day() | |
| logger.info(f"Last trading day: {last_day}") | |
| # 1. 列表同步 | |
| logger.info("-" * 40) | |
| logger.info("Syncing stock list...") | |
| target_list = get_stock_list() | |
| list_parquet = Path("/tmp/data/stock_list.parquet") | |
| list_parquet.parent.mkdir(parents=True, exist_ok=True) | |
| target_list.to_parquet(list_parquet) | |
| db.upload_indicator("Stock List", list_parquet, "data") | |
| status.update('stock_list', | |
| last_trade_date=last_day, | |
| record_count=len(target_list), | |
| status='success') | |
| # 2. 行情同步 | |
| logger.info("-" * 40) | |
| logger.info("Syncing daily data...") | |
| daily_result = sync_stock_daily(target_list.to_dict('records'), last_day) | |
| # 智能上传日K行情数据(根据变更文件数量选择策略) | |
| parquet_dir = Path("/tmp/data/parquet") | |
| if parquet_dir.exists() and any(parquet_dir.glob("*.parquet")): | |
| db.upload_indicator_smart("Daily Data", parquet_dir, "data/parquet", | |
| daily_result.get('changed_files', [])) | |
| status.update('daily', | |
| last_trade_date=last_day, | |
| record_count=daily_result.get('record_count', 0), | |
| status=daily_result.get('status', 'unknown'), | |
| failed_codes=daily_result.get('failed_codes', []), | |
| success_rate=daily_result.get('success_rate', 0), | |
| message=daily_result.get('message', '')) | |
| # 3. 指数同步 | |
| logger.info("-" * 40) | |
| logger.info("Syncing index data...") | |
| idx_df = get_index_daily('000300') | |
| idx_count = 0 | |
| if idx_df is not None: | |
| idx_path = Path("/tmp/data/parquet/index_000300.parquet") | |
| idx_path.parent.mkdir(parents=True, exist_ok=True) | |
| idx_df.to_parquet(idx_path) | |
| db.upload_indicator("Index Data", idx_path, "data/parquet") | |
| idx_count = len(idx_df) | |
| status.update('index', | |
| last_trade_date=last_day, | |
| record_count=idx_count, | |
| status='success' if idx_count > 0 else 'fail') | |
| # 4. 资金流向同步 | |
| logger.info("-" * 40) | |
| logger.info("Syncing fund flow...") | |
| fund_flow_result = sync_fund_flow(target_list.to_dict('records'), last_day) | |
| # 智能上传资金流向数据(根据变更文件数量选择策略) | |
| fund_flow_dir = Path("/tmp/data/fund_flow") | |
| if fund_flow_dir.exists() and any(fund_flow_dir.glob("*.parquet")): | |
| db.upload_indicator_smart("Fund Flow", fund_flow_dir, "data/fund_flow", | |
| fund_flow_result.get('changed_files', [])) | |
| status.update('fund_flow', | |
| last_trade_date=last_day, | |
| record_count=fund_flow_result.get('record_count', 0), | |
| status=fund_flow_result.get('status', 'unknown'), | |
| failed_codes=fund_flow_result.get('failed_codes', []), | |
| success_rate=fund_flow_result.get('success_rate', 0), | |
| message=fund_flow_result.get('message', '')) | |
| # 5. 估值指标同步 | |
| logger.info("-" * 40) | |
| logger.info("Syncing valuation...") | |
| valuation_result = sync_valuation(target_list.to_dict('records'), last_day) | |
| # 智能上传估值指标数据(根据变更文件数量选择策略) | |
| valuation_dir = Path("/tmp/data/valuation") | |
| if valuation_dir.exists() and any(valuation_dir.glob("*.parquet")): | |
| db.upload_indicator_smart("Valuation", valuation_dir, "data/valuation", | |
| valuation_result.get('changed_files', [])) | |
| status.update('valuation', | |
| last_trade_date=last_day, | |
| record_count=valuation_result.get('record_count', 0), | |
| status=valuation_result.get('status', 'success')) | |
| # 6. 融资融券同步 | |
| logger.info("-" * 40) | |
| logger.info("Syncing margin...") | |
| margin_result = sync_margin(target_list.to_dict('records'), last_day) | |
| # 智能上传融资融券数据(根据变更文件数量选择策略) | |
| margin_dir = Path("/tmp/data/margin") | |
| if margin_dir.exists() and any(margin_dir.glob("*.parquet")): | |
| db.upload_indicator_smart("Margin", margin_dir, "data/margin", | |
| margin_result.get('changed_files', [])) | |
| status.update('margin', | |
| last_trade_date=last_day, | |
| record_count=margin_result.get('record_count', 0), | |
| status=margin_result.get('status', 'success')) | |
| # 7. 财务指标同步 | |
| logger.info("-" * 40) | |
| logger.info("Syncing financial indicator...") | |
| financial_result = sync_financial_indicator(target_list.to_dict('records')) | |
| fi_path = Path("/tmp/data/financial_indicator.parquet") | |
| if fi_path.exists() and financial_result.get('new_records', 0) > 0: | |
| upload_success = db.upload_indicator("Financial Indicator", fi_path, "data") | |
| financial_status = 'success' if upload_success else 'upload_fail' | |
| else: | |
| financial_status = financial_result.get('status', 'skipped') | |
| status.update('financial', | |
| last_trade_date=last_day, | |
| record_count=financial_result.get('record_count', 0), | |
| status=financial_status, | |
| new_records=financial_result.get('new_records', 0)) | |
| # 8. 股东户数同步 | |
| logger.info("-" * 40) | |
| logger.info("Syncing holder num...") | |
| holder_result = sync_holder_num() | |
| holder_path = Path("/tmp/data/holder_num.parquet") | |
| if holder_result.get('is_changed', False) and holder_path.exists(): | |
| upload_success = db.upload_indicator("Holder Num", holder_path, "data") | |
| holder_status = 'success' if upload_success else 'upload_fail' | |
| else: | |
| holder_status = 'skipped' if not holder_result.get('is_changed', False) else holder_result.get('status', 'fail') | |
| status.update('holder_num', | |
| last_trade_date=last_day, | |
| record_count=holder_result.get('record_count', 0), | |
| status=holder_status, | |
| is_changed=holder_result.get('is_changed', False)) | |
| # 9. 分红数据同步 | |
| logger.info("-" * 40) | |
| logger.info("Syncing dividend...") | |
| dividend_result = sync_dividend(target_list.to_dict('records')) | |
| div_path = Path("/tmp/data/dividend.parquet") | |
| if div_path.exists() and dividend_result.get('new_records', 0) > 0: | |
| upload_success = db.upload_indicator("Dividend", div_path, "data") | |
| dividend_status = 'success' if upload_success else 'upload_fail' | |
| else: | |
| dividend_status = dividend_result.get('status', 'skipped') | |
| status.update('dividend', | |
| last_trade_date=last_day, | |
| record_count=dividend_result.get('record_count', 0), | |
| status=dividend_status, | |
| new_records=dividend_result.get('new_records', 0)) | |
| # 10. 十大股东同步 | |
| logger.info("-" * 40) | |
| logger.info("Syncing top holders...") | |
| top_holders_result = sync_top_holders() | |
| top_holders_path = Path("/tmp/data/top_holders.parquet") | |
| if top_holders_result.get('is_changed', False) and top_holders_path.exists(): | |
| upload_success = db.upload_indicator("Top Holders", top_holders_path, "data") | |
| top_holders_status = 'success' if upload_success else 'upload_fail' | |
| else: | |
| top_holders_status = 'skipped' if not top_holders_result.get('is_changed', False) else top_holders_result.get('status', 'fail') | |
| status.update('top_holders', | |
| last_trade_date=last_day, | |
| record_count=top_holders_result.get('record_count', 0), | |
| status=top_holders_status, | |
| is_changed=top_holders_result.get('is_changed', False)) | |
| # 11. 限售解禁同步 | |
| logger.info("-" * 40) | |
| logger.info("Syncing restricted unlock...") | |
| restricted_result = sync_restricted_unlock() | |
| restricted_path = Path("/tmp/data/restricted_unlock.parquet") | |
| if restricted_result.get('is_changed', False) and restricted_path.exists(): | |
| upload_success = db.upload_indicator("Restricted Unlock", restricted_path, "data") | |
| restricted_status = 'success' if upload_success else 'upload_fail' | |
| else: | |
| restricted_status = 'skipped' if not restricted_result.get('is_changed', False) else restricted_result.get('status', 'fail') | |
| status.update('restricted_unlock', | |
| last_trade_date=last_day, | |
| record_count=restricted_result.get('record_count', 0), | |
| status=restricted_status, | |
| is_changed=restricted_result.get('is_changed', False)) | |
| # 12. 上传状态文件 | |
| logger.info("-" * 40) | |
| logger.info("Uploading sync status...") | |
| status_path = Path("/tmp/data/sync_status.json") | |
| status_upload_success = db.upload_indicator("Sync Status", status_path, "data") | |
| if not status_upload_success: | |
| logger.warning("Failed to upload sync status file") | |
| logger.info("=" * 60) | |
| logger.info("Sync Completed Successfully!") | |
| summary = (f"Daily={daily_result.get('count', 0)}, FundFlow={fund_flow_result.get('count', 0)}, " | |
| f"Valuation={valuation_result.get('count', 0)}, Margin={margin_result.get('count', 0)}, Financial={financial_result.get('count', 0)}, " | |
| f"Holder={holder_result.get('count', 0)}, Dividend={dividend_result.get('count', 0)}, " | |
| f"TopHolders={top_holders_result.get('count', 0)}, Restricted={restricted_result.get('count', 0)}") | |
| logger.info(f"Summary: {summary}") | |
| logger.info("=" * 60) | |
| return 0 | |
| except Exception as e: | |
| logger.error(f"Sync failed with error: {e}") | |
| return 1 | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |