Spaces:
Sleeping
Sleeping
| import logging | |
| import re | |
| from aiogram import types | |
| from aiogram.enums import ParseMode | |
| from config import config | |
| logger = logging.getLogger(__name__) | |
| MAX_LENGTH = config.MAX_MESSAGE_LENGTH | |
| def escape_markdown_v2(text: str) -> str: | |
| """Escape special characters for MarkdownV2 (Telegram).""" | |
| escape_chars = r"_\*\[\]\(\)~`>#+\-=|{}\.!" | |
| return re.sub(f"([{re.escape(escape_chars)}])", r"\\1", text) | |
| def format_code_blocks(text: str) -> str: | |
| """Ensure code blocks are properly closed.""" | |
| triple_backticks = text.count("```") | |
| if triple_backticks % 2 != 0: | |
| text += "\n```" | |
| return text | |
| def split_message_smart(text: str, max_length: int = MAX_LENGTH) -> list[str]: | |
| """Split text into chunks, trying not to break code blocks or paragraphs.""" | |
| if len(text) <= max_length: | |
| return [text] | |
| chunks = [] | |
| while text: | |
| if len(text) <= max_length: | |
| chunks.append(text) | |
| break | |
| split_at = text.rfind("\n", 0, max_length) | |
| if split_at == -1: | |
| split_at = text.rfind(" ", 0, max_length) | |
| if split_at == -1: | |
| split_at = max_length | |
| chunk = text[:split_at].strip() | |
| if chunk: | |
| chunks.append(chunk) | |
| text = text[split_at:].strip() | |
| return chunks | |
| async def send_long_message( | |
| message: types.Message, | |
| text: str, | |
| parse_mode: ParseMode = ParseMode.MARKDOWN, | |
| reply_markup=None, | |
| ) -> list[types.Message]: | |
| """Send long messages in chunks with correct Markdown.""" | |
| text = format_code_blocks(text) | |
| chunks = split_message_smart(text) | |
| sent_messages = [] | |
| for i, chunk in enumerate(chunks): | |
| try: | |
| markup = reply_markup if i == len(chunks) - 1 else None | |
| sent = await message.answer( | |
| chunk, | |
| parse_mode=parse_mode, | |
| reply_markup=markup, | |
| ) | |
| sent_messages.append(sent) | |
| except Exception as e: | |
| logger.warning("Failed to send chunk with Markdown: %s. Sending as plain text.", e) | |
| try: | |
| sent = await message.answer( | |
| chunk, | |
| parse_mode=None, | |
| reply_markup=reply_markup if i == len(chunks) - 1 else None, | |
| ) | |
| sent_messages.append(sent) | |
| except Exception as e2: | |
| logger.error("Failed to send chunk: %s", e2) | |
| return sent_messages | |
| async def edit_or_send_message( | |
| message: types.Message, | |
| text: str, | |
| last_bot_message: types.Message = None, | |
| parse_mode: ParseMode = ParseMode.MARKDOWN, | |
| ) -> types.Message: | |
| """Edit last bot message or send a new one. Used for streaming responses.""" | |
| text = format_code_blocks(text) | |
| if last_bot_message and len(text) <= MAX_LENGTH: | |
| try: | |
| await last_bot_message.edit_text(text, parse_mode=parse_mode) | |
| return last_bot_message | |
| except Exception as e: | |
| logger.debug("Edit failed, sending new message: %s", e) | |
| return await message.answer(text, parse_mode=parse_mode) | |