Spaces:
Sleeping
Sleeping
File size: 5,130 Bytes
430d0f8 a0f27fa 430d0f8 a0f27fa 430d0f8 a0f27fa 430d0f8 a0f27fa 430d0f8 a0f27fa 430d0f8 a0f27fa 430d0f8 a0f27fa 430d0f8 a0f27fa 430d0f8 a0f27fa 430d0f8 a0f27fa 430d0f8 a0f27fa 430d0f8 a0f27fa 430d0f8 a0f27fa 430d0f8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 | """APScheduler β configurable pipeline trigger running inside the web process."""
import logging
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
log = logging.getLogger(__name__)
scheduler = BackgroundScheduler()
JOB_ID = "weekly_run"
# Default: Sunday 22:00 UTC
_DEFAULT_CRON = CronTrigger(day_of_week="sun", hour=22, minute=0)
def _parse_cron_trigger(cron_str: str) -> CronTrigger | None:
"""Parse a 5-field cron string into a CronTrigger.
Returns None for empty string (manual-only mode).
Falls back to default schedule on parse errors.
"""
if not cron_str or not cron_str.strip():
return None
parts = cron_str.strip().split()
if len(parts) != 5:
log.warning("Invalid cron string '%s' (expected 5 fields) β using default", cron_str)
return _DEFAULT_CRON
try:
return CronTrigger(
minute=parts[0],
hour=parts[1],
day=parts[2],
month=parts[3],
day_of_week=parts[4],
)
except (ValueError, TypeError) as e:
log.warning("Failed to parse cron '%s': %s β using default", cron_str, e)
return _DEFAULT_CRON
def weekly_run():
"""Run all enabled pipelines: aiml β security β github β events β reports."""
from src.config import is_pipeline_enabled
log.info("Starting scheduled run ...")
if is_pipeline_enabled("aiml"):
try:
from src.pipelines.aiml import run_aiml_pipeline
from src.scoring import rescore_top, score_run
aiml_run_id = run_aiml_pipeline()
score_run(aiml_run_id, "aiml")
rescore_top(aiml_run_id, "aiml")
from src.web.app import _enrich_s2, _generate_report
_enrich_s2(aiml_run_id, "aiml")
_generate_report(aiml_run_id, "aiml")
except Exception:
log.exception("AI/ML pipeline failed")
else:
log.info("AI/ML pipeline disabled β skipping")
if is_pipeline_enabled("security"):
try:
from src.pipelines.security import run_security_pipeline
from src.scoring import rescore_top, score_run
sec_run_id = run_security_pipeline()
score_run(sec_run_id, "security")
rescore_top(sec_run_id, "security")
from src.web.app import _enrich_s2, _generate_report
_enrich_s2(sec_run_id, "security")
_generate_report(sec_run_id, "security")
except Exception:
log.exception("Security pipeline failed")
else:
log.info("Security pipeline disabled β skipping")
if is_pipeline_enabled("github"):
try:
from src.pipelines.github import run_github_pipeline
run_github_pipeline()
except Exception:
log.exception("GitHub pipeline failed")
else:
log.info("GitHub pipeline disabled β skipping")
if is_pipeline_enabled("events"):
try:
from src.pipelines.events import run_events_pipeline
run_events_pipeline()
except Exception:
log.exception("Events pipeline failed")
else:
log.info("Events pipeline disabled β skipping")
# Recompute preferences after scoring so adjusted rankings reflect new data
try:
from src.preferences import compute_preferences
from src.db import get_signal_counts
counts = get_signal_counts()
if sum(counts.values()) > 0:
compute_preferences()
log.info("Preferences recomputed after scheduled run")
except Exception:
log.exception("Post-run preference recompute failed")
log.info("Scheduled run complete")
def start_scheduler():
"""Start the background scheduler with the configured cron job."""
from src.config import SCHEDULE_CRON
trigger = _parse_cron_trigger(SCHEDULE_CRON)
if trigger is None:
log.info("Schedule set to manual β no automatic jobs")
scheduler.start()
return
scheduler.add_job(
weekly_run,
trigger=trigger,
id=JOB_ID,
name="Scheduled research pipeline",
replace_existing=True,
)
scheduler.start()
log.info("Scheduler started with cron: %s", SCHEDULE_CRON)
def reschedule(cron_str: str | None = None):
"""Update the scheduler with a new cron string at runtime.
If cron_str is None, reads from current config.
"""
if cron_str is None:
from src.config import SCHEDULE_CRON
cron_str = SCHEDULE_CRON
# Remove existing job if present
try:
scheduler.remove_job(JOB_ID)
except Exception:
pass # Job may not exist
trigger = _parse_cron_trigger(cron_str)
if trigger is None:
log.info("Rescheduled to manual β removed automatic job")
return
scheduler.add_job(
weekly_run,
trigger=trigger,
id=JOB_ID,
name="Scheduled research pipeline",
replace_existing=True,
)
log.info("Rescheduled with cron: %s", cron_str)
|