ResearchRadar / app /core /telegram_bot.py
ak0601's picture
Update app/core/telegram_bot.py
0e84075 verified
"""
ResearchRadar β€” Telegram Bot notification system.
Sends formatted paper digests to the user's Telegram chat.
Replaces plyer notifications for phone delivery.
"""
from __future__ import annotations
import json
import logging
import os
from typing import Dict, List, Optional
import requests
from app.core.models import Digest, Paper
from app.core.config import CATEGORY_LABELS
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
_CONFIG_KEYS = ('telegram_bot_token', 'telegram_chat_id')
def _load_telegram_config(data_dir: str) -> dict:
"""Load Telegram config from settings.json."""
path = os.path.join(data_dir, 'settings.json')
if not os.path.exists(path):
return {}
try:
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return {}
def _get_credentials(data_dir: str) -> tuple:
"""
Get bot token and chat ID from settings or environment variables.
Priority: env vars > settings.json
"""
config = _load_telegram_config(data_dir)
token = (
os.getenv('TELEGRAM_BOT_TOKEN')
or config.get('telegram_bot_token', '')
)
chat_id = (
os.getenv('TELEGRAM_CHAT_ID')
or config.get('telegram_chat_id', '')
)
return token, chat_id
# ---------------------------------------------------------------------------
# Message formatting
# ---------------------------------------------------------------------------
def _format_paper(rank: int, paper: Paper) -> str:
"""Format a single paper as a Telegram message block."""
# Authors (first author + et al.)
if paper.authors:
if len(paper.authors) > 2:
authors = f"{paper.authors[0]} et al."
else:
authors = ", ".join(paper.authors)
else:
authors = "Unknown"
# Score badge
score = f"{paper.composite_score:.2f}"
lines = [
f"*{rank}.* [{paper.title}]({paper.abstract_url})",
f" πŸ‘€ _{authors}_",
f" πŸ“… {paper.published_date.isoformat()} β€’ πŸ“Š Score: {score} β€’ πŸ“ Citations: {paper.citation_count}",
]
# LLM Summary (Structured)
if paper.summary_llm:
lines.append("")
lines.append(f"πŸ€– *AI Summary:*")
# Indent the summary for readability
for slink in paper.summary_llm.split('\n'):
if slink.strip():
lines.append(f" _{slink.strip()}_")
if paper.pdf_url:
lines.append("")
lines.append(f" πŸ“„ [PDF]({paper.pdf_url})")
return "\n".join(lines)
def format_digest_message(digest: Digest) -> str:
"""Format a full digest as a Telegram-ready Markdown message."""
lines = [
"πŸ“‘ *ResearchRadar β€” Daily Paper Digest*",
f"πŸ“… Week of {digest.week_start.isoformat()}",
f"πŸ• Generated: {digest.generated_at.strftime('%Y-%m-%d %H:%M UTC')}",
"",
]
total_papers = 0
for cat_slug, papers in digest.papers.items():
if not papers:
continue
cat_name = CATEGORY_LABELS.get(cat_slug, cat_slug.title())
total_papers += len(papers)
lines.append(f"━━━━━━━━━━━━━━━━━━━━")
lines.append(f"πŸ”¬ *{cat_name}* ({len(papers)} papers)")
lines.append("")
for i, paper in enumerate(papers, 1):
lines.append(_format_paper(i, paper))
lines.append("")
if total_papers == 0:
lines.append("_No new papers found this cycle. Check back tomorrow!_")
if digest.videos:
lines.append("━━━━━━━━━━━━━━━━━━━━")
lines.append("🎬 *While You Eat: AI Video Updates*")
lines.append("")
for vid in digest.videos:
lines.append(f"β€’ [{vid['title']}]({vid['url']})")
lines.append("")
# Summary footer
lines.append("━━━━━━━━━━━━━━━━━━━━")
lines.append(
f"πŸ“Š *Summary:* {digest.total_fetched} fetched β†’ "
f"{digest.total_ranked} ranked β†’ {total_papers} delivered"
)
if digest.fetch_errors:
lines.append(f"⚠️ {len(digest.fetch_errors)} non-fatal errors logged")
return "\n".join(lines)
def format_short_notification(digest: Digest) -> str:
"""Format a short notification summary."""
counts = []
for cat_slug, papers in digest.papers.items():
if papers:
label = CATEGORY_LABELS.get(cat_slug, cat_slug.title())
counts.append(f"{label}: {len(papers)}")
if not counts:
return "πŸ“‘ ResearchRadar: No new papers found today."
summary = " | ".join(counts)
total = sum(len(p) for p in digest.papers.values())
return f"πŸ“‘ *ResearchRadar* β€” {total} new papers!\n{summary}"
# ---------------------------------------------------------------------------
# Sending
# ---------------------------------------------------------------------------
def send_message(
token: str,
chat_id: str,
text: str,
parse_mode: str = 'Markdown',
disable_preview: bool = True,
) -> bool:
"""
Send a message via Telegram Bot API.
Returns True on success, False on failure (never raises).
"""
url = f"https://api.telegram.org/bot{token}/sendMessage"
# Telegram has a 4096 char limit per message
if len(text) > 4000:
return _send_chunked(token, chat_id, text, parse_mode, disable_preview)
try:
resp = requests.post(
url,
json={
'chat_id': chat_id,
'text': text,
'parse_mode': parse_mode,
'disable_web_page_preview': disable_preview,
},
timeout=15,
)
if resp.status_code == 200:
data = resp.json()
if data.get('ok'):
logger.info('Telegram message sent to chat %s', chat_id)
return True
else:
logger.error('Telegram API error: %s', data.get('description'))
return False
else:
logger.error('Telegram HTTP %d: %s', resp.status_code, resp.text[:200])
return False
except requests.exceptions.RequestException as exc:
logger.error('Telegram send failed: %s', exc)
return False
def _send_chunked(
token: str,
chat_id: str,
text: str,
parse_mode: str,
disable_preview: bool,
) -> bool:
"""Split long messages at section boundaries and send sequentially."""
chunks = []
current = ""
for line in text.split("\n"):
if len(current) + len(line) + 1 > 3800 and current:
chunks.append(current)
current = line
else:
current = current + "\n" + line if current else line
if current:
chunks.append(current)
success = True
for i, chunk in enumerate(chunks):
if i > 0:
import time
time.sleep(0.5) # Rate limiting courtesy
ok = send_message(token, chat_id, chunk, parse_mode, disable_preview)
if not ok:
success = False
return success
# ---------------------------------------------------------------------------
# High-level API
# ---------------------------------------------------------------------------
def send_digest_notification(digest: Digest, data_dir: str) -> bool:
"""
Send the full digest to Telegram.
Reads credentials from env vars or settings.json.
Returns True on success, False on failure (never raises).
"""
token, chat_id = _get_credentials(data_dir)
if not token or not chat_id:
logger.warning(
'Telegram not configured β€” set TELEGRAM_BOT_TOKEN and '
'TELEGRAM_CHAT_ID in environment or settings.json'
)
return False
# Send short notification first
short = format_short_notification(digest)
send_message(token, chat_id, short)
# Then send the full digest
full = format_digest_message(digest)
return send_message(token, chat_id, full)
def send_test_message(data_dir: str) -> bool:
"""Send a test message to verify Telegram setup."""
token, chat_id = _get_credentials(data_dir)
if not token or not chat_id:
print("❌ Telegram not configured!")
print(" Set TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID in settings.json")
print(" or as environment variables.")
return False
text = (
"βœ… *ResearchRadar β€” Test Message*\n\n"
"Your Telegram notifications are working!\n"
"You'll receive daily paper digests at your configured time."
)
success = send_message(token, chat_id, text)
if success:
print("βœ… Test message sent! Check your Telegram.")
else:
print("❌ Failed to send test message. Check your bot token and chat ID.")
return success