"""Background worker process for AI summarization.""" import os import time import logging import signal import sqlite3 from typing import List, Tuple from utils.llm_summarizer import OpenAICompatSummarizer from utils.ai_summary_store import ( init_storage, fetch_ready_batches, store_summaries, BATCH_MAX_CHARS, BUFFER_SECONDS, ) logger = logging.getLogger(__name__) PID_FILE = os.getenv("AI_SUMMARY_WORKER_PID", "/tmp/ai_summary_worker.pid") POLL_SECONDS = int(os.getenv("AI_SUMMARY_POLL_SECONDS", "5")) MAX_RETRIES = int(os.getenv("LLM_SUMMARY_RETRIES", "3")) class Worker: def __init__(self): self._stop = False self.summarizer = OpenAICompatSummarizer() def stop(self, *_args): self._stop = True def run(self): init_storage() signal.signal(signal.SIGTERM, self.stop) signal.signal(signal.SIGINT, self.stop) while not self._stop: try: batches = fetch_ready_batches(BATCH_MAX_CHARS, BUFFER_SECONDS) for batch in batches: self._process_batch(batch) except sqlite3.Error as exc: logger.warning(f"AI worker DB error: {exc}") except Exception as exc: logger.warning(f"AI worker error: {exc}") time.sleep(POLL_SECONDS) def _process_batch(self, batch: List[Tuple[str, str, str]]): if not batch or not self.summarizer.enabled: return texts = [] for _, title, source in batch: if source: texts.append(f"Source: {source}\nTitle: {title}") else: texts.append(f"Title: {title}") for attempt in range(1, MAX_RETRIES + 1): summaries = self.summarizer._summarize_chunk(texts, source="dashboard") if summaries and len(summaries) == len(batch): break if attempt < MAX_RETRIES: time.sleep(2 ** attempt) else: logger.warning("AI worker failed to summarize batch after retries") return to_store = [] for (item_key, title, source), summary in zip(batch, summaries): if not summary: continue to_store.append((item_key, title, source, summary)) if to_store: store_summaries(to_store) def _pid_running(pid: int) -> bool: try: os.kill(pid, 0) return True except Exception: return False def start_worker_if_needed(): if os.path.exists(PID_FILE): try: with open(PID_FILE, "r", encoding="utf-8") as f: pid = int(f.read().strip() or 0) if pid and _pid_running(pid): return except Exception: pass pid = os.fork() if pid != 0: return os.setsid() with open(PID_FILE, "w", encoding="utf-8") as f: f.write(str(os.getpid())) worker = Worker() worker.run()