# type: ignore """ ================================================================================ AKIRA V21 ULTIMATE - DATABASE MODULE ================================================================================ Banco de dados SQLite extremamente robusto, moderno e completo. Gerencia: mensagens, embeddings, gírias, tom, aprendizados, API logs, training sessions. Features: - SQLite com WAL mode para performance máxima - Pool de conexões permanente (NÃO fecha a cada operação!) - Sistema neural de aprendizado contínuo - 3 níveis de transição de tom gradual - Full-text search com FTS5 - Vector storage para embeddings - Tracking de interações para análise de padrões ================================================================================ """ import sqlite3 import os import json import re import random import threading from typing import Optional, List, Dict, Any, Tuple, Union, Callable from datetime import datetime, timedelta from loguru import logger # numpy é opcional - usado apenas para embeddings try: import numpy as np NUMPY_AVAILABLE = True except ImportError: NUMPY_AVAILABLE = False np = None # type: ignore class DatabasePool: """ Pool de conexões SQLite para performance máxima. Mantém conexões abertas permanentemente para evitar overhead. """ _instance = None _lock = threading.Lock() def __new__(cls, db_path: str = "akira.db"): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self, db_path: str = "akira.db"): if self._initialized: return self.db_path = db_path self._local = threading.local() self._initialized = True db_dir = os.path.dirname(db_path) if db_dir and not os.path.exists(db_dir): os.makedirs(db_dir, exist_ok=True) self._init_connection() def _init_connection(self): """Inicializa conexão permanente.""" try: conn = sqlite3.connect( self.db_path, timeout=30.0, check_same_thread=False ) conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA synchronous=NORMAL") conn.execute("PRAGMA cache_size=-64000") conn.execute("PRAGMA temp_store=MEMORY") conn.execute("PRAGMA busy_timeout=60000") conn.execute("PRAGMA mmap_size=268435456") conn.execute("PRAGMA foreign_keys=ON") conn.row_factory = sqlite3.Row self._local.conn = conn logger.info(f"✅ Conexão DB permanente: {self.db_path}") except Exception as e: logger.error(f"❌ Erro ao criar conexão DB: {e}") raise def _get_connection(self) -> sqlite3.Connection: if not hasattr(self._local, 'conn') or self._local.conn is None: self._init_connection() return self._local.conn def cursor(self): return self._get_connection().cursor() def close(self): if hasattr(self._local, 'conn') and self._local.conn: try: self._local.conn.close() self._local.conn = None except: pass _db_pool: Optional[DatabasePool] = None def get_db_pool() -> DatabasePool: global _db_pool if _db_pool is None: from .config import DB_PATH _db_pool = DatabasePool(DB_PATH) return _db_pool class Database: """ Banco de dados Akira V21 Ultimate. Pool de conexões permanente - NÃO fecha a cada operação! Sistema neural de aprendizado contínuo com 3 níveis de transição. """ # Class variables (used as defaults) CODIGOS_VERIFICACAO: Dict[str, Dict] = {} USUARIOS_PRIVILEGIADOS_DEFAULT: List[str] = [] def __init__(self, db_path: str = None): from .config import DB_PATH as CONFIG_DB_PATH, PRIVILEGED_USERS self.db_path = db_path or CONFIG_DB_PATH self._pool = get_db_pool() self._ensure_tables() # Instance variables from config self.USUARIOS_PRIVILEGIADOS = list(PRIVILEGED_USERS) logger.info("Database Akira V21 inicializado (pool permanente)") def _ensure_tables(self): """Cria todas as tabelas necessárias.""" conn = self._pool._get_connection() c = conn.cursor() # Tabela de mensagens c.execute(""" CREATE TABLE IF NOT EXISTS mensagens ( id INTEGER PRIMARY KEY AUTOINCREMENT, usuario TEXT, mensagem TEXT, resposta TEXT, numero TEXT, is_reply INTEGER DEFAULT 0, mensagem_original TEXT, humor TEXT DEFAULT 'neutro', modo_resposta TEXT DEFAULT 'normal', nivel_transicao INTEGER DEFAULT 1, usuario_privilegiado INTEGER DEFAULT 0, tom_detectado TEXT DEFAULT 'neutro', intensidade_tom REAL DEFAULT 0.5, reply_contexto TEXT, conversation_id TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) # 🔥 MIGRAÇÃO: Adiciona coluna conversation_id se não existir try: c.execute("ALTER TABLE mensagens ADD COLUMN conversation_id TEXT") logger.info("✅ Coluna conversation_id adicionada à tabela mensagens") except sqlite3.OperationalError: # Coluna já existe pass # Usuários privilegiados c.execute(""" CREATE TABLE IF NOT EXISTS usuarios_privilegiados ( id INTEGER PRIMARY KEY AUTOINCREMENT, numero TEXT UNIQUE, nome TEXT, apelido TEXT, modo_fala TEXT, codigo_verificacao TEXT, ativo INTEGER DEFAULT 1, nivel INTEGER DEFAULT 1, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) # Tom do usuário c.execute(""" CREATE TABLE IF NOT EXISTS tom_usuario ( id INTEGER PRIMARY KEY AUTOINCREMENT, numero_usuario TEXT, tom_detectado TEXT, intensidade REAL DEFAULT 0.5, contexto TEXT, humor TEXT DEFAULT 'neutro', nivel_transicao INTEGER DEFAULT 1, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) # Contexto persistente c.execute(""" CREATE TABLE IF NOT EXISTS contexto ( user_key TEXT PRIMARY KEY, historico TEXT, emocao_atual TEXT, humor_atual TEXT DEFAULT 'neutro', modo_resposta TEXT DEFAULT 'normal', nivel_transicao INTEGER DEFAULT 1, usuario_privilegiado INTEGER DEFAULT 0, termos TEXT, girias TEXT, tom TEXT, pesos_neurais TEXT, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) # Pesos neurais c.execute(""" CREATE TABLE IF NOT EXISTS pesos_neurais ( id INTEGER PRIMARY KEY AUTOINCREMENT, numero_usuario TEXT, tipo_peso TEXT, valor REAL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) # Histórico de interações c.execute(""" CREATE TABLE IF NOT EXISTS historico_interacoes ( id INTEGER PRIMARY KEY AUTOINCREMENT, numero_usuario TEXT, mensagem TEXT, resposta TEXT, emocao_detectada TEXT, tom_usado TEXT, tom_usuario TEXT, intensidade REAL, nivel_transicao INTEGER, reply_contexto TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) # Aprendizados c.execute(""" CREATE TABLE IF NOT EXISTS aprendizados ( id INTEGER PRIMARY KEY AUTOINCREMENT, numero_usuario TEXT, chave TEXT, valor TEXT, peso REAL DEFAULT 1.0, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) # Códigos de verificação c.execute(""" CREATE TABLE IF NOT EXISTS codigos_verificacao ( numero TEXT PRIMARY KEY, codigo TEXT, expira_em DATETIME, permissoes TEXT, nivel INTEGER, ativo INTEGER DEFAULT 1, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) # Embeddings vetoriais c.execute(""" CREATE TABLE IF NOT EXISTS embeddings ( id INTEGER PRIMARY KEY AUTOINCREMENT, numero_usuario TEXT, mensagem TEXT, embedding TEXT, tipo TEXT DEFAULT 'mensagem', created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) # Índices c.execute("CREATE INDEX IF NOT EXISTS idx_mensagens_numero ON mensagens(numero)") c.execute("CREATE INDEX IF NOT EXISTS idx_mensagens_created ON mensagens(created_at)") c.execute("CREATE INDEX IF NOT EXISTS idx_mensagens_conversation_id ON mensagens(conversation_id)") # 🔥 FIX c.execute("CREATE INDEX IF NOT EXISTS idx_tom_usuario ON tom_usuario(numero_usuario, created_at)") c.execute("CREATE INDEX IF NOT EXISTS idx_historico_interacoes ON historico_interacoes(numero_usuario, created_at)") c.execute("CREATE INDEX IF NOT EXISTS idx_contexto_key ON contexto(user_key)") c.execute("CREATE INDEX IF NOT EXISTS idx_pesos_neurais ON pesos_neurais(numero_usuario, tipo_peso)") # Usuários default usuarios_default = [ ('244937035662', 'Isaac Quarenta', 'Isaac', 'tecnico_formal', 3), ('244978787009', 'Isaac Quarenta 2', 'Isaac', 'tecnico_formal', 3) ] for numero, nome, apelido, modo, nivel in usuarios_default: c.execute( "INSERT OR IGNORE INTO usuarios_privilegiados (numero, nome, apelido, modo_fala, nivel) VALUES (?, ?, ?, ?, ?)", (numero, nome, apelido, modo, nivel) ) conn.commit() def _execute(self, query: str, params: tuple = None) -> Optional[List[sqlite3.Row]]: """Executa query na conexão permanente.""" conn = self._pool._get_connection() c = conn.cursor() c.execute(query, params or ()) if query.strip().upper().startswith("SELECT"): return c.fetchall() conn.commit() return None def _execute_with_retry(self, query: str, params: tuple = None, max_retries: int = 3) -> Optional[List[sqlite3.Row]]: """Executa query com retry em caso de falha.""" for attempt in range(max_retries): try: return self._execute(query, params) except Exception as e: if attempt < max_retries - 1: time.sleep(0.1 * (attempt + 1)) # Exponential backoff else: logger.warning(f"Query falhou após {max_retries} tentativas: {e}") return None # ================================================================ # SISTEMA NEURAL DE APRENDIZADO 3-NÍVEIS # ================================================================ def _update_neural_weights(self, numero_usuario: str, interacao: Dict[str, Any]) -> None: """Atualiza pesos neurais baseado na interação.""" try: tom_usuario = interacao.get('tom_detectado', 'neutro') emocao = interacao.get('emocao', 'neutral') intensidade = interacao.get('intensidade', 0.5) nivel = interacao.get('nivel_transicao', 1) tom_mapping = { 'rude': 'tom_rude', 'grosseiro': 'tom_rude', 'raivoso': 'tom_rude', 'formal': 'tom_neutro', 'neutro': 'tom_neutro', 'debochado': 'tom_proximo', 'proximo': 'tom_proximo', 'amoroso': 'tom_proximo', 'informal': 'tom_neutro' } tom_key = tom_mapping.get(tom_usuario, 'tom_neutro') current_weights = self._get_neural_weights(numero_usuario) learning_rate = 0.1 * nivel current_tom = current_weights.get(tom_key, 0.5) new_tom = current_tom + (intensidade - current_tom) * learning_rate new_tom = max(0.0, min(1.0, new_tom)) emocao_key = f'emocao_{emocao}' current_emocao = current_weights.get(emocao_key, 0.5) new_emocao = current_emocao + (intensidade - current_emocao) * learning_rate * 0.5 new_emocao = max(0.0, min(1.0, new_emocao)) eng_key = 'engajamento' current_eng = current_weights.get(eng_key, 0.5) new_eng = current_eng + (0.05 * nivel) new_eng = max(0.0, min(1.0, new_eng)) self._set_neural_weight(numero_usuario, tom_key, new_tom) self._set_neural_weight(numero_usuario, emocao_key, new_emocao) self._set_neural_weight(numero_usuario, eng_key, new_eng) except Exception as e: logger.warning(f"Erro pesos neurais: {e}") def _get_neural_weights(self, numero_usuario: str) -> Dict[str, float]: try: rows = self._execute( "SELECT tipo_peso, valor FROM pesos_neurais WHERE numero_usuario = ?", (numero_usuario,) ) return {r['tipo_peso']: r['valor'] for r in rows} if rows else {} except: return {} def _set_neural_weight(self, numero_usuario: str, tipo_peso: str, valor: float) -> None: self._execute( "INSERT OR REPLACE INTO pesos_neurais (numero_usuario, tipo_peso, valor, updated_at) VALUES (?, ?, ?, CURRENT_TIMESTAMP)", (numero_usuario, tipo_peso, valor) ) def get_next_tom_level(self, numero_usuario: str, user_tom: str) -> Tuple[str, int]: """Determina próximo nível de tom baseado no padrão do usuário.""" weights = self._get_neural_weights(numero_usuario) if user_tom in ['rude', 'grosseiro', 'raivoso']: rude_weight = weights.get('tom_rude', 0.0) if rude_weight > 0.7: return ('rude_forte', 3) elif rude_weight > 0.4: return ('rude_moderado', 2) else: return ('rude_sutil', 1) elif user_tom in ['debochado', 'proximo', 'amoroso']: prox_weight = weights.get('tom_proximo', 0.0) if prox_weight > 0.7: return ('proximo_forte', 3) elif prox_weight > 0.4: return ('proximo_moderado', 2) else: return ('proximo_sutil', 1) return ('neutro', 1) # ================================================================ # REGISTRO DE INTERAÇÕES # ================================================================ def registrar_interacao_completa( self, numero_usuario: str, mensagem: str, resposta: str, emocao_detectada: str, tom_usado: str, tom_usuario: str, intensidade: float = 0.5, nivel_transicao: int = 1, reply_contexto: Optional[str] = None, is_reply: bool = False, mensagem_original: Optional[str] = None, humor: str = "neutro" ) -> bool: """Registra interação completa para aprendizado neural.""" try: self._execute( """INSERT INTO historico_interacoes (numero_usuario, mensagem, resposta, emocao_detectada, tom_usado, tom_usuario, intensidade, nivel_transicao, reply_contexto) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", (numero_usuario, mensagem[:1000], resposta[:2000], emocao_detectada, tom_usado, tom_usuario, intensidade, nivel_transicao, reply_contexto) ) self._update_neural_weights(numero_usuario, { 'tom_detectado': tom_usuario, 'emocao': emocao_detectada, 'intensidade': intensidade, 'nivel_transicao': nivel_transicao }) self._execute( """INSERT INTO tom_usuario (numero_usuario, tom_detectado, intensidade, contexto, humor, nivel_transicao) VALUES (?, ?, ?, ?, ?, ?)""", (numero_usuario, tom_usuario, intensidade, reply_contexto, humor, nivel_transicao) ) self._execute( """INSERT INTO mensagens (usuario, mensagem, resposta, numero, is_reply, mensagem_original, humor, modo_resposta, nivel_transicao, tom_detectado, intensidade_tom, reply_contexto) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (numero_usuario, mensagem[:1000], resposta[:2000], numero_usuario, 1 if is_reply else 0, mensagem_original, humor, tom_usado, nivel_transicao, tom_usuario, intensidade, reply_contexto) ) pesos = self._get_neural_weights(numero_usuario) self._execute( """INSERT OR REPLACE INTO contexto (user_key, emocao_atual, humor_atual, modo_resposta, nivel_transicao, tom, pesos_neurais, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)""", (numero_usuario, emocao_detectada, humor, tom_usado, nivel_transicao, tom_usuario, json.dumps(pesos)) ) return True except Exception as e: logger.warning(f"Erro ao registrar interação: {e}") return False def recuperar_contexto_persistente(self, numero_usuario: str) -> Dict[str, Any]: try: rows = self._execute( "SELECT * FROM contexto WHERE user_key = ?", (numero_usuario,) ) if rows: row = rows[0] pesos = json.loads(row['pesos_neurais']) if row['pesos_neurais'] else {} return { 'emocao': row['emocao_atual'], 'humor': row['humor_atual'], 'tom': row['tom'], 'nivel': row['nivel_transicao'], 'pesos_neurais': pesos, 'updated_at': row['updated_at'] } return {} except: return {} def get_historical_interactions(self, numero_usuario: str, limit: int = 50) -> List[Dict]: try: rows = self._execute( """SELECT * FROM historico_interacoes WHERE numero_usuario = ? ORDER BY created_at DESC LIMIT ?""", (numero_usuario, limit) ) return [ { 'mensagem': r['mensagem'], 'resposta': r['resposta'], 'emocao': r['emocao_detectada'], 'tom_usuario': r['tom_usuario'], 'intensidade': r['intensidade'], 'nivel': r['nivel_transicao'], 'reply_contexto': r['reply_contexto'], 'created_at': r['created_at'] } for r in rows ] if rows else [] except: return [] # ================================================================ # PRIVILÉGIOS # ================================================================ def verificar_privilegios_usuario(self, numero: str) -> Dict[str, Any]: if not numero: return {"privilegiado": False, "nivel": 0} numero_limpo = re.sub(r'[^\d]', '', str(numero)) if numero_limpo in self.USUARIOS_PRIVILEGIADOS: return {"privilegiado": True, "nivel": 3, "permissoes": ["all"]} try: rows = self._execute( "SELECT nivel, permissoes FROM usuarios_privilegiados WHERE numero = ? AND ativo = 1", (numero_limpo,) ) if rows: permissoes = json.loads(rows[0]['permissoes']) if rows[0]['permissoes'] else [] return {"privilegiado": True, "nivel": rows[0]['nivel'], "permissoes": permissoes} except: pass return {"privilegiado": False, "nivel": 0} def eh_privilegiado(self, numero: str) -> bool: return self.verificar_privilegios_usuario(numero).get("privilegiado", False) def obter_modo_fala_privilegiado(self, numero: str) -> Optional[str]: """Obtém o modo de fala de um usuário privilegiado.""" try: rows = self._execute( "SELECT modo_fala FROM usuarios_privilegiados WHERE numero = ? AND ativo = 1", (numero,) ) return rows[0]['modo_fala'] if rows else None except: return None # ================================================================ # OPERAÇÕES BÁSICAS # ================================================================ def salvar_mensagem(self, usuario=None, mensagem=None, resposta=None, numero=None, is_reply=False, mensagem_original=None, humor='neutro', modo_resposta='normal', nivel_transicao=1, conversation_id=None, **kwargs) -> bool: """ Salva mensagem no banco com suporte a isolamento de contexto. Args: conversation_id: ID único da conversa (pv:numero ou grupo:grupo_id) 🔥 FIX """ try: self._execute( """INSERT INTO mensagens (usuario, mensagem, resposta, numero, is_reply, mensagem_original, humor, modo_resposta, nivel_transicao, conversation_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (usuario, mensagem, resposta, numero, 1 if is_reply else 0, mensagem_original, humor, modo_resposta, nivel_transicao, conversation_id) ) if conversation_id: logger.debug(f"💾 Mensagem salva com conversation_id: {conversation_id[:30]}...") return True except Exception as e: logger.warning(f"Erro ao salvar mensagem: {e}") return False def recuperar_historico( self, usuario: str, limite: int = 20, conversation_id: Optional[str] = None # 🔥 CONTEXT ISOLATION FIX ) -> List[Tuple[str, str]]: """ Recupera histórico de mensagens. Args: usuario: Nome ou número do usuário limite: Máximo de mensagens a retornar conversation_id: ID da conversa isolada (PV ou grupo específico) Returns: Lista de tuplas (mensagem, resposta) """ try: if conversation_id: # 🔥 ISOLADO: Filtra por conversation_id (contexto específico) rows = self._execute( """SELECT mensagem, resposta FROM mensagens WHERE conversation_id = ? ORDER BY id DESC LIMIT ?""", (conversation_id, limite) ) logger.debug(f"📝 Histórico recuperado por conversation_id: {len(rows) if rows else 0} mensagens") else: # LEGACY: Filtra por usuário (mantido para retrocompatibilidade) rows = self._execute( """SELECT mensagem, resposta FROM mensagens WHERE usuario = ? OR numero = ? ORDER BY id DESC LIMIT ?""", (usuario, usuario, limite) ) logger.debug(f"📝 Histórico recuperado por usuário (legacy): {len(rows) if rows else 0} mensagens") return [(r['mensagem'], r['resposta']) for r in rows] if rows else [] except Exception as e: logger.warning(f"Erro ao recuperar histórico: {e}") return [] def recuperar_mensagens(self, usuario: str, limite: int = 20) -> List[Tuple[str, str]]: """Alias para recuperar_historico para compatibilidade.""" return self.recuperar_historico(usuario, limite) def salvar_aprendizado(self, numero_usuario: str, chave: str, valor: str) -> bool: try: self._execute( "INSERT INTO aprendizados (numero_usuario, chave, valor) VALUES (?, ?, ?)", (numero_usuario, chave, valor) ) return True except: return False def salvar_aprendizado_detalhado(self, numero_usuario: str, chave: str, valor: str) -> bool: """Salva um aprendizado detalhado.""" return self.salvar_aprendizado(numero_usuario, chave, valor) def salvar_giria_aprendida(self, numero_usuario: str, giria: str, significado: str, contexto: str) -> bool: """Salva uma gíria aprendida.""" try: chave = f"giria_{giria}" valor = json.dumps({ 'significado': significado, 'contexto': contexto, 'frequencia': 1 }) return self.salvar_aprendizado(numero_usuario, chave, valor) except: return False def salvar_contexto(self, user_key: str, historico: str, emocao_atual: str, termos: str, girias: str, tom: str) -> bool: """Salva o contexto do usuário.""" try: self._execute( "INSERT OR REPLACE INTO contexto (user_key, historico, emocao_atual, humor_atual, termos, girias, tom, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)", (user_key, historico, emocao_atual, emocao_atual, termos, girias, tom) ) return True except: return False def salvar_embedding(self, numero_usuario: str, mensagem: str, embedding: List[float], tipo: str = "mensagem") -> bool: """Salva embedding vetorial da mensagem.""" try: # Convert numpy array to list if necessary if NUMPY_AVAILABLE and isinstance(embedding, np.ndarray): # Handle both 1D and multi-dimensional arrays if embedding.ndim == 1: embedding = embedding.tolist() else: # Flatten multi-dimensional arrays embedding = embedding.flatten().tolist() # Also handle nested numpy arrays and ensure all elements are JSON serializable elif isinstance(embedding, (list, tuple)): embedding = [ float(x) if NUMPY_AVAILABLE and isinstance(x, (np.number, np.ndarray)) else (x.tolist() if NUMPY_AVAILABLE and isinstance(x, np.ndarray) else x) for x in embedding ] # Final check: ensure all elements are JSON serializable (float, int, etc.) def make_serializable(obj): if NUMPY_AVAILABLE and isinstance(obj, np.ndarray): return obj.tolist() elif NUMPY_AVAILABLE and isinstance(obj, np.number): return float(obj) elif isinstance(obj, (list, tuple)): return [make_serializable(x) for x in obj] else: return obj embedding = make_serializable(embedding) embedding_json = json.dumps(embedding) self._execute( "INSERT INTO embeddings (numero_usuario, mensagem, embedding, tipo) VALUES (?, ?, ?, ?)", (numero_usuario, mensagem[:500], embedding_json, tipo) ) return True except Exception as e: logger.warning(f"Erro ao salvar embedding: {e}") return False def registrar_tom_usuario(self, numero_usuario: str, tom_detectado: str, intensidade: float = 0.5, contexto: str = "", humor: str = "neutro", nivel_transicao: int = 1) -> bool: """Registra o tom detectado do usuário.""" try: self._execute( """INSERT INTO tom_usuario (numero_usuario, tom_detectado, intensidade, contexto, humor, nivel_transicao) VALUES (?, ?, ?, ?, ?, ?)""", (numero_usuario, tom_detectado, intensidade, contexto, humor, nivel_transicao) ) return True except Exception as e: logger.warning(f"Erro ao registrar tom do usuário: {e}") return False def obter_tom_predominante(self, numero_usuario: str) -> Optional[str]: """Obtém o tom predominante do usuário baseado no histórico.""" try: rows = self._execute( """SELECT tom_detectado, COUNT(*) as freq FROM tom_usuario WHERE numero_usuario = ? GROUP BY tom_detectado ORDER BY freq DESC LIMIT 1""", (numero_usuario,) ) return rows[0]['tom_detectado'] if rows else None except Exception as e: logger.warning(f"Erro ao obter tom predominante: {e}") return None def recuperar_aprendizados(self, numero_usuario: str) -> Dict[str, str]: try: rows = self._execute( "SELECT chave, valor FROM aprendizados WHERE numero_usuario = ?", (numero_usuario,) ) return {r['chave']: r['valor'] for r in rows} if rows else {} except: return {} def recuperar_aprendizado_detalhado(self, numero_usuario: str, chave: str) -> Optional[str]: """Recupera um aprendizado detalhado específico.""" try: rows = self._execute( "SELECT valor FROM aprendizados WHERE numero_usuario = ? AND chave = ?", (numero_usuario, chave) ) return rows[0]['valor'] if rows else None except: return None def recuperar_girias_usuario(self, numero_usuario: str) -> List[Dict[str, Any]]: """Recupera gírias aprendidas pelo usuário.""" try: # Assuming girias are stored in aprendizados with chave starting with 'giria_' rows = self._execute( "SELECT chave, valor FROM aprendizados WHERE numero_usuario = ? AND chave LIKE 'giria_%'", (numero_usuario,) ) girias = [] for row in rows: chave = row['chave'] valor = row['valor'] try: # Try to parse as JSON data = json.loads(valor) girias.append({ 'giria': chave.replace('giria_', ''), 'significado': data.get('significado', valor), 'frequencia': data.get('frequencia', 1) }) except: girias.append({ 'giria': chave.replace('giria_', ''), 'significado': valor, 'frequencia': 1 }) return girias except: return [] def get_stats(self) -> Dict[str, int]: try: total_msgs = self._execute("SELECT COUNT(*) FROM mensagens") total_users = self._execute("SELECT COUNT(DISTINCT numero) FROM mensagens") total_interacoes = self._execute("SELECT COUNT(*) FROM historico_interacoes") return { 'mensagens': total_msgs[0][0] if total_msgs else 0, 'usuarios': total_users[0][0] if total_users else 0, 'interacoes_neurais': total_interacoes[0][0] if total_interacoes else 0 } except: return {'mensagens': 0, 'usuarios': 0, 'interacoes_neurais': 0} # ================================================================ # CONTEXT ISOLATION - NOVOS MÉTODOS # ================================================================ def _ensure_context_isolation_tables(self): """Garante criação das tabelas de isolamento de contexto.""" conn = self._pool._get_connection() c = conn.cursor() # Tabela de contextos isolados c.execute(""" CREATE TABLE IF NOT EXISTS contextos_isolados ( id INTEGER PRIMARY KEY AUTOINCREMENT, context_id TEXT UNIQUE, numero_usuario TEXT, grupo_id TEXT, tipo_conversa TEXT, short_memory TEXT, estado_emocional TEXT DEFAULT 'neutral', nivel_intimidade INTEGER DEFAULT 1, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (context_id) ) """) # Tabela de embeddings vetoriais (para memória persistente) c.execute(""" CREATE TABLE IF NOT EXISTS embeddings_vectoriais ( id INTEGER PRIMARY KEY AUTOINCREMENT, conversation_id TEXT, mensagem TEXT, embedding TEXT, embedding_model TEXT, tipo TEXT DEFAULT 'mensagem', metadata TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) # Índices para performance c.execute("CREATE INDEX IF NOT EXISTS idx_ci_context_id ON contextos_isolados(context_id)") c.execute("CREATE INDEX IF NOT EXISTS idx_ci_numero ON contextos_isolados(numero_usuario)") c.execute("CREATE INDEX IF NOT EXISTS idx_ci_grupo ON contextos_isolados(grupo_id)") c.execute("CREATE INDEX IF NOT EXISTS idx_ev_conversation ON embeddings_vectoriais(conversation_id)") c.execute("CREATE INDEX IF NOT EXISTS idx_ev_tipo ON embeddings_vectoriais(tipo)") conn.commit() logger.debug("✅ Tabelas de isolamento de contexto criadas/verificadas") def salvar_contexto_isolado(self, context_data: Dict[str, Any]) -> bool: """ Salva contexto isolado no banco. Args: context_data: Dicionário com dados do ConversationContext Returns: True se salvo com sucesso """ try: self._ensure_context_isolation_tables() self._execute( """INSERT OR REPLACE INTO contextos_isolados (context_id, numero_usuario, grupo_id, tipo_conversa, short_memory, estado_emocional, nivel_intimidade, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)""", ( context_data.get('context_id'), context_data.get('numero_usuario'), context_data.get('grupo_id'), context_data.get('tipo_conversa'), json.dumps(context_data.get('short_memory', [])), context_data.get('estado_emocional', 'neutral'), context_data.get('nivel_intimidade', 1) ) ) return True except Exception as e: logger.warning(f"Erro ao salvar contexto isolado: {e}") return False def recuperar_contexto_isolado(self, context_id: str) -> Optional[Dict[str, Any]]: """ Recupera contexto isolado pelo ID. Args: context_id: ID do contexto Returns: Dicionário com dados ou None """ try: self._ensure_context_isolation_tables() rows = self._execute( "SELECT * FROM contextos_isolados WHERE context_id = ?", (context_id,) ) if rows: row = rows[0] return { 'context_id': row['context_id'], 'numero_usuario': row['numero_usuario'], 'grupo_id': row['grupo_id'], 'tipo_conversa': row['tipo_conversa'], 'short_memory': json.loads(row['short_memory']) if row['short_memory'] else [], 'estado_emocional': row['estado_emocional'], 'nivel_intimidade': row['nivel_intimidade'], 'created_at': row['created_at'], 'updated_at': row['updated_at'] } return None except Exception as e: logger.warning(f"Erro ao recuperar contexto isolado: {e}") return None def deletar_contexto_isolado(self, context_id: str) -> bool: """ Deleta contexto isolado. Args: context_id: ID do contexto Returns: True se deletado com sucesso """ try: self._ensure_context_isolation_tables() # Deleta contexto self._execute( "DELETE FROM contextos_isolados WHERE context_id = ?", (context_id,) ) # Deleta embeddings associados self._execute( "DELETE FROM embeddings_vectoriais WHERE conversation_id = ?", (context_id,) ) return True except Exception as e: logger.warning(f"Erro ao deletar contexto isolado: {e}") return False def listar_contextos_usuario(self, numero_usuario: str) -> List[Dict[str, Any]]: """ Lista todos os contextos de um usuário. Args: numero_usuario: Número do usuário Returns: Lista de contextos """ try: self._ensure_context_isolation_tables() rows = self._execute( "SELECT * FROM contextos_isolados WHERE numero_usuario = ? ORDER BY updated_at DESC", (numero_usuario,) ) contexts = [] for row in rows: contexts.append({ 'context_id': row['context_id'], 'numero_usuario': row['numero_usuario'], 'grupo_id': row['grupo_id'], 'tipo_conversa': row['tipo_conversa'], 'short_memory': json.loads(row['short_memory']) if row['short_memory'] else [], 'estado_emocional': row['estado_emocional'], 'nivel_intimidade': row['nivel_intimidade'], 'created_at': row['created_at'], 'updated_at': row['updated_at'] }) return contexts except Exception as e: logger.warning(f"Erro ao listar contextos do usuário: {e}") return [] def salvar_embedding_vetorial( self, conversation_id: str, mensagem: str, embedding: List[float], embedding_model: str = "paraphrase-multilingual-MiniLM-L12-v2", tipo: str = "mensagem", metadata: Optional[Dict[str, Any]] = None ) -> bool: """ Salva embedding vetorial para memória persistente. Args: conversation_id: ID da conversa isolada mensagem: Texto original embedding: Vetor de embedding embedding_model: Modelo usado tipo: Tipo de embedding metadata: Metadados adicionais Returns: True se salvo com sucesso """ try: self._ensure_context_isolation_tables() # Convert numpy array to list if necessary if NUMPY_AVAILABLE and isinstance(embedding, np.ndarray): embedding = embedding.tolist() # Also handle nested numpy arrays elif isinstance(embedding, (list, tuple)): embedding = [ float(x) if NUMPY_AVAILABLE and isinstance(x, (np.number, np.ndarray)) else x for x in embedding ] embedding_json = json.dumps(embedding) metadata_json = json.dumps(metadata) if metadata else None self._execute( """INSERT INTO embeddings_vectoriais (conversation_id, mensagem, embedding, embedding_model, tipo, metadata) VALUES (?, ?, ?, ?, ?, ?)""", (conversation_id, mensagem[:500], embedding_json, embedding_model, tipo, metadata_json) ) return True except Exception as e: logger.warning(f"Erro ao salvar embedding vetorial: {e}") return False def buscar_similaridade_vetorial( self, conversation_id: str, query_embedding: List[float], top_k: int = 5 ) -> List[Dict[str, Any]]: """ Busca embeddings similares (similaridade por cosseno). Args: conversation_id: ID da conversa query_embedding: Vetor de query top_k: Quantidade de resultados Returns: Lista de resultados ordenados por similaridade """ try: self._ensure_context_isolation_tables() # Busca todos os embeddings da conversa rows = self._execute( "SELECT * FROM embeddings_vectoriais WHERE conversation_id = ?", (conversation_id,) ) if not rows: return [] # Calcula similaridade por cosseno results = [] for row in rows: try: stored_embedding = json.loads(row['embedding']) similarity = self._cosine_similarity(query_embedding, stored_embedding) results.append({ 'mensagem': row['mensagem'], 'similarity': similarity, 'tipo': row['tipo'], 'metadata': json.loads(row['metadata']) if row['metadata'] else {}, 'created_at': row['created_at'] }) except Exception: continue # Ordena por similaridade e limita results.sort(key=lambda x: x['similarity'], reverse=True) return results[:top_k] except Exception as e: logger.warning(f"Erro ao buscar similaridade vetorial: {e}") return [] def _cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float: """Calcula similaridade por cosseno entre dois vetores.""" try: if not vec1 or not vec2: return 0.0 import math # Converte para float v1 = [float(x) for x in vec1] v2 = [float(x) for x in vec2] # Garante mesmo tamanho min_len = min(len(v1), len(v2)) v1 = v1[:min_len] v2 = v2[:min_len] # Produto escalar dot_product = sum(a * b for a, b in zip(v1, v2)) # Normas norm1 = math.sqrt(sum(a * a for a in v1)) norm2 = math.sqrt(sum(a * a for a in v2)) if norm1 == 0 or norm2 == 0: return 0.0 return dot_product / (norm1 * norm2) except Exception: return 0.0 def atualizar_short_memory_contexto( self, context_id: str, short_memory: List[Dict[str, Any]] ) -> bool: """ Atualiza apenas a memória de curto prazo de um contexto. Args: context_id: ID do contexto short_memory: Nova lista de mensagens Returns: True se atualizado com sucesso """ try: self._ensure_context_isolation_tables() self._execute( """UPDATE contextos_isolados SET short_memory = ?, updated_at = CURRENT_TIMESTAMP WHERE context_id = ?""", (json.dumps(short_memory), context_id) ) return True except Exception as e: logger.warning(f"Erro ao atualizar short memory: {e}") return False def get_conversation_stats(self, conversation_id: str) -> Dict[str, Any]: """ Retorna estatísticas de uma conversa isolada. Args: conversation_id: ID da conversa Returns: Dicionário com estatísticas """ try: self._ensure_context_isolation_tables() # Dados do contexto context = self.recuperar_contexto_isolado(conversation_id) # Contagem de embeddings embedding_count = self._execute( "SELECT COUNT(*) FROM embeddings_vectoriais WHERE conversation_id = ?", (conversation_id,) ) return { 'context_exists': context is not None, 'numero_usuario': context.get('numero_usuario') if context else None, 'tipo_conversa': context.get('tipo_conversa') if context else None, 'grupo_id': context.get('grupo_id') if context else None, 'short_memory_count': len(context.get('short_memory', [])) if context else 0, 'nivel_intimidade': context.get('nivel_intimidade', 1) if context else 1, 'estado_emocional': context.get('estado_emocional', 'neutral') if context else 'neutral', 'embeddings_count': embedding_count[0][0] if embedding_count else 0, 'updated_at': context.get('updated_at') if context else None } except Exception as e: logger.warning(f"Erro ao obter stats da conversa: {e}") return {} def get_database() -> Database: return Database() def init_pool(): global _db_pool if _db_pool is None: from .config import DB_PATH _db_pool = DatabasePool(DB_PATH) logger.info("✅ Database pool inicializado") return _db_pool