""" 定时任务调度器 使用 APScheduler 实现定时更新汇率数据 """ from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger from app.config import Settings from app.services.exchange_service import ExchangeRateService from app.utils.logger import logger class RateScheduler: """汇率更新定时调度器""" def __init__(self, exchange_service: ExchangeRateService, settings: Settings): self.exchange_service = exchange_service self.settings = settings self.scheduler = AsyncIOScheduler() self._is_running = False async def _update_job(self): """定时更新任务""" logger.info("定时汇率更新开始") try: success = await self.exchange_service.update_cache() if success: cached = self.exchange_service.get_cached_rates() if cached: logger.info( f"定时汇率更新完成。" f"货币数量: {len(cached.rates)}, " f"数据源更新时间: {cached.last_update_utc}" ) else: logger.warning("定时汇率更新失败,将在下一个时间间隔重试") except Exception as e: logger.error(f"定时更新任务出错: {e}") async def start(self): """启动调度器""" if self._is_running: logger.warning("调度器已在运行中") return logger.info("正在启动汇率调度器...") # 立即执行一次更新 logger.info("正在执行初始汇率获取...") await self._update_job() # 添加定时任务 self.scheduler.add_job( self._update_job, trigger=IntervalTrigger(seconds=self.settings.CACHE_UPDATE_INTERVAL), id='rate_update', name='定时更新汇率', replace_existing=True, max_instances=1 # 确保同一时间只有一个任务实例运行 ) self.scheduler.start() self._is_running = True logger.info( f"调度器启动成功。" f"更新间隔: {self.settings.CACHE_UPDATE_INTERVAL} 秒 " f"({self.settings.CACHE_UPDATE_INTERVAL / 60:.1f} 分钟)" ) def stop(self): """停止调度器""" if not self._is_running: logger.warning("调度器未在运行") return self.scheduler.shutdown(wait=False) self._is_running = False logger.info("调度器已停止") def is_running(self) -> bool: """检查调度器是否正在运行""" return self._is_running def get_next_run_time(self): """获取下次运行时间""" job = self.scheduler.get_job('rate_update') if job: return job.next_run_time return None