Spaces:
Paused
Paused
| # bot.py | |
| """ | |
| Основной файл Telegram-бота для арт-терапии. | |
| ФИНАЛЬНАЯ ВЕРСИЯ. Решает все проблемы совместимости и развертывания. | |
| Принцип работы: | |
| 1. Веб-интерфейс Gradio запускается в отдельном фоновом потоке. | |
| Он явно слушает хост 0.0.0.0 и порт 7860, чтобы быть видимым для | |
| платформы Hugging Face и менять статус на "Running". | |
| 2. Telegram-бот на aiogram запускается в ОСНОВНОМ потоке, что является | |
| требованием для корректной обработки системных сигналов. | |
| """ | |
| import asyncio | |
| import logging | |
| import os | |
| import tempfile | |
| import threading | |
| from collections import Counter | |
| from datetime import datetime, timedelta | |
| from typing import Dict, Union, Callable, Awaitable | |
| import gradio as gr | |
| from aiogram import Bot, Dispatcher, types | |
| from aiogram.exceptions import TelegramBadRequest | |
| from aiogram.types import BotCommand, InlineKeyboardButton, InlineKeyboardMarkup | |
| from dotenv import load_dotenv | |
| from art_therapy_data import DEFAULT_ADVICE, RECOMMENDATIONS | |
| from database import get_user_history, save_emotion | |
| from emotion_analyzer import analyze_emotion_hf_photo, analyze_emotion_hf_drawing | |
| # --- Конфигурация и инициализация --- | |
| load_dotenv() | |
| API_TOKEN = os.getenv("TELEGRAM_API_TOKEN") | |
| if not API_TOKEN: | |
| raise ValueError("ОШИБКА: Не удалось найти API_TOKEN. Добавьте его в .env или в секреты Space.") | |
| EMOTION_TRANSLATIONS: Dict[str, str] = { | |
| 'angry': 'Гнев', 'disgust': 'Отвращение', 'fear': 'Страх', 'happy': 'Счастье', | |
| 'sad': 'Грусть', 'surprise': 'Удивление', 'neutral': 'Нейтральность' | |
| } | |
| EMOJI_MAP: Dict[str, str] = { | |
| 'angry': '😠', 'disgust': '🤢', 'fear': '😨', 'happy': '😄', | |
| 'sad': '😢', 'surprise': '😮', 'neutral': '😐' | |
| } | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| bot = Bot(token=API_TOKEN) | |
| dp = Dispatcher() | |
| # Хранилища временного состояния (утеряются при перезапуске) | |
| detailed_advice_messages: Dict[int, int] = {} | |
| pending_emotions: Dict[int, str] = {} | |
| async def set_commands(bot_instance: Bot) -> None: | |
| """Устанавливает список команд (меню) для бота.""" | |
| commands = [ | |
| BotCommand(command="/start", description="Запустить/Перезапустить бота"), | |
| BotCommand(command="/manual", description="Выбрать эмоцию вручную"), | |
| BotCommand(command="/history", description="Посмотреть историю эмоций"), | |
| BotCommand(command="/stats", description="Посмотреть статистику за неделю"), | |
| ] | |
| await bot_instance.set_my_commands(commands) | |
| # --- Функции для управления состоянием --- | |
| def commit_pending_emotion(user_id: int): | |
| """Сохраняет эмоцию от ИИ, если пользователь выполнил подтверждающее действие.""" | |
| if user_id in pending_emotions: | |
| confirmed_emotion = pending_emotions.pop(user_id) | |
| save_emotion(user_id, confirmed_emotion) | |
| logger.info(f"Сохранена подтвержденная эмоция '{confirmed_emotion}' для пользователя {user_id}") | |
| def clear_pending_emotion(user_id: int): | |
| """Удаляет временную эмоцию, если пользователь выбрал другую.""" | |
| if user_id in pending_emotions: | |
| rejected_emotion = pending_emotions.pop(user_id) | |
| logger.info(f"Отклонена (очищена) эмоция '{rejected_emotion}' от ИИ для пользователя {user_id}") | |
| # --- Основная логика обработки изображений --- | |
| async def download_and_process_image(message: types.Message, file_id: str) -> None: | |
| """Скачивает, анализирует изображение и отправляет результат.""" | |
| commit_pending_emotion(message.from_user.id) | |
| status_message = await message.reply("🔍 Анализирую эмоцию...") | |
| temp_file_path = None | |
| try: | |
| with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: | |
| temp_file_path = tmp.name | |
| file_info = await bot.get_file(file_id) | |
| await bot.download_file(file_info.file_path, destination=temp_file_path) | |
| await process_and_reply(message, temp_file_path) | |
| await status_message.delete() | |
| except Exception as e: | |
| logger.error(f"Ошибка при загрузке или обработке файла: {e}", exc_info=True) | |
| await status_message.edit_text("⚠️ Произошла ошибка при обработке файла.") | |
| finally: | |
| if temp_file_path and os.path.exists(temp_file_path): | |
| os.remove(temp_file_path) | |
| async def process_and_reply(message: types.Message, file_path: str) -> None: | |
| """Запускает анализ и отправляет ответ пользователю.""" | |
| loop = asyncio.get_running_loop() | |
| emotion = None | |
| try: | |
| logger.info("Запуск анализа как ФОТО...") | |
| emotion = await loop.run_in_executor(None, analyze_emotion_hf_photo, file_path) | |
| if not emotion: | |
| logger.info("Анализ фото не дал результата. Запуск анализа как РИСУНКА...") | |
| emotion = await loop.run_in_executor(None, analyze_emotion_hf_drawing, file_path) | |
| if emotion: | |
| pending_emotions[message.from_user.id] = emotion | |
| await send_emotion_response(message.reply, message.from_user.id, emotion, is_manual=False) | |
| else: | |
| raise ValueError("Ни один из методов анализа не сработал.") | |
| except Exception as final_e: | |
| logger.error(f"Все методы анализа не увенчались успехом: {final_e}", exc_info=True) | |
| await message.reply("⚠️ Не удалось распознать эмоции. Попробуйте другое фото или рисунок.") | |
| finally: | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| # --- Функции для формирования и отправки ответов --- | |
| async def send_emotion_response(send_method: Callable[..., Awaitable[types.Message]], user_id: int, emotion: str, is_manual: bool = False, advice_index: int = 0) -> None: | |
| """Формирует и отправляет сообщение с результатом анализа и кнопками.""" | |
| translated_emotion = EMOTION_TRANSLATIONS.get(emotion, emotion.capitalize()) | |
| emoji = EMOJI_MAP.get(emotion, '🧠') | |
| advice_list = RECOMMENDATIONS.get(emotion, [DEFAULT_ADVICE]) | |
| current_advice = advice_list[advice_index % len(advice_list)] | |
| response_text = ( | |
| f"{emoji} {'Выбранная' if is_manual else 'Обнаруженная'} эмоция: *{translated_emotion}*\n\n" | |
| f"🎨 Совет по арт-терапии:\n{current_advice['short']}" | |
| ) | |
| first_row_buttons = [InlineKeyboardButton(text="Подробнее ✨", callback_data=f"more:{emotion}:{advice_index}")] | |
| if len(advice_list) > 1: | |
| first_row_buttons.append(InlineKeyboardButton(text="Другой совет ➡️", callback_data=f"another:{emotion}:{advice_index}")) | |
| keyboard = InlineKeyboardMarkup(inline_keyboard=[ | |
| first_row_buttons, | |
| [InlineKeyboardButton(text="Не та эмоция? 🤔", callback_data="manual_start"), | |
| InlineKeyboardButton(text="Новая эмоция 🆕", callback_data="start_new_session")] | |
| ]) | |
| await send_method(response_text, parse_mode='Markdown', reply_markup=keyboard) | |
| def get_manual_emotion_keyboard() -> InlineKeyboardMarkup: | |
| """Создает клавиатуру для ручного выбора эмоции.""" | |
| buttons = [InlineKeyboardButton(text=f"{EMOJI_MAP[k]} {EMOTION_TRANSLATIONS[k]}", callback_data=f"manual_select:{k}") | |
| for k in EMOTION_TRANSLATIONS.keys()] | |
| keyboard_layout = [buttons[i:i + 2] for i in range(0, len(buttons), 2)] | |
| return InlineKeyboardMarkup(inline_keyboard=keyboard_layout) | |
| # --- Обработчики команд (message handlers) --- | |
| async def send_welcome(message: types.Message) -> None: | |
| commit_pending_emotion(message.from_user.id) | |
| keyboard = InlineKeyboardMarkup(inline_keyboard=[ | |
| [InlineKeyboardButton(text="Выбрать эмоцию вручную", callback_data="manual_start")], | |
| [InlineKeyboardButton(text="Статистика 📊", callback_data="show_stats")] | |
| ]) | |
| await message.answer( | |
| "👋 Привет! Отправь мне фото или рисунок, и я предложу арт-терапию.\n\n" | |
| "Или используй кнопки ниже для выбора эмоции вручную и для просмотра своей статистики.", | |
| reply_markup=keyboard | |
| ) | |
| async def show_history(message: types.Message) -> None: | |
| commit_pending_emotion(message.from_user.id) | |
| history = get_user_history(message.from_user.id, limit=10) | |
| if not history: | |
| await message.reply("📜 Ваша история пока пуста. Отправьте фото для анализа!") | |
| return | |
| response_text = "📜 Ваши последние 10 эмоций:\n\n" | |
| for record in reversed(history): | |
| emotion_key = record['emotion'] | |
| translated = EMOTION_TRANSLATIONS.get(emotion_key, emotion_key.capitalize()) | |
| response_text += f"▪️ {record['date']}: *{translated}*\n" | |
| await message.reply(response_text, parse_mode='Markdown') | |
| async def stats_command_handler(message: types.Message) -> None: await send_statistics(message) | |
| async def manual_choice_command(message: types.Message) -> None: | |
| clear_pending_emotion(message.from_user.id) | |
| await show_manual_choice(message) | |
| async def handle_photo(message: types.Message) -> None: await download_and_process_image(message, message.photo[-1].file_id) | |
| async def handle_document(message: types.Message) -> None: await download_and_process_image(message, message.document.file_id) | |
| # --- Обработчики колбэков (callback query handlers) --- | |
| async def process_callback_another(callback_query: types.CallbackQuery) -> None: | |
| commit_pending_emotion(callback_query.from_user.id) | |
| await delete_previous_detailed_message(callback_query.message.chat.id) | |
| _, emotion, advice_index_str = callback_query.data.split(':') | |
| next_index = int(advice_index_str) + 1 | |
| is_manual = 'Выбранная' in callback_query.message.text | |
| try: | |
| await send_emotion_response(callback_query.message.edit_text, callback_query.from_user.id, emotion, is_manual, next_index) | |
| await callback_query.answer() | |
| except TelegramBadRequest as e: | |
| if "message is not modified" in e.message: await callback_query.answer("Вы просмотрели все советы.", show_alert=True) | |
| else: logger.error(f"Ошибка Telegram в 'another': {e}") | |
| async def process_callback_more(callback_query: types.CallbackQuery) -> None: | |
| commit_pending_emotion(callback_query.from_user.id) | |
| await delete_previous_detailed_message(callback_query.message.chat.id) | |
| _, emotion, advice_index_str = callback_query.data.split(':') | |
| advice_list = RECOMMENDATIONS.get(emotion, [DEFAULT_ADVICE]) | |
| advice = advice_list[int(advice_index_str) % len(advice_list)] | |
| keyboard = InlineKeyboardMarkup(inline_keyboard=[[InlineKeyboardButton(text="Скрыть совет 🗑️", callback_data="delete_detailed_advice")]]) | |
| sent_message = await callback_query.message.reply(f"✨ *Подробный совет:*\n\n{advice['long']}", parse_mode='Markdown', reply_markup=keyboard) | |
| detailed_advice_messages[callback_query.message.chat.id] = sent_message.message_id | |
| await callback_query.answer() | |
| async def process_manual_emotion_selection(callback_query: types.CallbackQuery) -> None: | |
| user_id = callback_query.from_user.id | |
| clear_pending_emotion(user_id) | |
| await delete_previous_detailed_message(callback_query.message.chat.id) | |
| emotion = callback_query.data.split(':')[1] | |
| save_emotion(user_id, emotion) | |
| await send_emotion_response(callback_query.message.edit_text, user_id, emotion, is_manual=True) | |
| await callback_query.answer() | |
| # --- ИСПРАВЛЕННЫЙ ОБРАБОТЧИК --- | |
| async def process_navigation_callbacks(callback_query: types.CallbackQuery) -> None: | |
| user_id = callback_query.from_user.id | |
| await delete_previous_detailed_message(callback_query.message.chat.id) | |
| if callback_query.data == 'manual_start': | |
| clear_pending_emotion(user_id) | |
| await show_manual_choice(callback_query) | |
| elif callback_query.data == 'start_new_session': | |
| commit_pending_emotion(user_id) | |
| # ИСПРАВЛЕНИЕ: Мы больше ничего не делаем с исходным сообщением. | |
| # Оно остается в чате без изменений. | |
| # Просто отправляем новое приветственное сообщение. | |
| await send_welcome(callback_query.message) | |
| await callback_query.answer() | |
| # --- КОНЕЦ ИСПРАВЛЕНИЯ --- | |
| async def process_stats_callback(callback_query: types.CallbackQuery) -> None: | |
| await send_statistics(callback_query) | |
| await callback_query.answer() | |
| async def process_delete_detailed_advice(callback_query: types.CallbackQuery) -> None: | |
| try: | |
| await callback_query.message.delete() | |
| except Exception: | |
| pass | |
| await callback_query.answer() | |
| # --- Вспомогательные функции --- | |
| async def show_manual_choice(message_or_query: Union[types.Message, types.CallbackQuery]) -> None: | |
| """Отображает меню ручного выбора эмоций.""" | |
| keyboard = get_manual_emotion_keyboard() | |
| text = "Пожалуйста, выберите вашу текущую эмоцию:" | |
| if isinstance(message_or_query, types.CallbackQuery): | |
| await message_or_query.message.edit_text(text, reply_markup=keyboard) | |
| else: | |
| await message_or_query.reply(text, reply_markup=keyboard) | |
| async def send_statistics(message_or_query: Union[types.Message, types.CallbackQuery]) -> None: | |
| """Отправляет статистику эмоций за последнюю неделю.""" | |
| user_id = message_or_query.from_user.id | |
| commit_pending_emotion(user_id) | |
| all_history = get_user_history(user_id, limit=None) | |
| seven_days_ago = datetime.now() - timedelta(days=7) | |
| recent_history = [rec for rec in all_history if datetime.strptime(rec['date'], "%Y-%m-%d %H:%M:%S") >= seven_days_ago] | |
| keyboard = InlineKeyboardMarkup(inline_keyboard=[[InlineKeyboardButton(text="Новая эмоция 🆕", callback_data="start_new_session")]]) | |
| if not recent_history: | |
| text = "📈 За последнюю неделю у вас нет записей об эмоциях. Время творить!" | |
| else: | |
| emotion_counts = Counter(rec['emotion'] for rec in recent_history) | |
| total = len(recent_history) | |
| text = "📈 *Ваша статистика эмоций за последнюю неделю:*\n\n" | |
| for emotion, count in emotion_counts.most_common(): | |
| emoji, translated = EMOJI_MAP.get(emotion, '▪️'), EMOTION_TRANSLATIONS.get(emotion, emotion.capitalize()) | |
| percentage = (count / total) * 100 | |
| text += f"{emoji} *{translated}:* {count} раз ({percentage:.1f}%)\n" | |
| send_method = message_or_query.answer if isinstance(message_or_query, types.Message) else message_or_query.message.answer | |
| await send_method(text, parse_mode='Markdown', reply_markup=keyboard) | |
| async def delete_previous_detailed_message(chat_id: int) -> None: | |
| """Удаляет старое сообщение с подробным советом, если оно есть.""" | |
| if chat_id in detailed_advice_messages: | |
| try: | |
| await bot.delete_message(chat_id, detailed_advice_messages.pop(chat_id)) | |
| except Exception: | |
| pass | |
| # --- Запуск приложения --- | |
| def run_web_interface(): | |
| """Запускает веб-интерфейс Gradio в отдельном фоновом потоке.""" | |
| def web_status(): | |
| return "Telegram-бот работает. Этот интерфейс нужен для UptimeRobot, чтобы Space не засыпал." | |
| web_app = gr.Interface( | |
| fn=web_status, | |
| inputs=[], | |
| outputs="text", | |
| title="Статус Telegram-бота 'Арт-Терапевт'", | |
| description="Эта страница подтверждает, что приложение на Hugging Face Space запущено и отвечает на запросы.", | |
| allow_flagging="never" | |
| ) | |
| web_app.launch(server_name="0.0.0.0", server_port=7860) | |
| async def main(): | |
| """ | |
| Основная асинхронная функция, которая запускает бота в основном потоке. | |
| """ | |
| gradio_thread = threading.Thread(target=run_web_interface, daemon=True) | |
| gradio_thread.start() | |
| await set_commands(bot) | |
| logger.info("Бот готов к работе и запускается в основном потоке...") | |
| await dp.start_polling(bot) | |
| if __name__ == "__main__": | |
| try: | |
| asyncio.run(main()) | |
| except (KeyboardInterrupt, SystemExit): | |
| logger.info("Бот остановлен вручную.") |