exchangeRates / app /services /scheduler.py
Hugo-Jiang's picture
Add application file
954be92
"""
定时任务调度器
使用 APScheduler 实现定时更新汇率数据
"""
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from app.config import Settings
from app.services.exchange_service import ExchangeRateService
from app.utils.logger import logger
class RateScheduler:
"""汇率更新定时调度器"""
def __init__(self, exchange_service: ExchangeRateService, settings: Settings):
self.exchange_service = exchange_service
self.settings = settings
self.scheduler = AsyncIOScheduler()
self._is_running = False
async def _update_job(self):
"""定时更新任务"""
logger.info("定时汇率更新开始")
try:
success = await self.exchange_service.update_cache()
if success:
cached = self.exchange_service.get_cached_rates()
if cached:
logger.info(
f"定时汇率更新完成。"
f"货币数量: {len(cached.rates)}, "
f"数据源更新时间: {cached.last_update_utc}"
)
else:
logger.warning("定时汇率更新失败,将在下一个时间间隔重试")
except Exception as e:
logger.error(f"定时更新任务出错: {e}")
async def start(self):
"""启动调度器"""
if self._is_running:
logger.warning("调度器已在运行中")
return
logger.info("正在启动汇率调度器...")
# 立即执行一次更新
logger.info("正在执行初始汇率获取...")
await self._update_job()
# 添加定时任务
self.scheduler.add_job(
self._update_job,
trigger=IntervalTrigger(seconds=self.settings.CACHE_UPDATE_INTERVAL),
id='rate_update',
name='定时更新汇率',
replace_existing=True,
max_instances=1 # 确保同一时间只有一个任务实例运行
)
self.scheduler.start()
self._is_running = True
logger.info(
f"调度器启动成功。"
f"更新间隔: {self.settings.CACHE_UPDATE_INTERVAL} 秒 "
f"({self.settings.CACHE_UPDATE_INTERVAL / 60:.1f} 分钟)"
)
def stop(self):
"""停止调度器"""
if not self._is_running:
logger.warning("调度器未在运行")
return
self.scheduler.shutdown(wait=False)
self._is_running = False
logger.info("调度器已停止")
def is_running(self) -> bool:
"""检查调度器是否正在运行"""
return self._is_running
def get_next_run_time(self):
"""获取下次运行时间"""
job = self.scheduler.get_job('rate_update')
if job:
return job.next_run_time
return None