| """Background worker process for AI summarization.""" |
|
|
| import os |
| import time |
| import logging |
| import signal |
| import sqlite3 |
| from typing import List, Tuple |
|
|
| from utils.llm_summarizer import OpenAICompatSummarizer |
| from utils.ai_summary_store import ( |
| init_storage, |
| fetch_ready_batches, |
| store_summaries, |
| BATCH_MAX_CHARS, |
| BUFFER_SECONDS, |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
| PID_FILE = os.getenv("AI_SUMMARY_WORKER_PID", "/tmp/ai_summary_worker.pid") |
| POLL_SECONDS = int(os.getenv("AI_SUMMARY_POLL_SECONDS", "5")) |
| MAX_RETRIES = int(os.getenv("LLM_SUMMARY_RETRIES", "3")) |
|
|
|
|
| class Worker: |
| def __init__(self): |
| self._stop = False |
| self.summarizer = OpenAICompatSummarizer() |
|
|
| def stop(self, *_args): |
| self._stop = True |
|
|
| def run(self): |
| init_storage() |
| signal.signal(signal.SIGTERM, self.stop) |
| signal.signal(signal.SIGINT, self.stop) |
|
|
| while not self._stop: |
| try: |
| batches = fetch_ready_batches(BATCH_MAX_CHARS, BUFFER_SECONDS) |
| for batch in batches: |
| self._process_batch(batch) |
| except sqlite3.Error as exc: |
| logger.warning(f"AI worker DB error: {exc}") |
| except Exception as exc: |
| logger.warning(f"AI worker error: {exc}") |
|
|
| time.sleep(POLL_SECONDS) |
|
|
| def _process_batch(self, batch: List[Tuple[str, str, str]]): |
| if not batch or not self.summarizer.enabled: |
| return |
|
|
| texts = [] |
| for _, title, source in batch: |
| if source: |
| texts.append(f"Source: {source}\nTitle: {title}") |
| else: |
| texts.append(f"Title: {title}") |
|
|
| for attempt in range(1, MAX_RETRIES + 1): |
| summaries = self.summarizer._summarize_chunk(texts, source="dashboard") |
| if summaries and len(summaries) == len(batch): |
| break |
| if attempt < MAX_RETRIES: |
| time.sleep(2 ** attempt) |
| else: |
| logger.warning("AI worker failed to summarize batch after retries") |
| return |
|
|
| to_store = [] |
| for (item_key, title, source), summary in zip(batch, summaries): |
| if not summary: |
| continue |
| to_store.append((item_key, title, source, summary)) |
|
|
| if to_store: |
| store_summaries(to_store) |
|
|
|
|
| def _pid_running(pid: int) -> bool: |
| try: |
| os.kill(pid, 0) |
| return True |
| except Exception: |
| return False |
|
|
|
|
| def start_worker_if_needed(): |
| if os.path.exists(PID_FILE): |
| try: |
| with open(PID_FILE, "r", encoding="utf-8") as f: |
| pid = int(f.read().strip() or 0) |
| if pid and _pid_running(pid): |
| return |
| except Exception: |
| pass |
|
|
| pid = os.fork() |
| if pid != 0: |
| return |
|
|
| os.setsid() |
| with open(PID_FILE, "w", encoding="utf-8") as f: |
| f.write(str(os.getpid())) |
|
|
| worker = Worker() |
| worker.run() |
|
|