Spaces:
Sleeping
Sleeping
| import asyncio | |
| import logging | |
| import socket | |
| from pathlib import Path | |
| from aiogram import Bot, Dispatcher, F, types | |
| from aiogram.filters import Command, CommandStart | |
| from aiogram.fsm.context import FSMContext | |
| from aiogram.fsm.storage.memory import MemoryStorage | |
| from aiogram.types import BufferedInputFile | |
| from deep_translator import GoogleTranslator | |
| # Логирование должно быть настроено до всего остального | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Импорты конфигурации и модулей | |
| from config import ( | |
| TG_BOT_TOKEN, check_config, BOT_NAME, MAX_REFERENCE_IMAGES, | |
| WORKER_URL, CF_WORKER_URL, FACEFUSION_URL, DAILY_LIMIT | |
| ) | |
| from states import UserStates | |
| from keyboards import ( | |
| get_main_menu, get_reference_menu, get_progress_keyboard, | |
| get_result_keyboard, get_style_menu | |
| ) | |
| from services.cloudflare import generate_with_phoenix as generate_image | |
| from services.face_fusion_api import FaceFusionClient | |
| from services.usage import UsageTracker | |
| # ------------------------------------------------------------ | |
| # Диагностика DNS для Cloudflare Worker | |
| # ------------------------------------------------------------ | |
| try: | |
| # Извлекаем имя хоста из WORKER_URL (например, telegram-forwarder.rubl1313.workers.dev) | |
| worker_host = WORKER_URL.split('://')[1].split('/')[0] | |
| ip = socket.gethostbyname(worker_host) | |
| logger.info(f"✅ DNS для Cloudflare Worker: {worker_host} -> {ip}") | |
| except Exception as e: | |
| logger.error(f"❌ Ошибка DNS для Cloudflare Worker ({WORKER_URL}): {e}") | |
| # ------------------------------------------------------------ | |
| # Настройка прокси (можно отключить для тестирования прямого доступа) | |
| # ------------------------------------------------------------ | |
| USE_PROXY = True # Установите False, чтобы использовать прямой доступ к api.telegram.org | |
| if USE_PROXY: | |
| from aiogram.client.telegram import TelegramAPIServer | |
| from aiogram.client.session.aiohttp import AiohttpSession | |
| api_server = TelegramAPIServer(base=WORKER_URL, file=WORKER_URL, is_local=True) | |
| session = AiohttpSession(api=api_server) | |
| bot = Bot(token=TG_BOT_TOKEN, session=session) | |
| logger.info(f"🔄 Используется прокси: {WORKER_URL}") | |
| else: | |
| bot = Bot(token=TG_BOT_TOKEN) | |
| logger.info("🌐 Используется прямой доступ к api.telegram.org") | |
| # ------------------------------------------------------------ | |
| # Инициализация остальных компонентов | |
| # ------------------------------------------------------------ | |
| storage = MemoryStorage() | |
| dp = Dispatcher(storage=storage) | |
| facefusion_client = FaceFusionClient(api_url=FACEFUSION_URL) | |
| DATA_DIR = Path("/data") | |
| DATA_DIR.mkdir(exist_ok=True) | |
| tracker = UsageTracker(daily_limit=DAILY_LIMIT, data_dir=DATA_DIR) | |
| # Хранилище пользовательских данных (временное) | |
| user_data: dict[int, dict] = {} | |
| # ------------------------------------------------------------ | |
| # КОМАНДЫ | |
| # ------------------------------------------------------------ | |
| async def cmd_start(message: types.Message, state: FSMContext): | |
| logger.info(f"Command /start from user {message.from_user.id}") | |
| await state.clear() | |
| user_data.pop(message.from_user.id, None) | |
| await message.answer( | |
| f"👋 Привет, {message.from_user.first_name}!\n\n" | |
| f"Я — **{BOT_NAME}** на FLUX.1 Schnell 🎨\n\n" | |
| f"⚡ **Преимущества:**\n" | |
| f"• Быстрая генерация (3-8 сек)\n" | |
| f"• Автоматически добавляю лицо к любому промпту\n" | |
| f"• Поддержка референс-изображений\n" | |
| f"• {tracker.daily_limit} генераций/день бесплатно\n\n" | |
| f"💡 **Просто напиши что хочешь** — я сам добавлю лицо!\n\n" | |
| f"🚀 **Начни:** нажми «🎨 Создать фото»!", | |
| reply_markup=get_main_menu(), | |
| parse_mode="Markdown" | |
| ) | |
| async def cmd_stats(message: types.Message): | |
| stats = tracker.get_stats_text(message.from_user.id) | |
| await message.answer(stats, parse_mode="Markdown") | |
| async def cmd_help(message: types.Message): | |
| await message.answer( | |
| "❓ **Помощь**\n\n" | |
| "🎨 **Создать фото:**\n" | |
| "1. Нажми «🎨 Создать фото»\n" | |
| "2. Отправь фото лица (как фото ИЛИ как файл)\n" | |
| "3. Напиши что хочешь увидеть (на любом языке)\n" | |
| "4. Выбери стиль (опционально)\n" | |
| "5. Жди результат!\n\n" | |
| "🤖 **Я сам добавлю лицо к твоему запросу!**\n\n" | |
| "📝 **Примеры:**\n" | |
| "• `astronaut on mars` → астронавт с твоим лицом\n" | |
| "• `businessman in office` → бизнес-портрет\n\n" | |
| "📸 **Референсы:**\n" | |
| "• Можно добавить 1 референс (стиль/поза)\n" | |
| "• Отправляй как фото или файл\n\n" | |
| "⏰ Лимит сбрасывается в 00:00 UTC", | |
| parse_mode="Markdown" | |
| ) | |
| # ------------------------------------------------------------ | |
| # ОБРАБОТКА КНОПОК (ReplyKeyboard) | |
| # ------------------------------------------------------------ | |
| async def start_creation(message: types.Message, state: FSMContext): | |
| user_id = message.from_user.id | |
| allowed, remaining = tracker.can_generate(user_id) | |
| if not allowed: | |
| await message.answer( | |
| f"❌ **Лимит исчерпан!**\n\n" | |
| f"Использовано: {tracker.daily_limit}/{tracker.daily_limit}\n" | |
| f"Попробуй завтра после 00:00 UTC", | |
| parse_mode="Markdown" | |
| ) | |
| return | |
| await state.set_state(UserStates.waiting_for_face_photo) | |
| user_data[user_id] = {'face': None, 'references': []} | |
| await message.answer( | |
| "📸 **Отправь фото лица**\n\n" | |
| "💡 Советы:\n" | |
| "• Лицо крупным планом\n" | |
| "• Хорошее освещение\n" | |
| "• Анфас\n\n" | |
| "📎 **Можно отправить:**\n" | |
| "• Как фото (📷)\n" | |
| "• Как файл (📎 Выбрать файл)\n" | |
| "• Перетащить в чат", | |
| parse_mode="Markdown" | |
| ) | |
| async def start_with_references(message: types.Message, state: FSMContext): | |
| await state.set_state(UserStates.waiting_for_face_photo) | |
| user_id = message.from_user.id | |
| user_data[user_id] = {'face': None, 'references': []} | |
| await message.answer( | |
| "📸 **Сначала отправь фото лица**\n\n" | |
| f"После этого сможешь добавить 1 референс (стиль или поза)\n\n" | |
| "📎 **Отправляй как фото или файл!**", | |
| parse_mode="Markdown", | |
| reply_markup=get_reference_menu() | |
| ) | |
| async def show_user_stats(message: types.Message): | |
| stats = tracker.get_stats_text(message.from_user.id) | |
| await message.answer(stats, parse_mode="Markdown") | |
| async def show_settings(message: types.Message): | |
| await message.answer( | |
| "⚙️ **Настройки**\n\n" | |
| f"📌 Лимит: {tracker.daily_limit} генераций/день\n" | |
| "🎨 Качество: Высокое (1024×1024)\n" | |
| "⏱ Время: ~20-40 секунд\n" | |
| "🖼️ Референсы: 1 фото", | |
| parse_mode="Markdown" | |
| ) | |
| async def show_help(message: types.Message): | |
| await cmd_help(message) | |
| # ------------------------------------------------------------ | |
| # ОБРАБОТКА ФОТО И ФАЙЛОВ | |
| # ------------------------------------------------------------ | |
| async def handle_face_photo(message: types.Message, state: FSMContext): | |
| await process_face_photo(message, state) | |
| async def handle_face_document(message: types.Message, state: FSMContext): | |
| if message.document.mime_type.startswith('image/'): | |
| await process_face_photo(message, state) | |
| else: | |
| await message.answer("❌ Это не изображение. Отправь фото (JPG, PNG)") | |
| async def process_face_photo(message: types.Message, state: FSMContext): | |
| user_id = message.from_user.id | |
| try: | |
| file = message.photo[-1] if message.photo else message.document | |
| downloaded = await bot.download(file) | |
| photo_bytes = downloaded.read() | |
| if len(photo_bytes) == 0: | |
| await message.answer("❌ Файл пустой") | |
| return | |
| user_data[user_id]['face'] = photo_bytes | |
| await state.set_state(UserStates.choosing_references) | |
| await message.answer( | |
| "✅ **Фото лица принято!**\n\n" | |
| "🖼️ **Добавить референс?**\n\n" | |
| "⚠️ Можно добавить только **1 референс**", | |
| reply_markup=get_reference_menu(), | |
| parse_mode="Markdown" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error processing face photo: {e}") | |
| await message.answer(f"❌ Ошибка: {e}") | |
| # ------------------------------------------------------------ | |
| # CALLBACK QUERY (Inline-кнопки) | |
| # ------------------------------------------------------------ | |
| async def ref_face_only(callback: types.CallbackQuery, state: FSMContext): | |
| await callback.answer() | |
| await proceed_to_prompt(callback.message, state) | |
| async def ref_selected(callback: types.CallbackQuery, state: FSMContext): | |
| await callback.answer() | |
| await state.set_state(UserStates.waiting_for_reference_photos) | |
| await callback.message.edit_text( | |
| "🖼️ **Отправь референс** (или напиши «пропустить»)\n\n" | |
| "📎 Отправь как фото или файл", | |
| parse_mode="Markdown" | |
| ) | |
| async def handle_reference_photo(message: types.Message, state: FSMContext): | |
| user_id = message.from_user.id | |
| try: | |
| file = message.photo[-1] if message.photo else message.document | |
| if not file.mime_type.startswith('image/'): | |
| await message.answer("❌ Это не изображение") | |
| return | |
| downloaded = await bot.download(file) | |
| photo_bytes = downloaded.read() | |
| if len(user_data[user_id]['references']) >= MAX_REFERENCE_IMAGES: | |
| await message.answer(f"❌ Максимум {MAX_REFERENCE_IMAGES} референс") | |
| return | |
| user_data[user_id]['references'].append(photo_bytes) | |
| await message.answer(f"✅ Референс добавлен! Напиши «пропустить» чтобы продолжить") | |
| except Exception as e: | |
| await message.answer(f"❌ Ошибка: {e}") | |
| async def skip_reference(message: types.Message, state: FSMContext): | |
| if message.text.lower() in ["готово", "done", "хватит", "стоп", "пропустить", "skip"]: | |
| await proceed_to_prompt(message, state) | |
| async def proceed_to_prompt(message: types.Message, state: FSMContext): | |
| await state.set_state(UserStates.waiting_for_prompt) | |
| await message.answer( | |
| "📝 **Напиши описание изображения**\n\n" | |
| "💡 **Я сам добавлю лицо к твоему запросу!**\n\n" | |
| "Примеры:\n" | |
| "• `astronaut on mars`\n" | |
| "• `business portrait in office`", | |
| parse_mode="Markdown" | |
| ) | |
| async def handle_prompt(message: types.Message, state: FSMContext): | |
| prompt = message.text.strip() | |
| if len(prompt) < 3: | |
| await message.answer("❌ Промпт слишком короткий") | |
| return | |
| await state.update_data(prompt=prompt) | |
| await state.set_state(UserStates.choosing_style) | |
| await message.answer( | |
| "🎨 **Выбери стиль** (опционально):", | |
| reply_markup=get_style_menu() | |
| ) | |
| async def style_selected(callback: types.CallbackQuery, state: FSMContext): | |
| await callback.answer() | |
| style = callback.data | |
| data = await state.get_data() | |
| prompt = data.get("prompt") | |
| user_id = callback.from_user.id | |
| await callback.message.edit_text( | |
| f"⏳ **Запускаю генерацию...**\n\n" | |
| f"📝 Промпт: `{prompt}`\n" | |
| f"🎨 Стиль: `{style}`\n\n" | |
| "20-40 секунд ⏱", | |
| parse_mode="Markdown" | |
| ) | |
| await state.set_state(UserStates.generating) | |
| await generate_and_send(callback.message, user_id, prompt, style) | |
| async def style_skipped(callback: types.CallbackQuery, state: FSMContext): | |
| await callback.answer() | |
| await style_selected(callback, state) # передаём пустой стиль (будет обработан в generate_and_send) | |
| async def back_to_main(callback: types.CallbackQuery, state: FSMContext): | |
| await callback.answer() | |
| await state.set_state(UserStates.idle) | |
| await callback.message.answer( | |
| "🏠 **Главное меню**", | |
| reply_markup=get_main_menu() | |
| ) | |
| try: | |
| await callback.message.delete() | |
| except: | |
| pass | |
| # ------------------------------------------------------------ | |
| # ГЕНЕРАЦИЯ | |
| # ------------------------------------------------------------ | |
| async def generate_and_send(message: types.Message, user_id: int, prompt: str, style: str): | |
| try: | |
| user_face = user_data[user_id]['face'] | |
| references = user_data[user_id]['references'] | |
| # 🔥 АВТОПЕРЕВОД НА АНГЛИЙСКИЙ | |
| try: | |
| translator = GoogleTranslator(source='auto', target='en') | |
| loop = asyncio.get_event_loop() | |
| translated_prompt = await loop.run_in_executor( | |
| None, translator.translate, prompt | |
| ) | |
| except: | |
| translated_prompt = prompt | |
| # 🔥 БАЗОВЫЙ ПРОМПТ С ЛИЦОМ (добавляется всегда) | |
| base_face_prompt = "portrait of a person with clear face, detailed facial features, visible face, looking at camera" | |
| style_prompts = { | |
| "style_cinematic": "cinematic lighting, film grain, dramatic", | |
| "style_portrait": "professional portrait, studio lighting, sharp focus", | |
| "style_art": "digital art, illustration, vibrant colors", | |
| "style_realistic": "photorealistic, natural lighting, highly detailed", | |
| "style_cyberpunk": "neon lights, cyberpunk, futuristic", | |
| "style_fantasy": "fantasy art, magical, ethereal", | |
| } | |
| full_prompt = f"{base_face_prompt}, {translated_prompt}, {style_prompts.get(style, '')}".strip(", ") | |
| await message.edit_text("🎨 Шаг 1/3: Генерация изображения...") | |
| # Генерируем изображение через Cloudflare Worker (Phoenix) | |
| generated_image = await generate_image( | |
| prompt=full_prompt, | |
| width=1024, | |
| height=1024, | |
| negative_prompt="ugly, blurry, low quality, distorted, watermark, no face, faceless, helmet, mask", | |
| steps=12, # Phoenix использует steps | |
| guidance=7.5 | |
| ) | |
| if not generated_image: | |
| await message.edit_text("❌ Ошибка генерации изображения") | |
| return | |
| await message.edit_text("👤 Шаг 2/3: Замена лица...") | |
| # Заменяем лицо через FaceFusion API | |
| final_image = await facefusion_client.swap_face_simple( | |
| source_face_bytes=user_face, | |
| target_image_bytes=generated_image | |
| ) | |
| if not final_image: | |
| logger.warning("Face swap failed, sending original image") | |
| final_image = generated_image | |
| await message.edit_text("✅ Готово! Отправляю...") | |
| await message.answer_photo( | |
| photo=BufferedInputFile(final_image, filename="result.jpg"), | |
| caption=f"📸 **Готово!**\n\n📝 `{prompt}`", | |
| reply_markup=get_result_keyboard(message.message_id), | |
| parse_mode="Markdown" | |
| ) | |
| tracker.increment(user_id) | |
| user_data.pop(user_id, None) | |
| await message.delete() | |
| except Exception as e: | |
| logger.error(f"Generation error: {e}", exc_info=True) | |
| await message.edit_text(f"❌ Ошибка: {str(e)}") | |
| user_data.pop(user_id, None) | |
| # ------------------------------------------------------------ | |
| # ЗАПУСК | |
| # ------------------------------------------------------------ | |
| async def main(): | |
| check_config() | |
| await bot.delete_webhook() | |
| await bot.get_updates(offset=-1) | |
| logger.info(f"🚀 {BOT_NAME} запущен!") | |
| await dp.start_polling(bot) | |
| if __name__ == "__main__": | |
| try: | |
| asyncio.run(main()) | |
| except KeyboardInterrupt: | |
| logger.info("👋 Бот остановлен") |