Spaces:
Sleeping
Sleeping
| """ | |
| 定时任务调度器 | |
| 使用 APScheduler 实现定时更新汇率数据 | |
| """ | |
| from apscheduler.schedulers.asyncio import AsyncIOScheduler | |
| from apscheduler.triggers.interval import IntervalTrigger | |
| from app.config import Settings | |
| from app.services.exchange_service import ExchangeRateService | |
| from app.utils.logger import logger | |
| class RateScheduler: | |
| """汇率更新定时调度器""" | |
| def __init__(self, exchange_service: ExchangeRateService, settings: Settings): | |
| self.exchange_service = exchange_service | |
| self.settings = settings | |
| self.scheduler = AsyncIOScheduler() | |
| self._is_running = False | |
| async def _update_job(self): | |
| """定时更新任务""" | |
| logger.info("定时汇率更新开始") | |
| try: | |
| success = await self.exchange_service.update_cache() | |
| if success: | |
| cached = self.exchange_service.get_cached_rates() | |
| if cached: | |
| logger.info( | |
| f"定时汇率更新完成。" | |
| f"货币数量: {len(cached.rates)}, " | |
| f"数据源更新时间: {cached.last_update_utc}" | |
| ) | |
| else: | |
| logger.warning("定时汇率更新失败,将在下一个时间间隔重试") | |
| except Exception as e: | |
| logger.error(f"定时更新任务出错: {e}") | |
| async def start(self): | |
| """启动调度器""" | |
| if self._is_running: | |
| logger.warning("调度器已在运行中") | |
| return | |
| logger.info("正在启动汇率调度器...") | |
| # 立即执行一次更新 | |
| logger.info("正在执行初始汇率获取...") | |
| await self._update_job() | |
| # 添加定时任务 | |
| self.scheduler.add_job( | |
| self._update_job, | |
| trigger=IntervalTrigger(seconds=self.settings.CACHE_UPDATE_INTERVAL), | |
| id='rate_update', | |
| name='定时更新汇率', | |
| replace_existing=True, | |
| max_instances=1 # 确保同一时间只有一个任务实例运行 | |
| ) | |
| self.scheduler.start() | |
| self._is_running = True | |
| logger.info( | |
| f"调度器启动成功。" | |
| f"更新间隔: {self.settings.CACHE_UPDATE_INTERVAL} 秒 " | |
| f"({self.settings.CACHE_UPDATE_INTERVAL / 60:.1f} 分钟)" | |
| ) | |
| def stop(self): | |
| """停止调度器""" | |
| if not self._is_running: | |
| logger.warning("调度器未在运行") | |
| return | |
| self.scheduler.shutdown(wait=False) | |
| self._is_running = False | |
| logger.info("调度器已停止") | |
| def is_running(self) -> bool: | |
| """检查调度器是否正在运行""" | |
| return self._is_running | |
| def get_next_run_time(self): | |
| """获取下次运行时间""" | |
| job = self.scheduler.get_job('rate_update') | |
| if job: | |
| return job.next_run_time | |
| return None | |