| |
| |
| |
|
|
| import os |
| import json |
| import re |
| import logging |
| import hmac |
| import httpx |
| from html import escape |
| from typing import Dict, Any, Optional |
| from fastapi import APIRouter, Request, HTTPException, Header |
| from utils.generation import request_generation |
| from api.endpoints import enhance_system_prompt |
| from utils.constants import API_ENDPOINT, MODEL_NAME, TTS_MODEL |
| from utils.generation import HF_TOKEN, BACKUP_HF_TOKEN |
|
|
| logger = logging.getLogger(__name__) |
| router = APIRouter() |
|
|
| |
| |
| |
| TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN") |
| TELEGRAM_SECRET_TOKEN = None |
| CLOUDFLARE_WORKER_URL = os.getenv("CLOUDFLARE_WORKER_URL", "https://mgzon-tg-proxy.amarlasfar0.workers.dev") |
|
|
| if not TELEGRAM_TOKEN: |
| logger.warning("⚠️ TELEGRAM_TOKEN is not set. Bot will not work.") |
|
|
| |
| |
| |
|
|
| def detect_language(text: str) -> str: |
| """Detect user's language from their message. Returns ISO 639-1 language code.""" |
| if not text: |
| return "en" |
| |
| if any(0x0600 <= ord(char) <= 0x06FF for char in text): |
| return "ar" |
| if any(0x4E00 <= ord(char) <= 0x9FFF for char in text): |
| return "zh" |
| if any(0x0400 <= ord(char) <= 0x04FF for char in text): |
| return "ru" |
| if any(char in "áéíóúñü¿¡" for char in text.lower()): |
| return "es" |
| if any(char in "àâçéèêëîïôûùÿæœ" for char in text.lower()): |
| return "fr" |
| if any(char in "äöüß" for char in text.lower()): |
| return "de" |
| |
| return "en" |
|
|
| |
| |
| |
|
|
| def get_welcome_message(lang: str) -> str: |
| messages = { |
| "en": "🤖 *Welcome to MGZon AI Assistant!*\n\nI'm your intelligent AI companion powered by advanced language models.\n\n📌 *What I can do:*\n• Answer programming and technical questions\n• Explain and debug code\n• Research and analyze information\n• Generate creative content\n\nJust send me a message and I'll help! 🚀", |
| "ar": "🤖 *مرحباً بك في مساعد MGZon AI!*\n\nأنا مساعدك الذكي المدعوم بأحدث نماذج الذكاء الاصطناعي.\n\n📌 *ماذا يمكنني أن أفعل؟*\n• الإجابة عن أسئلة البرمجة والتقنية\n• شرح وتصحيح الأكواد البرمجية\n• البحث وتحليل المعلومات\n• توليد محتوى إبداعي\n\nفقط أرسل لي رسالة وسأساعدك! 🚀", |
| "zh": "🤖 *欢迎使用 MGZon AI 助手!*\n\n我是由先进语言模型驱动的智能AI助手。\n\n📌 *我能做什么:*\n• 回答编程和技术问题\n• 解释和调试代码\n• 研究和分析信息\n• 生成创意内容\n\n直接给我发消息,我会帮助你!🚀", |
| "ru": "🤖 *Добро пожаловать в MGZon AI Assistant!*\n\nЯ ваш интеллектуальный помощник на основе передовых языковых моделей.\n\n📌 *Что я могу:*\n• Отвечать на вопросы по программированию\n• Объяснять и отлаживать код\n• Исследовать и анализировать информацию\n• Генерировать креативный контент\n\nПросто отправьте мне сообщение! 🚀", |
| "es": "🤖 *¡Bienvenido a MGZon AI Assistant!*\n\nSoy tu compañero de IA impulsado por modelos de lenguaje avanzados.\n\n📌 *Lo que puedo hacer:*\n• Responder preguntas de programación\n• Explicar y depurar código\n• Investigar y analizar información\n• Generar contenido creativo\n\n¡Solo envíame un mensaje y te ayudaré! 🚀", |
| "fr": "🤖 *Bienvenue sur MGZon AI Assistant!*\n\nJe suis votre compagnon IA alimenté par des modèles de langage avancés.\n\n📌 *Ce que je peux faire:*\n• Répondre aux questions de programmation\n• Expliquer et déboguer le code\n• Rechercher et analyser des informations\n• Générer du contenu créatif\n\nEnvoyez-moi simplement un message! 🚀", |
| "de": "🤖 *Willkommen bei MGZon AI Assistant!*\n\nIch bin Ihr intelligenter KI-Begleiter mit fortschrittlichen Sprachmodellen.\n\n📌 *Was ich kann:*\n• Beantwortung von Programmierfragen\n• Erklären und Debuggen von Code\n• Recherchieren und Analysieren von Informationen\n• Generieren von kreativen Inhalten\n\nSenden Sie mir einfach eine Nachricht! 🚀" |
| } |
| return messages.get(lang, messages["en"]) |
|
|
| def get_help_message(lang: str) -> str: |
| messages = { |
| "en": "❓ *MGZon AI Commands*\n\n📝 *Usage:*\nSimply type your question or paste code\n\n🔧 *Commands:*\n/start - Welcome message\n/help - Show this help\n/about - About MGZon AI\n/reset - Reset conversation\n\n💡 *Tips:*\n• Ask in any language\n• Share code for debugging\n• Request web search or analysis", |
| "ar": "❓ *أوامر مساعد MGZon AI*\n\n📝 *الاستخدام:*\nفقط اكتب سؤالك أو الصق الكود\n\n🔧 *الأوامر:*\n/start - رسالة الترحيب\n/help - عرض هذه المساعدة\n/about - معلومات عن MGZon AI\n/reset - إعادة تعيين المحادثة\n\n💡 *نصائح:*\n• اسأل بأي لغة\n• شارك الكود لتصحيحه\n• اطلب البحث أو التحليل", |
| "zh": "❓ *MGZon AI 命令*\n\n📝 *使用方法:*\n直接输入问题或粘贴代码\n\n🔧 *命令:*\n/start - 欢迎消息\n/help - 显示帮助\n/about - 关于 MGZon AI\n/reset - 重置对话\n\n💡 *提示:*\n• 可以用任何语言提问\n• 分享代码进行调试\n• 请求搜索或分析", |
| "ru": "❓ *Команды MGZon AI*\n\n📝 *Использование:*\nПросто напишите вопрос или вставьте код\n\n🔧 *Команды:*\n/start - Приветственное сообщение\n/help - Показать эту справку\n/about - О MGZon AI\n/reset - Сбросить диалог\n\n💡 *Советы:*\n• Спрашивайте на любом языке\n• Делитесь кодом для отладки\n• Запрашивайте поиск или анализ" |
| } |
| return messages.get(lang, messages["en"]) |
|
|
| def get_about_message(lang: str) -> str: |
| messages = { |
| "en": "ℹ️ *About MGZon AI*\n\nMGZon AI is an advanced AI assistant built on state-of-the-art language models.\n\n🌐 *Features:*\n• Multi-language support\n• Code analysis and generation\n• Web search integration\n• Image understanding\n\nCreated with ❤️ by the MGZon team\nhttps://mgzon-mgzon-app.hf.space", |
| "ar": "ℹ️ *حول MGZon AI*\n\nMGZon AI هو مساعد ذكاء اصطناعي متقدم يعتمد على أحدث نماذج اللغة.\n\n🌐 *الميزات:*\n• دعم متعدد اللغات\n• تحليل وتوليد الأكواد\n• تكامل البحث في الويب\n• فهم الصور\n\nتم الإنشاء بحب بواسطة فريق MGZon\nhttps://mgzon-mgzon-app.hf.space", |
| "zh": "ℹ️ *关于 MGZon AI*\n\nMGZon AI 是基于最先进语言模型构建的先进AI助手。\n\n🌐 *功能:*\n• 多语言支持\n• 代码分析和生成\n• 网络搜索集成\n• 图像理解\n\n由 MGZon 团队用心打造 ❤️\nhttps://mgzon-mgzon-app.hf.space" |
| } |
| return messages.get(lang, messages["en"]) |
|
|
| def get_reset_message(lang: str) -> str: |
| messages = { |
| "en": "🔄 *Conversation context reset*\n\nStart fresh with a new conversation!", |
| "ar": "🔄 *تم إعادة تعيين سياق المحادثة*\n\nابدأ محادثة جديدة!", |
| "zh": "🔄 *对话上下文已重置*\n\n开始新的对话!", |
| "ru": "🔄 *Контекст разговора сброшен*\n\nНачните новый разговор!" |
| } |
| return messages.get(lang, messages["en"]) |
|
|
| def get_non_text_message(lang: str) -> str: |
| messages = { |
| "en": "📎 *I can only process text messages for now.*\n\nPlease send me a text question or code snippet.", |
| "ar": "📎 *يمكنني معالجة الرسائل النصية فقط حالياً.*\n\nيرجى إرسال سؤال نصي أو مقتطف برمجي.", |
| "zh": "📎 *我目前只能处理文本消息。*\n\n请发送文本问题或代码片段。", |
| "ru": "📎 *Пока я могу обрабатывать только текстовые сообщения.*\n\nПожалуйста, отправьте текстовый вопрос или фрагмент кода." |
| } |
| return messages.get(lang, messages["en"]) |
|
|
| def get_error_message(lang: str) -> str: |
| messages = { |
| "en": "⚠️ *An error occurred while processing your request.*\n\nPlease try again in a few moments.", |
| "ar": "⚠️ *حدث خطأ أثناء معالجة طلبك.*\n\nيرجى المحاولة مرة أخرى بعد قليل.", |
| "zh": "⚠️ *处理您的请求时发生错误。*\n\n请稍后再试。", |
| "ru": "⚠️ *Произошла ошибка при обработке вашего запроса.*\n\nПожалуйста, попробуйте позже." |
| } |
| return messages.get(lang, messages["en"]) |
|
|
| def get_empty_response_message(lang: str) -> str: |
| messages = { |
| "en": "🤔 I couldn't generate a proper response. Could you rephrase your question?", |
| "ar": "🤔 لم أتمكن من إنشاء رد مناسب. هل يمكنك إعادة صياغة سؤالك؟", |
| "zh": "🤔 我无法生成合适的回复。你能重新表述一下你的问题吗?", |
| "ru": "🤔 Я не смог сгенерировать подходящий ответ. Можете переформулировать вопрос?" |
| } |
| return messages.get(lang, messages["en"]) |
|
|
| |
| |
| |
|
|
| def verify_telegram_signature(secret_token: Optional[str], header_token: Optional[str]) -> bool: |
| if not secret_token: |
| return True |
| if not header_token: |
| logger.warning("Missing X-Telegram-Bot-Api-Secret-Token header") |
| return False |
| return hmac.compare_digest(header_token, secret_token) |
|
|
| |
| |
| |
|
|
| async def send_telegram_message(chat_id: int, text: str, parse_mode: str = "HTML", reply_to_message_id: Optional[int] = None) -> bool: |
| """Send message to Telegram via Cloudflare Worker""" |
| if not TELEGRAM_TOKEN: |
| return False |
| |
| if not text or len(text.strip()) == 0: |
| text = "⚠️ No response generated. Please try again." |
| |
| text = text.replace('<br>', '\n').replace('<br/>', '\n').replace('<br />', '\n') |
|
|
| async with httpx.AsyncClient(timeout=30.0) as client: |
| try: |
| response = await client.post( |
| f"{CLOUDFLARE_WORKER_URL}/bot{TELEGRAM_TOKEN}/sendMessage", |
| json={ |
| "chat_id": chat_id, |
| "text": text[:4096], |
| "parse_mode": parse_mode, |
| "disable_web_page_preview": True, |
| "reply_to_message_id": reply_to_message_id |
| } |
| ) |
| if response.status_code == 200: |
| logger.info(f"✅ Message sent to {chat_id}") |
| return True |
| else: |
| logger.error(f"Failed to send message: {response.text}") |
| return False |
| except Exception as e: |
| logger.error(f"Error sending message: {e}") |
| return False |
|
|
| async def send_typing_action(chat_id: int) -> None: |
| """Show typing indicator via Cloudflare Worker""" |
| if not TELEGRAM_TOKEN: |
| return |
| |
| async with httpx.AsyncClient(timeout=10.0) as client: |
| try: |
| await client.post( |
| f"{CLOUDFLARE_WORKER_URL}/bot{TELEGRAM_TOKEN}/sendChatAction", |
| json={"chat_id": chat_id, "action": "typing"} |
| ) |
| except Exception as e: |
| logger.warning(f"Typing action failed: {e}") |
|
|
| async def set_bot_commands() -> None: |
| """Register bot commands globally via Cloudflare Worker""" |
| if not TELEGRAM_TOKEN: |
| return |
| |
| commands = [ |
| {"command": "start", "description": "🚀 Start the bot"}, |
| {"command": "help", "description": "❓ Get help"}, |
| {"command": "about", "description": "ℹ️ About"}, |
| {"command": "reset", "description": "🔄 Reset conversation"} |
| ] |
| |
| async with httpx.AsyncClient(timeout=30.0) as client: |
| try: |
| response = await client.post( |
| f"{CLOUDFLARE_WORKER_URL}/bot{TELEGRAM_TOKEN}/setMyCommands", |
| json={"commands": commands} |
| ) |
| if response.status_code == 200: |
| logger.info("✅ Bot commands registered successfully") |
| else: |
| logger.warning(f"Failed to register commands: {response.text}") |
| except Exception as e: |
| logger.error(f"Error registering commands: {e}") |
|
|
| def format_ai_response(raw_response: str) -> str: |
| """Format AI response for Telegram HTML with better table and code support""" |
| if not raw_response: |
| return "" |
| |
| |
| formatted = raw_response |
| |
| |
| |
| |
| table_lines = [] |
| in_table = False |
| lines = formatted.split('\n') |
| new_lines = [] |
| i = 0 |
| |
| while i < len(lines): |
| line = lines[i] |
| |
| |
| if '|' in line and not in_table: |
| next_line = lines[i+1] if i+1 < len(lines) else '' |
| if '|' in next_line and re.search(r'\|[\s\-:]+\|', next_line): |
| in_table = True |
| table_lines = [line, next_line] |
| i += 2 |
| continue |
| |
| |
| if in_table and '|' in line: |
| table_lines.append(line) |
| i += 1 |
| continue |
| elif in_table: |
| |
| formatted_table = format_markdown_table(table_lines) |
| new_lines.append(formatted_table) |
| in_table = False |
| table_lines = [] |
| continue |
| |
| |
| new_lines.append(line) |
| i += 1 |
| |
| if in_table and table_lines: |
| new_lines.append(format_markdown_table(table_lines)) |
| |
| formatted = '\n'.join(new_lines) |
| |
| |
| |
| |
| |
| formatted = re.sub( |
| r'```(\w*)\n(.*?)\n```', |
| lambda m: f'<pre><code>{escape(m.group(2))}</code></pre>', |
| formatted, |
| flags=re.DOTALL |
| ) |
| |
| |
| formatted = re.sub(r'`([^`]+)`', r'<code>\1</code>', formatted) |
| |
| |
| |
| |
| |
| formatted = re.sub(r'\*\*(.*?)\*\*', r'<b>\1</b>', formatted) |
| |
| |
| formatted = re.sub(r'(?<!\*)\*(?!\*)(.*?)(?<!\*)\*(?!\*)', r'<i>\1</i>', formatted) |
| |
| |
| formatted = re.sub(r'^#+\s+(.*?)$', r'<b>\1</b>', formatted, flags=re.MULTILINE) |
| |
| |
| formatted = re.sub(r'^[\-\*]\s+(.*?)$', r'• \1', formatted, flags=re.MULTILINE) |
| |
| |
| formatted = re.sub(r'^(\d+)\.\s+(.*?)$', r'\1) \2', formatted, flags=re.MULTILINE) |
| |
| |
| |
| |
| |
| if len(formatted) > 4000: |
| formatted = formatted[:4000] + "\n\n... (response truncated)" |
| |
| return formatted |
|
|
|
|
| def format_markdown_table(table_lines: list) -> str: |
| """Convert markdown table to formatted text for Telegram""" |
| if len(table_lines) < 2: |
| return '\n'.join(table_lines) |
| |
| |
| rows = [] |
| for line in table_lines: |
| |
| cells = [cell.strip() for cell in line.split('|')] |
| |
| if cells and cells[0] == '': |
| cells = cells[1:] |
| if cells and cells[-1] == '': |
| cells = cells[:-1] |
| if cells: |
| rows.append(cells) |
| |
| if len(rows) < 2: |
| return '\n'.join(table_lines) |
| |
| |
| col_widths = [] |
| for col_idx in range(len(rows[0])): |
| max_width = max(len(row[col_idx]) if col_idx < len(row) else 0 for row in rows) |
| col_widths.append(min(max_width, 20)) |
| |
| |
| result = [] |
| |
| |
| separator = '+' + '+'.join(['-' * (w + 2) for w in col_widths]) + '+' |
| |
| for row_idx, row in enumerate(rows): |
| |
| while len(row) < len(col_widths): |
| row.append('') |
| |
| |
| row_line = '| ' + ' | '.join([ |
| cell[:col_widths[i]].ljust(col_widths[i]) if i < len(cell) else ' ' * col_widths[i] |
| for i, cell in enumerate(row) |
| ]) + ' |' |
| result.append(row_line) |
| |
| |
| if row_idx == 0: |
| result.append(separator) |
| |
| result.append(separator) |
| |
| return '\n'.join(result) |
|
|
|
|
| def clean_text_for_tts(text: str) -> str: |
| """ |
| تزيل الأكواد البرمجية من النص قبل تحويله إلى صوت |
| وتحل محل الرموز الخاصة بنظيرها المنطوق |
| """ |
| if not text: |
| return "" |
| |
| |
| text = re.sub(r'```[\s\S]*?```', '', text) |
| |
| |
| text = re.sub(r'`([^`]+)`', r'\1', text) |
| |
| |
| text = re.sub(r'https?://\S+', 'رابط', text) |
| |
| |
| text = re.sub(r'\*\*(.*?)\*\*', r'\1', text) |
| text = re.sub(r'\*(.*?)\*', r'\1', text) |
| text = re.sub(r'\[(.*?)\]\(.*?\)', r'\1', text) |
| |
| |
| replacements = { |
| '#': 'hashtag ', |
| '```': '', |
| '`': '', |
| '**': '', |
| '*': '', |
| '_': ' ', |
| '-': ' ', |
| '\n': '. ', |
| } |
| for old, new in replacements.items(): |
| text = text.replace(old, new) |
| |
| |
| text = re.sub(r'\s+', ' ', text).strip() |
| |
| return text |
|
|
| |
| |
| |
|
|
| async def call_ai_with_fallback(user_message: str, enhanced_prompt: str) -> str: |
| """Calls the AI with automatic fallback to backup tokens""" |
| available_tokens = [] |
| if HF_TOKEN: |
| available_tokens.append(HF_TOKEN) |
| if BACKUP_HF_TOKEN: |
| available_tokens.append(BACKUP_HF_TOKEN) |
| |
| if not available_tokens: |
| logger.error("No HF tokens available!") |
| return get_error_message("en") |
| |
| for i, token in enumerate(available_tokens): |
| try: |
| logger.info(f"🔄 Trying token #{i+1}...") |
| response_chunks = [] |
| stream = request_generation( |
| api_key=token, |
| api_base=API_ENDPOINT, |
| message=user_message, |
| system_prompt=enhanced_prompt, |
| model_name=MODEL_NAME, |
| temperature=0.7, |
| max_new_tokens=8000, |
| deep_search=True, |
| input_type="text", |
| output_format="text" |
| ) |
| |
| for chunk in stream: |
| if isinstance(chunk, str) and chunk not in ["analysis", "assistantfinal"]: |
| response_chunks.append(chunk) |
| |
| bot_reply = "".join(response_chunks).strip() |
| if bot_reply and len(bot_reply) >= 5: |
| logger.info(f"✅ Token #{i+1} succeeded") |
| return bot_reply |
| else: |
| raise Exception("Empty or insufficient response") |
| |
| except Exception as e: |
| logger.warning(f"⚠️ Token #{i+1} failed: {e}") |
| continue |
| |
| logger.error("❌ All tokens failed") |
| return get_error_message("en") |
|
|
| |
| |
| |
|
|
| async def text_to_speech_audio(text: str, lang: str = "ar") -> Optional[bytes]: |
| """Convert text to speech using TTS_MODEL without reading code""" |
| try: |
| |
| clean_text = clean_text_for_tts(text) |
| |
| if not clean_text: |
| logger.warning("⚠️ No text left after cleaning for TTS") |
| return None |
| |
| logger.info(f"🎤 Converting cleaned text to speech for language: {lang}") |
| logger.debug(f"Original text length: {len(text)}, Cleaned: {len(clean_text)}") |
| |
| stream = request_generation( |
| api_key=HF_TOKEN, |
| api_base=API_ENDPOINT, |
| message=clean_text[:500], |
| system_prompt=f"Convert this text to natural speech in {lang} language. Speak clearly and naturally.", |
| model_name=TTS_MODEL, |
| temperature=0.7, |
| max_new_tokens=len(clean_text) * 2, |
| input_type="tts", |
| output_format="audio" |
| ) |
| |
| audio_data = b"" |
| for chunk in stream: |
| if isinstance(chunk, bytes): |
| audio_data += chunk |
| |
| if audio_data: |
| logger.info(f"✅ Audio generated: {len(audio_data)} bytes") |
| return audio_data |
| return None |
| except Exception as e: |
| logger.error(f"❌ Text-to-speech failed: {e}") |
| return None |
|
|
| async def send_voice_message(chat_id: int, audio_data: bytes, reply_to_message_id: Optional[int] = None): |
| """Send voice message via Cloudflare Worker""" |
| try: |
| from pydub import AudioSegment |
| import io |
| |
| audio_segment = AudioSegment.from_wav(io.BytesIO(audio_data)) |
| ogg_io = io.BytesIO() |
| audio_segment.export(ogg_io, format="ogg") |
| ogg_data = ogg_io.getvalue() |
| except Exception as e: |
| logger.warning(f"Could not convert to OGG, using original: {e}") |
| ogg_data = audio_data |
| |
| async with httpx.AsyncClient(timeout=30.0) as client: |
| files = {"voice": ("speech.ogg", ogg_data, "audio/ogg")} |
| data = {"chat_id": chat_id} |
| if reply_to_message_id: |
| data["reply_to_message_id"] = reply_to_message_id |
| |
| response = await client.post( |
| f"{CLOUDFLARE_WORKER_URL}/bot{TELEGRAM_TOKEN}/sendVoice", |
| data=data, |
| files=files |
| ) |
| |
| if response.status_code == 200: |
| logger.info(f"🎵 Voice message sent to {chat_id}") |
| else: |
| logger.error(f"Failed to send voice: {response.text}") |
|
|
| async def answer_callback_query(callback_query_id: str, text: str): |
| """Answer callback query""" |
| async with httpx.AsyncClient() as client: |
| await client.post( |
| f"{CLOUDFLARE_WORKER_URL}/bot{TELEGRAM_TOKEN}/answerCallbackQuery", |
| json={"callback_query_id": callback_query_id, "text": text, "show_alert": False} |
| ) |
|
|
| |
| |
| |
|
|
| @router.post("/telegram/webhook") |
| async def telegram_webhook(request: Request, x_telegram_bot_api_secret_token: Optional[str] = Header(None)) -> Dict[str, Any]: |
| """Main webhook endpoint - receives all updates from Telegram""" |
| |
| if not TELEGRAM_TOKEN: |
| logger.error("TELEGRAM_TOKEN not configured") |
| raise HTTPException(status_code=500, detail="Bot not configured") |
| |
| if TELEGRAM_SECRET_TOKEN and not verify_telegram_signature(TELEGRAM_SECRET_TOKEN, x_telegram_bot_api_secret_token): |
| logger.warning("Invalid or missing secret token") |
| raise HTTPException(status_code=403, detail="Forbidden") |
| |
| try: |
| update = await request.json() |
| logger.debug(f"Received update: {json.dumps(update)[:500]}") |
| |
| |
| |
| |
| callback_query = update.get("callback_query") |
| if callback_query: |
| data = callback_query.get("data", "") |
| if data.startswith("speak_"): |
| parts = data.split("_", 2) |
| if len(parts) >= 3: |
| original_chat_id = int(parts[1]) |
| original_text = parts[2] |
| |
| await answer_callback_query(callback_query["id"], "🎵 جاري تجهيز الصوت...") |
| user_lang = detect_language(original_text) |
| audio_data = await text_to_speech_audio(original_text, user_lang) |
| |
| if audio_data: |
| await send_voice_message(original_chat_id, audio_data) |
| await answer_callback_query(callback_query["id"], "✅ تم إرسال الصوت") |
| else: |
| await answer_callback_query(callback_query["id"], "❌ عذراً، فشل تحويل النص إلى صوت") |
| return {"ok": True} |
| |
| |
| |
| |
| message = update.get("message") |
| edited_message = update.get("edited_message") |
| |
| if not message and not edited_message: |
| return {"ok": True} |
| |
| msg = message or edited_message |
| is_edited = edited_message is not None |
| |
| chat_id = msg.get("chat", {}).get("id") |
| user_message = msg.get("text") |
| message_id = msg.get("message_id") |
| |
| if not chat_id or not user_message: |
| return {"ok": True} |
| |
| user_lang = detect_language(user_message) |
| |
| |
| if user_message.startswith("/"): |
| command = user_message.split()[0].lower() |
| if command == "/start": |
| await send_telegram_message(chat_id, get_welcome_message(user_lang)) |
| return {"ok": True} |
| elif command == "/help": |
| await send_telegram_message(chat_id, get_help_message(user_lang)) |
| return {"ok": True} |
| elif command == "/about": |
| await send_telegram_message(chat_id, get_about_message(user_lang)) |
| return {"ok": True} |
| elif command == "/reset": |
| await send_telegram_message(chat_id, get_reset_message(user_lang)) |
| return {"ok": True} |
| |
| |
| await send_typing_action(chat_id) |
| |
| |
| enhanced_prompt = enhance_system_prompt("", user_message, user=None) |
| |
| |
| try: |
| bot_reply = await call_ai_with_fallback(user_message, enhanced_prompt) |
| |
| if not bot_reply or len(bot_reply) < 5: |
| bot_reply = get_empty_response_message(user_lang) |
| |
| formatted_reply = format_ai_response(bot_reply) |
| |
| if is_edited: |
| edit_prefix = { |
| "en": "✏️ *Message edited*\n\n", |
| "ar": "✏️ *تم تعديل الرسالة*\n\n", |
| "zh": "✏️ *消息已编辑*\n\n", |
| "ru": "✏️ *Сообщение отредактировано*\n\n" |
| }.get(user_lang, "✏️ *Message edited*\n\n") |
| formatted_reply = edit_prefix + formatted_reply |
| |
| |
| listen_button = { |
| "inline_keyboard": [ |
| [{"text": "🔊 استمع إلى الرد", "callback_data": f"speak_{chat_id}_{bot_reply[:200]}"}] |
| ] |
| } |
| |
| await send_telegram_message(chat_id, formatted_reply, parse_mode="HTML", reply_to_message_id=message_id) |
| |
| |
| async with httpx.AsyncClient() as client: |
| await client.post( |
| f"{CLOUDFLARE_WORKER_URL}/bot{TELEGRAM_TOKEN}/sendMessage", |
| json={ |
| "chat_id": chat_id, |
| "text": "🔊 اضغط للاستماع إلى الرد الصوتي", |
| "reply_markup": listen_button, |
| "reply_to_message_id": message_id |
| } |
| ) |
| |
| except Exception as ai_error: |
| logger.error(f"AI generation error: {ai_error}", exc_info=True) |
| await send_telegram_message(chat_id, get_error_message(user_lang)) |
| |
| return {"ok": True} |
| |
| except json.JSONDecodeError as e: |
| logger.error(f"Invalid JSON from Telegram: {e}") |
| raise HTTPException(status_code=400, detail="Invalid JSON") |
| except Exception as e: |
| logger.error(f"Unexpected error: {e}", exc_info=True) |
| return {"ok": False, "description": str(e)} |
|
|
| |
| |
| |
|
|
| @router.post("/telegram/set-commands") |
| async def set_commands_endpoint() -> Dict[str, Any]: |
| await set_bot_commands() |
| return {"status": "ok", "message": "Commands registered"} |
|
|
| @router.get("/telegram/webhook-info") |
| async def get_webhook_info() -> Dict[str, Any]: |
| if not TELEGRAM_TOKEN: |
| raise HTTPException(status_code=500, detail="Bot not configured") |
| |
| async with httpx.AsyncClient(timeout=30.0) as client: |
| response = await client.get(f"{CLOUDFLARE_WORKER_URL}/bot{TELEGRAM_TOKEN}/getWebhookInfo") |
| return response.json() |