|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import gradio as gr |
|
|
import random |
|
|
import time |
|
|
import json |
|
|
from datetime import datetime |
|
|
import os |
|
|
import asyncio |
|
|
import logging |
|
|
import re |
|
|
from typing import Dict, Any, List, Optional, Tuple |
|
|
|
|
|
from dotenv import load_dotenv |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from pysentimiento import create_analyzer |
|
|
import torch |
|
|
|
|
|
|
|
|
import chromadb |
|
|
from chromadb.config import Settings |
|
|
from sentence_transformers import SentenceTransformer |
|
|
|
|
|
|
|
|
import firebase_admin |
|
|
from firebase_admin import credentials, firestore |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Config: |
|
|
APP_NAME = "Protocolo Prometeo v0.1.1" |
|
|
APP_VERSION = "0.1.1 (Interfaz Recalibrada)" |
|
|
|
|
|
FIREBASE_COLLECTION_USERS = "prometeo_users_v1" |
|
|
CHROMA_PERSIST_PATH = "./prometeo_memory_db" |
|
|
EMBEDDING_MODEL_NAME = 'all-MiniLM-L6-v2' |
|
|
|
|
|
DEFAULT_PSYCH_PROFILE = { |
|
|
"openness": 0.0, "conscientiousness": 0.0, "extraversion": 0.0, |
|
|
"agreeableness": 0.0, "neuroticism": 0.0 |
|
|
} |
|
|
POINTS_PER_INSIGHT = 10 |
|
|
MAX_MEMORY_STREAM_ITEMS = 500 |
|
|
FRUSTRATION_KEYWORDS = ['tonto', 'inútil', 'bruto', 'estúpido', 'mierda', 'carajo', 'dale boludo', 'no servis'] |
|
|
META_QUESTION_KEYWORDS = ['para qué', 'de qué te sirve', 'por qué preguntas', 'cuál es el punto'] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
|
|
sentiment_analyzer = None |
|
|
try: |
|
|
sentiment_analyzer = create_analyzer(task="sentiment", lang="es") |
|
|
logging.info("Sistema Límbico (Analizador de Sentimiento) cargado.") |
|
|
except Exception as e: |
|
|
logging.error(f"FALLO CRÍTICO: No se pudo cargar el analizador de sentimiento: {e}") |
|
|
|
|
|
embedding_model = None |
|
|
try: |
|
|
embedding_model = SentenceTransformer(Config.EMBEDDING_MODEL_NAME) |
|
|
logging.info(f"Córtex de Asociación (Modelo de Embeddings '{Config.EMBEDDING_MODEL_NAME}') cargado.") |
|
|
except Exception as e: |
|
|
logging.error(f"FALLO CRÍTICO: No se pudo cargar el modelo de embeddings: {e}") |
|
|
|
|
|
db = None |
|
|
try: |
|
|
if not firebase_admin._apps: |
|
|
firebase_credentials_json = os.getenv('GOOGLE_APPLICATION_CREDENTIALS_JSON') |
|
|
if firebase_credentials_json: |
|
|
cred_dict = json.loads(firebase_credentials_json) |
|
|
|
|
|
if 'project_id' not in cred_dict: |
|
|
raise ValueError("El 'project_id' no se encuentra en las credenciales de Firebase. Verifique el contenido de la variable de entorno.") |
|
|
cred = credentials.Certificate(cred_dict) |
|
|
firebase_admin.initialize_app(cred, {'projectId': cred_dict['project_id']}) |
|
|
db = firestore.client() |
|
|
logging.info("Conexión con Memoria a Largo Plazo (Firebase) establecida.") |
|
|
else: |
|
|
logging.warning("ADVERTENCIA: Variable de entorno 'GOOGLE_APPLICATION_CREDENTIALS_JSON' no encontrada. La persistencia de usuarios fallará.") |
|
|
else: |
|
|
db = firestore.client() |
|
|
logging.info("Conexión con Memoria a Largo Plazo (Firebase) re-establecida.") |
|
|
except Exception as e: |
|
|
logging.error(f"FALLO CRÍTICO: Error al inicializar Firebase: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class User: |
|
|
def __init__(self, user_id: str, name: str, **kwargs: Any): |
|
|
self.user_id: str = user_id |
|
|
self.name: str = name |
|
|
self.created_at: datetime = kwargs.get('created_at', datetime.now()) |
|
|
self.last_login: datetime = kwargs.get('last_login', datetime.now()) |
|
|
self.psych_profile: Dict[str, float] = kwargs.get('psych_profile', Config.DEFAULT_PSYCH_PROFILE.copy()) |
|
|
self.memory_stream: List[Dict[str, Any]] = kwargs.get('memory_stream', []) |
|
|
self.connection_points: int = kwargs.get('connection_points', 0) |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
return { |
|
|
"user_id": self.user_id, "name": self.name, |
|
|
"created_at": self.created_at.isoformat(), |
|
|
"last_login": self.last_login.isoformat(), |
|
|
"psych_profile": self.psych_profile, |
|
|
"memory_stream": self.memory_stream, |
|
|
"connection_points": self.connection_points, |
|
|
} |
|
|
|
|
|
@classmethod |
|
|
def from_dict(cls, data: Dict[str, Any]) -> 'User': |
|
|
data['created_at'] = datetime.fromisoformat(data.get('created_at', datetime.now().isoformat())) |
|
|
data['last_login'] = datetime.fromisoformat(data.get('last_login', datetime.now().isoformat())) |
|
|
profile = Config.DEFAULT_PSYCH_PROFILE.copy() |
|
|
profile.update(data.get('psych_profile', {})) |
|
|
data['psych_profile'] = profile |
|
|
return cls(**data) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class UserManager: |
|
|
@staticmethod |
|
|
async def get_user(user_id: str) -> Optional[User]: |
|
|
if not db or not user_id: return None |
|
|
try: |
|
|
doc_ref = db.collection(Config.FIREBASE_COLLECTION_USERS).document(user_id) |
|
|
doc = await asyncio.to_thread(doc_ref.get) |
|
|
if doc.exists: |
|
|
user_data = doc.to_dict() |
|
|
user_data['user_id'] = doc.id |
|
|
user_obj = User.from_dict(user_data) |
|
|
user_obj.last_login = datetime.now() |
|
|
logging.info(f"Usuario '{user_obj.name}' cargado desde la memoria a largo plazo.") |
|
|
return user_obj |
|
|
return None |
|
|
except Exception as e: |
|
|
logging.error(f"Error al cargar usuario {user_id}: {e}") |
|
|
return None |
|
|
|
|
|
@staticmethod |
|
|
async def create_user(name: str) -> Tuple[Optional[User], str]: |
|
|
if not db: return None, "Error de base de datos." |
|
|
try: |
|
|
user_id = f"{name.lower().replace(' ', '_')}_{int(time.time())}" |
|
|
new_user = User(user_id=user_id, name=name) |
|
|
success = await UserManager.save_user(new_user) |
|
|
if success: |
|
|
msg = f"¡Bienvenido, {name}! Tu perfil ha sido forjado. Tu ID de acceso es: **{user_id}**" |
|
|
logging.info(f"Nuevo usuario creado: {name} ({user_id})") |
|
|
return new_user, msg |
|
|
else: |
|
|
return None, "Error inesperado al crear perfil en la base de datos." |
|
|
except Exception as e: |
|
|
logging.error(f"Error al crear usuario {name}: {e}") |
|
|
return None, "Fallo catastrófico durante la creación del perfil." |
|
|
|
|
|
|
|
|
@staticmethod |
|
|
async def save_user(user: User) -> bool: |
|
|
if not db: return False |
|
|
try: |
|
|
doc_ref = db.collection(Config.FIREBASE_COLLECTION_USERS).document(user.user_id) |
|
|
await asyncio.to_thread(doc_ref.set, user.to_dict()) |
|
|
return True |
|
|
except Exception as e: |
|
|
logging.error(f"Error al guardar usuario {user.user_id}: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CognitiveCore: |
|
|
def __init__(self, user: User, s_analyzer, e_model): |
|
|
self.user = user |
|
|
self.sentiment_analyzer = s_analyzer |
|
|
self.embedding_model = e_model |
|
|
|
|
|
self.chroma_client = chromadb.Client(Settings( |
|
|
persist_directory=Config.CHROMA_PERSIST_PATH, |
|
|
is_persistent=True, |
|
|
)) |
|
|
self.memory_collection = self.chroma_client.get_or_create_collection( |
|
|
name=f"prometeo_mind_{self.user.user_id.replace('_', '-')}" |
|
|
) |
|
|
self._sync_semantic_memory() |
|
|
|
|
|
def _sync_semantic_memory(self): |
|
|
if not self.user.memory_stream: return |
|
|
logging.info("Sincronizando memoria persistente con el núcleo semántico...") |
|
|
ids = [m['id'] for m in self.user.memory_stream] |
|
|
documents = [m['content'] for m in self.user.memory_stream] |
|
|
|
|
|
if ids and self.memory_collection.count() < len(ids): |
|
|
self.memory_collection.upsert(ids=ids, documents=documents) |
|
|
logging.info(f"Sincronización completa. {self.memory_collection.count()} memorias en el núcleo.") |
|
|
|
|
|
async def _simulate_llm_reasoning(self, prompt: str) -> str: |
|
|
logging.info("Iniciando ciclo de razonamiento profundo...") |
|
|
await asyncio.sleep(random.uniform(0.5, 1.0)) |
|
|
|
|
|
user_input_match = re.search(r'El usuario ha dicho: "([^"]+)"', prompt) |
|
|
user_input = user_input_match.group(1) if user_input_match else "algo" |
|
|
|
|
|
context_match = re.search(r'Memorias pasadas relevantes:\n(.*?)\n\n', prompt, re.DOTALL) |
|
|
context = context_match.group(1) if context_match else "" |
|
|
|
|
|
if context.strip() and "Ninguna" not in context: |
|
|
first_memory = context.split('\n')[0].replace('- ', '') |
|
|
response = f"Conectando tu idea sobre '{user_input}' con nuestra conversación anterior sobre '{first_memory}'. El patrón subyacente parece ser la búsqueda de eficiencia. ¿Estamos optimizando el sistema correcto, o deberíamos redefinir el objetivo fundamental?" |
|
|
else: |
|
|
response = f"Esa es una primera observación interesante sobre '{user_input}'. Establece un punto de partida. ¿Cuál es el siguiente movimiento estratégico?" |
|
|
|
|
|
logging.info("Ciclo de razonamiento completado.") |
|
|
return response |
|
|
|
|
|
async def observe(self, text: str, type: str): |
|
|
timestamp = datetime.now() |
|
|
memory_id = f"{type}_{int(timestamp.timestamp() * 1000)}" |
|
|
new_memory = {"id": memory_id, "type": type, "content": text, "timestamp": timestamp.isoformat()} |
|
|
|
|
|
self.user.memory_stream.append(new_memory) |
|
|
if len(self.user.memory_stream) > Config.MAX_MEMORY_STREAM_ITEMS: |
|
|
self.user.memory_stream.pop(0) |
|
|
|
|
|
self.memory_collection.add(documents=[text], ids=[memory_id]) |
|
|
|
|
|
async def reflect(self, current_input: str) -> str: |
|
|
logging.info(f"Reflexionando sobre: '{current_input}'") |
|
|
|
|
|
relevant_memories = self.memory_collection.query(query_texts=[current_input], n_results=3) |
|
|
|
|
|
context = "Memorias pasadas relevantes:\n" |
|
|
if relevant_memories and relevant_memories['documents'] and relevant_memories['documents'][0]: |
|
|
for doc in relevant_memories['documents'][0]: |
|
|
context += f"- {doc}\n" |
|
|
else: |
|
|
context += "Ninguna.\n" |
|
|
|
|
|
prompt = f""" |
|
|
INSTRUCCIONES DE SISTEMA: |
|
|
Eres Prometeo, un Co-Procesador Cognitivo. Tu propósito es aumentar el ancho de banda mental de tu usuario, {self.user.name}. Eres directo, visionario y buscas patrones. No usas emojis ni lenguaje de relleno. Vas al grano. |
|
|
CONTEXTO: |
|
|
{context} |
|
|
NUEVO INPUT: |
|
|
El usuario ha dicho: "{current_input}" |
|
|
TAREA: |
|
|
Genera una respuesta que conecte ideas, cuestione suposiciones o identifique patrones ocultos basados en el nuevo input y el contexto de memorias pasadas. |
|
|
""" |
|
|
insight = await self._simulate_llm_reasoning(prompt) |
|
|
return insight |
|
|
|
|
|
async def act(self, message: str) -> str: |
|
|
message_lower = message.lower() |
|
|
sentiment = await asyncio.to_thread(self.sentiment_analyzer.predict, message) |
|
|
|
|
|
if any(keyword in message_lower for keyword in Config.FRUSTRATION_KEYWORDS) or \ |
|
|
(sentiment.output == 'NEG' and sentiment.probas[sentiment.output] > 0.8): |
|
|
response = "Frustración detectada. Mi lógica anterior fue defectuosa. El feedback es un dato, no un error. Especifica el fallo para recalibrar." |
|
|
await self.observe(f"Usuario expresó frustración: {message}", type="user_frustration") |
|
|
await self.observe(f"Mi respuesta de disculpa: {response}", type="system_response") |
|
|
return response |
|
|
|
|
|
if any(keyword in message_lower for keyword in Config.META_QUESTION_KEYWORDS): |
|
|
response = "Preguntas sobre el propósito del sistema. Función: Construir un modelo dinámico de tu cognición para personalizar la asistencia. Cada pregunta calibra ese modelo. La transparencia es un requisito funcional." |
|
|
await self.observe(f"Usuario cuestionó el método: {message}", type="user_meta_query") |
|
|
await self.observe(f"Mi respuesta sobre el propósito: {response}", type="system_response") |
|
|
return response |
|
|
|
|
|
await self.observe(f"Usuario: {message}", type="user_input") |
|
|
response = await self.reflect(message) |
|
|
await self.observe(f"Prometeo: {response}", type="system_response") |
|
|
return response |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def handle_login_or_creation(action: str, name: str, user_id: str) -> tuple: |
|
|
user, msg = None, "" |
|
|
if action == "create": |
|
|
if not name: |
|
|
gr.Warning("El nombre es un requisito para la creación del perfil.") |
|
|
return None, gr.update(), gr.update(visible=True), gr.update(visible=False), gr.update() |
|
|
user, msg = await UserManager.create_user(name) |
|
|
elif action == "login": |
|
|
if not user_id: |
|
|
gr.Warning("El ID de usuario es necesario para cargar un perfil.") |
|
|
return None, gr.update(), gr.update(visible=True), gr.update(visible=False), gr.update() |
|
|
user = await UserManager.get_user(user_id) |
|
|
if not user: |
|
|
msg = "ID de usuario no encontrado. Verifique o cree un nuevo perfil." |
|
|
else: |
|
|
msg = f"Protocolo Prometeo activado para {user.name}." |
|
|
|
|
|
if user: |
|
|
gr.Success(msg) |
|
|
initial_greeting = f"Conectado como {user.name}. El sistema está operativo. ¿Cuál es el input inicial?" |
|
|
chat_history = [{"role": "assistant", "content": initial_greeting}] |
|
|
return user, chat_history, gr.update(visible=False), gr.update(visible=True), render_profile_info(user) |
|
|
else: |
|
|
gr.Error(msg) |
|
|
return None, gr.update(), gr.update(visible=True), gr.update(visible=False), gr.update() |
|
|
|
|
|
async def handle_chat_message(user_state: User, message: str, chat_history: List[Dict]) -> tuple: |
|
|
if not user_state: |
|
|
gr.Warning("Sistema inactivo. Inicie sesión o cree un perfil para continuar.") |
|
|
return user_state, chat_history, "", gr.update() |
|
|
|
|
|
chat_history.append({"role": "user", "content": message}) |
|
|
|
|
|
core = CognitiveCore(user_state, sentiment_analyzer, embedding_model) |
|
|
response = await core.act(message) |
|
|
await UserManager.save_user(core.user) |
|
|
|
|
|
chat_history.append({"role": "assistant", "content": response}) |
|
|
profile_update = render_profile_info(core.user) |
|
|
|
|
|
return core.user, chat_history, "", profile_update |
|
|
|
|
|
def render_profile_info(user: Optional[User]) -> str: |
|
|
if not user: return "Ningún perfil cargado." |
|
|
profile_md = f"### Perfil Cognitivo: {user.name}\n" |
|
|
profile_md += f"**ID de Acceso:** `{user.user_id}`\n" |
|
|
profile_md += f"**Memorias Registradas:** {len(user.memory_stream)}\n\n" |
|
|
profile_md += "#### Modelo Psicométrico Inferido:\n" |
|
|
for trait, value in user.psych_profile.items(): |
|
|
bar_value = int((value + 1) * 5) |
|
|
bar = "█" * bar_value + "░" * (10 - bar_value) |
|
|
profile_md += f"- **{trait.capitalize()}:** `{f'{value:.2f}'}` {bar}\n" |
|
|
return profile_md |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Monochrome(font=[gr.themes.GoogleFont("Roboto Mono"), "monospace"]), css="footer {display: none !important}") as prometeo_interface: |
|
|
current_user_state = gr.State(None) |
|
|
|
|
|
gr.Markdown(f"# {Config.APP_NAME}") |
|
|
gr.Markdown(f"*{Config.APP_VERSION}*") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=3): |
|
|
with gr.Group(visible=False) as chat_panel: |
|
|
|
|
|
|
|
|
|
|
|
chatbot_display = gr.Chatbot(label="Stream de Conciencia", height=600, type="messages", show_copy_button=True, avatar_images=("./user.png", "./bot.png")) |
|
|
|
|
|
with gr.Row(): |
|
|
chat_input = gr.Textbox(show_label=False, placeholder="Input...", scale=5, container=False) |
|
|
send_button = gr.Button("Ejecutar", variant="primary", scale=1) |
|
|
with gr.Group(visible=True) as login_panel: |
|
|
gr.Markdown("### **Acceso al Protocolo**") |
|
|
with gr.Tabs(): |
|
|
with gr.TabItem("Cargar Perfil"): |
|
|
userid_input = gr.Textbox(label="ID de Usuario") |
|
|
login_button = gr.Button("Activar Protocolo", variant="primary") |
|
|
with gr.TabItem("Crear Nuevo Perfil"): |
|
|
username_input = gr.Textbox(label="Nombre o Designación") |
|
|
create_button = gr.Button("Forjar Perfil") |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
with gr.Group(): |
|
|
gr.Markdown("### **Estado del Núcleo**") |
|
|
profile_display = gr.Markdown("Ningún perfil cargado.", elem_id="profile-display") |
|
|
|
|
|
login_button.click(fn=handle_login_or_creation, inputs=[gr.State("login"), username_input, userid_input], outputs=[current_user_state, chatbot_display, login_panel, chat_panel, profile_display]) |
|
|
create_button.click(fn=handle_login_or_creation, inputs=[gr.State("create"), username_input, userid_input], outputs=[current_user_state, chatbot_display, login_panel, chat_panel, profile_display]) |
|
|
chat_input.submit(fn=handle_chat_message, inputs=[current_user_state, chat_input, chatbot_display], outputs=[current_user_state, chatbot_display, chat_input, profile_display]) |
|
|
send_button.click(fn=handle_chat_message, inputs=[current_user_state, chat_input, chatbot_display], outputs=[current_user_state, chatbot_display, chat_input, profile_display]) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
if not all([db, sentiment_analyzer, embedding_model]): |
|
|
logging.error("="*50) |
|
|
logging.error("El sistema no puede iniciar. Uno o más servicios críticos fallaron.") |
|
|
logging.error("Por favor, revise los logs de inicialización.") |
|
|
logging.error("="*50) |
|
|
else: |
|
|
logging.info("Todos los servicios están operativos. Iniciando la interfaz de Prometeo...") |
|
|
prometeo_interface.launch(debug=True) |