""" ai_processor.py ─────────────── AI layer powered by Groq API — llama-3.1-8b-instant (FREE tier). Free-tier limits (Groq — completely free, no credit card): • 14,400 RPD (requests per day) ← handles up to 14,000 news/day • 30 RPM (requests per minute) • 131,072 TPM (tokens per minute) Design goal: ONE API call per message to stay well within limits. The single call does two things simultaneously: 1. Semantic duplicate detection — is this news already covered recently? 2. Twitter/X-style English translation — punchy, hash-tagged, emoji-rich. At 1,000 news/day → 1,000 RPD / ~0.7 RPM → uses only 7% of the free quota. """ from __future__ import annotations import asyncio import json import logging import re from collections import deque from typing import Optional, Tuple from groq import Groq log = logging.getLogger("telegram_monitor.ai") # ───────────────────────────────────────────────────────────────────────────── # Prompt engineering # ───────────────────────────────────────────────────────────────────────────── _SYSTEM_PROMPT = """You are a professional news editor and social-media expert. Your job for each incoming news snippet: STEP 1 — DUPLICATE CHECK Compare the NEW news against the RECENT NEWS LIST. Mark as duplicate if they describe the **same event or story** — even when phrased differently, in different languages, or from different sources. Do NOT mark as duplicate just because they share the same topic/sport/team. STEP 2 — TWITTER TRANSLATION (only if NOT a duplicate) Translate the news into English and rewrite it as a viral Twitter/X post: • Maximum 240 characters (including hashtags) • Start with a strong hook or emoji • Include 2–4 relevant hashtags (e.g. #Football #PremierLeague #Transfer) • Use emojis naturally — do NOT stuff them • Keep it factual but punchy • If the news mentions a score, keep the exact numbers RESPONSE FORMAT — Respond ONLY with valid JSON, no extra text, no markdown: { "is_duplicate": true or false, "duplicate_reason": "one-sentence reason if duplicate, otherwise null", "twitter_text": "the English tweet if not duplicate, otherwise null" }""" # ═════════════════════════════════════════════════════════════════════════════ # Main processor class # ═════════════════════════════════════════════════════════════════════════════ class AIProcessor: """Combined semantic deduplicator + Twitter formatter using Groq. Parameters ---------- api_key: Groq API key (free tier — 14,400 requests/day, no credit card needed). similarity_threshold: Reserved for config compatibility — Groq/LLM decides duplication. window_size: Number of recent news items kept in the sliding context window. model: Groq model to use. Default: llama-3.1-8b-instant (14,400 RPD free). Alternative: llama-3.3-70b-versatile (better quality, 1,000 RPD). """ _DEFAULT_MODEL = "llama-3.1-8b-instant" # 14,400 RPD free _FALLBACK_MODEL = "gemma2-9b-it" # 14,400 RPD free (backup) # Context window limits _CONTEXT_ITEMS = 50 # max recent news sent per request _ITEM_PREVIEW = 220 # max chars per item in context _NEWS_PREVIEW = 600 # max chars of incoming news def __init__( self, api_key: str, similarity_threshold: float = 0.80, window_size: int = 200, model: str = _DEFAULT_MODEL, ) -> None: self._threshold = similarity_threshold self._window: deque[str] = deque(maxlen=window_size) self._model = model self._max_retries = 3 self._retry_base_delay = 5.0 # Groq client (synchronous — we run it in a thread pool for async) self._client = Groq(api_key=api_key) log.info( "AIProcessor initialised | provider=Groq | model=%s | window=%d", model, window_size, ) # ── Public API ──────────────────────────────────────────────────────────── async def process(self, text: str) -> Tuple[bool, Optional[str]]: """Analyse *text* for duplication and produce a Twitter translation. Returns ------- (is_duplicate, twitter_text) • is_duplicate=True → caller should silently skip this message. • twitter_text → ready-to-send English tweet (None if dup). """ clean = text.strip() if not clean: return False, None prompt = self._build_prompt(clean) try: result = await self._call_groq(prompt) is_dup: bool = bool(result.get("is_duplicate", False)) twitter: Optional[str] = result.get("twitter_text") or None if is_dup: reason = result.get("duplicate_reason") or "same story" log.info("🔄 Duplicate suppressed — %s", reason) return True, None # Persist to sliding window self._window.append(clean[: self._ITEM_PREVIEW]) if twitter: log.info("🐦 Tweet ready (%d chars): %s…", len(twitter), twitter[:70]) else: log.warning("Groq returned no twitter_text — will forward original.") return False, twitter except Exception as exc: log.warning( "AI processing failed — forwarding original without AI: %s", exc ) # Fail-open: never block genuine news due to AI errors self._window.append(clean[: self._ITEM_PREVIEW]) return False, None # ── Internal helpers ────────────────────────────────────────────────────── def _build_prompt(self, text: str) -> str: recent = list(self._window)[-self._CONTEXT_ITEMS:] if recent: numbered = "\n".join( f" {i + 1}. {item[: self._ITEM_PREVIEW]}" for i, item in enumerate(recent) ) context_block = f"\nRECENT NEWS LIST ({len(recent)} items):\n{numbered}\n" else: context_block = "\nRECENT NEWS LIST: (empty — this is the first item)\n" return ( f"NEW NEWS:\n\"{text[: self._NEWS_PREVIEW]}\"\n" f"{context_block}\n" "Apply STEP 1 and STEP 2. Return JSON only." ) async def _call_groq(self, prompt: str) -> dict: """Run the Groq API call in a thread pool (Groq SDK is synchronous). Retries up to ``_max_retries`` times on rate-limit (429) or transient errors with exponential back-off. """ loop = asyncio.get_event_loop() last_exc: Exception = RuntimeError("No attempts made") for attempt in range(self._max_retries + 1): try: response = await loop.run_in_executor( None, lambda: self._client.chat.completions.create( model=self._model, messages=[ {"role": "system", "content": _SYSTEM_PROMPT}, {"role": "user", "content": prompt}, ], temperature=0.2, max_tokens=300, response_format={"type": "json_object"}, ), ) raw = response.choices[0].message.content.strip() # Strip accidental markdown fences raw = re.sub(r"^```(?:json)?\s*", "", raw, flags=re.IGNORECASE) raw = re.sub(r"\s*```$", "", raw) return json.loads(raw) except Exception as exc: last_exc = exc err_str = str(exc) # Check for rate-limit or transient server errors is_retryable = any( code in err_str for code in ("429", "rate_limit", "503", "500", "timeout") ) if is_retryable and attempt < self._max_retries: delay = self._retry_base_delay * (2 ** attempt) log.warning( "Groq rate-limit (attempt %d/%d) — retrying in %.0fs…", attempt + 1, self._max_retries, delay, ) await asyncio.sleep(delay) else: raise last_exc raise last_exc # Should never reach here