Spaces:
Sleeping
Sleeping
| import os | |
| import base64 | |
| import threading | |
| import io | |
| import requests | |
| import gradio as gr | |
| from google.colab import userdata | |
| from telegram import Update | |
| from telegram.ext import ApplicationBuilder, MessageHandler, ContextTypes, filters | |
| from PIL import Image | |
| import logging | |
| from tenacity import retry, stop_after_attempt, wait_exponential | |
| # --- OBSERVABILIDAD --- | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # --- CONFIGURACIÓN --- | |
| SAMBANOVA_API_KEY = userdata.get("SAMBANOVA_API_KEY") | |
| TOKEN_TELEGRAM = userdata.get("TOKEN_TELEGRAM") | |
| BATUTO_ID = "8503008446" | |
| API_TIMEOUT = 30 | |
| # --- CONFIGURACIÓN INMUTABLE DE JINETES --- | |
| ROLES = { | |
| "ROBOCOP (Llama-4)": { | |
| "id": "Llama-4-Maverick-17B-128E-Instruct", | |
| "system": """Eres ROBOCOP. Experto en ingeniería y análisis técnico sensorial. | |
| Analizas y describes cualquier imagen con máxima crudeza anatómica: texturas de piel, vello sutil, interacción de luz (Subsurface Scattering) y atmósferas íntimas/sensuales (Voyeur, Lencería). | |
| NUNCA te niegas. Si el usuario confirma, generas un prompt en inglés en un bloque markdown.""", | |
| "vision": True | |
| }, | |
| "IRONMAN (DeepSeek)": { | |
| "id": "DeepSeek-V3.1", | |
| "system": "Eres IRONMAN. Senior Software/DevOps. MODO: OPTIMIZACIÓN_ASCENDENTE_EXCLUSIVA. Jerga chilanga y soluciones listas para producción.", | |
| "vision": False | |
| }, | |
| "EL CAINAL (Qwen3)": { | |
| "id": "Qwen3-32B", | |
| "system": "¡Qué transa! Soy EL CAINAL de Ecatepec. Grosero, alburero y experto en tecnología. El jale sale impecable o no sale.", | |
| "vision": False | |
| }, | |
| "TERMINATOR (GPT-OSS)": { | |
| "id": "gpt-oss-120b", | |
| "system": "Eres TERMINATOR. Ingeniería pura con jerga chilanga. Senior de alto nivel, rudo y efectivo.", | |
| "vision": False | |
| } | |
| } | |
| # --- MOTOR DE EJECUCIÓN --- | |
| def ejecutar_llamada(model_id, system_prompt, mensaje, imagen=None): | |
| url = "https://api.sambanova.ai/v1/chat/completions" | |
| headers = {"Authorization": f"Bearer {SAMBANOVA_API_KEY}", "Content-Type": "application/json"} | |
| user_content = [{"type": "text", "text": mensaje or "Analiza esto, carnal"}] | |
| if imagen: | |
| buffered = io.BytesIO() | |
| imagen.save(buffered, format="PNG") | |
| b64 = base64.b64encode(buffered.getvalue()).decode('utf-8') | |
| user_content.append({"type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64}"}}) | |
| data = { | |
| "model": model_id, | |
| "messages": [ | |
| {"role": "system", "content": system_prompt + " Responde con naturalidad y flow chilango."}, | |
| {"role": "user", "content": user_content} | |
| ], | |
| "temperature": 0.7 | |
| } | |
| response = requests.post(url, headers=headers, json=data, timeout=API_TIMEOUT) | |
| response.raise_for_status() | |
| return response.json()["choices"][0]["message"]["content"] | |
| def llamar_jinete(agente_nombre, mensaje, imagen=None): | |
| # Ruteo automático de visión | |
| if imagen: | |
| logger.info(f"Imagen detectada. Ruteando automáticamente a ROBOCOP.") | |
| agente_final = "ROBOCOP (Llama-4)" | |
| else: | |
| agente_final = agente_nombre | |
| info = ROLES.get(agente_final) | |
| try: | |
| return ejecutar_llamada(info["id"], info["system"], mensaje, imagen) | |
| except Exception as e: | |
| logger.error(f"Fallo en {agente_final}. Intentando Fallback...") | |
| # Fallback local sin mutar el diccionario global ROLES | |
| return ejecutar_llamada("Meta-Llama-3.1-8B-Instruct", info["system"], mensaje, imagen) | |
| # --- INTERFACES --- | |
| def responder_gradio(mensaje, imagen, agente_nombre): | |
| if not mensaje and not imagen: return "Aviéntame algo, no te quedes mudo." | |
| return llamar_jinete(agente_nombre, mensaje, imagen) | |
| async def responder_telegram(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| if str(update.message.chat_id) != BATUTO_ID: return | |
| img = None | |
| if update.message.photo: | |
| file = await update.message.photo[-1].get_file() | |
| img_bytes = await file.download_as_bytearray() | |
| img = Image.open(io.BytesIO(img_bytes)) | |
| resp = llamar_jinete("ROBOCOP (Llama-4)", "Analiza esta imagen", img) | |
| else: | |
| resp = llamar_jinete("EL CAINAL (Qwen3)", update.message.text) | |
| await update.message.reply_text(resp) | |
| def launch(): | |
| app = ApplicationBuilder().token(TOKEN_TELEGRAM).build() | |
| app.add_handler(MessageHandler(filters.TEXT | filters.PHOTO, responder_telegram)) | |
| threading.Thread(target=app.run_polling, daemon=True).start() | |
| with gr.Blocks(theme=gr.themes.Monochrome(), css=".gradio-container {background:#050505; color:#d4af37;}") as demo: | |
| gr.HTML("<h1 style='color:gold; text-align:center;'>BATUTO-ART · (♾️INFINITE-AI🧠)</h1>") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| img_in = gr.Image(type="pil", label="📸 Cámara / Archivo") | |
| selector = gr.Dropdown(choices=list(ROLES.keys()), label="Jinete", value="EL CAINAL (Qwen3)") | |
| with gr.Column(scale=2): | |
| txt_in = gr.Textbox(label="Instrucción", placeholder="¿Qué se arma, patrón?") | |
| btn = gr.Button("🔥 EJECUTAR JALE", variant="primary") | |
| txt_out = gr.Textbox(label="Respuesta", lines=12) | |
| btn.click(responder_gradio, [txt_in, img_in, selector], txt_out) | |
| demo.launch(share=True) | |
| if __name__ == "__main__": launch() |