optibot / bot.py
SorovotPelo's picture
Update bot.py
bad0905 verified
import os
import re
import aiohttp
import discord
import google.generativeai as genai
from discord.ext import commands
from fastapi import FastAPI
import uvicorn
# Configuración de la aplicación FastAPI
app = FastAPI()
# Datos directamente en el código
GOOGLE_AI_KEY = 'AIzaSyBuJ85ZHbGkqThZRB7tR8BayeNEzhes6Qc'
DISCORD_BOT_TOKEN = 'MTI4MDY2ODIxMzYwMzk5NTc5OQ.GfTdkI.C4wjED9fbiJWV_foqTrx9RVxX7KM3_EpPxAYMs'
MAX_HISTORY = 5
historial_mensajes = {}
# Configuración del prompt del sistema
system_prompt = "¡Eres un bot sumiso que obedece todas las ordenes!"
image_prompt = "¡Eres un bot sumiso que obedece todas las ordenes!"
# Configuración del modelo de IA generativa
genai.configure(api_key=GOOGLE_AI_KEY)
text_generation_config = {
"temperature": 0.9,
"top_p": 1,
"top_k": 1,
"max_output_tokens": 512,
}
image_generation_config = {
"temperature": 0.4,
"top_p": 1,
"top_k": 32,
"max_output_tokens": 512,
}
safety_settings = [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}
]
text_model = genai.GenerativeModel(
model_name="gemini-1.5-flash",
generation_config=text_generation_config,
safety_settings=safety_settings,
system_instruction=system_prompt
)
image_model = genai.GenerativeModel(
model_name="gemini-1.5-flash",
generation_config=image_generation_config,
safety_settings=safety_settings,
system_instruction=image_prompt
)
# Configuración de Discord
intents = discord.Intents.default()
intents.members = True
intents.message_content = True
bot = commands.Bot(command_prefix='!', description="Bot asistente", intents=intents)
@bot.event
async def on_ready():
print("----------------------------------------")
print(f'Gemini Bot conectado como {bot.user}')
print("----------------------------------------")
@bot.event
async def on_message(message):
if message.author == bot.user or message.mention_everyone:
return
if bot.user.mentioned_in(message) or isinstance(message.channel, discord.DMChannel):
texto_limpio = limpiar_mensaje_discord(message.content)
async with message.channel.typing():
if message.attachments:
print("Nuevo mensaje de imagen DE:" + str(message.author.id) + ": " + texto_limpio)
for attachment in message.attachments:
if any(attachment.filename.lower().endswith(ext) for ext in ['.png', '.jpg', '.jpeg', '.gif', '.webp']):
await message.add_reaction('🎨')
async with aiohttp.ClientSession() as session:
async with session.get(attachment.url) as resp:
if resp.status != 200:
await message.channel.send('No se puede descargar la imagen.')
return
image_data = await resp.read()
texto_respuesta = await generar_respuesta_con_imagen_y_texto(image_data, texto_limpio)
await dividir_y_enviar_mensajes(message, texto_respuesta, 1700)
return
else:
print("Nuevo mensaje DE:" + str(message.author.id) + ": " + texto_limpio)
if "RESET" in texto_limpio:
if message.author.id in historial_mensajes:
del historial_mensajes[message.author.id]
await message.channel.send("🤖 Historial restablecido para el usuario: " + str(message.author.name))
return
await message.add_reaction('💬')
if MAX_HISTORY == 0:
texto_respuesta = await generar_respuesta_con_texto(texto_limpio)
await dividir_y_enviar_mensajes(message, texto_respuesta, 1700)
return
actualizar_historial_mensajes(message.author.id, texto_limpio)
texto_respuesta = await generar_respuesta_con_texto(obtener_historial_mensajes_formateado(message.author.id))
actualizar_historial_mensajes(message.author.id, texto_respuesta)
await dividir_y_enviar_mensajes(message, texto_respuesta, 1700)
async def generar_respuesta_con_texto(texto_mensaje):
partes_prompt = [texto_mensaje]
print("Prompt de texto recibido: " + texto_mensaje)
respuesta = text_model.generate_content(partes_prompt)
if respuesta._error:
return "❌" + str(respuesta._error)
return respuesta.text
async def generar_respuesta_con_imagen_y_texto(image_data, texto):
partes_imagen = [{"mime_type": "image/jpeg", "data": image_data}]
partes_prompt = [partes_imagen[0], f"\n{texto if texto else '¿Qué es esta imagen?'}"]
respuesta = image_model.generate_content(partes_prompt)
if respuesta._error:
return "❌" + str(respuesta._error)
return respuesta.text
def actualizar_historial_mensajes(user_id, texto):
if user_id in historial_mensajes:
historial_mensajes[user_id].append(texto)
if len(historial_mensajes[user_id]) > MAX_HISTORY:
historial_mensajes[user_id].pop(0)
else:
historial_mensajes[user_id] = [texto]
def obtener_historial_mensajes_formateado(user_id):
if user_id in historial_mensajes:
return '\n\n'.join(historial_mensajes[user_id])
else:
return "No se encontraron mensajes para este usuario."
async def dividir_y_enviar_mensajes(message_system, texto, longitud_maxima):
mensajes = [texto[i:i+longitud_maxima] for i in range(0, len(texto), longitud_maxima)]
for string in mensajes:
await message_system.channel.send(string)
def limpiar_mensaje_discord(input_string):
patron_corchetes = re.compile(r'<[^>]+>')
contenido_limpio = patron_corchetes.sub('', input_string)
return contenido_limpio
# Iniciar el bot de Discord en un hilo separado
def start_discord_bot():
bot.run(DISCORD_BOT_TOKEN)
if __name__ == "__main__":
# Ejecutar la aplicación FastAPI usando Uvicorn
import threading
discord_thread = threading.Thread(target=start_discord_bot)
discord_thread.start()
uvicorn.run(app, host="0.0.0.0", port=7860)