ml / core /scheduler.py
devin15's picture
Upload 31 files
3979178 verified
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
import pytz
from core.config import get_settings
class SchedulerManager:
def __init__(self):
self.scheduler = AsyncIOScheduler(timezone=pytz.timezone(get_settings().TIMEZONE))
self._configure_jobs()
def _configure_jobs(self):
"""配置所有定时任务"""
from task.scheduled_tasks import (
daily_task,
weekly_report_task,
data_cleanup_task
)
# 添加定时任务,注意这些任务都是异步的
self.scheduler.add_job(
daily_task,
trigger=CronTrigger(hour='8', minute='13', timezone=pytz.timezone(get_settings().TIMEZONE)),
id='daily_task'
)
self.scheduler.add_job(
daily_task,
trigger=CronTrigger(hour='9', minute='13', timezone=pytz.timezone(get_settings().TIMEZONE)),
id='daily_task_1'
)
self.scheduler.add_job(
weekly_report_task,
trigger=CronTrigger(day_of_week='mon', hour='9', timezone=pytz.timezone(get_settings().TIMEZONE)),
id='weekly_report'
)
self.scheduler.add_job(
data_cleanup_task,
trigger=CronTrigger(hour='2', timezone=pytz.timezone(get_settings().TIMEZONE)),
id='data_cleanup'
)
async def start(self):
"""异步启动调度器"""
self.scheduler.start()
async def shutdown(self):
"""异步关闭调度器"""
self.scheduler.shutdown()
def get_jobs(self):
"""获取所有任务"""
return self.scheduler.get_jobs()
def add_job(self, func, trigger, **kwargs):
"""添加新任务"""
return self.scheduler.add_job(func, trigger, **kwargs)
def remove_job(self, job_id):
"""移除任务"""
self.scheduler.remove_job(job_id)