from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger import pytz from core.config import get_settings class SchedulerManager: def __init__(self): self.scheduler = AsyncIOScheduler(timezone=pytz.timezone(get_settings().TIMEZONE)) self._configure_jobs() def _configure_jobs(self): """配置所有定时任务""" from task.scheduled_tasks import ( daily_task, weekly_report_task, data_cleanup_task ) # 添加定时任务,注意这些任务都是异步的 self.scheduler.add_job( daily_task, trigger=CronTrigger(hour='8', minute='13', timezone=pytz.timezone(get_settings().TIMEZONE)), id='daily_task' ) self.scheduler.add_job( daily_task, trigger=CronTrigger(hour='9', minute='13', timezone=pytz.timezone(get_settings().TIMEZONE)), id='daily_task_1' ) self.scheduler.add_job( weekly_report_task, trigger=CronTrigger(day_of_week='mon', hour='9', timezone=pytz.timezone(get_settings().TIMEZONE)), id='weekly_report' ) self.scheduler.add_job( data_cleanup_task, trigger=CronTrigger(hour='2', timezone=pytz.timezone(get_settings().TIMEZONE)), id='data_cleanup' ) async def start(self): """异步启动调度器""" self.scheduler.start() async def shutdown(self): """异步关闭调度器""" self.scheduler.shutdown() def get_jobs(self): """获取所有任务""" return self.scheduler.get_jobs() def add_job(self, func, trigger, **kwargs): """添加新任务""" return self.scheduler.add_job(func, trigger, **kwargs) def remove_job(self, job_id): """移除任务""" self.scheduler.remove_job(job_id)