File size: 5,130 Bytes
430d0f8
a0f27fa
 
 
 
 
 
 
 
 
430d0f8
a0f27fa
430d0f8
 
a0f27fa
 
430d0f8
 
a0f27fa
430d0f8
 
 
 
 
a0f27fa
430d0f8
 
 
 
a0f27fa
 
430d0f8
 
 
 
 
 
 
 
 
 
 
a0f27fa
430d0f8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a0f27fa
430d0f8
 
 
 
 
 
a0f27fa
430d0f8
a0f27fa
430d0f8
a0f27fa
 
 
430d0f8
 
 
 
 
 
 
 
 
a0f27fa
 
430d0f8
 
 
a0f27fa
 
 
430d0f8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
"""APScheduler β€” configurable pipeline trigger running inside the web process."""

import logging

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger

log = logging.getLogger(__name__)
scheduler = BackgroundScheduler()

JOB_ID = "weekly_run"

# Default: Sunday 22:00 UTC
_DEFAULT_CRON = CronTrigger(day_of_week="sun", hour=22, minute=0)


def _parse_cron_trigger(cron_str: str) -> CronTrigger | None:
    """Parse a 5-field cron string into a CronTrigger.

    Returns None for empty string (manual-only mode).
    Falls back to default schedule on parse errors.
    """
    if not cron_str or not cron_str.strip():
        return None

    parts = cron_str.strip().split()
    if len(parts) != 5:
        log.warning("Invalid cron string '%s' (expected 5 fields) β€” using default", cron_str)
        return _DEFAULT_CRON

    try:
        return CronTrigger(
            minute=parts[0],
            hour=parts[1],
            day=parts[2],
            month=parts[3],
            day_of_week=parts[4],
        )
    except (ValueError, TypeError) as e:
        log.warning("Failed to parse cron '%s': %s β€” using default", cron_str, e)
        return _DEFAULT_CRON


def weekly_run():
    """Run all enabled pipelines: aiml β†’ security β†’ github β†’ events β†’ reports."""
    from src.config import is_pipeline_enabled

    log.info("Starting scheduled run ...")

    if is_pipeline_enabled("aiml"):
        try:
            from src.pipelines.aiml import run_aiml_pipeline
            from src.scoring import rescore_top, score_run

            aiml_run_id = run_aiml_pipeline()
            score_run(aiml_run_id, "aiml")
            rescore_top(aiml_run_id, "aiml")

            from src.web.app import _enrich_s2, _generate_report
            _enrich_s2(aiml_run_id, "aiml")
            _generate_report(aiml_run_id, "aiml")
        except Exception:
            log.exception("AI/ML pipeline failed")
    else:
        log.info("AI/ML pipeline disabled β€” skipping")

    if is_pipeline_enabled("security"):
        try:
            from src.pipelines.security import run_security_pipeline
            from src.scoring import rescore_top, score_run

            sec_run_id = run_security_pipeline()
            score_run(sec_run_id, "security")
            rescore_top(sec_run_id, "security")

            from src.web.app import _enrich_s2, _generate_report
            _enrich_s2(sec_run_id, "security")
            _generate_report(sec_run_id, "security")
        except Exception:
            log.exception("Security pipeline failed")
    else:
        log.info("Security pipeline disabled β€” skipping")

    if is_pipeline_enabled("github"):
        try:
            from src.pipelines.github import run_github_pipeline
            run_github_pipeline()
        except Exception:
            log.exception("GitHub pipeline failed")
    else:
        log.info("GitHub pipeline disabled β€” skipping")

    if is_pipeline_enabled("events"):
        try:
            from src.pipelines.events import run_events_pipeline
            run_events_pipeline()
        except Exception:
            log.exception("Events pipeline failed")
    else:
        log.info("Events pipeline disabled β€” skipping")

    # Recompute preferences after scoring so adjusted rankings reflect new data
    try:
        from src.preferences import compute_preferences
        from src.db import get_signal_counts
        counts = get_signal_counts()
        if sum(counts.values()) > 0:
            compute_preferences()
            log.info("Preferences recomputed after scheduled run")
    except Exception:
        log.exception("Post-run preference recompute failed")

    log.info("Scheduled run complete")


def start_scheduler():
    """Start the background scheduler with the configured cron job."""
    from src.config import SCHEDULE_CRON

    trigger = _parse_cron_trigger(SCHEDULE_CRON)
    if trigger is None:
        log.info("Schedule set to manual β€” no automatic jobs")
        scheduler.start()
        return

    scheduler.add_job(
        weekly_run,
        trigger=trigger,
        id=JOB_ID,
        name="Scheduled research pipeline",
        replace_existing=True,
    )
    scheduler.start()
    log.info("Scheduler started with cron: %s", SCHEDULE_CRON)


def reschedule(cron_str: str | None = None):
    """Update the scheduler with a new cron string at runtime.

    If cron_str is None, reads from current config.
    """
    if cron_str is None:
        from src.config import SCHEDULE_CRON
        cron_str = SCHEDULE_CRON

    # Remove existing job if present
    try:
        scheduler.remove_job(JOB_ID)
    except Exception:
        pass  # Job may not exist

    trigger = _parse_cron_trigger(cron_str)
    if trigger is None:
        log.info("Rescheduled to manual β€” removed automatic job")
        return

    scheduler.add_job(
        weekly_run,
        trigger=trigger,
        id=JOB_ID,
        name="Scheduled research pipeline",
        replace_existing=True,
    )
    log.info("Rescheduled with cron: %s", cron_str)