videoNote / backend /app /services /scheduler.py
zhoujiaangyao
deploy videomemo backend to HF Space
6cfe55f
Raw
History Blame Contribute Delete
3.77 kB
from __future__ import annotations
import logging
import os
import threading
from typing import Any
logger = logging.getLogger(__name__)
class TrendScheduler:
"""Simple background scheduler for periodic trend matching and notification.
Uses threading.Timer for simplicity. Configured via environment variables:
- TREND_CHECK_INTERVAL_MINUTES: how often to run (default 30)
- TREND_SCHEDULER_ENABLED: set to "false" to disable (default true)
"""
def __init__(self):
self._timer: threading.Timer | None = None
self._running = False
self._interval_minutes = int(os.getenv("TREND_CHECK_INTERVAL_MINUTES", "30"))
self._enabled = os.getenv("TREND_SCHEDULER_ENABLED", "true").lower() != "false"
@property
def interval_seconds(self) -> int:
return max(60, self._interval_minutes * 60) # minimum 1 minute
def start(self) -> None:
if not self._enabled:
logger.info("TrendScheduler 已禁用 (TREND_SCHEDULER_ENABLED=false)")
return
if self._running:
return
self._running = True
logger.info(f"TrendScheduler 已启动,间隔 {self._interval_minutes} 分钟")
self._schedule_next()
def stop(self) -> None:
self._running = False
if self._timer is not None:
self._timer.cancel()
self._timer = None
logger.info("TrendScheduler 已停止")
def run_now(self) -> dict[str, Any]:
"""Manually trigger a full matching cycle. Returns summary."""
logger.info("TrendScheduler 手动触发匹配…")
try:
from app.services.trend_subscription import TrendSubscriptionService
from app.services.notification import NotificationService
svc = TrendSubscriptionService()
summary = svc.match_all_subscriptions()
# Send notifications for subscriptions with new matches
if summary["total_new_matches"] > 0:
notifier = NotificationService()
for sub_result in summary["by_subscription"]:
sub = svc.get_subscription(sub_result["subscription_id"])
if not sub or not sub.get("push_enabled"):
continue
channel_ids = sub.get("push_channel_ids", [])
if not channel_ids:
continue
match_titles = [m["title"] for m in sub_result["matches"]]
title = f"🔥 VideoMemo: {sub['name']}{len(match_titles)} 条新热点"
body = "\n\n".join(f"• {t}" for t in match_titles[:10])
if len(match_titles) > 10:
body += f"\n\n…共 {len(match_titles)} 条"
notifier.send_batch(channel_ids, title, body)
logger.info(
f"TrendScheduler 匹配完成: "
f"{summary['total_subscriptions']} 订阅, "
f"{summary['total_new_matches']} 新匹配"
)
return summary
except Exception:
logger.exception("TrendScheduler 匹配出错")
return {"error": "匹配过程出错,详见日志"}
def _schedule_next(self) -> None:
if not self._running:
return
def _tick():
self.run_now()
self._schedule_next()
self._timer = threading.Timer(self.interval_seconds, _tick)
self._timer.daemon = True
self._timer.start()
# Module-level singleton
_scheduler: TrendScheduler | None = None
def get_scheduler() -> TrendScheduler:
global _scheduler
if _scheduler is None:
_scheduler = TrendScheduler()
return _scheduler