SkyAlone / utils /helpers.py
FreshPixels's picture
Update utils/helpers.py
9d9590a verified
Raw
History Blame Contribute Delete
3.11 kB
import logging
import re
from aiogram import types
from aiogram.enums import ParseMode
from config import config
logger = logging.getLogger(__name__)
MAX_LENGTH = config.MAX_MESSAGE_LENGTH
def escape_markdown_v2(text: str) -> str:
"""Escape special characters for MarkdownV2 (Telegram)."""
escape_chars = r"_\*\[\]\(\)~`>#+\-=|{}\.!"
return re.sub(f"([{re.escape(escape_chars)}])", r"\\1", text)
def format_code_blocks(text: str) -> str:
"""Ensure code blocks are properly closed."""
triple_backticks = text.count("```")
if triple_backticks % 2 != 0:
text += "\n```"
return text
def split_message_smart(text: str, max_length: int = MAX_LENGTH) -> list[str]:
"""Split text into chunks, trying not to break code blocks or paragraphs."""
if len(text) <= max_length:
return [text]
chunks = []
while text:
if len(text) <= max_length:
chunks.append(text)
break
split_at = text.rfind("\n", 0, max_length)
if split_at == -1:
split_at = text.rfind(" ", 0, max_length)
if split_at == -1:
split_at = max_length
chunk = text[:split_at].strip()
if chunk:
chunks.append(chunk)
text = text[split_at:].strip()
return chunks
async def send_long_message(
message: types.Message,
text: str,
parse_mode: ParseMode = ParseMode.MARKDOWN,
reply_markup=None,
) -> list[types.Message]:
"""Send long messages in chunks with correct Markdown."""
text = format_code_blocks(text)
chunks = split_message_smart(text)
sent_messages = []
for i, chunk in enumerate(chunks):
try:
markup = reply_markup if i == len(chunks) - 1 else None
sent = await message.answer(
chunk,
parse_mode=parse_mode,
reply_markup=markup,
)
sent_messages.append(sent)
except Exception as e:
logger.warning("Failed to send chunk with Markdown: %s. Sending as plain text.", e)
try:
sent = await message.answer(
chunk,
parse_mode=None,
reply_markup=reply_markup if i == len(chunks) - 1 else None,
)
sent_messages.append(sent)
except Exception as e2:
logger.error("Failed to send chunk: %s", e2)
return sent_messages
async def edit_or_send_message(
message: types.Message,
text: str,
last_bot_message: types.Message = None,
parse_mode: ParseMode = ParseMode.MARKDOWN,
) -> types.Message:
"""Edit last bot message or send a new one. Used for streaming responses."""
text = format_code_blocks(text)
if last_bot_message and len(text) <= MAX_LENGTH:
try:
await last_bot_message.edit_text(text, parse_mode=parse_mode)
return last_bot_message
except Exception as e:
logger.debug("Edit failed, sending new message: %s", e)
return await message.answer(text, parse_mode=parse_mode)