sync_stock / app /sync_status.py
superxu520's picture
"feat:sync-status-tracking"
42c3910
"""
同步状态管理模块 - 跟踪每类指标的同步元数据
"""
import json
import os
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Any
import logging
logger = logging.getLogger(__name__)
STATUS_FILE = "/tmp/data/sync_status.json"
class SyncStatus:
"""同步状态管理器"""
def __init__(self, status_file: str = STATUS_FILE):
self.status_file = Path(status_file)
self.status_file.parent.mkdir(parents=True, exist_ok=True)
self._data = self._load()
def _load(self) -> Dict[str, Any]:
"""加载状态文件"""
if self.status_file.exists():
try:
with open(self.status_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load status file: {e}")
return {}
def save(self) -> None:
"""保存状态文件"""
try:
with open(self.status_file, 'w', encoding='utf-8') as f:
json.dump(self._data, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"Failed to save status file: {e}")
def get(self, indicator: str) -> Dict[str, Any]:
"""获取指定指标的状态"""
return self._data.get(indicator, {})
def update(self, indicator: str, **kwargs) -> None:
"""
更新指定指标的状态
Args:
indicator: 指标名称 (stock_list, daily, fund_flow, valuation, margin, etc.)
last_trade_date: 最后交易日
record_count: 记录数
status: 状态 (success, partial_fail, fail)
failed_codes: 失败的股票代码列表
success_rate: 成功率
message: 额外消息
"""
if indicator not in self._data:
self._data[indicator] = {}
# 更新字段
self._data[indicator].update({
'last_sync_time': datetime.now().isoformat(),
**kwargs
})
# 保存到文件
self.save()
logger.info(f"Sync status updated for {indicator}: {kwargs.get('status', 'unknown')}")
def get_failed_codes(self, indicator: str) -> List[str]:
"""获取上次同步失败的股票代码"""
status = self.get(indicator)
return status.get('failed_codes', [])
def is_recent(self, indicator: str, trade_date: str) -> bool:
"""检查指定指标是否已是最新"""
status = self.get(indicator)
last_trade_date = status.get('last_trade_date')
if last_trade_date and last_trade_date >= trade_date:
return True
return False
def get_summary(self) -> Dict[str, Any]:
"""获取所有指标的摘要"""
summary = {}
for indicator, data in self._data.items():
summary[indicator] = {
'last_sync_time': data.get('last_sync_time'),
'last_trade_date': data.get('last_trade_date'),
'status': data.get('status'),
'record_count': data.get('record_count', 0),
}
return summary
# 全局状态管理器实例
_sync_status: Optional[SyncStatus] = None
def get_sync_status() -> SyncStatus:
"""获取全局同步状态管理器"""
global _sync_status
if _sync_status is None:
_sync_status = SyncStatus()
return _sync_status