Spaces:
Running
Running
| """ | |
| ai_processor.py | |
| ─────────────── | |
| AI layer powered by Groq API — llama-3.1-8b-instant (FREE tier). | |
| Free-tier limits (Groq — completely free, no credit card): | |
| • 14,400 RPD (requests per day) ← handles up to 14,000 news/day | |
| • 30 RPM (requests per minute) | |
| • 131,072 TPM (tokens per minute) | |
| Design goal: ONE API call per message to stay well within limits. | |
| The single call does two things simultaneously: | |
| 1. Semantic duplicate detection — is this news already covered recently? | |
| 2. Twitter/X-style English translation — punchy, hash-tagged, emoji-rich. | |
| At 1,000 news/day → 1,000 RPD / ~0.7 RPM → uses only 7% of the free quota. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import logging | |
| import re | |
| from collections import deque | |
| from typing import Optional, Tuple | |
| from groq import Groq | |
| log = logging.getLogger("telegram_monitor.ai") | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| # Prompt engineering | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| _SYSTEM_PROMPT = """You are a professional news editor and social-media expert. | |
| Your job for each incoming news snippet: | |
| STEP 1 — DUPLICATE CHECK | |
| Compare the NEW news against the RECENT NEWS LIST. | |
| Mark as duplicate if they describe the **same event or story** — even when | |
| phrased differently, in different languages, or from different sources. | |
| Do NOT mark as duplicate just because they share the same topic/sport/team. | |
| STEP 2 — TWITTER TRANSLATION (only if NOT a duplicate) | |
| Translate the news into English and rewrite it as a viral Twitter/X post: | |
| • Maximum 240 characters (including hashtags) | |
| • Start with a strong hook or emoji | |
| • Include 2–4 relevant hashtags (e.g. #Football #PremierLeague #Transfer) | |
| • Use emojis naturally — do NOT stuff them | |
| • Keep it factual but punchy | |
| • If the news mentions a score, keep the exact numbers | |
| RESPONSE FORMAT — Respond ONLY with valid JSON, no extra text, no markdown: | |
| { | |
| "is_duplicate": true or false, | |
| "duplicate_reason": "one-sentence reason if duplicate, otherwise null", | |
| "twitter_text": "the English tweet if not duplicate, otherwise null" | |
| }""" | |
| # ═════════════════════════════════════════════════════════════════════════════ | |
| # Main processor class | |
| # ═════════════════════════════════════════════════════════════════════════════ | |
| class AIProcessor: | |
| """Combined semantic deduplicator + Twitter formatter using Groq. | |
| Parameters | |
| ---------- | |
| api_key: | |
| Groq API key (free tier — 14,400 requests/day, no credit card needed). | |
| similarity_threshold: | |
| Reserved for config compatibility — Groq/LLM decides duplication. | |
| window_size: | |
| Number of recent news items kept in the sliding context window. | |
| model: | |
| Groq model to use. Default: llama-3.1-8b-instant (14,400 RPD free). | |
| Alternative: llama-3.3-70b-versatile (better quality, 1,000 RPD). | |
| """ | |
| _DEFAULT_MODEL = "llama-3.1-8b-instant" # 14,400 RPD free | |
| _FALLBACK_MODEL = "gemma2-9b-it" # 14,400 RPD free (backup) | |
| # Context window limits | |
| _CONTEXT_ITEMS = 50 # max recent news sent per request | |
| _ITEM_PREVIEW = 220 # max chars per item in context | |
| _NEWS_PREVIEW = 600 # max chars of incoming news | |
| def __init__( | |
| self, | |
| api_key: str, | |
| similarity_threshold: float = 0.80, | |
| window_size: int = 200, | |
| model: str = _DEFAULT_MODEL, | |
| ) -> None: | |
| self._threshold = similarity_threshold | |
| self._window: deque[str] = deque(maxlen=window_size) | |
| self._model = model | |
| self._max_retries = 3 | |
| self._retry_base_delay = 5.0 | |
| # Groq client (synchronous — we run it in a thread pool for async) | |
| self._client = Groq(api_key=api_key) | |
| log.info( | |
| "AIProcessor initialised | provider=Groq | model=%s | window=%d", | |
| model, window_size, | |
| ) | |
| # ── Public API ──────────────────────────────────────────────────────────── | |
| async def process(self, text: str) -> Tuple[bool, Optional[str]]: | |
| """Analyse *text* for duplication and produce a Twitter translation. | |
| Returns | |
| ------- | |
| (is_duplicate, twitter_text) | |
| • is_duplicate=True → caller should silently skip this message. | |
| • twitter_text → ready-to-send English tweet (None if dup). | |
| """ | |
| clean = text.strip() | |
| if not clean: | |
| return False, None | |
| prompt = self._build_prompt(clean) | |
| try: | |
| result = await self._call_groq(prompt) | |
| is_dup: bool = bool(result.get("is_duplicate", False)) | |
| twitter: Optional[str] = result.get("twitter_text") or None | |
| if is_dup: | |
| reason = result.get("duplicate_reason") or "same story" | |
| log.info("🔄 Duplicate suppressed — %s", reason) | |
| return True, None | |
| # Persist to sliding window | |
| self._window.append(clean[: self._ITEM_PREVIEW]) | |
| if twitter: | |
| log.info("🐦 Tweet ready (%d chars): %s…", len(twitter), twitter[:70]) | |
| else: | |
| log.warning("Groq returned no twitter_text — will forward original.") | |
| return False, twitter | |
| except Exception as exc: | |
| log.warning( | |
| "AI processing failed — forwarding original without AI: %s", exc | |
| ) | |
| # Fail-open: never block genuine news due to AI errors | |
| self._window.append(clean[: self._ITEM_PREVIEW]) | |
| return False, None | |
| # ── Internal helpers ────────────────────────────────────────────────────── | |
| def _build_prompt(self, text: str) -> str: | |
| recent = list(self._window)[-self._CONTEXT_ITEMS:] | |
| if recent: | |
| numbered = "\n".join( | |
| f" {i + 1}. {item[: self._ITEM_PREVIEW]}" | |
| for i, item in enumerate(recent) | |
| ) | |
| context_block = f"\nRECENT NEWS LIST ({len(recent)} items):\n{numbered}\n" | |
| else: | |
| context_block = "\nRECENT NEWS LIST: (empty — this is the first item)\n" | |
| return ( | |
| f"NEW NEWS:\n\"{text[: self._NEWS_PREVIEW]}\"\n" | |
| f"{context_block}\n" | |
| "Apply STEP 1 and STEP 2. Return JSON only." | |
| ) | |
| async def _call_groq(self, prompt: str) -> dict: | |
| """Run the Groq API call in a thread pool (Groq SDK is synchronous). | |
| Retries up to ``_max_retries`` times on rate-limit (429) or transient | |
| errors with exponential back-off. | |
| """ | |
| loop = asyncio.get_event_loop() | |
| last_exc: Exception = RuntimeError("No attempts made") | |
| for attempt in range(self._max_retries + 1): | |
| try: | |
| response = await loop.run_in_executor( | |
| None, | |
| lambda: self._client.chat.completions.create( | |
| model=self._model, | |
| messages=[ | |
| {"role": "system", "content": _SYSTEM_PROMPT}, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| temperature=0.2, | |
| max_tokens=300, | |
| response_format={"type": "json_object"}, | |
| ), | |
| ) | |
| raw = response.choices[0].message.content.strip() | |
| # Strip accidental markdown fences | |
| raw = re.sub(r"^```(?:json)?\s*", "", raw, flags=re.IGNORECASE) | |
| raw = re.sub(r"\s*```$", "", raw) | |
| return json.loads(raw) | |
| except Exception as exc: | |
| last_exc = exc | |
| err_str = str(exc) | |
| # Check for rate-limit or transient server errors | |
| is_retryable = any( | |
| code in err_str | |
| for code in ("429", "rate_limit", "503", "500", "timeout") | |
| ) | |
| if is_retryable and attempt < self._max_retries: | |
| delay = self._retry_base_delay * (2 ** attempt) | |
| log.warning( | |
| "Groq rate-limit (attempt %d/%d) — retrying in %.0fs…", | |
| attempt + 1, self._max_retries, delay, | |
| ) | |
| await asyncio.sleep(delay) | |
| else: | |
| raise last_exc | |
| raise last_exc # Should never reach here | |