PepBielsaBot / ai_processor.py
thehnx's picture
Initial commit: Telegram Monitor Bot with AI + HF persistent store + Render deployment
da49547
Raw
History Blame Contribute Delete
9.39 kB
"""
ai_processor.py
───────────────
AI layer powered by Groq API — llama-3.1-8b-instant (FREE tier).
Free-tier limits (Groq — completely free, no credit card):
• 14,400 RPD (requests per day) ← handles up to 14,000 news/day
• 30 RPM (requests per minute)
• 131,072 TPM (tokens per minute)
Design goal: ONE API call per message to stay well within limits.
The single call does two things simultaneously:
1. Semantic duplicate detection — is this news already covered recently?
2. Twitter/X-style English translation — punchy, hash-tagged, emoji-rich.
At 1,000 news/day → 1,000 RPD / ~0.7 RPM → uses only 7% of the free quota.
"""
from __future__ import annotations
import asyncio
import json
import logging
import re
from collections import deque
from typing import Optional, Tuple
from groq import Groq
log = logging.getLogger("telegram_monitor.ai")
# ─────────────────────────────────────────────────────────────────────────────
# Prompt engineering
# ─────────────────────────────────────────────────────────────────────────────
_SYSTEM_PROMPT = """You are a professional news editor and social-media expert.
Your job for each incoming news snippet:
STEP 1 — DUPLICATE CHECK
Compare the NEW news against the RECENT NEWS LIST.
Mark as duplicate if they describe the **same event or story** — even when
phrased differently, in different languages, or from different sources.
Do NOT mark as duplicate just because they share the same topic/sport/team.
STEP 2 — TWITTER TRANSLATION (only if NOT a duplicate)
Translate the news into English and rewrite it as a viral Twitter/X post:
• Maximum 240 characters (including hashtags)
• Start with a strong hook or emoji
• Include 2–4 relevant hashtags (e.g. #Football #PremierLeague #Transfer)
• Use emojis naturally — do NOT stuff them
• Keep it factual but punchy
• If the news mentions a score, keep the exact numbers
RESPONSE FORMAT — Respond ONLY with valid JSON, no extra text, no markdown:
{
"is_duplicate": true or false,
"duplicate_reason": "one-sentence reason if duplicate, otherwise null",
"twitter_text": "the English tweet if not duplicate, otherwise null"
}"""
# ═════════════════════════════════════════════════════════════════════════════
# Main processor class
# ═════════════════════════════════════════════════════════════════════════════
class AIProcessor:
"""Combined semantic deduplicator + Twitter formatter using Groq.
Parameters
----------
api_key:
Groq API key (free tier — 14,400 requests/day, no credit card needed).
similarity_threshold:
Reserved for config compatibility — Groq/LLM decides duplication.
window_size:
Number of recent news items kept in the sliding context window.
model:
Groq model to use. Default: llama-3.1-8b-instant (14,400 RPD free).
Alternative: llama-3.3-70b-versatile (better quality, 1,000 RPD).
"""
_DEFAULT_MODEL = "llama-3.1-8b-instant" # 14,400 RPD free
_FALLBACK_MODEL = "gemma2-9b-it" # 14,400 RPD free (backup)
# Context window limits
_CONTEXT_ITEMS = 50 # max recent news sent per request
_ITEM_PREVIEW = 220 # max chars per item in context
_NEWS_PREVIEW = 600 # max chars of incoming news
def __init__(
self,
api_key: str,
similarity_threshold: float = 0.80,
window_size: int = 200,
model: str = _DEFAULT_MODEL,
) -> None:
self._threshold = similarity_threshold
self._window: deque[str] = deque(maxlen=window_size)
self._model = model
self._max_retries = 3
self._retry_base_delay = 5.0
# Groq client (synchronous — we run it in a thread pool for async)
self._client = Groq(api_key=api_key)
log.info(
"AIProcessor initialised | provider=Groq | model=%s | window=%d",
model, window_size,
)
# ── Public API ────────────────────────────────────────────────────────────
async def process(self, text: str) -> Tuple[bool, Optional[str]]:
"""Analyse *text* for duplication and produce a Twitter translation.
Returns
-------
(is_duplicate, twitter_text)
• is_duplicate=True → caller should silently skip this message.
• twitter_text → ready-to-send English tweet (None if dup).
"""
clean = text.strip()
if not clean:
return False, None
prompt = self._build_prompt(clean)
try:
result = await self._call_groq(prompt)
is_dup: bool = bool(result.get("is_duplicate", False))
twitter: Optional[str] = result.get("twitter_text") or None
if is_dup:
reason = result.get("duplicate_reason") or "same story"
log.info("🔄 Duplicate suppressed — %s", reason)
return True, None
# Persist to sliding window
self._window.append(clean[: self._ITEM_PREVIEW])
if twitter:
log.info("🐦 Tweet ready (%d chars): %s…", len(twitter), twitter[:70])
else:
log.warning("Groq returned no twitter_text — will forward original.")
return False, twitter
except Exception as exc:
log.warning(
"AI processing failed — forwarding original without AI: %s", exc
)
# Fail-open: never block genuine news due to AI errors
self._window.append(clean[: self._ITEM_PREVIEW])
return False, None
# ── Internal helpers ──────────────────────────────────────────────────────
def _build_prompt(self, text: str) -> str:
recent = list(self._window)[-self._CONTEXT_ITEMS:]
if recent:
numbered = "\n".join(
f" {i + 1}. {item[: self._ITEM_PREVIEW]}"
for i, item in enumerate(recent)
)
context_block = f"\nRECENT NEWS LIST ({len(recent)} items):\n{numbered}\n"
else:
context_block = "\nRECENT NEWS LIST: (empty — this is the first item)\n"
return (
f"NEW NEWS:\n\"{text[: self._NEWS_PREVIEW]}\"\n"
f"{context_block}\n"
"Apply STEP 1 and STEP 2. Return JSON only."
)
async def _call_groq(self, prompt: str) -> dict:
"""Run the Groq API call in a thread pool (Groq SDK is synchronous).
Retries up to ``_max_retries`` times on rate-limit (429) or transient
errors with exponential back-off.
"""
loop = asyncio.get_event_loop()
last_exc: Exception = RuntimeError("No attempts made")
for attempt in range(self._max_retries + 1):
try:
response = await loop.run_in_executor(
None,
lambda: self._client.chat.completions.create(
model=self._model,
messages=[
{"role": "system", "content": _SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
temperature=0.2,
max_tokens=300,
response_format={"type": "json_object"},
),
)
raw = response.choices[0].message.content.strip()
# Strip accidental markdown fences
raw = re.sub(r"^```(?:json)?\s*", "", raw, flags=re.IGNORECASE)
raw = re.sub(r"\s*```$", "", raw)
return json.loads(raw)
except Exception as exc:
last_exc = exc
err_str = str(exc)
# Check for rate-limit or transient server errors
is_retryable = any(
code in err_str
for code in ("429", "rate_limit", "503", "500", "timeout")
)
if is_retryable and attempt < self._max_retries:
delay = self._retry_base_delay * (2 ** attempt)
log.warning(
"Groq rate-limit (attempt %d/%d) — retrying in %.0fs…",
attempt + 1, self._max_retries, delay,
)
await asyncio.sleep(delay)
else:
raise last_exc
raise last_exc # Should never reach here