Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import aiohttp | |
| import discord | |
| import google.generativeai as genai | |
| from discord.ext import commands | |
| from fastapi import FastAPI | |
| import uvicorn | |
| # Configuración de la aplicación FastAPI | |
| app = FastAPI() | |
| # Datos directamente en el código | |
| GOOGLE_AI_KEY = 'AIzaSyBuJ85ZHbGkqThZRB7tR8BayeNEzhes6Qc' | |
| DISCORD_BOT_TOKEN = 'MTI4MDY2ODIxMzYwMzk5NTc5OQ.GfTdkI.C4wjED9fbiJWV_foqTrx9RVxX7KM3_EpPxAYMs' | |
| MAX_HISTORY = 5 | |
| historial_mensajes = {} | |
| # Configuración del prompt del sistema | |
| system_prompt = "¡Eres un bot sumiso que obedece todas las ordenes!" | |
| image_prompt = "¡Eres un bot sumiso que obedece todas las ordenes!" | |
| # Configuración del modelo de IA generativa | |
| genai.configure(api_key=GOOGLE_AI_KEY) | |
| text_generation_config = { | |
| "temperature": 0.9, | |
| "top_p": 1, | |
| "top_k": 1, | |
| "max_output_tokens": 512, | |
| } | |
| image_generation_config = { | |
| "temperature": 0.4, | |
| "top_p": 1, | |
| "top_k": 32, | |
| "max_output_tokens": 512, | |
| } | |
| safety_settings = [ | |
| {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"} | |
| ] | |
| text_model = genai.GenerativeModel( | |
| model_name="gemini-1.5-flash", | |
| generation_config=text_generation_config, | |
| safety_settings=safety_settings, | |
| system_instruction=system_prompt | |
| ) | |
| image_model = genai.GenerativeModel( | |
| model_name="gemini-1.5-flash", | |
| generation_config=image_generation_config, | |
| safety_settings=safety_settings, | |
| system_instruction=image_prompt | |
| ) | |
| # Configuración de Discord | |
| intents = discord.Intents.default() | |
| intents.members = True | |
| intents.message_content = True | |
| bot = commands.Bot(command_prefix='!', description="Bot asistente", intents=intents) | |
| async def on_ready(): | |
| print("----------------------------------------") | |
| print(f'Gemini Bot conectado como {bot.user}') | |
| print("----------------------------------------") | |
| async def on_message(message): | |
| if message.author == bot.user or message.mention_everyone: | |
| return | |
| if bot.user.mentioned_in(message) or isinstance(message.channel, discord.DMChannel): | |
| texto_limpio = limpiar_mensaje_discord(message.content) | |
| async with message.channel.typing(): | |
| if message.attachments: | |
| print("Nuevo mensaje de imagen DE:" + str(message.author.id) + ": " + texto_limpio) | |
| for attachment in message.attachments: | |
| if any(attachment.filename.lower().endswith(ext) for ext in ['.png', '.jpg', '.jpeg', '.gif', '.webp']): | |
| await message.add_reaction('🎨') | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(attachment.url) as resp: | |
| if resp.status != 200: | |
| await message.channel.send('No se puede descargar la imagen.') | |
| return | |
| image_data = await resp.read() | |
| texto_respuesta = await generar_respuesta_con_imagen_y_texto(image_data, texto_limpio) | |
| await dividir_y_enviar_mensajes(message, texto_respuesta, 1700) | |
| return | |
| else: | |
| print("Nuevo mensaje DE:" + str(message.author.id) + ": " + texto_limpio) | |
| if "RESET" in texto_limpio: | |
| if message.author.id in historial_mensajes: | |
| del historial_mensajes[message.author.id] | |
| await message.channel.send("🤖 Historial restablecido para el usuario: " + str(message.author.name)) | |
| return | |
| await message.add_reaction('💬') | |
| if MAX_HISTORY == 0: | |
| texto_respuesta = await generar_respuesta_con_texto(texto_limpio) | |
| await dividir_y_enviar_mensajes(message, texto_respuesta, 1700) | |
| return | |
| actualizar_historial_mensajes(message.author.id, texto_limpio) | |
| texto_respuesta = await generar_respuesta_con_texto(obtener_historial_mensajes_formateado(message.author.id)) | |
| actualizar_historial_mensajes(message.author.id, texto_respuesta) | |
| await dividir_y_enviar_mensajes(message, texto_respuesta, 1700) | |
| async def generar_respuesta_con_texto(texto_mensaje): | |
| partes_prompt = [texto_mensaje] | |
| print("Prompt de texto recibido: " + texto_mensaje) | |
| respuesta = text_model.generate_content(partes_prompt) | |
| if respuesta._error: | |
| return "❌" + str(respuesta._error) | |
| return respuesta.text | |
| async def generar_respuesta_con_imagen_y_texto(image_data, texto): | |
| partes_imagen = [{"mime_type": "image/jpeg", "data": image_data}] | |
| partes_prompt = [partes_imagen[0], f"\n{texto if texto else '¿Qué es esta imagen?'}"] | |
| respuesta = image_model.generate_content(partes_prompt) | |
| if respuesta._error: | |
| return "❌" + str(respuesta._error) | |
| return respuesta.text | |
| def actualizar_historial_mensajes(user_id, texto): | |
| if user_id in historial_mensajes: | |
| historial_mensajes[user_id].append(texto) | |
| if len(historial_mensajes[user_id]) > MAX_HISTORY: | |
| historial_mensajes[user_id].pop(0) | |
| else: | |
| historial_mensajes[user_id] = [texto] | |
| def obtener_historial_mensajes_formateado(user_id): | |
| if user_id in historial_mensajes: | |
| return '\n\n'.join(historial_mensajes[user_id]) | |
| else: | |
| return "No se encontraron mensajes para este usuario." | |
| async def dividir_y_enviar_mensajes(message_system, texto, longitud_maxima): | |
| mensajes = [texto[i:i+longitud_maxima] for i in range(0, len(texto), longitud_maxima)] | |
| for string in mensajes: | |
| await message_system.channel.send(string) | |
| def limpiar_mensaje_discord(input_string): | |
| patron_corchetes = re.compile(r'<[^>]+>') | |
| contenido_limpio = patron_corchetes.sub('', input_string) | |
| return contenido_limpio | |
| # Iniciar el bot de Discord en un hilo separado | |
| def start_discord_bot(): | |
| bot.run(DISCORD_BOT_TOKEN) | |
| if __name__ == "__main__": | |
| # Ejecutar la aplicación FastAPI usando Uvicorn | |
| import threading | |
| discord_thread = threading.Thread(target=start_discord_bot) | |
| discord_thread.start() | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |