UnifiedFinancialPlatform / app /utils /llm_summarizer.py
Dmitry Beresnev
init project
e189a31
"""OpenAI-compatible LLM summarizer for news items."""
import json
import logging
import os
import time
from typing import Dict, List, Optional, Tuple
import requests
logger = logging.getLogger(__name__)
class OpenAICompatSummarizer:
"""
Summarize news items using an OpenAI-compatible chat completions API.
"""
def __init__(
self,
api_base: Optional[str] = None,
api_key: Optional[str] = None,
model: Optional[str] = None,
timeout: Optional[int] = None,
max_items_per_request: Optional[int] = None,
max_chars_per_item: Optional[int] = None,
max_chars_total: Optional[int] = None,
):
self.api_base = (api_base or os.getenv("LLM_API_BASE") or "https://researchengineering-agi.hf.space").rstrip("/")
self.api_key = api_key if api_key is not None else os.getenv("LLM_API_KEY", "")
self.model = model or os.getenv("LLM_MODEL", "gpt-4o-mini")
self.timeout = timeout or int(os.getenv("LLM_TIMEOUT", "600"))
# Conservative defaults to avoid large token bursts on slow servers.
self.max_items_per_request = max_items_per_request or int(os.getenv("LLM_SUMMARY_BATCH", "2"))
self.max_chars_per_item = max_chars_per_item or int(os.getenv("LLM_SUMMARY_MAX_CHARS", "600"))
self.max_chars_total = max_chars_total or int(os.getenv("LLM_SUMMARY_MAX_CHARS_TOTAL", "1200"))
self.enabled = os.getenv("ENABLE_AI_SUMMARIZATION", "true").lower() in {"1", "true", "yes"}
self.sleep_seconds = float(os.getenv("LLM_SUMMARY_SLEEP_SECONDS", "0"))
self._chat_url = f"{self.api_base}/v1/chat/completions"
def summarize_items(self, items: List[Dict], source: Optional[str] = None) -> List[Dict]:
if not self.enabled or not items:
return items
candidates: List[Tuple[Dict, str]] = []
for item in items:
if str(item.get("summary_ai", "")).strip():
continue
text = self._build_input_text(item)
if text:
candidates.append((item, text))
if not candidates:
return items
chunks = self._chunked(candidates, self.max_items_per_request)
for idx, chunk in enumerate(chunks, start=1):
texts = [text for _, text in chunk]
if self.max_chars_total > 0:
texts = self._truncate_to_total(texts, self.max_chars_total)
summaries = self._summarize_chunk(texts, source=source)
if not summaries:
continue
for (item, _), summary in zip(chunk, summaries):
if summary:
item["summary_ai"] = summary
item["summary"] = summary
if self.sleep_seconds > 0 and idx < len(chunks):
time.sleep(self.sleep_seconds)
return items
def _build_input_text(self, item: Dict) -> str:
title = str(item.get("title", "")).strip()
if title:
source = str(item.get("source", "")).strip()
if len(title) > self.max_chars_per_item:
title = title[: self.max_chars_per_item].rstrip()
if source:
return f"Source: {source}\nTitle: {title}"
return f"Title: {title}"
return ""
def _summarize_chunk(self, texts: List[str], source: Optional[str] = None) -> List[str]:
system_prompt = (
"You are a financial news summarizer. "
"Return concise, factual summaries in 1-2 sentences, <=240 characters each. "
"Do not add speculation or new facts."
)
source_line = f"Source: {source}" if source else ""
items_text = []
for idx, text in enumerate(texts, start=1):
items_text.append(f"{idx}. {text}")
user_prompt = (
"Summarize each item below. "
"Return a JSON array of strings in the same order. "
"No extra text.\n"
f"{source_line}\n\n" + "\n\n".join(items_text)
)
payload = {
"model": self.model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
"temperature": 0.2,
}
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
try:
response = requests.post(self._chat_url, json=payload, headers=headers, timeout=self.timeout)
response.raise_for_status()
data = response.json()
content = (
data.get("choices", [{}])[0]
.get("message", {})
.get("content", "")
.strip()
)
summaries = self._parse_json_array(content)
if summaries and len(summaries) == len(texts):
return summaries
logger.warning("LLM summarizer returned unexpected format or length")
return []
except Exception as exc:
logger.warning(f"LLM summarization failed: {exc}")
return []
def _parse_json_array(self, content: str) -> List[str]:
if not content:
return []
try:
parsed = json.loads(content)
if isinstance(parsed, list):
return [str(x).strip() for x in parsed]
return []
except Exception:
return []
def _chunked(self, items: List[Tuple[Dict, str]], size: int) -> List[List[Tuple[Dict, str]]]:
if size <= 0:
return [items]
return [items[i : i + size] for i in range(0, len(items), size)]
def _truncate_to_total(self, texts: List[str], max_total: int) -> List[str]:
if max_total <= 0:
return texts
truncated = []
total = 0
for text in texts:
if total >= max_total:
break
remaining = max_total - total
if len(text) > remaining:
text = text[:remaining].rstrip()
truncated.append(text)
total += len(text)
return truncated