Photobot / main.py
Dmitry1313's picture
Update main.py
97a3683 verified
import asyncio
import logging
import socket
from pathlib import Path
from aiogram import Bot, Dispatcher, F, types
from aiogram.filters import Command, CommandStart
from aiogram.fsm.context import FSMContext
from aiogram.fsm.storage.memory import MemoryStorage
from aiogram.types import BufferedInputFile
from deep_translator import GoogleTranslator
# Логирование должно быть настроено до всего остального
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Импорты конфигурации и модулей
from config import (
TG_BOT_TOKEN, check_config, BOT_NAME, MAX_REFERENCE_IMAGES,
WORKER_URL, CF_WORKER_URL, FACEFUSION_URL, DAILY_LIMIT
)
from states import UserStates
from keyboards import (
get_main_menu, get_reference_menu, get_progress_keyboard,
get_result_keyboard, get_style_menu
)
from services.cloudflare import generate_with_phoenix as generate_image
from services.face_fusion_api import FaceFusionClient
from services.usage import UsageTracker
# ------------------------------------------------------------
# Диагностика DNS для Cloudflare Worker
# ------------------------------------------------------------
try:
# Извлекаем имя хоста из WORKER_URL (например, telegram-forwarder.rubl1313.workers.dev)
worker_host = WORKER_URL.split('://')[1].split('/')[0]
ip = socket.gethostbyname(worker_host)
logger.info(f"✅ DNS для Cloudflare Worker: {worker_host} -> {ip}")
except Exception as e:
logger.error(f"❌ Ошибка DNS для Cloudflare Worker ({WORKER_URL}): {e}")
# ------------------------------------------------------------
# Настройка прокси (можно отключить для тестирования прямого доступа)
# ------------------------------------------------------------
USE_PROXY = True # Установите False, чтобы использовать прямой доступ к api.telegram.org
if USE_PROXY:
from aiogram.client.telegram import TelegramAPIServer
from aiogram.client.session.aiohttp import AiohttpSession
api_server = TelegramAPIServer(base=WORKER_URL, file=WORKER_URL, is_local=True)
session = AiohttpSession(api=api_server)
bot = Bot(token=TG_BOT_TOKEN, session=session)
logger.info(f"🔄 Используется прокси: {WORKER_URL}")
else:
bot = Bot(token=TG_BOT_TOKEN)
logger.info("🌐 Используется прямой доступ к api.telegram.org")
# ------------------------------------------------------------
# Инициализация остальных компонентов
# ------------------------------------------------------------
storage = MemoryStorage()
dp = Dispatcher(storage=storage)
facefusion_client = FaceFusionClient(api_url=FACEFUSION_URL)
DATA_DIR = Path("/data")
DATA_DIR.mkdir(exist_ok=True)
tracker = UsageTracker(daily_limit=DAILY_LIMIT, data_dir=DATA_DIR)
# Хранилище пользовательских данных (временное)
user_data: dict[int, dict] = {}
# ------------------------------------------------------------
# КОМАНДЫ
# ------------------------------------------------------------
@dp.message(CommandStart())
async def cmd_start(message: types.Message, state: FSMContext):
logger.info(f"Command /start from user {message.from_user.id}")
await state.clear()
user_data.pop(message.from_user.id, None)
await message.answer(
f"👋 Привет, {message.from_user.first_name}!\n\n"
f"Я — **{BOT_NAME}** на FLUX.1 Schnell 🎨\n\n"
f"⚡ **Преимущества:**\n"
f"• Быстрая генерация (3-8 сек)\n"
f"• Автоматически добавляю лицо к любому промпту\n"
f"• Поддержка референс-изображений\n"
f"• {tracker.daily_limit} генераций/день бесплатно\n\n"
f"💡 **Просто напиши что хочешь** — я сам добавлю лицо!\n\n"
f"🚀 **Начни:** нажми «🎨 Создать фото»!",
reply_markup=get_main_menu(),
parse_mode="Markdown"
)
@dp.message(Command("stats"))
async def cmd_stats(message: types.Message):
stats = tracker.get_stats_text(message.from_user.id)
await message.answer(stats, parse_mode="Markdown")
@dp.message(Command("help"))
async def cmd_help(message: types.Message):
await message.answer(
"❓ **Помощь**\n\n"
"🎨 **Создать фото:**\n"
"1. Нажми «🎨 Создать фото»\n"
"2. Отправь фото лица (как фото ИЛИ как файл)\n"
"3. Напиши что хочешь увидеть (на любом языке)\n"
"4. Выбери стиль (опционально)\n"
"5. Жди результат!\n\n"
"🤖 **Я сам добавлю лицо к твоему запросу!**\n\n"
"📝 **Примеры:**\n"
"• `astronaut on mars` → астронавт с твоим лицом\n"
"• `businessman in office` → бизнес-портрет\n\n"
"📸 **Референсы:**\n"
"• Можно добавить 1 референс (стиль/поза)\n"
"• Отправляй как фото или файл\n\n"
"⏰ Лимит сбрасывается в 00:00 UTC",
parse_mode="Markdown"
)
# ------------------------------------------------------------
# ОБРАБОТКА КНОПОК (ReplyKeyboard)
# ------------------------------------------------------------
@dp.message(F.text == "🎨 Создать фото")
async def start_creation(message: types.Message, state: FSMContext):
user_id = message.from_user.id
allowed, remaining = tracker.can_generate(user_id)
if not allowed:
await message.answer(
f"❌ **Лимит исчерпан!**\n\n"
f"Использовано: {tracker.daily_limit}/{tracker.daily_limit}\n"
f"Попробуй завтра после 00:00 UTC",
parse_mode="Markdown"
)
return
await state.set_state(UserStates.waiting_for_face_photo)
user_data[user_id] = {'face': None, 'references': []}
await message.answer(
"📸 **Отправь фото лица**\n\n"
"💡 Советы:\n"
"• Лицо крупным планом\n"
"• Хорошее освещение\n"
"• Анфас\n\n"
"📎 **Можно отправить:**\n"
"• Как фото (📷)\n"
"• Как файл (📎 Выбрать файл)\n"
"• Перетащить в чат",
parse_mode="Markdown"
)
@dp.message(F.text == "🖼️ С референсами")
async def start_with_references(message: types.Message, state: FSMContext):
await state.set_state(UserStates.waiting_for_face_photo)
user_id = message.from_user.id
user_data[user_id] = {'face': None, 'references': []}
await message.answer(
"📸 **Сначала отправь фото лица**\n\n"
f"После этого сможешь добавить 1 референс (стиль или поза)\n\n"
"📎 **Отправляй как фото или файл!**",
parse_mode="Markdown",
reply_markup=get_reference_menu()
)
@dp.message(F.text == "📊 Моя статистика")
async def show_user_stats(message: types.Message):
stats = tracker.get_stats_text(message.from_user.id)
await message.answer(stats, parse_mode="Markdown")
@dp.message(F.text == "⚙️ Настройки")
async def show_settings(message: types.Message):
await message.answer(
"⚙️ **Настройки**\n\n"
f"📌 Лимит: {tracker.daily_limit} генераций/день\n"
"🎨 Качество: Высокое (1024×1024)\n"
"⏱ Время: ~20-40 секунд\n"
"🖼️ Референсы: 1 фото",
parse_mode="Markdown"
)
@dp.message(F.text == "❓ Помощь")
async def show_help(message: types.Message):
await cmd_help(message)
# ------------------------------------------------------------
# ОБРАБОТКА ФОТО И ФАЙЛОВ
# ------------------------------------------------------------
@dp.message(UserStates.waiting_for_face_photo, F.photo)
async def handle_face_photo(message: types.Message, state: FSMContext):
await process_face_photo(message, state)
@dp.message(UserStates.waiting_for_face_photo, F.document)
async def handle_face_document(message: types.Message, state: FSMContext):
if message.document.mime_type.startswith('image/'):
await process_face_photo(message, state)
else:
await message.answer("❌ Это не изображение. Отправь фото (JPG, PNG)")
async def process_face_photo(message: types.Message, state: FSMContext):
user_id = message.from_user.id
try:
file = message.photo[-1] if message.photo else message.document
downloaded = await bot.download(file)
photo_bytes = downloaded.read()
if len(photo_bytes) == 0:
await message.answer("❌ Файл пустой")
return
user_data[user_id]['face'] = photo_bytes
await state.set_state(UserStates.choosing_references)
await message.answer(
"✅ **Фото лица принято!**\n\n"
"🖼️ **Добавить референс?**\n\n"
"⚠️ Можно добавить только **1 референс**",
reply_markup=get_reference_menu(),
parse_mode="Markdown"
)
except Exception as e:
logger.error(f"Error processing face photo: {e}")
await message.answer(f"❌ Ошибка: {e}")
# ------------------------------------------------------------
# CALLBACK QUERY (Inline-кнопки)
# ------------------------------------------------------------
@dp.callback_query(UserStates.choosing_references, F.data == "ref_face_only")
async def ref_face_only(callback: types.CallbackQuery, state: FSMContext):
await callback.answer()
await proceed_to_prompt(callback.message, state)
@dp.callback_query(UserStates.choosing_references, F.data.startswith("ref_"))
async def ref_selected(callback: types.CallbackQuery, state: FSMContext):
await callback.answer()
await state.set_state(UserStates.waiting_for_reference_photos)
await callback.message.edit_text(
"🖼️ **Отправь референс** (или напиши «пропустить»)\n\n"
"📎 Отправь как фото или файл",
parse_mode="Markdown"
)
@dp.message(UserStates.waiting_for_reference_photos, F.photo)
@dp.message(UserStates.waiting_for_reference_photos, F.document)
async def handle_reference_photo(message: types.Message, state: FSMContext):
user_id = message.from_user.id
try:
file = message.photo[-1] if message.photo else message.document
if not file.mime_type.startswith('image/'):
await message.answer("❌ Это не изображение")
return
downloaded = await bot.download(file)
photo_bytes = downloaded.read()
if len(user_data[user_id]['references']) >= MAX_REFERENCE_IMAGES:
await message.answer(f"❌ Максимум {MAX_REFERENCE_IMAGES} референс")
return
user_data[user_id]['references'].append(photo_bytes)
await message.answer(f"✅ Референс добавлен! Напиши «пропустить» чтобы продолжить")
except Exception as e:
await message.answer(f"❌ Ошибка: {e}")
@dp.message(UserStates.waiting_for_reference_photos, F.text)
async def skip_reference(message: types.Message, state: FSMContext):
if message.text.lower() in ["готово", "done", "хватит", "стоп", "пропустить", "skip"]:
await proceed_to_prompt(message, state)
async def proceed_to_prompt(message: types.Message, state: FSMContext):
await state.set_state(UserStates.waiting_for_prompt)
await message.answer(
"📝 **Напиши описание изображения**\n\n"
"💡 **Я сам добавлю лицо к твоему запросу!**\n\n"
"Примеры:\n"
"• `astronaut on mars`\n"
"• `business portrait in office`",
parse_mode="Markdown"
)
@dp.message(UserStates.waiting_for_prompt, F.text)
async def handle_prompt(message: types.Message, state: FSMContext):
prompt = message.text.strip()
if len(prompt) < 3:
await message.answer("❌ Промпт слишком короткий")
return
await state.update_data(prompt=prompt)
await state.set_state(UserStates.choosing_style)
await message.answer(
"🎨 **Выбери стиль** (опционально):",
reply_markup=get_style_menu()
)
@dp.callback_query(UserStates.choosing_style, F.data.startswith("style_"))
async def style_selected(callback: types.CallbackQuery, state: FSMContext):
await callback.answer()
style = callback.data
data = await state.get_data()
prompt = data.get("prompt")
user_id = callback.from_user.id
await callback.message.edit_text(
f"⏳ **Запускаю генерацию...**\n\n"
f"📝 Промпт: `{prompt}`\n"
f"🎨 Стиль: `{style}`\n\n"
"20-40 секунд ⏱",
parse_mode="Markdown"
)
await state.set_state(UserStates.generating)
await generate_and_send(callback.message, user_id, prompt, style)
@dp.callback_query(UserStates.choosing_style, F.data == "style_skip")
async def style_skipped(callback: types.CallbackQuery, state: FSMContext):
await callback.answer()
await style_selected(callback, state) # передаём пустой стиль (будет обработан в generate_and_send)
@dp.callback_query(F.data.startswith("back_to_main"))
async def back_to_main(callback: types.CallbackQuery, state: FSMContext):
await callback.answer()
await state.set_state(UserStates.idle)
await callback.message.answer(
"🏠 **Главное меню**",
reply_markup=get_main_menu()
)
try:
await callback.message.delete()
except:
pass
# ------------------------------------------------------------
# ГЕНЕРАЦИЯ
# ------------------------------------------------------------
async def generate_and_send(message: types.Message, user_id: int, prompt: str, style: str):
try:
user_face = user_data[user_id]['face']
references = user_data[user_id]['references']
# 🔥 АВТОПЕРЕВОД НА АНГЛИЙСКИЙ
try:
translator = GoogleTranslator(source='auto', target='en')
loop = asyncio.get_event_loop()
translated_prompt = await loop.run_in_executor(
None, translator.translate, prompt
)
except:
translated_prompt = prompt
# 🔥 БАЗОВЫЙ ПРОМПТ С ЛИЦОМ (добавляется всегда)
base_face_prompt = "portrait of a person with clear face, detailed facial features, visible face, looking at camera"
style_prompts = {
"style_cinematic": "cinematic lighting, film grain, dramatic",
"style_portrait": "professional portrait, studio lighting, sharp focus",
"style_art": "digital art, illustration, vibrant colors",
"style_realistic": "photorealistic, natural lighting, highly detailed",
"style_cyberpunk": "neon lights, cyberpunk, futuristic",
"style_fantasy": "fantasy art, magical, ethereal",
}
full_prompt = f"{base_face_prompt}, {translated_prompt}, {style_prompts.get(style, '')}".strip(", ")
await message.edit_text("🎨 Шаг 1/3: Генерация изображения...")
# Генерируем изображение через Cloudflare Worker (Phoenix)
generated_image = await generate_image(
prompt=full_prompt,
width=1024,
height=1024,
negative_prompt="ugly, blurry, low quality, distorted, watermark, no face, faceless, helmet, mask",
steps=12, # Phoenix использует steps
guidance=7.5
)
if not generated_image:
await message.edit_text("❌ Ошибка генерации изображения")
return
await message.edit_text("👤 Шаг 2/3: Замена лица...")
# Заменяем лицо через FaceFusion API
final_image = await facefusion_client.swap_face_simple(
source_face_bytes=user_face,
target_image_bytes=generated_image
)
if not final_image:
logger.warning("Face swap failed, sending original image")
final_image = generated_image
await message.edit_text("✅ Готово! Отправляю...")
await message.answer_photo(
photo=BufferedInputFile(final_image, filename="result.jpg"),
caption=f"📸 **Готово!**\n\n📝 `{prompt}`",
reply_markup=get_result_keyboard(message.message_id),
parse_mode="Markdown"
)
tracker.increment(user_id)
user_data.pop(user_id, None)
await message.delete()
except Exception as e:
logger.error(f"Generation error: {e}", exc_info=True)
await message.edit_text(f"❌ Ошибка: {str(e)}")
user_data.pop(user_id, None)
# ------------------------------------------------------------
# ЗАПУСК
# ------------------------------------------------------------
async def main():
check_config()
await bot.delete_webhook()
await bot.get_updates(offset=-1)
logger.info(f"🚀 {BOT_NAME} запущен!")
await dp.start_polling(bot)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("👋 Бот остановлен")