sync_stock / app /stock_list_cache.py
superxu520's picture
"feat:persist-stock-list-meta-to-cloud"
7d0ba65
"""
股票列表缓存模块
实现股票列表的元数据缓存机制,减少频繁获取的开销
"""
import json
import hashlib
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Dict, Any
import logging
import pandas as pd
import akshare as ak
from huggingface_hub import hf_hub_download, upload_file, login
logger = logging.getLogger(__name__)
# Dataset配置
DATASET_REPO_ID = os.getenv("DATASET_REPO_ID")
HF_TOKEN = os.getenv("HF_TOKEN")
# 缓存配置
CACHE_VALID_DAYS = 90 # 缓存有效期90天
COUNT_CHANGE_THRESHOLD = 0.05 # 数量变化阈值5%
def _load_stock_list_meta() -> Optional[Dict[str, Any]]:
"""加载股票列表元数据(优先从本地,本地不存在则从云端下载)"""
meta_path = Path("/tmp/data/stock_list.meta.json")
# 1. 优先从本地加载
if meta_path.exists():
try:
with open(meta_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load local stock list meta: {e}")
# 2. 本地不存在,尝试从云端下载
if HF_TOKEN and DATASET_REPO_ID:
try:
downloaded_path = hf_hub_download(
repo_id=DATASET_REPO_ID,
filename="data/stock_list.meta.json",
repo_type="dataset",
local_dir="/tmp/data",
local_dir_use_symlinks=False
)
with open(downloaded_path, 'r', encoding='utf-8') as f:
meta = json.load(f)
logger.info("Downloaded stock list meta from cloud")
return meta
except Exception as e:
logger.debug(f"No stock list meta found in cloud: {e}")
return None
def _save_stock_list_meta(meta: Dict[str, Any]) -> None:
"""保存股票列表元数据"""
meta_path = Path("/tmp/data/stock_list.meta.json")
meta_path.parent.mkdir(parents=True, exist_ok=True)
try:
with open(meta_path, 'w', encoding='utf-8') as f:
json.dump(meta, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.warning(f"Failed to save stock list meta: {e}")
def _calculate_data_hash(df: pd.DataFrame) -> str:
"""计算数据的MD5哈希(用于校验完整性)"""
try:
# 使用code列的排序后字符串计算哈希
codes_str = ','.join(sorted(df['code'].tolist()))
return hashlib.md5(codes_str.encode()).hexdigest()[:8]
except Exception:
return ""
def _is_cache_expired(meta: Optional[Dict[str, Any]]) -> bool:
"""检查缓存是否过期(仅检查时间,不触发网络请求)"""
if meta is None:
return True
# 检查必要字段
if 'expire_time' not in meta:
return True
# 检查是否过期
try:
expire_time = datetime.fromisoformat(meta['expire_time'])
if datetime.now() > expire_time:
logger.info(f"Stock list cache expired (expired at {meta['expire_time']})")
return True
except Exception as e:
logger.warning(f"Failed to parse expire_time: {e}")
return True
return False
def get_cached_stock_list() -> Optional[pd.DataFrame]:
"""尝试获取缓存的股票列表
Returns:
pd.DataFrame: 缓存的股票列表,如果缓存无效则返回None
"""
list_path = Path("/tmp/data/stock_list.parquet")
meta = _load_stock_list_meta()
# 1. 检查缓存是否过期(不触发网络请求)
if _is_cache_expired(meta):
return None
# 2. 检查缓存文件是否存在
if not list_path.exists():
logger.info("Cache meta valid but parquet file missing")
return None
# 3. 读取缓存数据
try:
df = pd.read_parquet(list_path)
# 4. 验证哈希
cached_hash = meta.get('data_hash', '')
current_hash = _calculate_data_hash(df)
if cached_hash and cached_hash != current_hash:
logger.warning(f"Data hash mismatch: cached={cached_hash}, current={current_hash}")
return None
logger.info(f"Using cached stock list: {len(df)} records "
f"(fetched at {meta.get('fetch_time', 'unknown')}, "
f"expires at {meta.get('expire_time', 'unknown')})")
return df
except Exception as e:
logger.warning(f"Failed to load cached stock list: {e}")
return None
def save_stock_list_cache(df: pd.DataFrame) -> None:
"""保存股票列表到缓存(并上传到云端)
Args:
df: 股票列表DataFrame
"""
list_path = Path("/tmp/data/stock_list.parquet")
meta_path = Path("/tmp/data/stock_list.meta.json")
list_path.parent.mkdir(parents=True, exist_ok=True)
try:
# 读取旧缓存元数据(用于检测变化)
old_meta = _load_stock_list_meta()
old_count = old_meta.get('record_count', 0) if old_meta else 0
# 保存数据
df.to_parquet(list_path)
# 计算市场统计
market_stats = df['market'].value_counts().to_dict()
# 生成元数据
now = datetime.now()
meta = {
'fetch_time': now.isoformat(),
'valid_days': CACHE_VALID_DAYS,
'expire_time': (now + timedelta(days=CACHE_VALID_DAYS)).isoformat(),
'record_count': len(df),
'markets': market_stats,
'data_hash': _calculate_data_hash(df),
'version': '1.0'
}
_save_stock_list_meta(meta)
# 检查数量变化(仅记录日志,不影响保存)
if old_count > 0:
change_ratio = abs(len(df) - old_count) / old_count
if change_ratio > COUNT_CHANGE_THRESHOLD:
logger.warning(f"Significant stock count change detected: {old_count} -> {len(df)} ({change_ratio:.1%})")
logger.info(f"Stock list cache saved locally: {len(df)} records, expires at {meta['expire_time']}")
# 上传到云端
if HF_TOKEN and DATASET_REPO_ID:
try:
login(token=HF_TOKEN)
upload_file(
path_or_fileobj=str(meta_path),
path_in_repo="data/stock_list.meta.json",
repo_id=DATASET_REPO_ID,
repo_type="dataset",
commit_message=f"Update stock list meta: {len(df)} records"
)
logger.info("Stock list meta uploaded to cloud")
except Exception as e:
logger.warning(f"Failed to upload stock list meta to cloud: {e}")
except Exception as e:
logger.error(f"Failed to save stock list cache: {e}")
def invalidate_cache() -> None:
"""使缓存失效(用于强制刷新)"""
meta_path = Path("/tmp/data/stock_list.meta.json")
list_path = Path("/tmp/data/stock_list.parquet")
try:
if meta_path.exists():
meta_path.unlink()
if list_path.exists():
list_path.unlink()
logger.info("Stock list cache invalidated")
except Exception as e:
logger.warning(f"Failed to invalidate cache: {e}")
def get_cache_info() -> Optional[Dict[str, Any]]:
"""获取缓存信息(用于调试)"""
meta = _load_stock_list_meta()
if meta is None:
return None
list_path = Path("/tmp/data/stock_list.parquet")
if list_path.exists():
meta['file_exists'] = True
meta['file_size_mb'] = round(list_path.stat().st_size / (1024 * 1024), 2)
else:
meta['file_exists'] = False
# 计算剩余有效期
try:
expire_time = datetime.fromisoformat(meta['expire_time'])
remaining = expire_time - datetime.now()
meta['remaining_days'] = remaining.days
except Exception:
meta['remaining_days'] = -1
return meta