# utils/telegram_bot.py
# Telegram Bot Webhook Handler - Global Production Ready
# Supports ALL languages (Arabic, English, Spanish, Chinese, etc.)
import os
import json
import re
import logging
import hmac
import httpx
from html import escape
from typing import Dict, Any, Optional
from fastapi import APIRouter, Request, HTTPException, Header
from utils.generation import request_generation
from api.endpoints import enhance_system_prompt
from utils.constants import API_ENDPOINT, MODEL_NAME, TTS_MODEL
from utils.generation import HF_TOKEN, BACKUP_HF_TOKEN
logger = logging.getLogger(__name__)
router = APIRouter()
# ============================================================
# Configuration
# ============================================================
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
TELEGRAM_SECRET_TOKEN = None
CLOUDFLARE_WORKER_URL = os.getenv("CLOUDFLARE_WORKER_URL", "https://mgzon-tg-proxy.amarlasfar0.workers.dev")
if not TELEGRAM_TOKEN:
logger.warning("⚠️ TELEGRAM_TOKEN is not set. Bot will not work.")
# ============================================================
# Language Detection Helper
# ============================================================
def detect_language(text: str) -> str:
"""Detect user's language from their message. Returns ISO 639-1 language code."""
if not text:
return "en"
if any(0x0600 <= ord(char) <= 0x06FF for char in text):
return "ar"
if any(0x4E00 <= ord(char) <= 0x9FFF for char in text):
return "zh"
if any(0x0400 <= ord(char) <= 0x04FF for char in text):
return "ru"
if any(char in "áéíóúñü¿¡" for char in text.lower()):
return "es"
if any(char in "àâçéèêëîïôûùÿæœ" for char in text.lower()):
return "fr"
if any(char in "äöüß" for char in text.lower()):
return "de"
return "en"
# ============================================================
# Multilingual Messages
# ============================================================
def get_welcome_message(lang: str) -> str:
messages = {
"en": "🤖 *Welcome to MGZon AI Assistant!*\n\nI'm your intelligent AI companion powered by advanced language models.\n\n📌 *What I can do:*\n• Answer programming and technical questions\n• Explain and debug code\n• Research and analyze information\n• Generate creative content\n\nJust send me a message and I'll help! 🚀",
"ar": "🤖 *مرحباً بك في مساعد MGZon AI!*\n\nأنا مساعدك الذكي المدعوم بأحدث نماذج الذكاء الاصطناعي.\n\n📌 *ماذا يمكنني أن أفعل؟*\n• الإجابة عن أسئلة البرمجة والتقنية\n• شرح وتصحيح الأكواد البرمجية\n• البحث وتحليل المعلومات\n• توليد محتوى إبداعي\n\nفقط أرسل لي رسالة وسأساعدك! 🚀",
"zh": "🤖 *欢迎使用 MGZon AI 助手!*\n\n我是由先进语言模型驱动的智能AI助手。\n\n📌 *我能做什么:*\n• 回答编程和技术问题\n• 解释和调试代码\n• 研究和分析信息\n• 生成创意内容\n\n直接给我发消息,我会帮助你!🚀",
"ru": "🤖 *Добро пожаловать в MGZon AI Assistant!*\n\nЯ ваш интеллектуальный помощник на основе передовых языковых моделей.\n\n📌 *Что я могу:*\n• Отвечать на вопросы по программированию\n• Объяснять и отлаживать код\n• Исследовать и анализировать информацию\n• Генерировать креативный контент\n\nПросто отправьте мне сообщение! 🚀",
"es": "🤖 *¡Bienvenido a MGZon AI Assistant!*\n\nSoy tu compañero de IA impulsado por modelos de lenguaje avanzados.\n\n📌 *Lo que puedo hacer:*\n• Responder preguntas de programación\n• Explicar y depurar código\n• Investigar y analizar información\n• Generar contenido creativo\n\n¡Solo envíame un mensaje y te ayudaré! 🚀",
"fr": "🤖 *Bienvenue sur MGZon AI Assistant!*\n\nJe suis votre compagnon IA alimenté par des modèles de langage avancés.\n\n📌 *Ce que je peux faire:*\n• Répondre aux questions de programmation\n• Expliquer et déboguer le code\n• Rechercher et analyser des informations\n• Générer du contenu créatif\n\nEnvoyez-moi simplement un message! 🚀",
"de": "🤖 *Willkommen bei MGZon AI Assistant!*\n\nIch bin Ihr intelligenter KI-Begleiter mit fortschrittlichen Sprachmodellen.\n\n📌 *Was ich kann:*\n• Beantwortung von Programmierfragen\n• Erklären und Debuggen von Code\n• Recherchieren und Analysieren von Informationen\n• Generieren von kreativen Inhalten\n\nSenden Sie mir einfach eine Nachricht! 🚀"
}
return messages.get(lang, messages["en"])
def get_help_message(lang: str) -> str:
messages = {
"en": "❓ *MGZon AI Commands*\n\n📝 *Usage:*\nSimply type your question or paste code\n\n🔧 *Commands:*\n/start - Welcome message\n/help - Show this help\n/about - About MGZon AI\n/reset - Reset conversation\n\n💡 *Tips:*\n• Ask in any language\n• Share code for debugging\n• Request web search or analysis",
"ar": "❓ *أوامر مساعد MGZon AI*\n\n📝 *الاستخدام:*\nفقط اكتب سؤالك أو الصق الكود\n\n🔧 *الأوامر:*\n/start - رسالة الترحيب\n/help - عرض هذه المساعدة\n/about - معلومات عن MGZon AI\n/reset - إعادة تعيين المحادثة\n\n💡 *نصائح:*\n• اسأل بأي لغة\n• شارك الكود لتصحيحه\n• اطلب البحث أو التحليل",
"zh": "❓ *MGZon AI 命令*\n\n📝 *使用方法:*\n直接输入问题或粘贴代码\n\n🔧 *命令:*\n/start - 欢迎消息\n/help - 显示帮助\n/about - 关于 MGZon AI\n/reset - 重置对话\n\n💡 *提示:*\n• 可以用任何语言提问\n• 分享代码进行调试\n• 请求搜索或分析",
"ru": "❓ *Команды MGZon AI*\n\n📝 *Использование:*\nПросто напишите вопрос или вставьте код\n\n🔧 *Команды:*\n/start - Приветственное сообщение\n/help - Показать эту справку\n/about - О MGZon AI\n/reset - Сбросить диалог\n\n💡 *Советы:*\n• Спрашивайте на любом языке\n• Делитесь кодом для отладки\n• Запрашивайте поиск или анализ"
}
return messages.get(lang, messages["en"])
def get_about_message(lang: str) -> str:
messages = {
"en": "ℹ️ *About MGZon AI*\n\nMGZon AI is an advanced AI assistant built on state-of-the-art language models.\n\n🌐 *Features:*\n• Multi-language support\n• Code analysis and generation\n• Web search integration\n• Image understanding\n\nCreated with ❤️ by the MGZon team\nhttps://mgzon-mgzon-app.hf.space",
"ar": "ℹ️ *حول MGZon AI*\n\nMGZon AI هو مساعد ذكاء اصطناعي متقدم يعتمد على أحدث نماذج اللغة.\n\n🌐 *الميزات:*\n• دعم متعدد اللغات\n• تحليل وتوليد الأكواد\n• تكامل البحث في الويب\n• فهم الصور\n\nتم الإنشاء بحب بواسطة فريق MGZon\nhttps://mgzon-mgzon-app.hf.space",
"zh": "ℹ️ *关于 MGZon AI*\n\nMGZon AI 是基于最先进语言模型构建的先进AI助手。\n\n🌐 *功能:*\n• 多语言支持\n• 代码分析和生成\n• 网络搜索集成\n• 图像理解\n\n由 MGZon 团队用心打造 ❤️\nhttps://mgzon-mgzon-app.hf.space"
}
return messages.get(lang, messages["en"])
def get_reset_message(lang: str) -> str:
messages = {
"en": "🔄 *Conversation context reset*\n\nStart fresh with a new conversation!",
"ar": "🔄 *تم إعادة تعيين سياق المحادثة*\n\nابدأ محادثة جديدة!",
"zh": "🔄 *对话上下文已重置*\n\n开始新的对话!",
"ru": "🔄 *Контекст разговора сброшен*\n\nНачните новый разговор!"
}
return messages.get(lang, messages["en"])
def get_non_text_message(lang: str) -> str:
messages = {
"en": "📎 *I can only process text messages for now.*\n\nPlease send me a text question or code snippet.",
"ar": "📎 *يمكنني معالجة الرسائل النصية فقط حالياً.*\n\nيرجى إرسال سؤال نصي أو مقتطف برمجي.",
"zh": "📎 *我目前只能处理文本消息。*\n\n请发送文本问题或代码片段。",
"ru": "📎 *Пока я могу обрабатывать только текстовые сообщения.*\n\nПожалуйста, отправьте текстовый вопрос или фрагмент кода."
}
return messages.get(lang, messages["en"])
def get_error_message(lang: str) -> str:
messages = {
"en": "⚠️ *An error occurred while processing your request.*\n\nPlease try again in a few moments.",
"ar": "⚠️ *حدث خطأ أثناء معالجة طلبك.*\n\nيرجى المحاولة مرة أخرى بعد قليل.",
"zh": "⚠️ *处理您的请求时发生错误。*\n\n请稍后再试。",
"ru": "⚠️ *Произошла ошибка при обработке вашего запроса.*\n\nПожалуйста, попробуйте позже."
}
return messages.get(lang, messages["en"])
def get_empty_response_message(lang: str) -> str:
messages = {
"en": "🤔 I couldn't generate a proper response. Could you rephrase your question?",
"ar": "🤔 لم أتمكن من إنشاء رد مناسب. هل يمكنك إعادة صياغة سؤالك؟",
"zh": "🤔 我无法生成合适的回复。你能重新表述一下你的问题吗?",
"ru": "🤔 Я не смог сгенерировать подходящий ответ. Можете переформулировать вопрос?"
}
return messages.get(lang, messages["en"])
# ============================================================
# Security Helpers
# ============================================================
def verify_telegram_signature(secret_token: Optional[str], header_token: Optional[str]) -> bool:
if not secret_token:
return True
if not header_token:
logger.warning("Missing X-Telegram-Bot-Api-Secret-Token header")
return False
return hmac.compare_digest(header_token, secret_token)
# ============================================================
# Telegram API Helpers (via Cloudflare Worker)
# ============================================================
async def send_telegram_message(chat_id: int, text: str, parse_mode: str = "HTML", reply_to_message_id: Optional[int] = None) -> bool:
"""Send message to Telegram via Cloudflare Worker"""
if not TELEGRAM_TOKEN:
return False
if not text or len(text.strip()) == 0:
text = "⚠️ No response generated. Please try again."
text = text.replace('
', '\n').replace('
', '\n').replace('
', '\n')
async with httpx.AsyncClient(timeout=30.0) as client:
try:
response = await client.post(
f"{CLOUDFLARE_WORKER_URL}/bot{TELEGRAM_TOKEN}/sendMessage",
json={
"chat_id": chat_id,
"text": text[:4096],
"parse_mode": parse_mode,
"disable_web_page_preview": True,
"reply_to_message_id": reply_to_message_id
}
)
if response.status_code == 200:
logger.info(f"✅ Message sent to {chat_id}")
return True
else:
logger.error(f"Failed to send message: {response.text}")
return False
except Exception as e:
logger.error(f"Error sending message: {e}")
return False
async def send_typing_action(chat_id: int) -> None:
"""Show typing indicator via Cloudflare Worker"""
if not TELEGRAM_TOKEN:
return
async with httpx.AsyncClient(timeout=10.0) as client:
try:
await client.post(
f"{CLOUDFLARE_WORKER_URL}/bot{TELEGRAM_TOKEN}/sendChatAction",
json={"chat_id": chat_id, "action": "typing"}
)
except Exception as e:
logger.warning(f"Typing action failed: {e}")
async def set_bot_commands() -> None:
"""Register bot commands globally via Cloudflare Worker"""
if not TELEGRAM_TOKEN:
return
commands = [
{"command": "start", "description": "🚀 Start the bot"},
{"command": "help", "description": "❓ Get help"},
{"command": "about", "description": "ℹ️ About"},
{"command": "reset", "description": "🔄 Reset conversation"}
]
async with httpx.AsyncClient(timeout=30.0) as client:
try:
response = await client.post(
f"{CLOUDFLARE_WORKER_URL}/bot{TELEGRAM_TOKEN}/setMyCommands",
json={"commands": commands}
)
if response.status_code == 200:
logger.info("✅ Bot commands registered successfully")
else:
logger.warning(f"Failed to register commands: {response.text}")
except Exception as e:
logger.error(f"Error registering commands: {e}")
def format_ai_response(raw_response: str) -> str:
"""Format AI response for Telegram HTML with better table and code support"""
if not raw_response:
return ""
# أولاً: تنظيف HTML والإكساب
formatted = raw_response
# ============================================================
# 1. معالجة الجداول (تحويل Markdown Tables إلى نص منسق)
# ============================================================
table_lines = []
in_table = False
lines = formatted.split('\n')
new_lines = []
i = 0
while i < len(lines):
line = lines[i]
# كشف بداية الجدول (سطر فيه | وأسفله سطر فيه |---|)
if '|' in line and not in_table:
next_line = lines[i+1] if i+1 < len(lines) else ''
if '|' in next_line and re.search(r'\|[\s\-:]+\|', next_line):
in_table = True
table_lines = [line, next_line]
i += 2
continue
# جمع خطوط الجدول
if in_table and '|' in line:
table_lines.append(line)
i += 1
continue
elif in_table:
# نهاية الجدول - تحويله إلى نص منسق
formatted_table = format_markdown_table(table_lines)
new_lines.append(formatted_table)
in_table = False
table_lines = []
continue
# معالجة الخطوط العادية
new_lines.append(line)
i += 1
if in_table and table_lines:
new_lines.append(format_markdown_table(table_lines))
formatted = '\n'.join(new_lines)
# ============================================================
# 2. تنسيق الأكواد (Code Blocks)
# ============================================================
# تحويل ```code``` إلى
code
formatted = re.sub(
r'```(\w*)\n(.*?)\n```',
lambda m: f'{escape(m.group(2))}',
formatted,
flags=re.DOTALL
)
# تحويل `inline code` إلى ...
formatted = re.sub(r'`([^`]+)`', r'\1', formatted)
# ============================================================
# 3. تنسيق النصوص العادية
# ============================================================
# Bold: **text** -> text
formatted = re.sub(r'\*\*(.*?)\*\*', r'\1', formatted)
# Italic: *text* -> text (ليس في بداية أو نهاية الكلمة)
formatted = re.sub(r'(?\1', formatted)
# العناوين (# Title) -> Title مع سطر جديد
formatted = re.sub(r'^#+\s+(.*?)$', r'\1', formatted, flags=re.MULTILINE)
# القوائم: - item -> • item
formatted = re.sub(r'^[\-\*]\s+(.*?)$', r'• \1', formatted, flags=re.MULTILINE)
# الأرقام: 1. item -> 1) item
formatted = re.sub(r'^(\d+)\.\s+(.*?)$', r'\1) \2', formatted, flags=re.MULTILINE)
# Escape كامل للـ HTML لمنع الثغرات
# formatted = escape(formatted) # لو عايز تمنع أي HTML خالص
# التأكد من عدم تجاوز 4096 حرف
if len(formatted) > 4000:
formatted = formatted[:4000] + "\n\n... (response truncated)"
return formatted
def format_markdown_table(table_lines: list) -> str:
"""Convert markdown table to formatted text for Telegram"""
if len(table_lines) < 2:
return '\n'.join(table_lines)
# استخراج الخلايا
rows = []
for line in table_lines:
# تقسيم حسب | وتنظيف
cells = [cell.strip() for cell in line.split('|')]
# إزالة أول وآخر عنصر (فارغين بسبب بداية ونهاية السطر بـ |)
if cells and cells[0] == '':
cells = cells[1:]
if cells and cells[-1] == '':
cells = cells[:-1]
if cells:
rows.append(cells)
if len(rows) < 2:
return '\n'.join(table_lines)
# تحديد أقصى عرض لكل عمود
col_widths = []
for col_idx in range(len(rows[0])):
max_width = max(len(row[col_idx]) if col_idx < len(row) else 0 for row in rows)
col_widths.append(min(max_width, 20)) # حد أقصى 20 حرف
# بناء الجدول النصي
result = []
# خط الفاصل العلوي
separator = '+' + '+'.join(['-' * (w + 2) for w in col_widths]) + '+'
for row_idx, row in enumerate(rows):
# ملء الخلايا الناقصة
while len(row) < len(col_widths):
row.append('')
# بناء الصف
row_line = '| ' + ' | '.join([
cell[:col_widths[i]].ljust(col_widths[i]) if i < len(cell) else ' ' * col_widths[i]
for i, cell in enumerate(row)
]) + ' |'
result.append(row_line)
# بعد الصف الأول (الرأس)، أضف خط فاصل
if row_idx == 0:
result.append(separator)
result.append(separator)
return '\n'.join(result)
def clean_text_for_tts(text: str) -> str:
"""
تزيل الأكواد البرمجية من النص قبل تحويله إلى صوت
وتحل محل الرموز الخاصة بنظيرها المنطوق
"""
if not text:
return ""
# إزالة blocks كاملة للكود (``` ... ```)
text = re.sub(r'```[\s\S]*?```', '', text)
# إزالة الكود المضمن (inline code) `code`
text = re.sub(r'`([^`]+)`', r'\1', text)
# استبدال الروابط بالكلمة "رابط"
text = re.sub(r'https?://\S+', 'رابط', text)
# استبدال علامات Markdown (إزالتها)
text = re.sub(r'\*\*(.*?)\*\*', r'\1', text) # **bold**
text = re.sub(r'\*(.*?)\*', r'\1', text) # *italic*
text = re.sub(r'\[(.*?)\]\(.*?\)', r'\1', text) # [link](url)
# استبدال الرموز الشائعة
replacements = {
'#': 'hashtag ',
'```': '',
'`': '',
'**': '',
'*': '',
'_': ' ',
'-': ' ',
'\n': '. ',
}
for old, new in replacements.items():
text = text.replace(old, new)
# إزالة المسافات الزائدة
text = re.sub(r'\s+', ' ', text).strip()
return text
# ============================================================
# AI Call with Multi-Token Fallback
# ============================================================
async def call_ai_with_fallback(user_message: str, enhanced_prompt: str) -> str:
"""Calls the AI with automatic fallback to backup tokens"""
available_tokens = []
if HF_TOKEN:
available_tokens.append(HF_TOKEN)
if BACKUP_HF_TOKEN:
available_tokens.append(BACKUP_HF_TOKEN)
if not available_tokens:
logger.error("No HF tokens available!")
return get_error_message("en")
for i, token in enumerate(available_tokens):
try:
logger.info(f"🔄 Trying token #{i+1}...")
response_chunks = []
stream = request_generation(
api_key=token,
api_base=API_ENDPOINT,
message=user_message,
system_prompt=enhanced_prompt,
model_name=MODEL_NAME,
temperature=0.7,
max_new_tokens=8000,
deep_search=True,
input_type="text",
output_format="text"
)
for chunk in stream:
if isinstance(chunk, str) and chunk not in ["analysis", "assistantfinal"]:
response_chunks.append(chunk)
bot_reply = "".join(response_chunks).strip()
if bot_reply and len(bot_reply) >= 5:
logger.info(f"✅ Token #{i+1} succeeded")
return bot_reply
else:
raise Exception("Empty or insufficient response")
except Exception as e:
logger.warning(f"⚠️ Token #{i+1} failed: {e}")
continue
logger.error("❌ All tokens failed")
return get_error_message("en")
# ============================================================
# Text-to-Speech Helpers
# ============================================================
async def text_to_speech_audio(text: str, lang: str = "ar") -> Optional[bytes]:
"""Convert text to speech using TTS_MODEL without reading code"""
try:
# تنظيف النص من الأكواد قبل التحويل
clean_text = clean_text_for_tts(text)
if not clean_text:
logger.warning("⚠️ No text left after cleaning for TTS")
return None
logger.info(f"🎤 Converting cleaned text to speech for language: {lang}")
logger.debug(f"Original text length: {len(text)}, Cleaned: {len(clean_text)}")
stream = request_generation(
api_key=HF_TOKEN,
api_base=API_ENDPOINT,
message=clean_text[:500], # حد 500 حرف عشان السرعة
system_prompt=f"Convert this text to natural speech in {lang} language. Speak clearly and naturally.",
model_name=TTS_MODEL,
temperature=0.7,
max_new_tokens=len(clean_text) * 2,
input_type="tts",
output_format="audio"
)
audio_data = b""
for chunk in stream:
if isinstance(chunk, bytes):
audio_data += chunk
if audio_data:
logger.info(f"✅ Audio generated: {len(audio_data)} bytes")
return audio_data
return None
except Exception as e:
logger.error(f"❌ Text-to-speech failed: {e}")
return None
async def send_voice_message(chat_id: int, audio_data: bytes, reply_to_message_id: Optional[int] = None):
"""Send voice message via Cloudflare Worker"""
try:
from pydub import AudioSegment
import io
audio_segment = AudioSegment.from_wav(io.BytesIO(audio_data))
ogg_io = io.BytesIO()
audio_segment.export(ogg_io, format="ogg")
ogg_data = ogg_io.getvalue()
except Exception as e:
logger.warning(f"Could not convert to OGG, using original: {e}")
ogg_data = audio_data
async with httpx.AsyncClient(timeout=30.0) as client:
files = {"voice": ("speech.ogg", ogg_data, "audio/ogg")}
data = {"chat_id": chat_id}
if reply_to_message_id:
data["reply_to_message_id"] = reply_to_message_id
response = await client.post(
f"{CLOUDFLARE_WORKER_URL}/bot{TELEGRAM_TOKEN}/sendVoice",
data=data,
files=files
)
if response.status_code == 200:
logger.info(f"🎵 Voice message sent to {chat_id}")
else:
logger.error(f"Failed to send voice: {response.text}")
async def answer_callback_query(callback_query_id: str, text: str):
"""Answer callback query"""
async with httpx.AsyncClient() as client:
await client.post(
f"{CLOUDFLARE_WORKER_URL}/bot{TELEGRAM_TOKEN}/answerCallbackQuery",
json={"callback_query_id": callback_query_id, "text": text, "show_alert": False}
)
# ============================================================
# Main Webhook Endpoint
# ============================================================
@router.post("/telegram/webhook")
async def telegram_webhook(request: Request, x_telegram_bot_api_secret_token: Optional[str] = Header(None)) -> Dict[str, Any]:
"""Main webhook endpoint - receives all updates from Telegram"""
if not TELEGRAM_TOKEN:
logger.error("TELEGRAM_TOKEN not configured")
raise HTTPException(status_code=500, detail="Bot not configured")
if TELEGRAM_SECRET_TOKEN and not verify_telegram_signature(TELEGRAM_SECRET_TOKEN, x_telegram_bot_api_secret_token):
logger.warning("Invalid or missing secret token")
raise HTTPException(status_code=403, detail="Forbidden")
try:
update = await request.json()
logger.debug(f"Received update: {json.dumps(update)[:500]}")
# ============================================================
# Handle Callback Query (Listen Button)
# ============================================================
callback_query = update.get("callback_query")
if callback_query:
data = callback_query.get("data", "")
if data.startswith("speak_"):
parts = data.split("_", 2)
if len(parts) >= 3:
original_chat_id = int(parts[1])
original_text = parts[2]
await answer_callback_query(callback_query["id"], "🎵 جاري تجهيز الصوت...")
user_lang = detect_language(original_text)
audio_data = await text_to_speech_audio(original_text, user_lang)
if audio_data:
await send_voice_message(original_chat_id, audio_data)
await answer_callback_query(callback_query["id"], "✅ تم إرسال الصوت")
else:
await answer_callback_query(callback_query["id"], "❌ عذراً، فشل تحويل النص إلى صوت")
return {"ok": True}
# ============================================================
# Handle Regular Messages
# ============================================================
message = update.get("message")
edited_message = update.get("edited_message")
if not message and not edited_message:
return {"ok": True}
msg = message or edited_message
is_edited = edited_message is not None
chat_id = msg.get("chat", {}).get("id")
user_message = msg.get("text")
message_id = msg.get("message_id")
if not chat_id or not user_message:
return {"ok": True}
user_lang = detect_language(user_message)
# Handle Commands
if user_message.startswith("/"):
command = user_message.split()[0].lower()
if command == "/start":
await send_telegram_message(chat_id, get_welcome_message(user_lang))
return {"ok": True}
elif command == "/help":
await send_telegram_message(chat_id, get_help_message(user_lang))
return {"ok": True}
elif command == "/about":
await send_telegram_message(chat_id, get_about_message(user_lang))
return {"ok": True}
elif command == "/reset":
await send_telegram_message(chat_id, get_reset_message(user_lang))
return {"ok": True}
# Show typing indicator
await send_typing_action(chat_id)
# Prepare system prompt
enhanced_prompt = enhance_system_prompt("", user_message, user=None)
# Call AI
try:
bot_reply = await call_ai_with_fallback(user_message, enhanced_prompt)
if not bot_reply or len(bot_reply) < 5:
bot_reply = get_empty_response_message(user_lang)
formatted_reply = format_ai_response(bot_reply)
if is_edited:
edit_prefix = {
"en": "✏️ *Message edited*\n\n",
"ar": "✏️ *تم تعديل الرسالة*\n\n",
"zh": "✏️ *消息已编辑*\n\n",
"ru": "✏️ *Сообщение отредактировано*\n\n"
}.get(user_lang, "✏️ *Message edited*\n\n")
formatted_reply = edit_prefix + formatted_reply
# Listen button
listen_button = {
"inline_keyboard": [
[{"text": "🔊 استمع إلى الرد", "callback_data": f"speak_{chat_id}_{bot_reply[:200]}"}]
]
}
await send_telegram_message(chat_id, formatted_reply, parse_mode="HTML", reply_to_message_id=message_id)
# Send button separately (Telegram doesn't support buttons with reply_to_message_id easily)
async with httpx.AsyncClient() as client:
await client.post(
f"{CLOUDFLARE_WORKER_URL}/bot{TELEGRAM_TOKEN}/sendMessage",
json={
"chat_id": chat_id,
"text": "🔊 اضغط للاستماع إلى الرد الصوتي",
"reply_markup": listen_button,
"reply_to_message_id": message_id
}
)
except Exception as ai_error:
logger.error(f"AI generation error: {ai_error}", exc_info=True)
await send_telegram_message(chat_id, get_error_message(user_lang))
return {"ok": True}
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON from Telegram: {e}")
raise HTTPException(status_code=400, detail="Invalid JSON")
except Exception as e:
logger.error(f"Unexpected error: {e}", exc_info=True)
return {"ok": False, "description": str(e)}
# ============================================================
# Utility Endpoints
# ============================================================
@router.post("/telegram/set-commands")
async def set_commands_endpoint() -> Dict[str, Any]:
await set_bot_commands()
return {"status": "ok", "message": "Commands registered"}
@router.get("/telegram/webhook-info")
async def get_webhook_info() -> Dict[str, Any]:
if not TELEGRAM_TOKEN:
raise HTTPException(status_code=500, detail="Bot not configured")
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(f"{CLOUDFLARE_WORKER_URL}/bot{TELEGRAM_TOKEN}/getWebhookInfo")
return response.json()