0xArctic's picture
Update
5aa53b4 verified
# bot.py
"""
Основной файл Telegram-бота для арт-терапии.
ФИНАЛЬНАЯ ВЕРСИЯ. Решает все проблемы совместимости и развертывания.
Принцип работы:
1. Веб-интерфейс Gradio запускается в отдельном фоновом потоке.
Он явно слушает хост 0.0.0.0 и порт 7860, чтобы быть видимым для
платформы Hugging Face и менять статус на "Running".
2. Telegram-бот на aiogram запускается в ОСНОВНОМ потоке, что является
требованием для корректной обработки системных сигналов.
"""
import asyncio
import logging
import os
import tempfile
import threading
from collections import Counter
from datetime import datetime, timedelta
from typing import Dict, Union, Callable, Awaitable
import gradio as gr
from aiogram import Bot, Dispatcher, types
from aiogram.exceptions import TelegramBadRequest
from aiogram.types import BotCommand, InlineKeyboardButton, InlineKeyboardMarkup
from dotenv import load_dotenv
from art_therapy_data import DEFAULT_ADVICE, RECOMMENDATIONS
from database import get_user_history, save_emotion
from emotion_analyzer import analyze_emotion_hf_photo, analyze_emotion_hf_drawing
# --- Конфигурация и инициализация ---
load_dotenv()
API_TOKEN = os.getenv("TELEGRAM_API_TOKEN")
if not API_TOKEN:
raise ValueError("ОШИБКА: Не удалось найти API_TOKEN. Добавьте его в .env или в секреты Space.")
EMOTION_TRANSLATIONS: Dict[str, str] = {
'angry': 'Гнев', 'disgust': 'Отвращение', 'fear': 'Страх', 'happy': 'Счастье',
'sad': 'Грусть', 'surprise': 'Удивление', 'neutral': 'Нейтральность'
}
EMOJI_MAP: Dict[str, str] = {
'angry': '😠', 'disgust': '🤢', 'fear': '😨', 'happy': '😄',
'sad': '😢', 'surprise': '😮', 'neutral': '😐'
}
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
bot = Bot(token=API_TOKEN)
dp = Dispatcher()
# Хранилища временного состояния (утеряются при перезапуске)
detailed_advice_messages: Dict[int, int] = {}
pending_emotions: Dict[int, str] = {}
async def set_commands(bot_instance: Bot) -> None:
"""Устанавливает список команд (меню) для бота."""
commands = [
BotCommand(command="/start", description="Запустить/Перезапустить бота"),
BotCommand(command="/manual", description="Выбрать эмоцию вручную"),
BotCommand(command="/history", description="Посмотреть историю эмоций"),
BotCommand(command="/stats", description="Посмотреть статистику за неделю"),
]
await bot_instance.set_my_commands(commands)
# --- Функции для управления состоянием ---
def commit_pending_emotion(user_id: int):
"""Сохраняет эмоцию от ИИ, если пользователь выполнил подтверждающее действие."""
if user_id in pending_emotions:
confirmed_emotion = pending_emotions.pop(user_id)
save_emotion(user_id, confirmed_emotion)
logger.info(f"Сохранена подтвержденная эмоция '{confirmed_emotion}' для пользователя {user_id}")
def clear_pending_emotion(user_id: int):
"""Удаляет временную эмоцию, если пользователь выбрал другую."""
if user_id in pending_emotions:
rejected_emotion = pending_emotions.pop(user_id)
logger.info(f"Отклонена (очищена) эмоция '{rejected_emotion}' от ИИ для пользователя {user_id}")
# --- Основная логика обработки изображений ---
async def download_and_process_image(message: types.Message, file_id: str) -> None:
"""Скачивает, анализирует изображение и отправляет результат."""
commit_pending_emotion(message.from_user.id)
status_message = await message.reply("🔍 Анализирую эмоцию...")
temp_file_path = None
try:
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
temp_file_path = tmp.name
file_info = await bot.get_file(file_id)
await bot.download_file(file_info.file_path, destination=temp_file_path)
await process_and_reply(message, temp_file_path)
await status_message.delete()
except Exception as e:
logger.error(f"Ошибка при загрузке или обработке файла: {e}", exc_info=True)
await status_message.edit_text("⚠️ Произошла ошибка при обработке файла.")
finally:
if temp_file_path and os.path.exists(temp_file_path):
os.remove(temp_file_path)
async def process_and_reply(message: types.Message, file_path: str) -> None:
"""Запускает анализ и отправляет ответ пользователю."""
loop = asyncio.get_running_loop()
emotion = None
try:
logger.info("Запуск анализа как ФОТО...")
emotion = await loop.run_in_executor(None, analyze_emotion_hf_photo, file_path)
if not emotion:
logger.info("Анализ фото не дал результата. Запуск анализа как РИСУНКА...")
emotion = await loop.run_in_executor(None, analyze_emotion_hf_drawing, file_path)
if emotion:
pending_emotions[message.from_user.id] = emotion
await send_emotion_response(message.reply, message.from_user.id, emotion, is_manual=False)
else:
raise ValueError("Ни один из методов анализа не сработал.")
except Exception as final_e:
logger.error(f"Все методы анализа не увенчались успехом: {final_e}", exc_info=True)
await message.reply("⚠️ Не удалось распознать эмоции. Попробуйте другое фото или рисунок.")
finally:
if os.path.exists(file_path):
os.remove(file_path)
# --- Функции для формирования и отправки ответов ---
async def send_emotion_response(send_method: Callable[..., Awaitable[types.Message]], user_id: int, emotion: str, is_manual: bool = False, advice_index: int = 0) -> None:
"""Формирует и отправляет сообщение с результатом анализа и кнопками."""
translated_emotion = EMOTION_TRANSLATIONS.get(emotion, emotion.capitalize())
emoji = EMOJI_MAP.get(emotion, '🧠')
advice_list = RECOMMENDATIONS.get(emotion, [DEFAULT_ADVICE])
current_advice = advice_list[advice_index % len(advice_list)]
response_text = (
f"{emoji} {'Выбранная' if is_manual else 'Обнаруженная'} эмоция: *{translated_emotion}*\n\n"
f"🎨 Совет по арт-терапии:\n{current_advice['short']}"
)
first_row_buttons = [InlineKeyboardButton(text="Подробнее ✨", callback_data=f"more:{emotion}:{advice_index}")]
if len(advice_list) > 1:
first_row_buttons.append(InlineKeyboardButton(text="Другой совет ➡️", callback_data=f"another:{emotion}:{advice_index}"))
keyboard = InlineKeyboardMarkup(inline_keyboard=[
first_row_buttons,
[InlineKeyboardButton(text="Не та эмоция? 🤔", callback_data="manual_start"),
InlineKeyboardButton(text="Новая эмоция 🆕", callback_data="start_new_session")]
])
await send_method(response_text, parse_mode='Markdown', reply_markup=keyboard)
def get_manual_emotion_keyboard() -> InlineKeyboardMarkup:
"""Создает клавиатуру для ручного выбора эмоции."""
buttons = [InlineKeyboardButton(text=f"{EMOJI_MAP[k]} {EMOTION_TRANSLATIONS[k]}", callback_data=f"manual_select:{k}")
for k in EMOTION_TRANSLATIONS.keys()]
keyboard_layout = [buttons[i:i + 2] for i in range(0, len(buttons), 2)]
return InlineKeyboardMarkup(inline_keyboard=keyboard_layout)
# --- Обработчики команд (message handlers) ---
@dp.message(lambda message: message.text in ['/start', '/help'])
async def send_welcome(message: types.Message) -> None:
commit_pending_emotion(message.from_user.id)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="Выбрать эмоцию вручную", callback_data="manual_start")],
[InlineKeyboardButton(text="Статистика 📊", callback_data="show_stats")]
])
await message.answer(
"👋 Привет! Отправь мне фото или рисунок, и я предложу арт-терапию.\n\n"
"Или используй кнопки ниже для выбора эмоции вручную и для просмотра своей статистики.",
reply_markup=keyboard
)
@dp.message(lambda message: message.text == '/history')
async def show_history(message: types.Message) -> None:
commit_pending_emotion(message.from_user.id)
history = get_user_history(message.from_user.id, limit=10)
if not history:
await message.reply("📜 Ваша история пока пуста. Отправьте фото для анализа!")
return
response_text = "📜 Ваши последние 10 эмоций:\n\n"
for record in reversed(history):
emotion_key = record['emotion']
translated = EMOTION_TRANSLATIONS.get(emotion_key, emotion_key.capitalize())
response_text += f"▪️ {record['date']}: *{translated}*\n"
await message.reply(response_text, parse_mode='Markdown')
@dp.message(lambda message: message.text == '/stats')
async def stats_command_handler(message: types.Message) -> None: await send_statistics(message)
@dp.message(lambda message: message.text == '/manual')
async def manual_choice_command(message: types.Message) -> None:
clear_pending_emotion(message.from_user.id)
await show_manual_choice(message)
@dp.message(lambda message: message.photo is not None)
async def handle_photo(message: types.Message) -> None: await download_and_process_image(message, message.photo[-1].file_id)
@dp.message(lambda message: message.document is not None and message.document.mime_type and message.document.mime_type.startswith('image/'))
async def handle_document(message: types.Message) -> None: await download_and_process_image(message, message.document.file_id)
# --- Обработчики колбэков (callback query handlers) ---
@dp.callback_query(lambda c: c.data.startswith('another:'))
async def process_callback_another(callback_query: types.CallbackQuery) -> None:
commit_pending_emotion(callback_query.from_user.id)
await delete_previous_detailed_message(callback_query.message.chat.id)
_, emotion, advice_index_str = callback_query.data.split(':')
next_index = int(advice_index_str) + 1
is_manual = 'Выбранная' in callback_query.message.text
try:
await send_emotion_response(callback_query.message.edit_text, callback_query.from_user.id, emotion, is_manual, next_index)
await callback_query.answer()
except TelegramBadRequest as e:
if "message is not modified" in e.message: await callback_query.answer("Вы просмотрели все советы.", show_alert=True)
else: logger.error(f"Ошибка Telegram в 'another': {e}")
@dp.callback_query(lambda c: c.data.startswith('more:'))
async def process_callback_more(callback_query: types.CallbackQuery) -> None:
commit_pending_emotion(callback_query.from_user.id)
await delete_previous_detailed_message(callback_query.message.chat.id)
_, emotion, advice_index_str = callback_query.data.split(':')
advice_list = RECOMMENDATIONS.get(emotion, [DEFAULT_ADVICE])
advice = advice_list[int(advice_index_str) % len(advice_list)]
keyboard = InlineKeyboardMarkup(inline_keyboard=[[InlineKeyboardButton(text="Скрыть совет 🗑️", callback_data="delete_detailed_advice")]])
sent_message = await callback_query.message.reply(f"✨ *Подробный совет:*\n\n{advice['long']}", parse_mode='Markdown', reply_markup=keyboard)
detailed_advice_messages[callback_query.message.chat.id] = sent_message.message_id
await callback_query.answer()
@dp.callback_query(lambda c: c.data.startswith('manual_select:'))
async def process_manual_emotion_selection(callback_query: types.CallbackQuery) -> None:
user_id = callback_query.from_user.id
clear_pending_emotion(user_id)
await delete_previous_detailed_message(callback_query.message.chat.id)
emotion = callback_query.data.split(':')[1]
save_emotion(user_id, emotion)
await send_emotion_response(callback_query.message.edit_text, user_id, emotion, is_manual=True)
await callback_query.answer()
# --- ИСПРАВЛЕННЫЙ ОБРАБОТЧИК ---
@dp.callback_query(lambda c: c.data in ['manual_start', 'start_new_session'])
async def process_navigation_callbacks(callback_query: types.CallbackQuery) -> None:
user_id = callback_query.from_user.id
await delete_previous_detailed_message(callback_query.message.chat.id)
if callback_query.data == 'manual_start':
clear_pending_emotion(user_id)
await show_manual_choice(callback_query)
elif callback_query.data == 'start_new_session':
commit_pending_emotion(user_id)
# ИСПРАВЛЕНИЕ: Мы больше ничего не делаем с исходным сообщением.
# Оно остается в чате без изменений.
# Просто отправляем новое приветственное сообщение.
await send_welcome(callback_query.message)
await callback_query.answer()
# --- КОНЕЦ ИСПРАВЛЕНИЯ ---
@dp.callback_query(lambda c: c.data == 'show_stats')
async def process_stats_callback(callback_query: types.CallbackQuery) -> None:
await send_statistics(callback_query)
await callback_query.answer()
@dp.callback_query(lambda c: c.data == 'delete_detailed_advice')
async def process_delete_detailed_advice(callback_query: types.CallbackQuery) -> None:
try:
await callback_query.message.delete()
except Exception:
pass
await callback_query.answer()
# --- Вспомогательные функции ---
async def show_manual_choice(message_or_query: Union[types.Message, types.CallbackQuery]) -> None:
"""Отображает меню ручного выбора эмоций."""
keyboard = get_manual_emotion_keyboard()
text = "Пожалуйста, выберите вашу текущую эмоцию:"
if isinstance(message_or_query, types.CallbackQuery):
await message_or_query.message.edit_text(text, reply_markup=keyboard)
else:
await message_or_query.reply(text, reply_markup=keyboard)
async def send_statistics(message_or_query: Union[types.Message, types.CallbackQuery]) -> None:
"""Отправляет статистику эмоций за последнюю неделю."""
user_id = message_or_query.from_user.id
commit_pending_emotion(user_id)
all_history = get_user_history(user_id, limit=None)
seven_days_ago = datetime.now() - timedelta(days=7)
recent_history = [rec for rec in all_history if datetime.strptime(rec['date'], "%Y-%m-%d %H:%M:%S") >= seven_days_ago]
keyboard = InlineKeyboardMarkup(inline_keyboard=[[InlineKeyboardButton(text="Новая эмоция 🆕", callback_data="start_new_session")]])
if not recent_history:
text = "📈 За последнюю неделю у вас нет записей об эмоциях. Время творить!"
else:
emotion_counts = Counter(rec['emotion'] for rec in recent_history)
total = len(recent_history)
text = "📈 *Ваша статистика эмоций за последнюю неделю:*\n\n"
for emotion, count in emotion_counts.most_common():
emoji, translated = EMOJI_MAP.get(emotion, '▪️'), EMOTION_TRANSLATIONS.get(emotion, emotion.capitalize())
percentage = (count / total) * 100
text += f"{emoji} *{translated}:* {count} раз ({percentage:.1f}%)\n"
send_method = message_or_query.answer if isinstance(message_or_query, types.Message) else message_or_query.message.answer
await send_method(text, parse_mode='Markdown', reply_markup=keyboard)
async def delete_previous_detailed_message(chat_id: int) -> None:
"""Удаляет старое сообщение с подробным советом, если оно есть."""
if chat_id in detailed_advice_messages:
try:
await bot.delete_message(chat_id, detailed_advice_messages.pop(chat_id))
except Exception:
pass
# --- Запуск приложения ---
def run_web_interface():
"""Запускает веб-интерфейс Gradio в отдельном фоновом потоке."""
def web_status():
return "Telegram-бот работает. Этот интерфейс нужен для UptimeRobot, чтобы Space не засыпал."
web_app = gr.Interface(
fn=web_status,
inputs=[],
outputs="text",
title="Статус Telegram-бота 'Арт-Терапевт'",
description="Эта страница подтверждает, что приложение на Hugging Face Space запущено и отвечает на запросы.",
allow_flagging="never"
)
web_app.launch(server_name="0.0.0.0", server_port=7860)
async def main():
"""
Основная асинхронная функция, которая запускает бота в основном потоке.
"""
gradio_thread = threading.Thread(target=run_web_interface, daemon=True)
gradio_thread.start()
await set_commands(bot)
logger.info("Бот готов к работе и запускается в основном потоке...")
await dp.start_polling(bot)
if __name__ == "__main__":
try:
asyncio.run(main())
except (KeyboardInterrupt, SystemExit):
logger.info("Бот остановлен вручную.")