Spaces:
Sleeping
Sleeping
| """ | |
| ResearchRadar β Telegram Bot notification system. | |
| Sends formatted paper digests to the user's Telegram chat. | |
| Replaces plyer notifications for phone delivery. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| import os | |
| from typing import Dict, List, Optional | |
| import requests | |
| from app.core.models import Digest, Paper | |
| from app.core.config import CATEGORY_LABELS | |
| logger = logging.getLogger(__name__) | |
| # --------------------------------------------------------------------------- | |
| # Configuration | |
| # --------------------------------------------------------------------------- | |
| _CONFIG_KEYS = ('telegram_bot_token', 'telegram_chat_id') | |
| def _load_telegram_config(data_dir: str) -> dict: | |
| """Load Telegram config from settings.json.""" | |
| path = os.path.join(data_dir, 'settings.json') | |
| if not os.path.exists(path): | |
| return {} | |
| try: | |
| with open(path, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| except (json.JSONDecodeError, OSError): | |
| return {} | |
| def _get_credentials(data_dir: str) -> tuple: | |
| """ | |
| Get bot token and chat ID from settings or environment variables. | |
| Priority: env vars > settings.json | |
| """ | |
| config = _load_telegram_config(data_dir) | |
| token = ( | |
| os.getenv('TELEGRAM_BOT_TOKEN') | |
| or config.get('telegram_bot_token', '') | |
| ) | |
| chat_id = ( | |
| os.getenv('TELEGRAM_CHAT_ID') | |
| or config.get('telegram_chat_id', '') | |
| ) | |
| return token, chat_id | |
| # --------------------------------------------------------------------------- | |
| # Message formatting | |
| # --------------------------------------------------------------------------- | |
| def _format_paper(rank: int, paper: Paper) -> str: | |
| """Format a single paper as a Telegram message block.""" | |
| # Authors (first author + et al.) | |
| if paper.authors: | |
| if len(paper.authors) > 2: | |
| authors = f"{paper.authors[0]} et al." | |
| else: | |
| authors = ", ".join(paper.authors) | |
| else: | |
| authors = "Unknown" | |
| # Score badge | |
| score = f"{paper.composite_score:.2f}" | |
| lines = [ | |
| f"*{rank}.* [{paper.title}]({paper.abstract_url})", | |
| f" π€ _{authors}_", | |
| f" π {paper.published_date.isoformat()} β’ π Score: {score} β’ π Citations: {paper.citation_count}", | |
| ] | |
| # LLM Summary (Structured) | |
| if paper.summary_llm: | |
| lines.append("") | |
| lines.append(f"π€ *AI Summary:*") | |
| # Indent the summary for readability | |
| for slink in paper.summary_llm.split('\n'): | |
| if slink.strip(): | |
| lines.append(f" _{slink.strip()}_") | |
| if paper.pdf_url: | |
| lines.append("") | |
| lines.append(f" π [PDF]({paper.pdf_url})") | |
| return "\n".join(lines) | |
| def format_digest_message(digest: Digest) -> str: | |
| """Format a full digest as a Telegram-ready Markdown message.""" | |
| lines = [ | |
| "π‘ *ResearchRadar β Daily Paper Digest*", | |
| f"π Week of {digest.week_start.isoformat()}", | |
| f"π Generated: {digest.generated_at.strftime('%Y-%m-%d %H:%M UTC')}", | |
| "", | |
| ] | |
| total_papers = 0 | |
| for cat_slug, papers in digest.papers.items(): | |
| if not papers: | |
| continue | |
| cat_name = CATEGORY_LABELS.get(cat_slug, cat_slug.title()) | |
| total_papers += len(papers) | |
| lines.append(f"ββββββββββββββββββββ") | |
| lines.append(f"π¬ *{cat_name}* ({len(papers)} papers)") | |
| lines.append("") | |
| for i, paper in enumerate(papers, 1): | |
| lines.append(_format_paper(i, paper)) | |
| lines.append("") | |
| if total_papers == 0: | |
| lines.append("_No new papers found this cycle. Check back tomorrow!_") | |
| if digest.videos: | |
| lines.append("ββββββββββββββββββββ") | |
| lines.append("π¬ *While You Eat: AI Video Updates*") | |
| lines.append("") | |
| for vid in digest.videos: | |
| lines.append(f"β’ [{vid['title']}]({vid['url']})") | |
| lines.append("") | |
| # Summary footer | |
| lines.append("ββββββββββββββββββββ") | |
| lines.append( | |
| f"π *Summary:* {digest.total_fetched} fetched β " | |
| f"{digest.total_ranked} ranked β {total_papers} delivered" | |
| ) | |
| if digest.fetch_errors: | |
| lines.append(f"β οΈ {len(digest.fetch_errors)} non-fatal errors logged") | |
| return "\n".join(lines) | |
| def format_short_notification(digest: Digest) -> str: | |
| """Format a short notification summary.""" | |
| counts = [] | |
| for cat_slug, papers in digest.papers.items(): | |
| if papers: | |
| label = CATEGORY_LABELS.get(cat_slug, cat_slug.title()) | |
| counts.append(f"{label}: {len(papers)}") | |
| if not counts: | |
| return "π‘ ResearchRadar: No new papers found today." | |
| summary = " | ".join(counts) | |
| total = sum(len(p) for p in digest.papers.values()) | |
| return f"π‘ *ResearchRadar* β {total} new papers!\n{summary}" | |
| # --------------------------------------------------------------------------- | |
| # Sending | |
| # --------------------------------------------------------------------------- | |
| def send_message( | |
| token: str, | |
| chat_id: str, | |
| text: str, | |
| parse_mode: str = 'Markdown', | |
| disable_preview: bool = True, | |
| ) -> bool: | |
| """ | |
| Send a message via Telegram Bot API. | |
| Returns True on success, False on failure (never raises). | |
| """ | |
| url = f"https://api.telegram.org/bot{token}/sendMessage" | |
| # Telegram has a 4096 char limit per message | |
| if len(text) > 4000: | |
| return _send_chunked(token, chat_id, text, parse_mode, disable_preview) | |
| try: | |
| resp = requests.post( | |
| url, | |
| json={ | |
| 'chat_id': chat_id, | |
| 'text': text, | |
| 'parse_mode': parse_mode, | |
| 'disable_web_page_preview': disable_preview, | |
| }, | |
| timeout=15, | |
| ) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| if data.get('ok'): | |
| logger.info('Telegram message sent to chat %s', chat_id) | |
| return True | |
| else: | |
| logger.error('Telegram API error: %s', data.get('description')) | |
| return False | |
| else: | |
| logger.error('Telegram HTTP %d: %s', resp.status_code, resp.text[:200]) | |
| return False | |
| except requests.exceptions.RequestException as exc: | |
| logger.error('Telegram send failed: %s', exc) | |
| return False | |
| def _send_chunked( | |
| token: str, | |
| chat_id: str, | |
| text: str, | |
| parse_mode: str, | |
| disable_preview: bool, | |
| ) -> bool: | |
| """Split long messages at section boundaries and send sequentially.""" | |
| chunks = [] | |
| current = "" | |
| for line in text.split("\n"): | |
| if len(current) + len(line) + 1 > 3800 and current: | |
| chunks.append(current) | |
| current = line | |
| else: | |
| current = current + "\n" + line if current else line | |
| if current: | |
| chunks.append(current) | |
| success = True | |
| for i, chunk in enumerate(chunks): | |
| if i > 0: | |
| import time | |
| time.sleep(0.5) # Rate limiting courtesy | |
| ok = send_message(token, chat_id, chunk, parse_mode, disable_preview) | |
| if not ok: | |
| success = False | |
| return success | |
| # --------------------------------------------------------------------------- | |
| # High-level API | |
| # --------------------------------------------------------------------------- | |
| def send_digest_notification(digest: Digest, data_dir: str) -> bool: | |
| """ | |
| Send the full digest to Telegram. | |
| Reads credentials from env vars or settings.json. | |
| Returns True on success, False on failure (never raises). | |
| """ | |
| token, chat_id = _get_credentials(data_dir) | |
| if not token or not chat_id: | |
| logger.warning( | |
| 'Telegram not configured β set TELEGRAM_BOT_TOKEN and ' | |
| 'TELEGRAM_CHAT_ID in environment or settings.json' | |
| ) | |
| return False | |
| # Send short notification first | |
| short = format_short_notification(digest) | |
| send_message(token, chat_id, short) | |
| # Then send the full digest | |
| full = format_digest_message(digest) | |
| return send_message(token, chat_id, full) | |
| def send_test_message(data_dir: str) -> bool: | |
| """Send a test message to verify Telegram setup.""" | |
| token, chat_id = _get_credentials(data_dir) | |
| if not token or not chat_id: | |
| print("β Telegram not configured!") | |
| print(" Set TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID in settings.json") | |
| print(" or as environment variables.") | |
| return False | |
| text = ( | |
| "β *ResearchRadar β Test Message*\n\n" | |
| "Your Telegram notifications are working!\n" | |
| "You'll receive daily paper digests at your configured time." | |
| ) | |
| success = send_message(token, chat_id, text) | |
| if success: | |
| print("β Test message sent! Check your Telegram.") | |
| else: | |
| print("β Failed to send test message. Check your bot token and chat ID.") | |
| return success | |