| """ |
| Telegram Platform Adapter |
| |
| Implements MessagingPlatform for Telegram using python-telegram-bot. |
| """ |
|
|
| import asyncio |
| import contextlib |
| import os |
| import tempfile |
| from pathlib import Path |
|
|
| |
| |
| os.environ["PTB_TIMEDELTA"] = "1" |
|
|
| from collections.abc import Awaitable, Callable |
| from typing import TYPE_CHECKING, Any |
|
|
| from loguru import logger |
|
|
| from providers.common import get_user_facing_error_message |
|
|
| if TYPE_CHECKING: |
| from telegram import Update |
| from telegram.ext import ContextTypes |
|
|
| from ..models import IncomingMessage |
| from ..rendering.telegram_markdown import escape_md_v2, format_status |
| from .base import MessagingPlatform |
|
|
| |
| try: |
| from telegram import Update |
| from telegram.error import NetworkError, RetryAfter, TelegramError |
| from telegram.ext import ( |
| Application, |
| CommandHandler, |
| ContextTypes, |
| MessageHandler, |
| filters, |
| ) |
| from telegram.request import HTTPXRequest |
|
|
| TELEGRAM_AVAILABLE = True |
| except ImportError: |
| TELEGRAM_AVAILABLE = False |
|
|
|
|
| class TelegramPlatform(MessagingPlatform): |
| """ |
| Telegram messaging platform adapter. |
| |
| Uses python-telegram-bot (BoT API) for Telegram access. |
| Requires a Bot Token from @BotFather. |
| """ |
|
|
| name = "telegram" |
|
|
| def __init__( |
| self, |
| bot_token: str | None = None, |
| allowed_user_id: str | None = None, |
| ): |
| if not TELEGRAM_AVAILABLE: |
| raise ImportError( |
| "python-telegram-bot is required. Install with: pip install python-telegram-bot" |
| ) |
|
|
| self.bot_token = bot_token or os.getenv("TELEGRAM_BOT_TOKEN") |
| self.allowed_user_id = allowed_user_id or os.getenv("ALLOWED_TELEGRAM_USER_ID") |
|
|
| if not self.bot_token: |
| |
| |
| logger.warning("TELEGRAM_BOT_TOKEN not set") |
|
|
| self._application: Application | None = None |
| self._message_handler: Callable[[IncomingMessage], Awaitable[None]] | None = ( |
| None |
| ) |
| self._connected = False |
| self._limiter: Any | None = None |
| |
| self._pending_voice: dict[tuple[str, str], tuple[str, str]] = {} |
| self._pending_voice_lock = asyncio.Lock() |
|
|
| async def _register_pending_voice( |
| self, chat_id: str, voice_msg_id: str, status_msg_id: str |
| ) -> None: |
| """Register a voice note as pending transcription (for /clear reply during transcription).""" |
| async with self._pending_voice_lock: |
| self._pending_voice[(chat_id, voice_msg_id)] = (voice_msg_id, status_msg_id) |
| self._pending_voice[(chat_id, status_msg_id)] = ( |
| voice_msg_id, |
| status_msg_id, |
| ) |
|
|
| async def cancel_pending_voice( |
| self, chat_id: str, reply_id: str |
| ) -> tuple[str, str] | None: |
| """Cancel a pending voice transcription. Returns (voice_msg_id, status_msg_id) if found.""" |
| async with self._pending_voice_lock: |
| entry = self._pending_voice.pop((chat_id, reply_id), None) |
| if entry is None: |
| return None |
| voice_msg_id, status_msg_id = entry |
| self._pending_voice.pop((chat_id, voice_msg_id), None) |
| self._pending_voice.pop((chat_id, status_msg_id), None) |
| return (voice_msg_id, status_msg_id) |
|
|
| async def _is_voice_still_pending(self, chat_id: str, voice_msg_id: str) -> bool: |
| """Check if a voice note is still pending (not cancelled).""" |
| async with self._pending_voice_lock: |
| return (chat_id, voice_msg_id) in self._pending_voice |
|
|
| async def start(self) -> None: |
| """Initialize and connect to Telegram.""" |
| if not self.bot_token: |
| raise ValueError("TELEGRAM_BOT_TOKEN is required") |
|
|
| |
| request = HTTPXRequest( |
| connection_pool_size=8, connect_timeout=30.0, read_timeout=30.0 |
| ) |
|
|
| |
| builder = Application.builder().token(self.bot_token).request(request) |
| self._application = builder.build() |
|
|
| |
| |
| self._application.add_handler( |
| MessageHandler(filters.TEXT & (~filters.COMMAND), self._on_telegram_message) |
| ) |
| self._application.add_handler(CommandHandler("start", self._on_start_command)) |
| |
| self._application.add_handler( |
| MessageHandler(filters.COMMAND, self._on_telegram_message) |
| ) |
| |
| self._application.add_handler( |
| MessageHandler(filters.VOICE, self._on_telegram_voice) |
| ) |
|
|
| |
| max_retries = 3 |
| for attempt in range(max_retries): |
| try: |
| await self._application.initialize() |
| await self._application.start() |
|
|
| |
| if self._application.updater: |
| await self._application.updater.start_polling( |
| drop_pending_updates=False |
| ) |
|
|
| self._connected = True |
| break |
| except (NetworkError, Exception) as e: |
| if attempt < max_retries - 1: |
| wait_time = 2 * (attempt + 1) |
| logger.warning( |
| f"Connection failed (attempt {attempt + 1}/{max_retries}): {e}. Retrying in {wait_time}s..." |
| ) |
| await asyncio.sleep(wait_time) |
| else: |
| logger.error(f"Failed to connect after {max_retries} attempts") |
| raise |
|
|
| |
| from ..limiter import MessagingRateLimiter |
|
|
| self._limiter = await MessagingRateLimiter.get_instance() |
|
|
| |
| try: |
| target = self.allowed_user_id |
| if target: |
| startup_text = ( |
| f"🚀 *{escape_md_v2('Claude Code Proxy is online!')}* " |
| f"{escape_md_v2('(Bot API)')}" |
| ) |
| await self.send_message( |
| target, |
| startup_text, |
| ) |
| except Exception as e: |
| logger.warning(f"Could not send startup message: {e}") |
|
|
| logger.info("Telegram platform started (Bot API)") |
|
|
| async def stop(self) -> None: |
| """Stop the bot.""" |
| if self._application and self._application.updater: |
| await self._application.updater.stop() |
| await self._application.stop() |
| await self._application.shutdown() |
|
|
| self._connected = False |
| logger.info("Telegram platform stopped") |
|
|
| async def _with_retry( |
| self, func: Callable[..., Awaitable[Any]], *args, **kwargs |
| ) -> Any: |
| """Helper to execute a function with exponential backoff on network errors.""" |
| max_retries = 3 |
| for attempt in range(max_retries): |
| try: |
| return await func(*args, **kwargs) |
| except (TimeoutError, NetworkError) as e: |
| if "Message is not modified" in str(e): |
| return None |
| if attempt < max_retries - 1: |
| wait_time = 2**attempt |
| logger.warning( |
| f"Telegram API network error (attempt {attempt + 1}/{max_retries}): {e}. Retrying in {wait_time}s..." |
| ) |
| await asyncio.sleep(wait_time) |
| else: |
| logger.error( |
| f"Telegram API failed after {max_retries} attempts: {e}" |
| ) |
| raise |
| except RetryAfter as e: |
| |
| from datetime import timedelta |
|
|
| retry_after = e.retry_after |
| if isinstance(retry_after, timedelta): |
| wait_secs = retry_after.total_seconds() |
| else: |
| wait_secs = float(retry_after) |
|
|
| logger.warning(f"Rate limited by Telegram, waiting {wait_secs}s...") |
| await asyncio.sleep(wait_secs) |
| |
| return await func(*args, **kwargs) |
| except TelegramError as e: |
| |
| err_lower = str(e).lower() |
| if "message is not modified" in err_lower: |
| return None |
| |
| if any( |
| x in err_lower |
| for x in [ |
| "message to edit not found", |
| "message to delete not found", |
| "message can't be deleted", |
| "message can't be edited", |
| "not enough rights to delete", |
| ] |
| ): |
| return None |
| if "Can't parse entities" in str(e) and kwargs.get("parse_mode"): |
| logger.warning("Markdown failed, retrying without parse_mode") |
| kwargs["parse_mode"] = None |
| return await func(*args, **kwargs) |
| raise |
|
|
| async def send_message( |
| self, |
| chat_id: str, |
| text: str, |
| reply_to: str | None = None, |
| parse_mode: str | None = "MarkdownV2", |
| message_thread_id: str | None = None, |
| ) -> str: |
| """Send a message to a chat.""" |
| app = self._application |
| if not app or not app.bot: |
| raise RuntimeError("Telegram application or bot not initialized") |
|
|
| async def _do_send(parse_mode=parse_mode): |
| bot = app.bot |
| kwargs: dict[str, Any] = { |
| "chat_id": chat_id, |
| "text": text, |
| "reply_to_message_id": int(reply_to) if reply_to else None, |
| "parse_mode": parse_mode, |
| } |
| if message_thread_id is not None: |
| kwargs["message_thread_id"] = int(message_thread_id) |
| msg = await bot.send_message(**kwargs) |
| return str(msg.message_id) |
|
|
| return await self._with_retry(_do_send, parse_mode=parse_mode) |
|
|
| async def edit_message( |
| self, |
| chat_id: str, |
| message_id: str, |
| text: str, |
| parse_mode: str | None = "MarkdownV2", |
| ) -> None: |
| """Edit an existing message.""" |
| app = self._application |
| if not app or not app.bot: |
| raise RuntimeError("Telegram application or bot not initialized") |
|
|
| async def _do_edit(parse_mode=parse_mode): |
| bot = app.bot |
| await bot.edit_message_text( |
| chat_id=chat_id, |
| message_id=int(message_id), |
| text=text, |
| parse_mode=parse_mode, |
| ) |
|
|
| await self._with_retry(_do_edit, parse_mode=parse_mode) |
|
|
| async def delete_message( |
| self, |
| chat_id: str, |
| message_id: str, |
| ) -> None: |
| """Delete a message from a chat.""" |
| app = self._application |
| if not app or not app.bot: |
| raise RuntimeError("Telegram application or bot not initialized") |
|
|
| async def _do_delete(): |
| bot = app.bot |
| await bot.delete_message(chat_id=chat_id, message_id=int(message_id)) |
|
|
| await self._with_retry(_do_delete) |
|
|
| async def delete_messages(self, chat_id: str, message_ids: list[str]) -> None: |
| """Delete multiple messages (best-effort).""" |
| if not message_ids: |
| return |
| app = self._application |
| if not app or not app.bot: |
| raise RuntimeError("Telegram application or bot not initialized") |
|
|
| |
| bot = app.bot |
| if hasattr(bot, "delete_messages"): |
|
|
| async def _do_bulk(): |
| mids = [] |
| for mid in message_ids: |
| try: |
| mids.append(int(mid)) |
| except Exception: |
| continue |
| if not mids: |
| return None |
| |
| await bot.delete_messages(chat_id=chat_id, message_ids=mids) |
|
|
| await self._with_retry(_do_bulk) |
| return |
|
|
| for mid in message_ids: |
| await self.delete_message(chat_id, mid) |
|
|
| async def queue_send_message( |
| self, |
| chat_id: str, |
| text: str, |
| reply_to: str | None = None, |
| parse_mode: str | None = "MarkdownV2", |
| fire_and_forget: bool = True, |
| message_thread_id: str | None = None, |
| ) -> str | None: |
| """Enqueue a message to be sent (using limiter).""" |
| |
| if not self._limiter: |
| return await self.send_message( |
| chat_id, text, reply_to, parse_mode, message_thread_id |
| ) |
|
|
| async def _send(): |
| return await self.send_message( |
| chat_id, text, reply_to, parse_mode, message_thread_id |
| ) |
|
|
| if fire_and_forget: |
| self._limiter.fire_and_forget(_send) |
| return None |
| else: |
| return await self._limiter.enqueue(_send) |
|
|
| async def queue_edit_message( |
| self, |
| chat_id: str, |
| message_id: str, |
| text: str, |
| parse_mode: str | None = "MarkdownV2", |
| fire_and_forget: bool = True, |
| ) -> None: |
| """Enqueue a message edit.""" |
| if not self._limiter: |
| return await self.edit_message(chat_id, message_id, text, parse_mode) |
|
|
| async def _edit(): |
| return await self.edit_message(chat_id, message_id, text, parse_mode) |
|
|
| dedup_key = f"edit:{chat_id}:{message_id}" |
| if fire_and_forget: |
| self._limiter.fire_and_forget(_edit, dedup_key=dedup_key) |
| else: |
| await self._limiter.enqueue(_edit, dedup_key=dedup_key) |
|
|
| async def queue_delete_message( |
| self, |
| chat_id: str, |
| message_id: str, |
| fire_and_forget: bool = True, |
| ) -> None: |
| """Enqueue a message delete.""" |
| if not self._limiter: |
| return await self.delete_message(chat_id, message_id) |
|
|
| async def _delete(): |
| return await self.delete_message(chat_id, message_id) |
|
|
| dedup_key = f"del:{chat_id}:{message_id}" |
| if fire_and_forget: |
| self._limiter.fire_and_forget(_delete, dedup_key=dedup_key) |
| else: |
| await self._limiter.enqueue(_delete, dedup_key=dedup_key) |
|
|
| async def queue_delete_messages( |
| self, |
| chat_id: str, |
| message_ids: list[str], |
| fire_and_forget: bool = True, |
| ) -> None: |
| """Enqueue a bulk delete (if supported) or a sequence of deletes.""" |
| if not message_ids: |
| return |
|
|
| if not self._limiter: |
| return await self.delete_messages(chat_id, message_ids) |
|
|
| async def _bulk(): |
| return await self.delete_messages(chat_id, message_ids) |
|
|
| |
| dedup_key = f"del_bulk:{chat_id}:{hash(tuple(message_ids))}" |
| if fire_and_forget: |
| self._limiter.fire_and_forget(_bulk, dedup_key=dedup_key) |
| else: |
| await self._limiter.enqueue(_bulk, dedup_key=dedup_key) |
|
|
| def fire_and_forget(self, task: Awaitable[Any]) -> None: |
| """Execute a coroutine without awaiting it.""" |
| if asyncio.iscoroutine(task): |
| _ = asyncio.create_task(task) |
| else: |
| _ = asyncio.ensure_future(task) |
|
|
| def on_message( |
| self, |
| handler: Callable[[IncomingMessage], Awaitable[None]], |
| ) -> None: |
| """Register a message handler callback.""" |
| self._message_handler = handler |
|
|
| @property |
| def is_connected(self) -> bool: |
| """Check if connected.""" |
| return self._connected |
|
|
| async def _on_start_command( |
| self, update: Update, context: ContextTypes.DEFAULT_TYPE |
| ) -> None: |
| """Handle /start command.""" |
| if update.message: |
| await update.message.reply_text("👋 Hello! I am the Claude Code Proxy Bot.") |
| |
| await self._on_telegram_message(update, context) |
|
|
| async def _on_telegram_message( |
| self, update: Update, context: ContextTypes.DEFAULT_TYPE |
| ) -> None: |
| """Handle incoming updates.""" |
| if ( |
| not update.message |
| or not update.message.text |
| or not update.effective_user |
| or not update.effective_chat |
| ): |
| return |
|
|
| user_id = str(update.effective_user.id) |
| chat_id = str(update.effective_chat.id) |
|
|
| |
| if self.allowed_user_id and user_id != str(self.allowed_user_id).strip(): |
| logger.warning(f"Unauthorized access attempt from {user_id}") |
| return |
|
|
| message_id = str(update.message.message_id) |
| reply_to = ( |
| str(update.message.reply_to_message.message_id) |
| if update.message.reply_to_message |
| else None |
| ) |
| thread_id = ( |
| str(update.message.message_thread_id) |
| if getattr(update.message, "message_thread_id", None) is not None |
| else None |
| ) |
| text_preview = (update.message.text or "")[:80] |
| if len(update.message.text or "") > 80: |
| text_preview += "..." |
| logger.info( |
| "TELEGRAM_MSG: chat_id={} message_id={} reply_to={} text_preview={!r}", |
| chat_id, |
| message_id, |
| reply_to, |
| text_preview, |
| ) |
|
|
| if not self._message_handler: |
| return |
|
|
| incoming = IncomingMessage( |
| text=update.message.text, |
| chat_id=chat_id, |
| user_id=user_id, |
| message_id=message_id, |
| platform="telegram", |
| reply_to_message_id=reply_to, |
| message_thread_id=thread_id, |
| raw_event=update, |
| ) |
|
|
| try: |
| await self._message_handler(incoming) |
| except Exception as e: |
| logger.error(f"Error handling message: {e}") |
| with contextlib.suppress(Exception): |
| await self.send_message( |
| chat_id, |
| f"❌ *{escape_md_v2('Error:')}* {escape_md_v2(get_user_facing_error_message(e)[:200])}", |
| reply_to=incoming.message_id, |
| message_thread_id=thread_id, |
| parse_mode="MarkdownV2", |
| ) |
|
|
| async def _on_telegram_voice( |
| self, update: Update, context: ContextTypes.DEFAULT_TYPE |
| ) -> None: |
| """Handle incoming voice messages.""" |
| if ( |
| not update.message |
| or not update.message.voice |
| or not update.effective_user |
| or not update.effective_chat |
| ): |
| return |
|
|
| from config.settings import get_settings |
|
|
| settings = get_settings() |
| if not settings.voice_note_enabled: |
| await update.message.reply_text("Voice notes are disabled.") |
| return |
|
|
| user_id = str(update.effective_user.id) |
| chat_id = str(update.effective_chat.id) |
|
|
| if self.allowed_user_id and user_id != str(self.allowed_user_id).strip(): |
| logger.warning(f"Unauthorized voice access attempt from {user_id}") |
| return |
|
|
| if not self._message_handler: |
| return |
|
|
| thread_id = ( |
| str(update.message.message_thread_id) |
| if getattr(update.message, "message_thread_id", None) is not None |
| else None |
| ) |
| status_msg_id = await self.queue_send_message( |
| chat_id, |
| format_status("⏳", "Transcribing voice note..."), |
| reply_to=str(update.message.message_id), |
| parse_mode="MarkdownV2", |
| fire_and_forget=False, |
| message_thread_id=thread_id, |
| ) |
|
|
| message_id = str(update.message.message_id) |
| await self._register_pending_voice(chat_id, message_id, str(status_msg_id)) |
| reply_to = ( |
| str(update.message.reply_to_message.message_id) |
| if update.message.reply_to_message |
| else None |
| ) |
|
|
| voice = update.message.voice |
| suffix = ".ogg" |
| if voice.mime_type and "mpeg" in voice.mime_type: |
| suffix = ".mp3" |
| elif voice.mime_type and "mp4" in voice.mime_type: |
| suffix = ".mp4" |
|
|
| with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp: |
| tmp_path = Path(tmp.name) |
|
|
| try: |
| tg_file = await context.bot.get_file(voice.file_id) |
| await tg_file.download_to_drive(custom_path=str(tmp_path)) |
|
|
| from ..transcription import transcribe_audio |
|
|
| transcribed = await asyncio.to_thread( |
| transcribe_audio, |
| tmp_path, |
| voice.mime_type or "audio/ogg", |
| whisper_model=settings.whisper_model, |
| whisper_device=settings.whisper_device, |
| ) |
|
|
| if not await self._is_voice_still_pending(chat_id, message_id): |
| await self.queue_delete_message(chat_id, str(status_msg_id)) |
| return |
|
|
| async with self._pending_voice_lock: |
| self._pending_voice.pop((chat_id, message_id), None) |
| self._pending_voice.pop((chat_id, str(status_msg_id)), None) |
|
|
| incoming = IncomingMessage( |
| text=transcribed, |
| chat_id=chat_id, |
| user_id=user_id, |
| message_id=message_id, |
| platform="telegram", |
| reply_to_message_id=reply_to, |
| message_thread_id=thread_id, |
| raw_event=update, |
| status_message_id=status_msg_id, |
| ) |
|
|
| logger.info( |
| "TELEGRAM_VOICE: chat_id={} message_id={} transcribed={!r}", |
| chat_id, |
| message_id, |
| (transcribed[:80] + "..." if len(transcribed) > 80 else transcribed), |
| ) |
|
|
| await self._message_handler(incoming) |
| except ValueError as e: |
| await update.message.reply_text(get_user_facing_error_message(e)[:200]) |
| except ImportError as e: |
| await update.message.reply_text(get_user_facing_error_message(e)[:200]) |
| except Exception as e: |
| logger.error(f"Voice transcription failed: {e}") |
| await update.message.reply_text( |
| "Could not transcribe voice note. Please try again or send text." |
| ) |
| finally: |
| with contextlib.suppress(OSError): |
| tmp_path.unlink(missing_ok=True) |
|
|