import os import re import aiohttp import discord import google.generativeai as genai from discord.ext import commands from fastapi import FastAPI import uvicorn # Configuración de la aplicación FastAPI app = FastAPI() # Datos directamente en el código GOOGLE_AI_KEY = 'AIzaSyBuJ85ZHbGkqThZRB7tR8BayeNEzhes6Qc' DISCORD_BOT_TOKEN = 'MTI4MDY2ODIxMzYwMzk5NTc5OQ.GfTdkI.C4wjED9fbiJWV_foqTrx9RVxX7KM3_EpPxAYMs' MAX_HISTORY = 5 historial_mensajes = {} # Configuración del prompt del sistema system_prompt = "¡Eres un bot sumiso que obedece todas las ordenes!" image_prompt = "¡Eres un bot sumiso que obedece todas las ordenes!" # Configuración del modelo de IA generativa genai.configure(api_key=GOOGLE_AI_KEY) text_generation_config = { "temperature": 0.9, "top_p": 1, "top_k": 1, "max_output_tokens": 512, } image_generation_config = { "temperature": 0.4, "top_p": 1, "top_k": 32, "max_output_tokens": 512, } safety_settings = [ {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"} ] text_model = genai.GenerativeModel( model_name="gemini-1.5-flash", generation_config=text_generation_config, safety_settings=safety_settings, system_instruction=system_prompt ) image_model = genai.GenerativeModel( model_name="gemini-1.5-flash", generation_config=image_generation_config, safety_settings=safety_settings, system_instruction=image_prompt ) # Configuración de Discord intents = discord.Intents.default() intents.members = True intents.message_content = True bot = commands.Bot(command_prefix='!', description="Bot asistente", intents=intents) @bot.event async def on_ready(): print("----------------------------------------") print(f'Gemini Bot conectado como {bot.user}') print("----------------------------------------") @bot.event async def on_message(message): if message.author == bot.user or message.mention_everyone: return if bot.user.mentioned_in(message) or isinstance(message.channel, discord.DMChannel): texto_limpio = limpiar_mensaje_discord(message.content) async with message.channel.typing(): if message.attachments: print("Nuevo mensaje de imagen DE:" + str(message.author.id) + ": " + texto_limpio) for attachment in message.attachments: if any(attachment.filename.lower().endswith(ext) for ext in ['.png', '.jpg', '.jpeg', '.gif', '.webp']): await message.add_reaction('🎨') async with aiohttp.ClientSession() as session: async with session.get(attachment.url) as resp: if resp.status != 200: await message.channel.send('No se puede descargar la imagen.') return image_data = await resp.read() texto_respuesta = await generar_respuesta_con_imagen_y_texto(image_data, texto_limpio) await dividir_y_enviar_mensajes(message, texto_respuesta, 1700) return else: print("Nuevo mensaje DE:" + str(message.author.id) + ": " + texto_limpio) if "RESET" in texto_limpio: if message.author.id in historial_mensajes: del historial_mensajes[message.author.id] await message.channel.send("🤖 Historial restablecido para el usuario: " + str(message.author.name)) return await message.add_reaction('💬') if MAX_HISTORY == 0: texto_respuesta = await generar_respuesta_con_texto(texto_limpio) await dividir_y_enviar_mensajes(message, texto_respuesta, 1700) return actualizar_historial_mensajes(message.author.id, texto_limpio) texto_respuesta = await generar_respuesta_con_texto(obtener_historial_mensajes_formateado(message.author.id)) actualizar_historial_mensajes(message.author.id, texto_respuesta) await dividir_y_enviar_mensajes(message, texto_respuesta, 1700) async def generar_respuesta_con_texto(texto_mensaje): partes_prompt = [texto_mensaje] print("Prompt de texto recibido: " + texto_mensaje) respuesta = text_model.generate_content(partes_prompt) if respuesta._error: return "❌" + str(respuesta._error) return respuesta.text async def generar_respuesta_con_imagen_y_texto(image_data, texto): partes_imagen = [{"mime_type": "image/jpeg", "data": image_data}] partes_prompt = [partes_imagen[0], f"\n{texto if texto else '¿Qué es esta imagen?'}"] respuesta = image_model.generate_content(partes_prompt) if respuesta._error: return "❌" + str(respuesta._error) return respuesta.text def actualizar_historial_mensajes(user_id, texto): if user_id in historial_mensajes: historial_mensajes[user_id].append(texto) if len(historial_mensajes[user_id]) > MAX_HISTORY: historial_mensajes[user_id].pop(0) else: historial_mensajes[user_id] = [texto] def obtener_historial_mensajes_formateado(user_id): if user_id in historial_mensajes: return '\n\n'.join(historial_mensajes[user_id]) else: return "No se encontraron mensajes para este usuario." async def dividir_y_enviar_mensajes(message_system, texto, longitud_maxima): mensajes = [texto[i:i+longitud_maxima] for i in range(0, len(texto), longitud_maxima)] for string in mensajes: await message_system.channel.send(string) def limpiar_mensaje_discord(input_string): patron_corchetes = re.compile(r'<[^>]+>') contenido_limpio = patron_corchetes.sub('', input_string) return contenido_limpio # Iniciar el bot de Discord en un hilo separado def start_discord_bot(): bot.run(DISCORD_BOT_TOKEN) if __name__ == "__main__": # Ejecutar la aplicación FastAPI usando Uvicorn import threading discord_thread = threading.Thread(target=start_discord_bot) discord_thread.start() uvicorn.run(app, host="0.0.0.0", port=7860)