MagicFaceTG / bot.py
rastof9's picture
Update Gradio app with multiple files
e6c006b verified
# bot.py
import logging
import os
import traceback
from telegram import Update, ReplyKeyboardMarkup, ReplyKeyboardRemove, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
Application,
CommandHandler,
ContextTypes,
ConversationHandler,
MessageHandler,
CallbackQueryHandler,
filters,
)
import config
import database
from generate import GenerationService
# --- Setup Logging ---
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# --- State Definitions for Conversation ---
SELECTING_STYLE, SELECTING_GENDER = range(2)
# --- Style Presets ---
# These are the options the user will see.
STYLE_PRESETS = [
{"title": "Mona Lisa", "prompt": "A mesmerizing portrait in the style of Leonardo da Vinci's Mona Lisa, renaissance oil painting, soft sfumato technique", "emoji": "🎨"},
{"title": "Iron Hero", "prompt": "Hyper realistic portrait as a high-tech superhero, wearing advanced metallic suit, arc reactor glow", "emoji": "🦾"},
{"title": "Van Gogh", "prompt": "Self-portrait in the style of Vincent van Gogh, bold brushstrokes, vibrant colors, post-impressionist style", "emoji": "🎨"},
{"title": "Jedi Master", "prompt": "Epic portrait as a Jedi Master from Star Wars, wearing traditional robes, holding a lightsaber", "emoji": "βš”οΈ"},
{"title": "Greek God", "prompt": "Mythological portrait as a Greek God, wearing flowing robes, golden laurel wreath, Mount Olympus background", "emoji": "⚑"},
{"title": "Medieval Knight", "prompt": "Noble knight portrait, wearing ornate plate armor, holding a sword, castle background", "emoji": "πŸ›‘οΈ"},
]
# --- Initialize the Generation Service ---
logger.info("Initializing Generation Service... This may take a while.")
try:
generation_service = GenerationService()
logger.info("Generation Service initialized successfully.")
except Exception as e:
logger.error(f"FATAL: Could not initialize GenerationService. Bot cannot start. Error: {e}")
generation_service = None
# --- Helper Functions ---
def get_style_keyboard() -> InlineKeyboardMarkup:
"""Creates the inline keyboard for style selection."""
keyboard = [
[InlineKeyboardButton(f"{style['emoji']} {style['title']}", callback_data=f"style_{i}")]
for i, style in enumerate(STYLE_PRESETS)
]
return InlineKeyboardMarkup(keyboard)
# --- Conversation Handlers ---
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handles the /start command."""
user = update.effective_user
logger.info(f"User {user.id} ({user.username}) started the bot.")
database.get_or_create_user(user.id, user.username)
credits = database.get_user_credits(user.id)
welcome_message = (
f"πŸ‘‹ Welcome, {user.first_name}!\n\n"
"To transform your face, just send me a clear photo.\n\n"
f"You have **{credits}** credits."
)
await update.message.reply_text(welcome_message, parse_mode='Markdown')
async def photo_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Handles the user sending a photo."""
user = update.effective_user
photo_file = await update.message.photo[-1].get_file()
# Create a temporary directory to store the photo
temp_dir = "temp_user_photos"
os.makedirs(temp_dir, exist_ok=True)
file_path = os.path.join(temp_dir, f"{user.id}_{photo_file.file_unique_id}.jpg")
await photo_file.download_to_drive(file_path)
logger.info(f"Photo from user {user.id} saved to {file_path}")
context.user_data['photo_path'] = file_path
await update.message.reply_text(
"Great photo! Now, pick a style for your transformation:",
reply_markup=get_style_keyboard()
)
return SELECTING_STYLE
async def style_selected_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Handles the user selecting a style from the inline keyboard."""
query = update.callback_query
await query.answer() # Acknowledge the button press
style_index = int(query.data.split('_')[1])
selected_style = STYLE_PRESETS[style_index]
context.user_data['prompt'] = selected_style['prompt']
logger.info(f"User {update.effective_user.id} selected style: {selected_style['title']}")
keyboard = [
[
InlineKeyboardButton("♀️ Female", callback_data="gender_Female"),
InlineKeyboardButton("♂️ Male", callback_data="gender_Male"),
]
]
await query.edit_message_text(
text=f"Style selected: *{selected_style['title']}*.\n\nNow, please select your gender:",
reply_markup=InlineKeyboardMarkup(keyboard),
parse_mode='Markdown'
)
return SELECTING_GENDER
async def gender_selected_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Handles the user selecting a gender and triggers the final generation."""
query = update.callback_query
await query.answer()
user_id = update.effective_user.id
gender = query.data.split('_')[1]
context.user_data['gender'] = gender
logger.info(f"User {user_id} selected gender: {gender}. Preparing for generation.")
# Check for credits before starting
if database.get_user_credits(user_id) <= 0:
await query.edit_message_text("Oh no! You're out of credits. Use /buy to get more.")
return ConversationHandler.END
await query.edit_message_text("✨ Magic in progress... Your masterpiece is being created. This can take several minutes!")
try:
# Get all data from context
photo_path = context.user_data['photo_path']
prompt = context.user_data['prompt']
# Call the generation service
image_url = generation_service.generate_magic_image(
face_images=[photo_path],
gender=gender,
prompt=prompt
)
if image_url:
logger.info(f"Successfully generated image for user {user_id}. URL: {image_url}")
database.deduct_credit(user_id)
new_credits = database.get_user_credits(user_id)
await context.bot.send_photo(
chat_id=user_id,
photo=image_url,
caption=f"Here is your masterpiece! ✨\n\nYou have {new_credits} credits left."
)
else:
logger.error(f"Image generation failed for user {user_id}.")
await context.bot.send_message(user_id, "πŸ˜• Something went wrong during the magic spell. Please try again with a different photo.")
except Exception as e:
logger.error(f"An error occurred in the generation process for user {user_id}: {e}")
traceback.print_exc()
await context.bot.send_message(user_id, "Sorry, a critical error occurred. The developer has been notified.")
finally:
# Clean up the temporary photo
if 'photo_path' in context.user_data and os.path.exists(context.user_data['photo_path']):
os.remove(context.user_data['photo_path'])
# End the conversation
return ConversationHandler.END
async def cancel_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Cancels and ends the conversation."""
await update.message.reply_text("Action cancelled.", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
# --- Main Application ---
def main() -> None:
if not generation_service:
logger.error("Generation service not available. Bot is shutting down.")
return
application = Application.builder().token(config.TELEGRAM_TOKEN).build()
conv_handler = ConversationHandler(
entry_points=[MessageHandler(filters.PHOTO, photo_handler)],
states={
SELECTING_STYLE: [CallbackQueryHandler(style_selected_handler, pattern="^style_")],
SELECTING_GENDER: [CallbackQueryHandler(gender_selected_handler, pattern="^gender_")],
},
fallbacks=[CommandHandler("start", start_command), CommandHandler("cancel", cancel_command)],
)
application.add_handler(CommandHandler("start", start_command))
application.add_handler(conv_handler)
# Other handlers can be added here
logger.info("Bot is starting to poll for updates...")
application.run_polling()
if __name__ == "__main__":
main()