Spaces:
Paused
Paused
| """ | |
| 股票列表缓存模块 | |
| 实现股票列表的元数据缓存机制,减少频繁获取的开销 | |
| """ | |
| import json | |
| import hashlib | |
| import os | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from typing import Optional, Dict, Any | |
| import logging | |
| import pandas as pd | |
| import akshare as ak | |
| from huggingface_hub import hf_hub_download, upload_file, login | |
| logger = logging.getLogger(__name__) | |
| # Dataset配置 | |
| DATASET_REPO_ID = os.getenv("DATASET_REPO_ID") | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| # 缓存配置 | |
| CACHE_VALID_DAYS = 90 # 缓存有效期90天 | |
| COUNT_CHANGE_THRESHOLD = 0.05 # 数量变化阈值5% | |
| def _load_stock_list_meta() -> Optional[Dict[str, Any]]: | |
| """加载股票列表元数据(优先从本地,本地不存在则从云端下载)""" | |
| meta_path = Path("/tmp/data/stock_list.meta.json") | |
| # 1. 优先从本地加载 | |
| if meta_path.exists(): | |
| try: | |
| with open(meta_path, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| except Exception as e: | |
| logger.warning(f"Failed to load local stock list meta: {e}") | |
| # 2. 本地不存在,尝试从云端下载 | |
| if HF_TOKEN and DATASET_REPO_ID: | |
| try: | |
| downloaded_path = hf_hub_download( | |
| repo_id=DATASET_REPO_ID, | |
| filename="data/stock_list.meta.json", | |
| repo_type="dataset", | |
| local_dir="/tmp/data", | |
| local_dir_use_symlinks=False | |
| ) | |
| with open(downloaded_path, 'r', encoding='utf-8') as f: | |
| meta = json.load(f) | |
| logger.info("Downloaded stock list meta from cloud") | |
| return meta | |
| except Exception as e: | |
| logger.debug(f"No stock list meta found in cloud: {e}") | |
| return None | |
| def _save_stock_list_meta(meta: Dict[str, Any]) -> None: | |
| """保存股票列表元数据""" | |
| meta_path = Path("/tmp/data/stock_list.meta.json") | |
| meta_path.parent.mkdir(parents=True, exist_ok=True) | |
| try: | |
| with open(meta_path, 'w', encoding='utf-8') as f: | |
| json.dump(meta, f, ensure_ascii=False, indent=2) | |
| except Exception as e: | |
| logger.warning(f"Failed to save stock list meta: {e}") | |
| def _calculate_data_hash(df: pd.DataFrame) -> str: | |
| """计算数据的MD5哈希(用于校验完整性)""" | |
| try: | |
| # 使用code列的排序后字符串计算哈希 | |
| codes_str = ','.join(sorted(df['code'].tolist())) | |
| return hashlib.md5(codes_str.encode()).hexdigest()[:8] | |
| except Exception: | |
| return "" | |
| def _is_cache_expired(meta: Optional[Dict[str, Any]]) -> bool: | |
| """检查缓存是否过期(仅检查时间,不触发网络请求)""" | |
| if meta is None: | |
| return True | |
| # 检查必要字段 | |
| if 'expire_time' not in meta: | |
| return True | |
| # 检查是否过期 | |
| try: | |
| expire_time = datetime.fromisoformat(meta['expire_time']) | |
| if datetime.now() > expire_time: | |
| logger.info(f"Stock list cache expired (expired at {meta['expire_time']})") | |
| return True | |
| except Exception as e: | |
| logger.warning(f"Failed to parse expire_time: {e}") | |
| return True | |
| return False | |
| def get_cached_stock_list() -> Optional[pd.DataFrame]: | |
| """尝试获取缓存的股票列表 | |
| Returns: | |
| pd.DataFrame: 缓存的股票列表,如果缓存无效则返回None | |
| """ | |
| list_path = Path("/tmp/data/stock_list.parquet") | |
| meta = _load_stock_list_meta() | |
| # 1. 检查缓存是否过期(不触发网络请求) | |
| if _is_cache_expired(meta): | |
| return None | |
| # 2. 检查缓存文件是否存在 | |
| if not list_path.exists(): | |
| logger.info("Cache meta valid but parquet file missing") | |
| return None | |
| # 3. 读取缓存数据 | |
| try: | |
| df = pd.read_parquet(list_path) | |
| # 4. 验证哈希 | |
| cached_hash = meta.get('data_hash', '') | |
| current_hash = _calculate_data_hash(df) | |
| if cached_hash and cached_hash != current_hash: | |
| logger.warning(f"Data hash mismatch: cached={cached_hash}, current={current_hash}") | |
| return None | |
| logger.info(f"Using cached stock list: {len(df)} records " | |
| f"(fetched at {meta.get('fetch_time', 'unknown')}, " | |
| f"expires at {meta.get('expire_time', 'unknown')})") | |
| return df | |
| except Exception as e: | |
| logger.warning(f"Failed to load cached stock list: {e}") | |
| return None | |
| def save_stock_list_cache(df: pd.DataFrame) -> None: | |
| """保存股票列表到缓存(并上传到云端) | |
| Args: | |
| df: 股票列表DataFrame | |
| """ | |
| list_path = Path("/tmp/data/stock_list.parquet") | |
| meta_path = Path("/tmp/data/stock_list.meta.json") | |
| list_path.parent.mkdir(parents=True, exist_ok=True) | |
| try: | |
| # 读取旧缓存元数据(用于检测变化) | |
| old_meta = _load_stock_list_meta() | |
| old_count = old_meta.get('record_count', 0) if old_meta else 0 | |
| # 保存数据 | |
| df.to_parquet(list_path) | |
| # 计算市场统计 | |
| market_stats = df['market'].value_counts().to_dict() | |
| # 生成元数据 | |
| now = datetime.now() | |
| meta = { | |
| 'fetch_time': now.isoformat(), | |
| 'valid_days': CACHE_VALID_DAYS, | |
| 'expire_time': (now + timedelta(days=CACHE_VALID_DAYS)).isoformat(), | |
| 'record_count': len(df), | |
| 'markets': market_stats, | |
| 'data_hash': _calculate_data_hash(df), | |
| 'version': '1.0' | |
| } | |
| _save_stock_list_meta(meta) | |
| # 检查数量变化(仅记录日志,不影响保存) | |
| if old_count > 0: | |
| change_ratio = abs(len(df) - old_count) / old_count | |
| if change_ratio > COUNT_CHANGE_THRESHOLD: | |
| logger.warning(f"Significant stock count change detected: {old_count} -> {len(df)} ({change_ratio:.1%})") | |
| logger.info(f"Stock list cache saved locally: {len(df)} records, expires at {meta['expire_time']}") | |
| # 上传到云端 | |
| if HF_TOKEN and DATASET_REPO_ID: | |
| try: | |
| login(token=HF_TOKEN) | |
| upload_file( | |
| path_or_fileobj=str(meta_path), | |
| path_in_repo="data/stock_list.meta.json", | |
| repo_id=DATASET_REPO_ID, | |
| repo_type="dataset", | |
| commit_message=f"Update stock list meta: {len(df)} records" | |
| ) | |
| logger.info("Stock list meta uploaded to cloud") | |
| except Exception as e: | |
| logger.warning(f"Failed to upload stock list meta to cloud: {e}") | |
| except Exception as e: | |
| logger.error(f"Failed to save stock list cache: {e}") | |
| def invalidate_cache() -> None: | |
| """使缓存失效(用于强制刷新)""" | |
| meta_path = Path("/tmp/data/stock_list.meta.json") | |
| list_path = Path("/tmp/data/stock_list.parquet") | |
| try: | |
| if meta_path.exists(): | |
| meta_path.unlink() | |
| if list_path.exists(): | |
| list_path.unlink() | |
| logger.info("Stock list cache invalidated") | |
| except Exception as e: | |
| logger.warning(f"Failed to invalidate cache: {e}") | |
| def get_cache_info() -> Optional[Dict[str, Any]]: | |
| """获取缓存信息(用于调试)""" | |
| meta = _load_stock_list_meta() | |
| if meta is None: | |
| return None | |
| list_path = Path("/tmp/data/stock_list.parquet") | |
| if list_path.exists(): | |
| meta['file_exists'] = True | |
| meta['file_size_mb'] = round(list_path.stat().st_size / (1024 * 1024), 2) | |
| else: | |
| meta['file_exists'] = False | |
| # 计算剩余有效期 | |
| try: | |
| expire_time = datetime.fromisoformat(meta['expire_time']) | |
| remaining = expire_time - datetime.now() | |
| meta['remaining_days'] = remaining.days | |
| except Exception: | |
| meta['remaining_days'] = -1 | |
| return meta | |