"""OpenAI-compatible LLM summarizer for news items.""" import json import logging import os import time from typing import Dict, List, Optional, Tuple import requests logger = logging.getLogger(__name__) class OpenAICompatSummarizer: """ Summarize news items using an OpenAI-compatible chat completions API. """ def __init__( self, api_base: Optional[str] = None, api_key: Optional[str] = None, model: Optional[str] = None, timeout: Optional[int] = None, max_items_per_request: Optional[int] = None, max_chars_per_item: Optional[int] = None, max_chars_total: Optional[int] = None, ): self.api_base = (api_base or os.getenv("LLM_API_BASE") or "https://researchengineering-agi.hf.space").rstrip("/") self.api_key = api_key if api_key is not None else os.getenv("LLM_API_KEY", "") self.model = model or os.getenv("LLM_MODEL", "gpt-4o-mini") self.timeout = timeout or int(os.getenv("LLM_TIMEOUT", "600")) # Conservative defaults to avoid large token bursts on slow servers. self.max_items_per_request = max_items_per_request or int(os.getenv("LLM_SUMMARY_BATCH", "2")) self.max_chars_per_item = max_chars_per_item or int(os.getenv("LLM_SUMMARY_MAX_CHARS", "600")) self.max_chars_total = max_chars_total or int(os.getenv("LLM_SUMMARY_MAX_CHARS_TOTAL", "1200")) self.enabled = os.getenv("ENABLE_AI_SUMMARIZATION", "true").lower() in {"1", "true", "yes"} self.sleep_seconds = float(os.getenv("LLM_SUMMARY_SLEEP_SECONDS", "0")) self._chat_url = f"{self.api_base}/v1/chat/completions" def summarize_items(self, items: List[Dict], source: Optional[str] = None) -> List[Dict]: if not self.enabled or not items: return items candidates: List[Tuple[Dict, str]] = [] for item in items: if str(item.get("summary_ai", "")).strip(): continue text = self._build_input_text(item) if text: candidates.append((item, text)) if not candidates: return items chunks = self._chunked(candidates, self.max_items_per_request) for idx, chunk in enumerate(chunks, start=1): texts = [text for _, text in chunk] if self.max_chars_total > 0: texts = self._truncate_to_total(texts, self.max_chars_total) summaries = self._summarize_chunk(texts, source=source) if not summaries: continue for (item, _), summary in zip(chunk, summaries): if summary: item["summary_ai"] = summary item["summary"] = summary if self.sleep_seconds > 0 and idx < len(chunks): time.sleep(self.sleep_seconds) return items def _build_input_text(self, item: Dict) -> str: title = str(item.get("title", "")).strip() if title: source = str(item.get("source", "")).strip() if len(title) > self.max_chars_per_item: title = title[: self.max_chars_per_item].rstrip() if source: return f"Source: {source}\nTitle: {title}" return f"Title: {title}" return "" def _summarize_chunk(self, texts: List[str], source: Optional[str] = None) -> List[str]: system_prompt = ( "You are a financial news summarizer. " "Return concise, factual summaries in 1-2 sentences, <=240 characters each. " "Do not add speculation or new facts." ) source_line = f"Source: {source}" if source else "" items_text = [] for idx, text in enumerate(texts, start=1): items_text.append(f"{idx}. {text}") user_prompt = ( "Summarize each item below. " "Return a JSON array of strings in the same order. " "No extra text.\n" f"{source_line}\n\n" + "\n\n".join(items_text) ) payload = { "model": self.model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ], "temperature": 0.2, } headers = {"Content-Type": "application/json"} if self.api_key: headers["Authorization"] = f"Bearer {self.api_key}" try: response = requests.post(self._chat_url, json=payload, headers=headers, timeout=self.timeout) response.raise_for_status() data = response.json() content = ( data.get("choices", [{}])[0] .get("message", {}) .get("content", "") .strip() ) summaries = self._parse_json_array(content) if summaries and len(summaries) == len(texts): return summaries logger.warning("LLM summarizer returned unexpected format or length") return [] except Exception as exc: logger.warning(f"LLM summarization failed: {exc}") return [] def _parse_json_array(self, content: str) -> List[str]: if not content: return [] try: parsed = json.loads(content) if isinstance(parsed, list): return [str(x).strip() for x in parsed] return [] except Exception: return [] def _chunked(self, items: List[Tuple[Dict, str]], size: int) -> List[List[Tuple[Dict, str]]]: if size <= 0: return [items] return [items[i : i + size] for i in range(0, len(items), size)] def _truncate_to_total(self, texts: List[str], max_total: int) -> List[str]: if max_total <= 0: return texts truncated = [] total = 0 for text in texts: if total >= max_total: break remaining = max_total - total if len(text) > remaining: text = text[:remaining].rstrip() truncated.append(text) total += len(text) return truncated