| from __future__ import annotations |
|
|
| import logging |
| import os |
| import threading |
| from typing import Any |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class TrendScheduler: |
| """Simple background scheduler for periodic trend matching and notification. |
| |
| Uses threading.Timer for simplicity. Configured via environment variables: |
| - TREND_CHECK_INTERVAL_MINUTES: how often to run (default 30) |
| - TREND_SCHEDULER_ENABLED: set to "false" to disable (default true) |
| """ |
|
|
| def __init__(self): |
| self._timer: threading.Timer | None = None |
| self._running = False |
| self._interval_minutes = int(os.getenv("TREND_CHECK_INTERVAL_MINUTES", "30")) |
| self._enabled = os.getenv("TREND_SCHEDULER_ENABLED", "true").lower() != "false" |
|
|
| @property |
| def interval_seconds(self) -> int: |
| return max(60, self._interval_minutes * 60) |
|
|
| def start(self) -> None: |
| if not self._enabled: |
| logger.info("TrendScheduler 已禁用 (TREND_SCHEDULER_ENABLED=false)") |
| return |
| if self._running: |
| return |
| self._running = True |
| logger.info(f"TrendScheduler 已启动,间隔 {self._interval_minutes} 分钟") |
| self._schedule_next() |
|
|
| def stop(self) -> None: |
| self._running = False |
| if self._timer is not None: |
| self._timer.cancel() |
| self._timer = None |
| logger.info("TrendScheduler 已停止") |
|
|
| def run_now(self) -> dict[str, Any]: |
| """Manually trigger a full matching cycle. Returns summary.""" |
| logger.info("TrendScheduler 手动触发匹配…") |
| try: |
| from app.services.trend_subscription import TrendSubscriptionService |
| from app.services.notification import NotificationService |
|
|
| svc = TrendSubscriptionService() |
| summary = svc.match_all_subscriptions() |
|
|
| |
| if summary["total_new_matches"] > 0: |
| notifier = NotificationService() |
| for sub_result in summary["by_subscription"]: |
| sub = svc.get_subscription(sub_result["subscription_id"]) |
| if not sub or not sub.get("push_enabled"): |
| continue |
| channel_ids = sub.get("push_channel_ids", []) |
| if not channel_ids: |
| continue |
|
|
| match_titles = [m["title"] for m in sub_result["matches"]] |
| title = f"🔥 VideoMemo: {sub['name']} — {len(match_titles)} 条新热点" |
| body = "\n\n".join(f"• {t}" for t in match_titles[:10]) |
| if len(match_titles) > 10: |
| body += f"\n\n…共 {len(match_titles)} 条" |
|
|
| notifier.send_batch(channel_ids, title, body) |
|
|
| logger.info( |
| f"TrendScheduler 匹配完成: " |
| f"{summary['total_subscriptions']} 订阅, " |
| f"{summary['total_new_matches']} 新匹配" |
| ) |
| return summary |
| except Exception: |
| logger.exception("TrendScheduler 匹配出错") |
| return {"error": "匹配过程出错,详见日志"} |
|
|
| def _schedule_next(self) -> None: |
| if not self._running: |
| return |
|
|
| def _tick(): |
| self.run_now() |
| self._schedule_next() |
|
|
| self._timer = threading.Timer(self.interval_seconds, _tick) |
| self._timer.daemon = True |
| self._timer.start() |
|
|
|
|
| |
| _scheduler: TrendScheduler | None = None |
|
|
|
|
| def get_scheduler() -> TrendScheduler: |
| global _scheduler |
| if _scheduler is None: |
| _scheduler = TrendScheduler() |
| return _scheduler |
|
|