Spaces:
Paused
Paused
| """ | |
| 同步状态管理模块 - 跟踪每类指标的同步元数据 | |
| """ | |
| import json | |
| import os | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Any | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| STATUS_FILE = "/tmp/data/sync_status.json" | |
| class SyncStatus: | |
| """同步状态管理器""" | |
| def __init__(self, status_file: str = STATUS_FILE): | |
| self.status_file = Path(status_file) | |
| self.status_file.parent.mkdir(parents=True, exist_ok=True) | |
| self._data = self._load() | |
| def _load(self) -> Dict[str, Any]: | |
| """加载状态文件""" | |
| if self.status_file.exists(): | |
| try: | |
| with open(self.status_file, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| except Exception as e: | |
| logger.warning(f"Failed to load status file: {e}") | |
| return {} | |
| def save(self) -> None: | |
| """保存状态文件""" | |
| try: | |
| with open(self.status_file, 'w', encoding='utf-8') as f: | |
| json.dump(self._data, f, ensure_ascii=False, indent=2) | |
| except Exception as e: | |
| logger.error(f"Failed to save status file: {e}") | |
| def get(self, indicator: str) -> Dict[str, Any]: | |
| """获取指定指标的状态""" | |
| return self._data.get(indicator, {}) | |
| def update(self, indicator: str, **kwargs) -> None: | |
| """ | |
| 更新指定指标的状态 | |
| Args: | |
| indicator: 指标名称 (stock_list, daily, fund_flow, valuation, margin, etc.) | |
| last_trade_date: 最后交易日 | |
| record_count: 记录数 | |
| status: 状态 (success, partial_fail, fail) | |
| failed_codes: 失败的股票代码列表 | |
| success_rate: 成功率 | |
| message: 额外消息 | |
| """ | |
| if indicator not in self._data: | |
| self._data[indicator] = {} | |
| # 更新字段 | |
| self._data[indicator].update({ | |
| 'last_sync_time': datetime.now().isoformat(), | |
| **kwargs | |
| }) | |
| # 保存到文件 | |
| self.save() | |
| logger.info(f"Sync status updated for {indicator}: {kwargs.get('status', 'unknown')}") | |
| def get_failed_codes(self, indicator: str) -> List[str]: | |
| """获取上次同步失败的股票代码""" | |
| status = self.get(indicator) | |
| return status.get('failed_codes', []) | |
| def is_recent(self, indicator: str, trade_date: str) -> bool: | |
| """检查指定指标是否已是最新""" | |
| status = self.get(indicator) | |
| last_trade_date = status.get('last_trade_date') | |
| if last_trade_date and last_trade_date >= trade_date: | |
| return True | |
| return False | |
| def get_summary(self) -> Dict[str, Any]: | |
| """获取所有指标的摘要""" | |
| summary = {} | |
| for indicator, data in self._data.items(): | |
| summary[indicator] = { | |
| 'last_sync_time': data.get('last_sync_time'), | |
| 'last_trade_date': data.get('last_trade_date'), | |
| 'status': data.get('status'), | |
| 'record_count': data.get('record_count', 0), | |
| } | |
| return summary | |
| # 全局状态管理器实例 | |
| _sync_status: Optional[SyncStatus] = None | |
| def get_sync_status() -> SyncStatus: | |
| """获取全局同步状态管理器""" | |
| global _sync_status | |
| if _sync_status is None: | |
| _sync_status = SyncStatus() | |
| return _sync_status | |