UnifiedFinancialPlatform / app /utils /ai_summary_worker.py
Dmitry Beresnev
init project
e189a31
"""Background worker process for AI summarization."""
import os
import time
import logging
import signal
import sqlite3
from typing import List, Tuple
from utils.llm_summarizer import OpenAICompatSummarizer
from utils.ai_summary_store import (
init_storage,
fetch_ready_batches,
store_summaries,
BATCH_MAX_CHARS,
BUFFER_SECONDS,
)
logger = logging.getLogger(__name__)
PID_FILE = os.getenv("AI_SUMMARY_WORKER_PID", "/tmp/ai_summary_worker.pid")
POLL_SECONDS = int(os.getenv("AI_SUMMARY_POLL_SECONDS", "5"))
MAX_RETRIES = int(os.getenv("LLM_SUMMARY_RETRIES", "3"))
class Worker:
def __init__(self):
self._stop = False
self.summarizer = OpenAICompatSummarizer()
def stop(self, *_args):
self._stop = True
def run(self):
init_storage()
signal.signal(signal.SIGTERM, self.stop)
signal.signal(signal.SIGINT, self.stop)
while not self._stop:
try:
batches = fetch_ready_batches(BATCH_MAX_CHARS, BUFFER_SECONDS)
for batch in batches:
self._process_batch(batch)
except sqlite3.Error as exc:
logger.warning(f"AI worker DB error: {exc}")
except Exception as exc:
logger.warning(f"AI worker error: {exc}")
time.sleep(POLL_SECONDS)
def _process_batch(self, batch: List[Tuple[str, str, str]]):
if not batch or not self.summarizer.enabled:
return
texts = []
for _, title, source in batch:
if source:
texts.append(f"Source: {source}\nTitle: {title}")
else:
texts.append(f"Title: {title}")
for attempt in range(1, MAX_RETRIES + 1):
summaries = self.summarizer._summarize_chunk(texts, source="dashboard")
if summaries and len(summaries) == len(batch):
break
if attempt < MAX_RETRIES:
time.sleep(2 ** attempt)
else:
logger.warning("AI worker failed to summarize batch after retries")
return
to_store = []
for (item_key, title, source), summary in zip(batch, summaries):
if not summary:
continue
to_store.append((item_key, title, source, summary))
if to_store:
store_summaries(to_store)
def _pid_running(pid: int) -> bool:
try:
os.kill(pid, 0)
return True
except Exception:
return False
def start_worker_if_needed():
if os.path.exists(PID_FILE):
try:
with open(PID_FILE, "r", encoding="utf-8") as f:
pid = int(f.read().strip() or 0)
if pid and _pid_running(pid):
return
except Exception:
pass
pid = os.fork()
if pid != 0:
return
os.setsid()
with open(PID_FILE, "w", encoding="utf-8") as f:
f.write(str(os.getpid()))
worker = Worker()
worker.run()