File size: 1,963 Bytes
3979178
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
import pytz
from core.config import get_settings

class SchedulerManager:
    def __init__(self):
        self.scheduler = AsyncIOScheduler(timezone=pytz.timezone(get_settings().TIMEZONE))
        self._configure_jobs()

    def _configure_jobs(self):
        """配置所有定时任务"""
        from task.scheduled_tasks import (
            daily_task,
            weekly_report_task,
            data_cleanup_task
        )

        # 添加定时任务,注意这些任务都是异步的
        self.scheduler.add_job(
            daily_task,
            trigger=CronTrigger(hour='8', minute='13', timezone=pytz.timezone(get_settings().TIMEZONE)),
            id='daily_task'
        )

        self.scheduler.add_job(
            daily_task,
            trigger=CronTrigger(hour='9', minute='13', timezone=pytz.timezone(get_settings().TIMEZONE)),
            id='daily_task_1'
        )

        self.scheduler.add_job(
            weekly_report_task,
            trigger=CronTrigger(day_of_week='mon', hour='9', timezone=pytz.timezone(get_settings().TIMEZONE)),
            id='weekly_report'
        )

        self.scheduler.add_job(
            data_cleanup_task,
            trigger=CronTrigger(hour='2', timezone=pytz.timezone(get_settings().TIMEZONE)),
            id='data_cleanup'
        )

    async def start(self):
        """异步启动调度器"""
        self.scheduler.start()

    async def shutdown(self):
        """异步关闭调度器"""
        self.scheduler.shutdown()

    def get_jobs(self):
        """获取所有任务"""
        return self.scheduler.get_jobs()

    def add_job(self, func, trigger, **kwargs):
        """添加新任务"""
        return self.scheduler.add_job(func, trigger, **kwargs)

    def remove_job(self, job_id):
        """移除任务"""
        self.scheduler.remove_job(job_id)