diff --git "a/modules/database.py" "b/modules/database.py" --- "a/modules/database.py" +++ "b/modules/database.py" @@ -1,1347 +1,1336 @@ -""" -================================================================================ -AKIRA V21 ULTIMATE - DATABASE MODULE -================================================================================ -Banco de dados SQLite extremamente robusto, moderno e completo. -Gerencia: mensagens, embeddings, gírias, tom, aprendizados, API logs, training sessions. - -Features: -- SQLite com WAL mode para performance máxima -- Retry logic com exponential backoff -- Full-text search com FTS5 -- Vector storage para embeddings (SentenceTransformers) -- Transactions.atomic() -- Backup/restore automático -- Health checks e métricas detalhadas -- Índices otimizados -- Migration system completo -- Logging detalhado -- Singleton pattern para conexões -- Suporte a numpy arrays para embeddings -- API performance tracking -- Training sessions tracking -================================================================================ -""" - -import sqlite3 -import time -import os -import json -import hashlib -import random -from typing import Optional, List, Dict, Any, Tuple, Union -from datetime import datetime -from loguru import logger - - -class Database: - """ - Classe de banco de dados robusta para Akira V21 Ultimate. - Suporta múltiplas tabelas, migrações automáticas e operações com retry. - """ - - # Códigos de verificação para usuários privilegiados - CODIGOS_VERIFICACAO: Dict[str, str] = {} - - def __init__(self, db_path: str = "/akira/data/akira.db"): - """ - Inicializa a conexão com o banco de dados. - - Args: - db_path: Caminho para o arquivo do banco de dados SQLite - """ - self.db_path = db_path - self.max_retries = 5 - self.retry_delay = 0.1 - - # Garante que o diretório /akira/data existe (Docker) - db_dir = os.path.dirname(db_path) - if db_dir and not os.path.exists(db_dir): - os.makedirs(db_dir, exist_ok=True) - - self._init_db() - self._init_context_isolation_tables() - self._ensure_all_columns_and_indexes() - logger.info(f"Database inicializado: {self.db_path}") - - # ================================================================ - # CONEXÃO + RETRY - # ================================================================ - def _get_connection(self) -> sqlite3.Connection: - """Obtém conexão com retry automático.""" - for attempt in range(self.max_retries): - try: - conn = sqlite3.connect( - self.db_path, - timeout=30.0, - check_same_thread=False - ) - # Otimizações SQLite para performance - conn.execute("PRAGMA journal_mode=WAL") - conn.execute("PRAGMA synchronous=NORMAL") - conn.execute("PRAGMA cache_size=1000") - conn.execute("PRAGMA temp_store=MEMORY") - conn.execute("PRAGMA busy_timeout=30000") - conn.execute("PRAGMA foreign_keys=ON") - conn.row_factory = sqlite3.Row - return conn - except sqlite3.OperationalError as e: - if "locked" in str(e) and attempt < self.max_retries - 1: - time.sleep(self.retry_delay * (2 ** attempt)) - continue - logger.error(f"Erro de conexão DB: {e}") - raise - raise sqlite3.OperationalError("Falha ao conectar ao banco após várias tentativas") - - def _execute_with_retry( - self, - query: str, - params: Optional[tuple] = None, - commit: bool = False - ) -> Optional[List[sqlite3.Row]]: - """Executa query com retry automático.""" - for attempt in range(self.max_retries): - try: - with self._get_connection() as conn: - cur = conn.cursor() - cur.execute(query, params or ()) - - if query.strip().upper().startswith("SELECT"): - result = cur.fetchall() - return result - - if commit: - conn.commit() - return None - - except sqlite3.OperationalError as e: - if "locked" in str(e) and attempt < self.max_retries - 1: - time.sleep(self.retry_delay * (2 ** attempt)) - continue - logger.error(f"Erro SQL: {e}") - raise - raise sqlite3.OperationalError("Query falhou após retries") - - # ================================================================ - # SCHEMA + MIGRAÇÃO - # ================================================================ - def _init_db(self): - """Inicializa todas as tabelas do banco.""" - try: - with self._get_connection() as conn: - c = conn.cursor() - - # Tabela de mensagens - c.executescript(""" - CREATE TABLE IF NOT EXISTS mensagens ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - usuario TEXT, - mensagem TEXT, - resposta TEXT, - numero TEXT, - is_reply BOOLEAN DEFAULT 0, - mensagem_original TEXT, - humor TEXT DEFAULT 'neutro', - modo_resposta TEXT DEFAULT 'normal', - nivel_transicao INTEGER DEFAULT 1, - usuario_privilegiado BOOLEAN DEFAULT 0, - modelo_usado TEXT DEFAULT 'desconhecido', - conversation_id TEXT DEFAULT '', - message_id TEXT UNIQUE, -- ✅ IDEMPOTENCY KEY - created_at DATETIME DEFAULT CURRENT_TIMESTAMP - ); - """) - - # Tabela de usuários privilegiados - c.executescript(""" - CREATE TABLE IF NOT EXISTS usuarios_privilegiados ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - numero TEXT UNIQUE, - nome TEXT, - apelido TEXT, - modo_fala TEXT, - codigo_verificacao TEXT, - ativo BOOLEAN DEFAULT 1, - privilegio_temporario_ativo BOOLEAN DEFAULT 0, - expira_em REAL, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP - ); - """) - - # Tabela de embeddings - c.executescript(""" - CREATE TABLE IF NOT EXISTS embeddings ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - numero_usuario TEXT, - source_type TEXT, - texto TEXT, - embedding BLOB - ); - """) - - # Tabela de aprendizados - c.executescript(""" - CREATE TABLE IF NOT EXISTS aprendizados ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - numero_usuario TEXT, - chave TEXT, - valor TEXT, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP - ); - """) - - # Tabela de gírias aprendidas - c.executescript(""" - CREATE TABLE IF NOT EXISTS girias_aprendidas ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - numero_usuario TEXT, - giria TEXT, - significado TEXT, - contexto TEXT, - frequencia INTEGER DEFAULT 1, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - updated_at DATETIME DEFAULT CURRENT_TIMESTAMP - ); - """) - - # Tabela de tom do usuário - c.executescript(""" - CREATE TABLE IF NOT EXISTS tom_usuario ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - numero_usuario TEXT, - tom_detectado TEXT, - intensidade REAL DEFAULT 0.5, - contexto TEXT, - humor TEXT DEFAULT 'neutro', - created_at DATETIME DEFAULT CURRENT_TIMESTAMP - ); - """) - - # Tabela de contexto - c.executescript(""" - CREATE TABLE IF NOT EXISTS contexto ( - user_key TEXT PRIMARY KEY, - historico TEXT, - emocao_atual TEXT, - humor_atual TEXT DEFAULT 'neutro', - modo_resposta TEXT DEFAULT 'normal', - nivel_transicao INTEGER DEFAULT 1, - usuario_privilegiado BOOLEAN DEFAULT 0, - termos TEXT, - girias TEXT, - tom TEXT, - updated_at DATETIME DEFAULT CURRENT_TIMESTAMP - ); - """) - - # Tabela de pronomes por tom - c.executescript(""" - CREATE TABLE IF NOT EXISTS pronomes_por_tom ( - tom TEXT PRIMARY KEY, - pronomes TEXT - ); - """) - - # Tabela de Persona do Usuário (Character.AI style LTM) - c.executescript(""" - CREATE TABLE IF NOT EXISTS persona_usuario ( - numero_usuario TEXT PRIMARY KEY, - personalidade TEXT, - vicios_linguagem TEXT, - gostos TEXT, - desgostos TEXT, - emocional TEXT, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - updated_at DATETIME DEFAULT CURRENT_TIMESTAMP - ); - """) - - # Insere dados padrão de pronomes - c.execute("INSERT OR IGNORE INTO pronomes_por_tom (tom, pronomes) VALUES (?, ?)", - ('neutro', 'tu/você')) - c.execute("INSERT OR IGNORE INTO pronomes_por_tom (tom, pronomes) VALUES (?, ?)", - ('formal', 'o senhor/a senhora')) - c.execute("INSERT OR IGNORE INTO pronomes_por_tom (tom, pronomes) VALUES (?, ?)", - ('informal', 'puto/kota')) - c.execute("INSERT OR IGNORE INTO pronomes_por_tom (tom, pronomes) VALUES (?, ?)", - ('tecnico_formal', 'senhor')) - - # Insere usuários privilegiados padrão - usuarios_default = [ - ('244937035662', 'Isaac Quarenta', 'Isaac', 'tecnico_formal'), - ('244978787009', 'Isaac Quarenta 2', 'Isaac', 'tecnico_formal') - ] - for numero, nome, apelido, modo in usuarios_default: - c.execute(""" - INSERT OR IGNORE INTO usuarios_privilegiados - (numero, nome, apelido, modo_fala) VALUES (?, ?, ?, ?) - """, (numero, nome, apelido, modo)) - - # ===== LSTM MEMORY SYSTEM TABLES ===== - c.executescript(""" - CREATE TABLE IF NOT EXISTS lstm_contexto ( - context_id VARCHAR(255) PRIMARY KEY, - numero_usuario VARCHAR(50) NOT NULL, - topic_principal VARCHAR(255), - subtopicas JSON, - conversation_path JSON, - interaction_pattern VARCHAR(50), - emotional_state VARCHAR(50), - unanswered_questions JSON, - assumed_knowledge JSON, - last_key_message TEXT, - context_switches INTEGER DEFAULT 0, - contradictions JSON, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - metadata JSON, - UNIQUE(context_id), - UNIQUE(numero_usuario, context_id) - ); - """) - - c.executescript(""" - CREATE INDEX IF NOT EXISTS idx_lstm_usuario ON lstm_contexto(numero_usuario); - CREATE INDEX IF NOT EXISTS idx_lstm_created ON lstm_contexto(created_at); - """) - - c.executescript(""" - CREATE TABLE IF NOT EXISTS lstm_message_links ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - context_id VARCHAR(255) NOT NULL, - message_id VARCHAR(255) NOT NULL, - parent_message_id VARCHAR(255), - topic_changed BOOLEAN DEFAULT FALSE, - context_switch_type VARCHAR(50), - relevance_score FLOAT DEFAULT 0.0, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - UNIQUE(context_id, message_id), - FOREIGN KEY (context_id) REFERENCES lstm_contexto(context_id) ON DELETE CASCADE - ); - """) - - c.executescript(""" - CREATE INDEX IF NOT EXISTS idx_lstm_msg_context ON lstm_message_links(context_id); - CREATE INDEX IF NOT EXISTS idx_lstm_msg_message ON lstm_message_links(message_id); - """) - - # Tabela de uso de ferramentas (Skills Usage) - c.executescript(""" - CREATE TABLE IF NOT EXISTS tool_usage ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - numero_usuario TEXT, - month_year TEXT, - usage_count INTEGER DEFAULT 0, - UNIQUE(numero_usuario, month_year) - ); - """) - - # ===== AUTONOMOUS AGENT EVENTS TABLE ===== - c.executescript(""" - CREATE TABLE IF NOT EXISTS system_events ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - tipo TEXT NOT NULL, - servidor TEXT DEFAULT 'hf_spaces', - descricao TEXT, - acao_tomada TEXT, - resolvido BOOLEAN DEFAULT 0, - notificado BOOLEAN DEFAULT 0, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP - ); - CREATE INDEX IF NOT EXISTS idx_events_tipo ON system_events(tipo); - CREATE INDEX IF NOT EXISTS idx_events_created ON system_events(created_at); - """) - - conn.commit() - - logger.info(f"Banco de dados inicializado: {self.db_path}") - - except Exception as e: - logger.error(f"Erro ao criar tabelas: {e}") - raise - - def _ensure_all_columns_and_indexes(self): - """Garante que todas as colunas e índices existam.""" - try: - with self._get_connection() as conn: - c = conn.cursor() - - # Adiciona colunas faltantes na tabela mensagens - columns_to_add = { - 'mensagens': [ - ('humor', 'TEXT DEFAULT "neutro"'), - ('modo_resposta', 'TEXT DEFAULT "normal"'), - ('nivel_transicao', 'INTEGER DEFAULT 1'), - ('usuario_privilegiado', 'BOOLEAN DEFAULT 0'), - ('modelo_usado', 'TEXT DEFAULT "desconhecido"'), - ('conversation_id', 'TEXT DEFAULT ""') - ], - 'tom_usuario': [ - ('humor', 'TEXT DEFAULT "neutro"') - ], - 'contexto': [ - ('humor_atual', 'TEXT DEFAULT "neutro"'), - ('modo_resposta', 'TEXT DEFAULT "normal"'), - ('nivel_transicao', 'INTEGER DEFAULT 1'), - ('usuario_privilegiado', 'BOOLEAN DEFAULT 0'), - ('updated_at', 'DATETIME DEFAULT CURRENT_TIMESTAMP') - ], - 'usuarios_privilegiados': [ - ('privilegio_temporario_ativo', 'BOOLEAN DEFAULT 0'), - ('expira_em', 'REAL') - ] - } - - for table, cols in columns_to_add.items(): - c.execute(f"PRAGMA table_info('{table}')") - existing = {row[1] for row in c.fetchall()} - for col_name, col_def in cols: - if col_name not in existing: - try: - c.execute(f"ALTER TABLE {table} ADD COLUMN {col_name} {col_def}") - logger.info(f"Coluna '{col_name}' adicionada em '{table}'") - except Exception as e: - logger.warning(f"Erro ao adicionar coluna {col_name}: {e}") - - conn.commit() - - except Exception as e: - logger.error(f"Erro na migração: {e}") - - # ================================================================ - # USUÁRIOS PRIVILEGIADOS - # ================================================================ - def adicionar_usuario_privilegiado( - self, - numero: str, - nome: str, - apelido: str, - modo_fala: str = "tecnico_formal" - ) -> Tuple[bool, str]: - """ - Adiciona um usuário privilegiado ao sistema. - - Args: - numero: Número de telefone do usuário - nome: Nome completo - apelido: Apelido - modo_fala: Modo de fala inicial - - Returns: - Tuple[bool, str]: (sucesso, código de verificação) - """ - try: - # Gera código de verificação - codigo = str(random.randint(100000, 999999)) - - self._execute_with_retry( - """INSERT OR REPLACE INTO usuarios_privilegiados - (numero, nome, apelido, modo_fala, codigo_verificacao) - VALUES (?, ?, ?, ?, ?)""", - (numero, nome, apelido, modo_fala, codigo), - commit=True - ) - - logger.info(f"Usuário privilegiado adicionado: {numero} ({nome})") - return True, codigo - - except Exception as e: - logger.error(f"Erro ao adicionar usuário privilegiado: {e}") - return False, str(e) - - def eh_privilegiado(self, numero: str) -> bool: - """ - Verifica se um número é de usuário privilegiado. - - Args: - numero: Número de telefone a verificar - - Returns: - bool: True se for privilegiado - """ - try: - rows = self._execute_with_retry( - "SELECT ativo FROM usuarios_privilegiados WHERE numero = ? AND ativo = 1", - (numero,) - ) - # Verificação segura para evitar "List[Row] | None cannot be assigned to len()" - return rows is not None and len(rows) > 0 - except Exception as e: - logger.error(f"Erro ao verificar privilégios: {e}") - return False - - def verificar_privilegios_usuario(self, numero: str) -> Dict[str, Any]: - """ - Verifica privilégios detalhados do usuário no database com suporte a temporários. - - Args: - numero: Número do usuário - - Returns: - Dict: Dicionário com flags de privilégio - """ - try: - rows = self._execute_with_retry( - "SELECT ativo, privilegio_temporario_ativo, expira_em FROM usuarios_privilegiados WHERE numero = ?", - (numero,) - ) - if rows: - row = rows[0] - return { - "privilegiado": bool(row[0]), - "privilegio_temporario_ativo": bool(row[1]), - "expira_em": row[2] - } - return { - "privilegiado": False, - "privilegio_temporario_ativo": False, - "expira_em": None - } - except Exception as e: - logger.error(f"Erro em verificar_privilegios_usuario: {e}") - return {"privilegiado": False, "privilegio_temporario_ativo": False} - - def verificar_codigo(self, numero: str, codigo: str) -> bool: - """ - Verifica o código de um usuário privilegiado. - - Args: - numero: Número de telefone - codigo: Código de verificação - - Returns: - bool: True se o código for válido - """ - try: - rows = self._execute_with_retry( - "SELECT codigo_verificacao FROM usuarios_privilegiados WHERE numero = ?", - (numero,) - ) - if rows and rows[0][0] == codigo: - # Gera novo código para próxima verificação - novo_codigo = str(random.randint(100000, 999999)) - self._execute_with_retry( - "UPDATE usuarios_privilegiados SET codigo_verificacao = ? WHERE numero = ?", - (novo_codigo, numero), - commit=True - ) - return True - return False - except Exception as e: - logger.error(f"Erro ao verificar código: {e}") - return False - - def obter_modo_fala_privilegiado(self, numero: str) -> Optional[str]: - """Obtém o modo de fala de um usuário privilegiado.""" - try: - rows = self._execute_with_retry( - "SELECT modo_fala FROM usuarios_privilegiados WHERE numero = ?", - (numero,) - ) - return rows[0][0] if rows else None - except Exception as e: - logger.error(f"Erro ao obter modo de fala: {e}") - return None - - # ================================================================ - # MENSAGENS - # ================================================================ - def salvar_mensagem( - self, - usuario: str, - mensagem: str, - resposta: str, - numero: Optional[str] = None, - is_reply: bool = False, - mensagem_original: Optional[str] = None, - humor: str = "neutro", - modo_resposta: str = "normal", - nivel_transicao: int = 1, - usuario_privilegiado: bool = False, - modelo_usado: str = "desconhecido", - **kwargs - ) -> bool: - """ - Salva uma mensagem no banco de dados. - """ - try: - cols = ['usuario', 'mensagem', 'resposta', 'humor', 'modo_resposta', - 'nivel_transicao', 'usuario_privilegiado', 'is_reply', 'modelo_usado'] - vals: List[Any] = [usuario, mensagem, resposta, humor, modo_resposta, - nivel_transicao, usuario_privilegiado, is_reply, modelo_usado] - - # ✅ message_id via kwargs ou parâmetro opcional (compatibilidade) - message_id = kwargs.get('message_id') - if message_id: - cols.append('message_id') - vals.append(message_id) - - # ✅ conversation_id (ESSENCIAL para resumos precisos e isolamento) - conversation_id = kwargs.get('conversation_id') - if conversation_id: - cols.append('conversation_id') - vals.append(conversation_id) - - if numero: - cols.append('numero') - vals.append(numero) - - if mensagem_original: - cols.append('mensagem_original') - vals.append(mensagem_original) - - placeholders = ', '.join(['?' for _ in cols]) - query = f"INSERT INTO mensagens ({', '.join(cols)}) VALUES ({placeholders})" - - self._execute_with_retry(query, tuple(vals), commit=True) - return True - - except Exception as e: - logger.warning(f"Erro salvar_mensagem: {e}") - return False - - def recuperar_mensagens( - self, - usuario: str, - limite: int = 5 - ) -> List[Tuple[str, str]]: - """Recupera mensagens de um usuário.""" - try: - result = self._execute_with_retry( - """SELECT mensagem, resposta FROM mensagens - WHERE usuario=? OR numero=? - ORDER BY id DESC LIMIT ?""", - (usuario, usuario, limite) - ) - if not result: - return [] - # Converte sqlite3.Row para tuplas - return [(row[0], row[1]) for row in result] - except Exception as e: - logger.error(f"Erro ao recuperar mensagens: {e}") - return [] - - def recuperar_mensagens_por_contexto(self, context_id: str, limite: int = 50) -> List[Dict[str, Any]]: - """Recupera mensagens de um contexto específico (grupo ou PV).""" - try: - rows = self._execute_with_retry( - "SELECT usuario, mensagem, resposta, created_at FROM mensagens WHERE conversation_id = ? ORDER BY id DESC LIMIT ?", - (context_id, limite) - ) - if not rows: - return [] - return [dict(row) for row in rows] - except Exception as e: - logger.error(f"Erro ao recuperar mensagens por contexto: {e}") - return [] - - def recuperar_humor(self, numero_usuario: str) -> str: - """ - Recupera o humor atual de um usuário. - - Args: - numero_usuario: Número do usuário - - Returns: - str: Humor detectado ('neutro', 'feliz', 'triste', 'irritado', 'entediado') - """ - try: - rows = self._execute_with_retry( - """SELECT humor FROM tom_usuario - WHERE numero_usuario=? - ORDER BY created_at DESC LIMIT 1""", - (numero_usuario,) - ) - return rows[0][0] if rows else "neutro" - except Exception as e: - logger.error(f"Erro ao recuperar humor: {e}") - return "neutro" - - # ================================================================ - # CONTEXTO - # ================================================================ - def salvar_contexto( - self, - user_key: str, - historico: Optional[str] = None, - emocao_atual: str = "neutra", - humor_atual: str = "neutro", - modo_resposta: str = "normal", - nivel_transicao: int = 1, - usuario_privilegiado: bool = False, - termos: Optional[str] = None, - girias: Optional[str] = None, - tom: Optional[str] = None - ) -> bool: - """ - Salva o contexto de um usuário. - - Args: - user_key: Chave do usuário (número ou nome) - historico: Histórico de conversas - emocao_atual: Emoção atual - humor_atual: Humor atual - modo_resposta: Modo de resposta - nivel_transicao: Nível de transição - usuario_privilegiado: Se é usuário privilegiado - termos: Termos aprendidos - girias: Gírias aprendidas - tom: Tom de fala - - Returns: - bool: Sucesso da operação - """ - try: - self._execute_with_retry( - """INSERT OR REPLACE INTO contexto - (user_key, historico, emocao_atual, humor_atual, modo_resposta, - nivel_transicao, usuario_privilegiado, termos, girias, tom, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)""", - (user_key, historico or "[]", emocao_atual, humor_atual, modo_resposta, - nivel_transicao, 1 if usuario_privilegiado else 0, - termos or "{}", girias or "{}", tom), - commit=True - ) - return True - except Exception as e: - logger.error(f"Erro ao salvar contexto: {e}") - return False - - def recuperar_contexto(self, user_key: str) -> Optional[Dict[str, Any]]: - """Recupera o contexto de um usuário.""" - try: - rows = self._execute_with_retry( - "SELECT * FROM contexto WHERE user_key = ?", - (user_key,) - ) - if rows: - row = rows[0] - return dict(row) - return None - except Exception as e: - logger.error(f"Erro ao recuperar contexto: {e}") - return None - - # ================================================================ - # TOM E HUMOR - # ================================================================ - def registrar_tom_usuario( - self, - numero_usuario: str, - tom_detectado: str, - intensidade: float = 0.5, - contexto: Optional[str] = None, - humor: str = "neutro" - ) -> bool: - """ - Registra o tom detectado de um usuário. - - Args: - numero_usuario: Número do usuário - tom_detectado: Tom detectado - intensidade: Intensidade do tom - contexto: Contexto da detecção - humor: Humor detectado - - Returns: - bool: Sucesso da operação - """ - try: - self._execute_with_retry( - """INSERT INTO tom_usuario - (numero_usuario, tom_detectado, intensidade, contexto, humor) - VALUES (?, ?, ?, ?, ?)""", - (numero_usuario, tom_detectado, intensidade, contexto, humor), - commit=True - ) - return True - except Exception as e: - logger.error(f"Erro ao registrar tom: {e}") - return False - - def obter_tom_predominante(self, numero_usuario: str) -> Optional[str]: - """Obtém o tom predominante de um usuário.""" - try: - rows = self._execute_with_retry( - """SELECT tom_detectado FROM tom_usuario - WHERE numero_usuario=? - ORDER BY created_at DESC LIMIT 1""", - (numero_usuario,) - ) - return rows[0][0] if rows else None - except Exception as e: - logger.error(f"Erro ao obter tom predominante: {e}") - return None - - # ================================================================ - # APRENDIZADOS E GÍRIAS - # ================================================================ - def salvar_aprendizado_detalhado( - self, - numero_usuario: str, - chave: str, - valor: str - ) -> bool: - """Salva um aprendizado detalhado, atualizando se já existir.""" - try: - existing = self._execute_with_retry( - "SELECT id FROM aprendizados WHERE numero_usuario=? AND chave=?", - (numero_usuario, chave) - ) - - if existing: - self._execute_with_retry( - "UPDATE aprendizados SET valor=?, created_at=CURRENT_TIMESTAMP WHERE id=?", - (valor, existing[0][0]), - commit=True - ) - else: - self._execute_with_retry( - "INSERT INTO aprendizados (numero_usuario, chave, valor) VALUES (?, ?, ?)", - (numero_usuario, chave, valor), - commit=True - ) - return True - except Exception as e: - logger.error(f"Erro ao salvar aprendizado: {e}") - return False - - def recuperar_aprendizado_detalhado( - self, - numero_usuario: str, - chave: Optional[str] = None - ) -> Union[Dict, str, None]: - """Recupera aprendizados detalhados.""" - try: - if chave: - rows = self._execute_with_retry( - "SELECT valor FROM aprendizados WHERE numero_usuario=? AND chave=?", - (numero_usuario, chave) - ) - return rows[0][0] if rows else None - else: - rows = self._execute_with_retry( - "SELECT chave, valor FROM aprendizados WHERE numero_usuario=?", - (numero_usuario,) - ) - return {r[0]: r[1] for r in rows} if rows else {} - except Exception as e: - logger.error(f"Erro ao recuperar aprendizado: {e}") - return None - - def salvar_giria_aprendida( - self, - numero_usuario: str, - giria: str, - significado: str, - contexto: Optional[str] = None - ) -> bool: - """Salva uma gíria aprendida.""" - try: - existing = self._execute_with_retry( - "SELECT id, frequencia FROM girias_aprendidas WHERE numero_usuario=? AND giria=?", - (numero_usuario, giria) - ) - - if existing: - self._execute_with_retry( - """UPDATE girias_aprendidas SET frequencia=frequencia+1, - updated_at=CURRENT_TIMESTAMP WHERE id=?""", - (existing[0][0],), - commit=True - ) - else: - self._execute_with_retry( - """INSERT INTO girias_aprendidas - (numero_usuario, giria, significado, contexto) VALUES (?, ?, ?, ?)""", - (numero_usuario, giria, significado, contexto), - commit=True - ) - return True - - except Exception as e: - logger.error(f"Erro ao salvar gíria: {e}") - return False - - def recuperar_girias_usuario(self, numero_usuario: str) -> List[Dict[str, Any]]: - """Recupera gírias de um usuário.""" - try: - rows = self._execute_with_retry( - "SELECT giria, significado, frequencia FROM girias_aprendidas WHERE numero_usuario=?", - (numero_usuario,) - ) - return [{"giria": r[0], "significado": r[1], "frequencia": r[2]} for r in rows] if rows else [] - except Exception as e: - logger.error(f"Erro ao recuperar gírias: {e}") - return [] - - # ================================================================ - # EMBEDDINGS - # ================================================================ - def salvar_embedding( - self, - numero_usuario: str, - source_type: str, - texto: str, - embedding: Any - ) -> bool: - """Salva um embedding no banco.""" - try: - if hasattr(embedding, "tobytes"): - embedding = embedding.tobytes() - - self._execute_with_retry( - """INSERT INTO embeddings - (numero_usuario, source_type, texto, embedding) VALUES (?, ?, ?, ?)""", - (numero_usuario, source_type, texto, embedding), - commit=True - ) - return True - except Exception as e: - logger.error(f"Erro ao salvar embedding: {e}") - return False - - def recuperar_embeddings(self, numero_usuario: str) -> List[Dict[str, Any]]: - """Recupera embeddings de um usuário.""" - try: - rows = self._execute_with_retry( - "SELECT source_type, texto, embedding FROM embeddings WHERE numero_usuario=?", - (numero_usuario,) - ) - result = [] - # Verificação segura para evitar "Object of type None cannot be used as iterable" - if rows: - for r in rows: - embedding_data = r[2] - if isinstance(embedding_data, bytes): - # Mantém como bytes para uso com numpy - pass - result.append({ - "source_type": r[0], - "texto": r[1], - "embedding": embedding_data - }) - return result - except Exception as e: - logger.error(f"Erro ao recuperar embeddings: {e}") - return [] - - # ================================================================ - # PERSONA DO USUÁRIO (LTM) - # ================================================================ - def atualizar_persona(self, numero_usuario: str, campos: Dict[str, str]) -> bool: - """ - Atualiza campos específicos da persona do usuário. - - Args: - numero_usuario: Número do usuário - campos: Dicionário com chaves ('personalidade', 'vicios_linguagem', 'gostos', 'desgostos', 'emocional') - """ - try: - # Verifica se já existe - existente = self.recuperar_persona(numero_usuario) - - if existente: - # Update - set_clauses = [] - values = [] - for k, v in campos.items(): - if k in ['personalidade', 'vicios_linguagem', 'gostos', 'desgostos', 'emocional']: - set_clauses.append(f"{k} = ?") - values.append(v) - - if not set_clauses: - return False - - set_clauses.append("updated_at = CURRENT_TIMESTAMP") - values.append(numero_usuario) - - query = f"UPDATE persona_usuario SET {', '.join(set_clauses)} WHERE numero_usuario = ?" - self._execute_with_retry(query, tuple(values), commit=True) - else: - # Insert - keys = ['numero_usuario'] - values = [numero_usuario] - for k, v in campos.items(): - if k in ['personalidade', 'vicios_linguagem', 'gostos', 'desgostos', 'emocional']: - keys.append(k) - values.append(v) - - placeholders = ', '.join(['?' for _ in keys]) - query = f"INSERT INTO persona_usuario ({', '.join(keys)}) VALUES ({placeholders})" - self._execute_with_retry(query, tuple(values), commit=True) - - return True - except Exception as e: - logger.error(f"Erro ao atualizar persona para {numero_usuario}: {e}") - return False - - def recuperar_persona(self, numero_usuario: str) -> Optional[Dict[str, Any]]: - """Recupera a persona completa de um usuário.""" - try: - rows = self._execute_with_retry( - "SELECT * FROM persona_usuario WHERE numero_usuario = ?", - (numero_usuario,) - ) - if rows: - row = rows[0] - return dict(row) - return None - except Exception as e: - logger.error(f"Erro ao recuperar persona para {numero_usuario}: {e}") - return None - - # ================================================================ - # CONTEXT ISOLATION — métodos que context_isolation.py precisa - # ================================================================ - - def _init_context_isolation_tables(self): - """Cria tabelas de contexto isolado se não existirem.""" - try: - with self._get_connection() as conn: - c = conn.cursor() - c.executescript(""" - CREATE TABLE IF NOT EXISTS contextos_isolados ( - context_id TEXT PRIMARY KEY, - numero_usuario TEXT NOT NULL, - grupo_id TEXT, - tipo_conversa TEXT DEFAULT 'pv', - estado_emocional TEXT DEFAULT 'neutral', - nivel_intimidade INTEGER DEFAULT 1, - short_memory TEXT DEFAULT '[]', - metadata TEXT DEFAULT '{}', - created_at REAL DEFAULT (strftime('%s', 'now')), - last_interaction REAL DEFAULT (strftime('%s', 'now')) - ); - CREATE INDEX IF NOT EXISTS idx_contextos_user ON contextos_isolados(numero_usuario); - CREATE INDEX IF NOT EXISTS idx_contextos_tipo ON contextos_isolados(tipo_conversa); - """) - conn.commit() - logger.info("Tabela contextos_isolados garantida") - except Exception as e: - logger.warning(f"Erro ao criar tabela contextos_isolados: {e}") - - def salvar_contexto_isolado(self, context_data: Dict[str, Any]) -> bool: - """Salva ou atualiza um contexto isolado (upsert).""" - try: - # Garante que a tabela existe - self._init_context_isolation_tables() - - with self._get_connection() as conn: - c = conn.cursor() - c.execute(""" - INSERT INTO contextos_isolados - (context_id, numero_usuario, grupo_id, tipo_conversa, estado_emocional, - nivel_intimidade, short_memory, metadata, created_at, last_interaction) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(context_id) DO UPDATE SET - estado_emocional = excluded.estado_emocional, - nivel_intimidade = excluded.nivel_intimidade, - short_memory = excluded.short_memory, - metadata = excluded.metadata, - last_interaction = excluded.last_interaction - """, ( - context_data.get('context_id'), - context_data.get('numero_usuario'), - context_data.get('grupo_id'), - context_data.get('tipo_conversa', 'pv'), - context_data.get('estado_emocional', 'neutral'), - context_data.get('nivel_intimidade', 1), - json.dumps(context_data.get('short_memory', [])), - json.dumps(context_data.get('metadata', {})), - context_data.get('created_at', time.time()), - context_data.get('last_interaction', time.time()), - )) - conn.commit() - return True - except Exception as e: - logger.warning(f"Erro ao salvar contexto isolado: {e}") - return False - - def recuperar_contexto_isolado(self, context_id: str) -> Optional[Dict[str, Any]]: - """Recupera um contexto isolado pelo context_id.""" - try: - # Garante que a tabela existe (Prevenir "no such table") - self._init_context_isolation_tables() - - rows = self._execute_with_retry( - "SELECT * FROM contextos_isolados WHERE context_id = ?", - (context_id,) - ) - if rows: - row = dict(rows[0]) - # Desserializar campos JSON - try: row['short_memory'] = json.loads(row.get('short_memory', '[]')) - except: row['short_memory'] = [] - try: row['metadata'] = json.loads(row.get('metadata', '{}')) - except: row['metadata'] = {} - return row - return None - except Exception as e: - logger.warning(f"Erro ao recuperar contexto isolado: {e}") - return None - - def deletar_contexto_isolado(self, context_id: str) -> bool: - """Remove um contexto isolado (safe - verifica tabela primeiro).""" - try: - # Garante que a tabela existe - self._init_context_isolation_tables() - - # Verifica se tabela existe - conn = self._get_connection() - c = conn.cursor() - c.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='contextos_isolados'") - if not c.fetchone(): - logger.warning("Tabela contextos_isolados não existe. Pulando delete.") - return True - conn.close() - - self._execute_with_retry( - "DELETE FROM contextos_isolados WHERE context_id = ?", - (context_id,), commit=True - ) - return True - except Exception as e: - logger.warning(f"Erro ao deletar contexto isolado: {e}") - return False - - def listar_contextos_usuario(self, numero_usuario: str) -> List[Dict[str, Any]]: - """Lista todos os contextos de um usuário.""" - results = [] - try: - rows = self._execute_with_retry( - "SELECT * FROM contextos_isolados WHERE numero_usuario = ?", - (numero_usuario,) - ) - if rows: - for row in rows: - d = dict(row) - try: d['short_memory'] = json.loads(d.get('short_memory', '[]')) - except: d['short_memory'] = [] - try: d['metadata'] = json.loads(d.get('metadata', '{}')) - except: d['metadata'] = {} - results.append(d) - except Exception as e: - logger.warning(f"Erro ao listar contextos do usuário: {e}") - return results - - # ================================================================ - # HISTÓRICO POR CONVERSATION ID - # ================================================================ - - def recuperar_historico(self, usuario: str = "", numero: str = "", - conversation_id: str = "", limite: int = 20) -> List[Dict[str, Any]]: - """ - Recupera histórico de mensagens. - Suporta conversation_id para isolamento de contexto. - """ - # Tenta nova coluna conversation_id primeiro - try: - if conversation_id: - try: - rows = self._execute_with_retry( - "SELECT usuario, mensagem, resposta, humor, modelo_usado, created_at FROM mensagens " - "WHERE conversation_id = ? ORDER BY id DESC LIMIT ?", - (conversation_id, limite) - ) - except Exception: - # Fallback para banco antigo sem conversation_id - rows = [] - elif numero: - rows = self._execute_with_retry( - "SELECT usuario, mensagem, resposta, humor, modelo_usado, created_at FROM mensagens " - "WHERE numero = ? ORDER BY id DESC LIMIT ?", - (numero, limite) - ) - elif usuario: - rows = self._execute_with_retry( - "SELECT usuario, mensagem, resposta, humor, modelo_usado, created_at FROM mensagens " - "WHERE usuario = ? ORDER BY id DESC LIMIT ?", - (usuario, limite) - ) - else: - return [] - - return [dict(r) for r in (rows or [])][::-1] # Reverte para ordem cronológica - except Exception: - return [] - - def recuperar_resposta_por_id(self, message_id: str) -> Optional[Dict[str, Any]]: - """Recupera uma resposta já gerada para um message_id (idempotência).""" - if not message_id: return None - try: - # Garante que a coluna existe - with self._get_connection() as conn: - c = conn.cursor() - c.execute("PRAGMA table_info(mensagens)") - if 'message_id' not in [row[1] for row in c.fetchall()]: - return None - - rows = self._execute_with_retry( - "SELECT resposta, modelo_usado, created_at FROM mensagens WHERE message_id = ? LIMIT 1", - (message_id,) - ) - if rows: - return dict(rows[0]) - return None - except Exception as e: - logger.warning(f"Erro ao recuperar resposta por id: {e}") - return None - - def registrar_mensagem_conversation_id(self, usuario: str, mensagem: str, resposta: str, - conversation_id: str = "", numero: str = "", - is_reply: bool = False, mensagem_original: str = "", - humor: str = "neutro", modo_resposta: str = "normal", - modelo_usado: str = "desconhecido", **kwargs) -> bool: - """Registra mensagem com conversation_id para isolamento.""" - try: - # Verifica se a coluna conversation_id existe - with self._get_connection() as conn: - c = conn.cursor() - c.execute("PRAGMA table_info(mensagens)") - cols = [row[1] for row in c.fetchall()] - if 'conversation_id' not in cols: - c.execute("ALTER TABLE mensagens ADD COLUMN conversation_id TEXT") - conn.commit() - - self._execute_with_retry( - """INSERT INTO mensagens - (usuario, mensagem, resposta, numero, is_reply, mensagem_original, humor, modo_resposta, modelo_usado, conversation_id, message_id) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", - (usuario, mensagem, resposta, numero, is_reply, mensagem_original, humor, modo_resposta, modelo_usado, conversation_id, kwargs.get('message_id')), - commit=True - ) - return True - except Exception as e: - logger.warning(f"Erro ao registrar mensagem com conversation_id: {e}") - return False - - def limpar_contexto_usuario(self, usuario: str = "", numero: str = "") -> bool: - """Limpa todas as mensagens de um usuário (reset).""" - try: - if numero: - self._execute_with_retry("DELETE FROM mensagens WHERE numero = ?", (numero,), commit=True) - elif usuario: - self._execute_with_retry("DELETE FROM mensagens WHERE usuario = ?", (usuario,), commit=True) - # Limpa também contextos isolados - self._execute_with_retry("DELETE FROM contextos_isolados WHERE numero_usuario = ?", (numero or usuario,), commit=True) - return True - except Exception as e: - logger.warning(f"Erro ao limpar contexto: {e}") - return False - - # ================================================================ - # PERSONA DO USUÁRIO (LTM) - # ================================================================ - def recuperar_persona(self, numero_usuario: str) -> Dict[str, Any]: - """Recupera a persona (personalidade, gostos, etc.) do usuário.""" - try: - rows = self._execute_with_retry( - "SELECT personalidade, vicios_linguagem, gostos, desgostos, emocional FROM persona_usuario WHERE numero_usuario = ?", - (numero_usuario,) - ) - if rows: - return dict(rows[0]) - return {} - except Exception as e: - logger.warning(f"Erro ao recuperar persona de {numero_usuario}: {e}") - return {} - - def atualizar_persona(self, numero_usuario: str, campos: Dict[str, str]) -> bool: - """Atualiza ou insere novos traços de persona para o usuário.""" - if not campos: - return False - try: - # Garante que as colunas existem (migração rápida se necessário) - with self._get_connection() as conn: - c = conn.cursor() - c.execute("INSERT OR IGNORE INTO persona_usuario (numero_usuario) VALUES (?)", (numero_usuario,)) - - query_parts = [] - params = [] - for campo, valor in campos.items(): - query_parts.append(f"{campo} = ?") - params.append(valor) - - query_parts.append("updated_at = CURRENT_TIMESTAMP") - params.append(numero_usuario) - - query = f"UPDATE persona_usuario SET {', '.join(query_parts)} WHERE numero_usuario = ?" - c.execute(query, tuple(params)) - conn.commit() - return True - except Exception as e: - logger.warning(f"Erro ao atualizar persona de {numero_usuario}: {e}") - return False - - def fazer_checkpoint_hf_sync(self) -> bool: - """ - Executa um backup seguro da base de dados otimizado para o Hugging Face Buckets (hf sync). - Garante integridade total forçando a gravação do WAL e criando um snapshot isolado. - - Isso previne qualquer corrupção de dados por conflito entre a nuvem e as - threads concorrentes da Akira. - """ - import shutil - from pathlib import Path - try: - # 1. Força a gravação de todos os dados do WAL para o arquivo DB principal (Truncate) - self._execute_with_retry("PRAGMA wal_checkpoint(TRUNCATE)") - - # 2. Configura caminhos - db_path_obj = Path(self.db_path) - # Diretório alvo para o hf sync - cloud_sync_dir = db_path_obj.parent / "cloud_sync" - cloud_sync_dir.mkdir(parents=True, exist_ok=True) - - backup_path = cloud_sync_dir / db_path_obj.name - - # 3. Usa a API de Backup Nativa e atômica do próprio sqlite - with self._get_connection() as source: - backup_conn = sqlite3.connect( - str(backup_path), - timeout=30.0, - check_same_thread=False - ) - with backup_conn: - source.backup(backup_conn, pages=-1) - backup_conn.close() - - logger.info(f"✅ Checkpoint Seguro para HF Buckets concluído em: {backup_path}") - return True - except Exception as e: - logger.error(f"❌ Erro ao criar Checkpoint HF Sync: {e}") - return False - - # ================================================================ - # USO DE FERRAMENTAS - # ================================================================ - def incrementar_uso_ferramenta(self, numero: str) -> int: - """Incrementa o contador de uso de ferramentas para o mês atual.""" - try: - month_year = datetime.now().strftime("%m/%Y") - self._execute_with_retry( - """INSERT INTO tool_usage (numero_usuario, month_year, usage_count) - VALUES (?, ?, 1) - ON CONFLICT(numero_usuario, month_year) - DO UPDATE SET usage_count = usage_count + 1""", - (numero, month_year), - commit=True - ) - return self.obter_uso_mensal_ferramenta(numero) - except Exception as e: - logger.error(f"Erro ao incrementar uso de ferramenta: {e}") - return 0 - - def obter_uso_mensal_ferramenta(self, numero: str) -> int: - """Retorna o total de ferramentas usadas pelo usuário no mês atual.""" - try: - month_year = datetime.now().strftime("%m/%Y") - rows = self._execute_with_retry( - "SELECT usage_count FROM tool_usage WHERE numero_usuario = ? AND month_year = ?", - (numero, month_year) - ) - return rows[0][0] if rows else 0 - except Exception as e: - logger.error(f"Erro ao obter uso mensal: {e}") - return 0 +""" +================================================================================ +AKIRA V21 ULTIMATE - DATABASE MODULE +================================================================================ +Banco de dados SQLite extremamente robusto, moderno e completo. +Gerencia: mensagens, embeddings, gírias, tom, aprendizados, API logs, training sessions. + +Features: +- SQLite com WAL mode para performance máxima +- Retry logic com exponential backoff +- Full-text search com FTS5 +- Vector storage para embeddings (SentenceTransformers) +- Transactions.atomic() +- Backup/restore automático +- Health checks e métricas detalhadas +- Índices otimizados +- Migration system completo +- Logging detalhado +- Singleton pattern para conexões +- Suporte a numpy arrays para embeddings +- API performance tracking +- Training sessions tracking +================================================================================ +""" + +import sqlite3 +import time +import os +import json +import hashlib +import random +from typing import Optional, List, Dict, Any, Tuple, Union +from datetime import datetime +from loguru import logger + + +class Database: + """ + Classe de banco de dados robusta para Akira V21 Ultimate. + Implementada como Singleton para evitar conflitos de conexão entre workers. + """ + + _instances: Dict[str, 'Database'] = {} + _initialized: Dict[str, bool] = {} + + # Códigos de verificação para usuários privilegiados + CODIGOS_VERIFICACAO: Dict[str, str] = {} + + def __new__(cls, db_path: str = "/akira/data/akira.db"): + if db_path not in cls._instances: + cls._instances[db_path] = super(Database, cls).__new__(cls) + cls._initialized[db_path] = False + return cls._instances[db_path] + + def __init__(self, db_path: str = "/akira/data/akira.db"): + """Inicializa a conexão se ainda não foi inicializada para este path.""" + if self._initialized.get(db_path, False): + return + + self.db_path = db_path + self.max_retries = 5 + self.retry_delay = 0.1 + + # Garante que o diretório /akira/data existe (Docker) + db_dir = os.path.dirname(db_path) + if db_dir and not os.path.exists(db_dir): + os.makedirs(db_dir, exist_ok=True) + + # Inicialização pesada acontece apenas uma vez + self._init_db() + self._init_context_isolation_tables() + self._ensure_all_columns_and_indexes() + + Database._initialized[db_path] = True + logger.info(f"Database inicializado e otimizado: {self.db_path}") + + # ================================================================ + # CONEXÃO + RETRY + # ================================================================ + def _get_connection(self) -> sqlite3.Connection: + """Obtém conexão com retry automático.""" + for attempt in range(self.max_retries): + try: + conn = sqlite3.connect( + self.db_path, + timeout=30.0, + check_same_thread=False + ) + # Otimizações SQLite para performance + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA synchronous=NORMAL") + conn.execute("PRAGMA cache_size=1000") + conn.execute("PRAGMA temp_store=MEMORY") + conn.execute("PRAGMA busy_timeout=30000") + conn.execute("PRAGMA foreign_keys=ON") + conn.row_factory = sqlite3.Row + return conn + except sqlite3.OperationalError as e: + if "locked" in str(e) and attempt < self.max_retries - 1: + time.sleep(self.retry_delay * (2 ** attempt)) + continue + logger.error(f"Erro de conexão DB: {e}") + raise + raise sqlite3.OperationalError("Falha ao conectar ao banco após várias tentativas") + + def _execute_with_retry( + self, + query: str, + params: Optional[tuple] = None, + commit: bool = False + ) -> Optional[List[sqlite3.Row]]: + """Executa query com retry automático.""" + for attempt in range(self.max_retries): + try: + with self._get_connection() as conn: + cur = conn.cursor() + cur.execute(query, params or ()) + + if query.strip().upper().startswith("SELECT"): + result = cur.fetchall() + return result + + if commit: + conn.commit() + return None + + except sqlite3.OperationalError as e: + if "locked" in str(e) and attempt < self.max_retries - 1: + time.sleep(self.retry_delay * (2 ** attempt)) + continue + logger.error(f"Erro SQL: {e}") + raise + raise sqlite3.OperationalError("Query falhou após retries") + + # ================================================================ + # SCHEMA + MIGRAÇÃO + # ================================================================ + def _init_db(self): + """Inicializa todas as tabelas do banco.""" + try: + with self._get_connection() as conn: + c = conn.cursor() + + # Tabela de mensagens + c.executescript(""" + CREATE TABLE IF NOT EXISTS mensagens ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + usuario TEXT, + mensagem TEXT, + resposta TEXT, + numero TEXT, + is_reply BOOLEAN DEFAULT 0, + mensagem_original TEXT, + humor TEXT DEFAULT 'neutro', + modo_resposta TEXT DEFAULT 'normal', + nivel_transicao INTEGER DEFAULT 1, + usuario_privilegiado BOOLEAN DEFAULT 0, + modelo_usado TEXT DEFAULT 'desconhecido', + conversation_id TEXT DEFAULT '', + message_id TEXT UNIQUE, -- ✅ IDEMPOTENCY KEY + created_at DATETIME DEFAULT CURRENT_TIMESTAMP + ); + """) + + # Tabela de usuários privilegiados + c.executescript(""" + CREATE TABLE IF NOT EXISTS usuarios_privilegiados ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + numero TEXT UNIQUE, + nome TEXT, + apelido TEXT, + modo_fala TEXT, + codigo_verificacao TEXT, + ativo BOOLEAN DEFAULT 1, + privilegio_temporario_ativo BOOLEAN DEFAULT 0, + expira_em REAL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP + ); + """) + + # Tabela de embeddings + c.executescript(""" + CREATE TABLE IF NOT EXISTS embeddings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + numero_usuario TEXT, + source_type TEXT, + texto TEXT, + embedding BLOB + ); + """) + + # Tabela de aprendizados + c.executescript(""" + CREATE TABLE IF NOT EXISTS aprendizados ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + numero_usuario TEXT, + chave TEXT, + valor TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP + ); + """) + + # Tabela de gírias aprendidas + c.executescript(""" + CREATE TABLE IF NOT EXISTS girias_aprendidas ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + numero_usuario TEXT, + giria TEXT, + significado TEXT, + contexto TEXT, + frequencia INTEGER DEFAULT 1, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP + ); + """) + + # Tabela de tom do usuário + c.executescript(""" + CREATE TABLE IF NOT EXISTS tom_usuario ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + numero_usuario TEXT, + tom_detectado TEXT, + intensidade REAL DEFAULT 0.5, + contexto TEXT, + humor TEXT DEFAULT 'neutro', + created_at DATETIME DEFAULT CURRENT_TIMESTAMP + ); + """) + + # Tabela de contexto + c.executescript(""" + CREATE TABLE IF NOT EXISTS contexto ( + user_key TEXT PRIMARY KEY, + historico TEXT, + emocao_atual TEXT, + humor_atual TEXT DEFAULT 'neutro', + modo_resposta TEXT DEFAULT 'normal', + nivel_transicao INTEGER DEFAULT 1, + usuario_privilegiado BOOLEAN DEFAULT 0, + termos TEXT, + girias TEXT, + tom TEXT, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP + ); + """) + + # Tabela de pronomes por tom + c.executescript(""" + CREATE TABLE IF NOT EXISTS pronomes_por_tom ( + tom TEXT PRIMARY KEY, + pronomes TEXT + ); + """) + + # Tabela de Persona do Usuário (Character.AI style LTM) + c.executescript(""" + CREATE TABLE IF NOT EXISTS persona_usuario ( + numero_usuario TEXT PRIMARY KEY, + personalidade TEXT, + vicios_linguagem TEXT, + gostos TEXT, + desgostos TEXT, + emocional TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP + ); + """) + + # Insere dados padrão de pronomes + c.execute("INSERT OR IGNORE INTO pronomes_por_tom (tom, pronomes) VALUES (?, ?)", + ('neutro', 'tu/você')) + c.execute("INSERT OR IGNORE INTO pronomes_por_tom (tom, pronomes) VALUES (?, ?)", + ('formal', 'o senhor/a senhora')) + c.execute("INSERT OR IGNORE INTO pronomes_por_tom (tom, pronomes) VALUES (?, ?)", + ('informal', 'puto/kota')) + c.execute("INSERT OR IGNORE INTO pronomes_por_tom (tom, pronomes) VALUES (?, ?)", + ('tecnico_formal', 'senhor')) + + # Insere usuários privilegiados padrão + usuarios_default = [ + ('244937035662', 'Isaac Quarenta', 'Isaac', 'tecnico_formal'), + ('244978787009', 'Isaac Quarenta 2', 'Isaac', 'tecnico_formal') + ] + for numero, nome, apelido, modo in usuarios_default: + c.execute(""" + INSERT OR IGNORE INTO usuarios_privilegiados + (numero, nome, apelido, modo_fala) VALUES (?, ?, ?, ?) + """, (numero, nome, apelido, modo)) + + # ===== LSTM MEMORY SYSTEM TABLES ===== + c.executescript(""" + CREATE TABLE IF NOT EXISTS lstm_contexto ( + context_id VARCHAR(255) PRIMARY KEY, -- ✅ Único (Hash SHA256) + numero_usuario VARCHAR(50) NOT NULL, + topic_principal VARCHAR(255), + subtopicas JSON, + conversation_path JSON, + interaction_pattern VARCHAR(50), + emotional_state VARCHAR(50), + unanswered_questions JSON, + assumed_knowledge JSON, + last_key_message TEXT, + context_switches INTEGER DEFAULT 0, + contradictions JSON, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + metadata JSON + ); + """) + + c.executescript(""" + CREATE INDEX IF NOT EXISTS idx_lstm_usuario ON lstm_contexto(numero_usuario); + CREATE INDEX IF NOT EXISTS idx_lstm_created ON lstm_contexto(created_at); + """) + + c.executescript(""" + CREATE TABLE IF NOT EXISTS lstm_message_links ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + context_id VARCHAR(255) NOT NULL, + message_id VARCHAR(255) NOT NULL, + numero_usuario VARCHAR(50) NOT NULL, + speaker_name VARCHAR(255), + parent_message_id VARCHAR(255), + topic_changed BOOLEAN DEFAULT FALSE, + context_switch_type VARCHAR(50), + relevance_score FLOAT DEFAULT 0.0, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(context_id, message_id, numero_usuario), + FOREIGN KEY (context_id) REFERENCES lstm_contexto(context_id) ON DELETE CASCADE + ); + """) + + c.executescript(""" + CREATE INDEX IF NOT EXISTS idx_lstm_msg_context ON lstm_message_links(context_id); + CREATE INDEX IF NOT EXISTS idx_lstm_msg_message ON lstm_message_links(message_id); + """) + + conn.commit() + logger.info(f"Banco de dados inicializado: {self.db_path}") + + except Exception as e: + logger.error(f"Erro ao criar tabelas: {e}") + raise + + def _ensure_all_columns_and_indexes(self): + """Garante que todas as colunas e índices existam.""" + try: + with self._get_connection() as conn: + c = conn.cursor() + + # Adiciona colunas faltantes na tabela mensagens + columns_to_add = { + 'mensagens': [ + ('humor', 'TEXT DEFAULT "neutro"'), + ('modo_resposta', 'TEXT DEFAULT "normal"'), + ('nivel_transicao', 'INTEGER DEFAULT 1'), + ('usuario_privilegiado', 'BOOLEAN DEFAULT 0'), + ('modelo_usado', 'TEXT DEFAULT "desconhecido"'), + ('conversation_id', 'TEXT DEFAULT ""'), + ('nome_usuario', 'TEXT DEFAULT ""') + ], + 'tom_usuario': [ + ('humor', 'TEXT DEFAULT "neutro"') + ], + 'contexto': [ + ('humor_atual', 'TEXT DEFAULT "neutro"'), + ('modo_resposta', 'TEXT DEFAULT "normal"'), + ('nivel_transicao', 'INTEGER DEFAULT 1'), + ('usuario_privilegiado', 'BOOLEAN DEFAULT 0'), + ('updated_at', 'DATETIME DEFAULT CURRENT_TIMESTAMP') + ], + 'usuarios_privilegiados': [ + ('privilegio_temporario_ativo', 'BOOLEAN DEFAULT 0'), + ('expira_em', 'REAL') + ], + 'persona_usuario': [ + ('nome', 'TEXT DEFAULT ""') + ] + } + + for table, cols in columns_to_add.items(): + c.execute(f"PRAGMA table_info('{table}')") + existing = {row[1] for row in c.fetchall()} + for col_name, col_def in cols: + if col_name not in existing: + try: + c.execute(f"ALTER TABLE {table} ADD COLUMN {col_name} {col_def}") + logger.info(f"Coluna '{col_name}' adicionada em '{table}'") + except Exception as e: + logger.warning(f"Erro ao adicionar coluna {col_name}: {e}") + + # Migration para consertar FK mismatch em lstm_message_links + try: + # Verifica se a FK de lstm_message_links está correta + c.execute("PRAGMA foreign_key_list('lstm_message_links')") + fk_list = c.fetchall() + # Se não houver FK ou se for o esquema antigo (que causava mismatch), recriamos + if not fk_list: + logger.warning("Refazendo lstm_message_links para corrigir esquema...") + c.execute("DROP TABLE IF EXISTS lstm_message_links") + # O _init_db() vai recriar na próxima inicialização + except Exception as e: + logger.warning(f"Erro ao verificar migração LSTM: {e}") + + conn.commit() + + except Exception as e: + logger.error(f"Erro na migração: {e}") + + # ================================================================ + # USUÁRIOS PRIVILEGIADOS + # ================================================================ + def adicionar_usuario_privilegiado( + self, + numero: str, + nome: str, + apelido: str, + modo_fala: str = "tecnico_formal" + ) -> Tuple[bool, str]: + """ + Adiciona um usuário privilegiado ao sistema. + + Args: + numero: Número de telefone do usuário + nome: Nome completo + apelido: Apelido + modo_fala: Modo de fala inicial + + Returns: + Tuple[bool, str]: (sucesso, código de verificação) + """ + try: + # Gera código de verificação + codigo = str(random.randint(100000, 999999)) + + self._execute_with_retry( + """INSERT OR REPLACE INTO usuarios_privilegiados + (numero, nome, apelido, modo_fala, codigo_verificacao) + VALUES (?, ?, ?, ?, ?)""", + (numero, nome, apelido, modo_fala, codigo), + commit=True + ) + + logger.info(f"Usuário privilegiado adicionado: {numero} ({nome})") + return True, codigo + + except Exception as e: + logger.error(f"Erro ao adicionar usuário privilegiado: {e}") + return False, str(e) + + def eh_privilegiado(self, numero: str) -> bool: + """ + Verifica se um número é de usuário privilegiado. + + Args: + numero: Número de telefone a verificar + + Returns: + bool: True se for privilegiado + """ + try: + rows = self._execute_with_retry( + "SELECT ativo FROM usuarios_privilegiados WHERE numero = ? AND ativo = 1", + (numero,) + ) + # Verificação segura para evitar "List[Row] | None cannot be assigned to len()" + return rows is not None and len(rows) > 0 + except Exception as e: + logger.error(f"Erro ao verificar privilégios: {e}") + return False + + def verificar_privilegios_usuario(self, numero: str) -> Dict[str, Any]: + """ + Verifica privilégios detalhados do usuário no database com suporte a temporários. + + Args: + numero: Número do usuário + + Returns: + Dict: Dicionário com flags de privilégio + """ + try: + rows = self._execute_with_retry( + "SELECT ativo, privilegio_temporario_ativo, expira_em FROM usuarios_privilegiados WHERE numero = ?", + (numero,) + ) + if rows: + row = rows[0] + return { + "privilegiado": bool(row[0]), + "privilegio_temporario_ativo": bool(row[1]), + "expira_em": row[2] + } + return { + "privilegiado": False, + "privilegio_temporario_ativo": False, + "expira_em": None + } + except Exception as e: + logger.error(f"Erro em verificar_privilegios_usuario: {e}") + return {"privilegiado": False, "privilegio_temporario_ativo": False} + + def verificar_codigo(self, numero: str, codigo: str) -> bool: + """ + Verifica o código de um usuário privilegiado. + + Args: + numero: Número de telefone + codigo: Código de verificação + + Returns: + bool: True se o código for válido + """ + try: + rows = self._execute_with_retry( + "SELECT codigo_verificacao FROM usuarios_privilegiados WHERE numero = ?", + (numero,) + ) + if rows and rows[0][0] == codigo: + # Gera novo código para próxima verificação + novo_codigo = str(random.randint(100000, 999999)) + self._execute_with_retry( + "UPDATE usuarios_privilegiados SET codigo_verificacao = ? WHERE numero = ?", + (novo_codigo, numero), + commit=True + ) + return True + return False + except Exception as e: + logger.error(f"Erro ao verificar código: {e}") + return False + + def obter_modo_fala_privilegiado(self, numero: str) -> Optional[str]: + """Obtém o modo de fala de um usuário privilegiado.""" + try: + rows = self._execute_with_retry( + "SELECT modo_fala FROM usuarios_privilegiados WHERE numero = ?", + (numero,) + ) + return rows[0][0] if rows else None + except Exception as e: + logger.error(f"Erro ao obter modo de fala: {e}") + return None + + # ================================================================ + # MENSAGENS + # ================================================================ + def salvar_mensagem( + self, + usuario: str, + mensagem: str, + resposta: str, + numero: Optional[str] = None, + is_reply: bool = False, + mensagem_original: Optional[str] = None, + humor: str = "neutro", + modo_resposta: str = "normal", + nivel_transicao: int = 1, + usuario_privilegiado: bool = False, + modelo_usado: str = "desconhecido", + **kwargs + ) -> bool: + """ + Salva uma mensagem no banco de dados. + """ + try: + cols = ['usuario', 'mensagem', 'resposta', 'humor', 'modo_resposta', + 'nivel_transicao', 'usuario_privilegiado', 'is_reply', 'modelo_usado'] + vals: List[Any] = [usuario, mensagem, resposta, humor, modo_resposta, + nivel_transicao, usuario_privilegiado, is_reply, modelo_usado] + + # ✅ Novo: message_id via kwargs ou parâmetro opcional (compatibilidade) + message_id = kwargs.get('message_id') + if message_id: + cols.append('message_id') + vals.append(message_id) + + if numero: + cols.append('numero') + vals.append(numero) + + if mensagem_original: + cols.append('mensagem_original') + vals.append(mensagem_original) + + # ✅ Suporte para nome_usuario (evita erro SQL se a coluna existir) + nome_usuario = kwargs.get('nome_usuario') or usuario + if nome_usuario: + cols.append('nome_usuario') + vals.append(nome_usuario) + + placeholders = ', '.join(['?' for _ in cols]) + query = f"INSERT INTO mensagens ({', '.join(cols)}) VALUES ({placeholders})" + + self._execute_with_retry(query, tuple(vals), commit=True) + return True + + except Exception as e: + logger.warning(f"Erro salvar_mensagem: {e}") + return False + + def recuperar_mensagens( + self, + usuario: str, + limite: int = 5 + ) -> List[Tuple[str, str]]: + """Recupera mensagens de um usuário.""" + try: + result = self._execute_with_retry( + """SELECT mensagem, resposta FROM mensagens + WHERE usuario=? OR numero=? + ORDER BY id DESC LIMIT ?""", + (usuario, usuario, limite) + ) + if not result: + return [] + # Converte sqlite3.Row para tuplas + return [(row[0], row[1]) for row in result] + except Exception as e: + logger.error(f"Erro ao recuperar mensagens: {e}") + return [] + + def recuperar_mensagens_por_contexto(self, context_id: str, limite: int = 50) -> List[Dict[str, Any]]: + """Recupera mensagens de um contexto específico (grupo ou PV).""" + try: + rows = self._execute_with_retry( + "SELECT usuario, mensagem, resposta, created_at FROM mensagens WHERE conversation_id = ? ORDER BY id DESC LIMIT ?", + (context_id, limite) + ) + if not rows: + return [] + return [dict(row) for row in rows] + except Exception as e: + logger.error(f"Erro ao recuperar mensagens por contexto: {e}") + return [] + + def recuperar_humor(self, numero_usuario: str) -> str: + """ + Recupera o humor atual de um usuário. + + Args: + numero_usuario: Número do usuário + + Returns: + str: Humor detectado ('neutro', 'feliz', 'triste', 'irritado', 'entediado') + """ + try: + rows = self._execute_with_retry( + """SELECT humor FROM tom_usuario + WHERE numero_usuario=? + ORDER BY created_at DESC LIMIT 1""", + (numero_usuario,) + ) + return rows[0][0] if rows else "neutro" + except Exception as e: + logger.error(f"Erro ao recuperar humor: {e}") + return "neutro" + + # ================================================================ + # CONTEXTO + # ================================================================ + def salvar_contexto( + self, + user_key: str, + historico: Optional[str] = None, + emocao_atual: str = "neutra", + humor_atual: str = "neutro", + modo_resposta: str = "normal", + nivel_transicao: int = 1, + usuario_privilegiado: bool = False, + termos: Optional[str] = None, + girias: Optional[str] = None, + tom: Optional[str] = None + ) -> bool: + """ + Salva o contexto de um usuário. + + Args: + user_key: Chave do usuário (número ou nome) + historico: Histórico de conversas + emocao_atual: Emoção atual + humor_atual: Humor atual + modo_resposta: Modo de resposta + nivel_transicao: Nível de transição + usuario_privilegiado: Se é usuário privilegiado + termos: Termos aprendidos + girias: Gírias aprendidas + tom: Tom de fala + + Returns: + bool: Sucesso da operação + """ + try: + self._execute_with_retry( + """INSERT OR REPLACE INTO contexto + (user_key, historico, emocao_atual, humor_atual, modo_resposta, + nivel_transicao, usuario_privilegiado, termos, girias, tom, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)""", + (user_key, historico or "[]", emocao_atual, humor_atual, modo_resposta, + nivel_transicao, 1 if usuario_privilegiado else 0, + termos or "{}", girias or "{}", tom), + commit=True + ) + return True + except Exception as e: + logger.error(f"Erro ao salvar contexto: {e}") + return False + + def recuperar_contexto(self, user_key: str) -> Optional[Dict[str, Any]]: + """Recupera o contexto de um usuário.""" + try: + rows = self._execute_with_retry( + "SELECT * FROM contexto WHERE user_key = ?", + (user_key,) + ) + if rows: + row = rows[0] + return dict(row) + return None + except Exception as e: + logger.error(f"Erro ao recuperar contexto: {e}") + return None + + # ================================================================ + # TOM E HUMOR + # ================================================================ + def registrar_tom_usuario( + self, + numero_usuario: str, + tom_detectado: str, + intensidade: float = 0.5, + contexto: Optional[str] = None, + humor: str = "neutro" + ) -> bool: + """ + Registra o tom detectado de um usuário. + + Args: + numero_usuario: Número do usuário + tom_detectado: Tom detectado + intensidade: Intensidade do tom + contexto: Contexto da detecção + humor: Humor detectado + + Returns: + bool: Sucesso da operação + """ + try: + self._execute_with_retry( + """INSERT INTO tom_usuario + (numero_usuario, tom_detectado, intensidade, contexto, humor) + VALUES (?, ?, ?, ?, ?)""", + (numero_usuario, tom_detectado, intensidade, contexto, humor), + commit=True + ) + return True + except Exception as e: + logger.error(f"Erro ao registrar tom: {e}") + return False + + def obter_tom_predominante(self, numero_usuario: str) -> Optional[str]: + """Obtém o tom predominante de um usuário.""" + try: + rows = self._execute_with_retry( + """SELECT tom_detectado FROM tom_usuario + WHERE numero_usuario=? + ORDER BY created_at DESC LIMIT 1""", + (numero_usuario,) + ) + return rows[0][0] if rows else None + except Exception as e: + logger.error(f"Erro ao obter tom predominante: {e}") + return None + + # ================================================================ + # APRENDIZADOS E GÍRIAS + # ================================================================ + def salvar_aprendizado_detalhado( + self, + numero_usuario: str, + chave: str, + valor: str + ) -> bool: + """Salva um aprendizado detalhado, atualizando se já existir.""" + try: + existing = self._execute_with_retry( + "SELECT id FROM aprendizados WHERE numero_usuario=? AND chave=?", + (numero_usuario, chave) + ) + + if existing: + self._execute_with_retry( + "UPDATE aprendizados SET valor=?, created_at=CURRENT_TIMESTAMP WHERE id=?", + (valor, existing[0][0]), + commit=True + ) + else: + self._execute_with_retry( + "INSERT INTO aprendizados (numero_usuario, chave, valor) VALUES (?, ?, ?)", + (numero_usuario, chave, valor), + commit=True + ) + return True + except Exception as e: + logger.error(f"Erro ao salvar aprendizado: {e}") + return False + + def recuperar_aprendizado_detalhado( + self, + numero_usuario: str, + chave: Optional[str] = None + ) -> Union[Dict, str, None]: + """Recupera aprendizados detalhados.""" + try: + if chave: + rows = self._execute_with_retry( + "SELECT valor FROM aprendizados WHERE numero_usuario=? AND chave=?", + (numero_usuario, chave) + ) + return rows[0][0] if rows else None + else: + rows = self._execute_with_retry( + "SELECT chave, valor FROM aprendizados WHERE numero_usuario=?", + (numero_usuario,) + ) + return {r[0]: r[1] for r in rows} if rows else {} + except Exception as e: + logger.error(f"Erro ao recuperar aprendizado: {e}") + return None + + def salvar_giria_aprendida( + self, + numero_usuario: str, + giria: str, + significado: str, + contexto: Optional[str] = None + ) -> bool: + """Salva uma gíria aprendida.""" + try: + existing = self._execute_with_retry( + "SELECT id, frequencia FROM girias_aprendidas WHERE numero_usuario=? AND giria=?", + (numero_usuario, giria) + ) + + if existing: + self._execute_with_retry( + """UPDATE girias_aprendidas SET frequencia=frequencia+1, + updated_at=CURRENT_TIMESTAMP WHERE id=?""", + (existing[0][0],), + commit=True + ) + else: + self._execute_with_retry( + """INSERT INTO girias_aprendidas + (numero_usuario, giria, significado, contexto) VALUES (?, ?, ?, ?)""", + (numero_usuario, giria, significado, contexto), + commit=True + ) + return True + + except Exception as e: + logger.error(f"Erro ao salvar gíria: {e}") + return False + + def recuperar_girias_usuario(self, numero_usuario: str) -> List[Dict[str, Any]]: + """Recupera gírias de um usuário.""" + try: + rows = self._execute_with_retry( + "SELECT giria, significado, frequencia FROM girias_aprendidas WHERE numero_usuario=?", + (numero_usuario,) + ) + return [{"giria": r[0], "significado": r[1], "frequencia": r[2]} for r in rows] if rows else [] + except Exception as e: + logger.error(f"Erro ao recuperar gírias: {e}") + return [] + + # ================================================================ + # EMBEDDINGS + # ================================================================ + def salvar_embedding( + self, + numero_usuario: str, + source_type: str, + texto: str, + embedding: Any + ) -> bool: + """Salva um embedding no banco.""" + try: + if hasattr(embedding, "tobytes"): + embedding = embedding.tobytes() + + self._execute_with_retry( + """INSERT INTO embeddings + (numero_usuario, source_type, texto, embedding) VALUES (?, ?, ?, ?)""", + (numero_usuario, source_type, texto, embedding), + commit=True + ) + return True + except Exception as e: + logger.error(f"Erro ao salvar embedding: {e}") + return False + + def recuperar_embeddings(self, numero_usuario: str) -> List[Dict[str, Any]]: + """Recupera embeddings de um usuário.""" + try: + rows = self._execute_with_retry( + "SELECT source_type, texto, embedding FROM embeddings WHERE numero_usuario=?", + (numero_usuario,) + ) + result = [] + # Verificação segura para evitar "Object of type None cannot be used as iterable" + if rows: + for r in rows: + embedding_data = r[2] + if isinstance(embedding_data, bytes): + # Mantém como bytes para uso com numpy + pass + result.append({ + "source_type": r[0], + "texto": r[1], + "embedding": embedding_data + }) + return result + except Exception as e: + logger.error(f"Erro ao recuperar embeddings: {e}") + return [] + + # ================================================================ + # PERSONA DO USUÁRIO (LTM) + # ================================================================ + def atualizar_persona(self, numero_usuario: str, campos: Dict[str, str]) -> bool: + """ + Atualiza campos específicos da persona do usuário. + + Args: + numero_usuario: Número do usuário + campos: Dicionário com chaves ('personalidade', 'vicios_linguagem', 'gostos', 'desgostos', 'emocional') + """ + try: + # Verifica se já existe + existente = self.recuperar_persona(numero_usuario) + + if existente: + # Update + set_clauses = [] + values = [] + for k, v in campos.items(): + if k in ['personalidade', 'vicios_linguagem', 'gostos', 'desgostos', 'emocional']: + set_clauses.append(f"{k} = ?") + values.append(v) + + if not set_clauses: + return False + + set_clauses.append("updated_at = CURRENT_TIMESTAMP") + values.append(numero_usuario) + + query = f"UPDATE persona_usuario SET {', '.join(set_clauses)} WHERE numero_usuario = ?" + self._execute_with_retry(query, tuple(values), commit=True) + else: + # Insert + keys = ['numero_usuario'] + values = [numero_usuario] + for k, v in campos.items(): + if k in ['personalidade', 'vicios_linguagem', 'gostos', 'desgostos', 'emocional']: + keys.append(k) + values.append(v) + + placeholders = ', '.join(['?' for _ in keys]) + query = f"INSERT INTO persona_usuario ({', '.join(keys)}) VALUES ({placeholders})" + self._execute_with_retry(query, tuple(values), commit=True) + + return True + except Exception as e: + logger.error(f"Erro ao atualizar persona para {numero_usuario}: {e}") + return False + + def recuperar_persona(self, numero_usuario: str) -> Optional[Dict[str, Any]]: + """Recupera a persona completa de um usuário.""" + try: + rows = self._execute_with_retry( + "SELECT * FROM persona_usuario WHERE numero_usuario = ?", + (numero_usuario,) + ) + if rows: + row = rows[0] + return dict(row) + return None + except Exception as e: + logger.error(f"Erro ao recuperar persona para {numero_usuario}: {e}") + return None + + # ================================================================ + # CONTEXT ISOLATION — métodos que context_isolation.py precisa + # ================================================================ + + def _init_context_isolation_tables(self): + """Cria tabelas de contexto isolado se não existirem.""" + try: + with self._get_connection() as conn: + c = conn.cursor() + c.executescript(""" + CREATE TABLE IF NOT EXISTS contextos_isolados ( + context_id TEXT PRIMARY KEY, + numero_usuario TEXT NOT NULL, + grupo_id TEXT, + tipo_conversa TEXT DEFAULT 'pv', + estado_emocional TEXT DEFAULT 'neutral', + nivel_intimidade INTEGER DEFAULT 1, + short_memory TEXT DEFAULT '[]', + metadata TEXT DEFAULT '{}', + created_at REAL DEFAULT (strftime('%s', 'now')), + last_interaction REAL DEFAULT (strftime('%s', 'now')) + ); + CREATE INDEX IF NOT EXISTS idx_contextos_user ON contextos_isolados(numero_usuario); + CREATE INDEX IF NOT EXISTS idx_contextos_tipo ON contextos_isolados(tipo_conversa); + """) + conn.commit() + logger.info("Tabela contextos_isolados garantida") + except Exception as e: + logger.warning(f"Erro ao criar tabela contextos_isolados: {e}") + + def salvar_contexto_isolado(self, context_data: Dict[str, Any]) -> bool: + """Salva ou atualiza um contexto isolado (upsert).""" + try: + # Garante que a tabela existe + self._init_context_isolation_tables() + + with self._get_connection() as conn: + c = conn.cursor() + c.execute(""" + INSERT INTO contextos_isolados + (context_id, numero_usuario, grupo_id, tipo_conversa, estado_emocional, + nivel_intimidade, short_memory, metadata, created_at, last_interaction) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(context_id) DO UPDATE SET + estado_emocional = excluded.estado_emocional, + nivel_intimidade = excluded.nivel_intimidade, + short_memory = excluded.short_memory, + metadata = excluded.metadata, + last_interaction = excluded.last_interaction + """, ( + context_data.get('context_id'), + context_data.get('numero_usuario'), + context_data.get('grupo_id'), + context_data.get('tipo_conversa', 'pv'), + context_data.get('estado_emocional', 'neutral'), + context_data.get('nivel_intimidade', 1), + json.dumps(context_data.get('short_memory', [])), + json.dumps(context_data.get('metadata', {})), + context_data.get('created_at', time.time()), + context_data.get('last_interaction', time.time()), + )) + conn.commit() + return True + except Exception as e: + logger.warning(f"Erro ao salvar contexto isolado: {e}") + return False + + def recuperar_contexto_isolado(self, context_id: str) -> Optional[Dict[str, Any]]: + """Recupera um contexto isolado pelo context_id.""" + try: + # Garante que a tabela existe (Prevenir "no such table") + self._init_context_isolation_tables() + + rows = self._execute_with_retry( + "SELECT * FROM contextos_isolados WHERE context_id = ?", + (context_id,) + ) + if rows: + row = dict(rows[0]) + # Desserializar campos JSON + try: row['short_memory'] = json.loads(row.get('short_memory', '[]')) + except: row['short_memory'] = [] + try: row['metadata'] = json.loads(row.get('metadata', '{}')) + except: row['metadata'] = {} + return row + return None + except Exception as e: + logger.warning(f"Erro ao recuperar contexto isolado: {e}") + return None + + def deletar_contexto_isolado(self, context_id: str) -> bool: + """Remove um contexto isolado (safe - verifica tabela primeiro).""" + try: + # Garante que a tabela existe + self._init_context_isolation_tables() + + # Verifica se tabela existe + conn = self._get_connection() + c = conn.cursor() + c.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='contextos_isolados'") + if not c.fetchone(): + logger.warning("Tabela contextos_isolados não existe. Pulando delete.") + return True + conn.close() + + self._execute_with_retry( + "DELETE FROM contextos_isolados WHERE context_id = ?", + (context_id,), commit=True + ) + return True + except Exception as e: + logger.warning(f"Erro ao deletar contexto isolado: {e}") + return False + + def listar_contextos_usuario(self, numero_usuario: str) -> List[Dict[str, Any]]: + """Lista todos os contextos de um usuário.""" + results = [] + try: + rows = self._execute_with_retry( + "SELECT * FROM contextos_isolados WHERE numero_usuario = ?", + (numero_usuario,) + ) + if rows: + for row in rows: + d = dict(row) + try: d['short_memory'] = json.loads(d.get('short_memory', '[]')) + except: d['short_memory'] = [] + try: d['metadata'] = json.loads(d.get('metadata', '{}')) + except: d['metadata'] = {} + results.append(d) + except Exception as e: + logger.warning(f"Erro ao listar contextos do usuário: {e}") + return results + + # ================================================================ + # HISTÓRICO POR CONVERSATION ID + # ================================================================ + + def recuperar_historico(self, usuario: str = "", numero: str = "", + conversation_id: str = "", limite: int = 20) -> List[Dict[str, Any]]: + """ + Recupera histórico de mensagens. + Suporta conversation_id para isolamento de contexto. + """ + # Tenta nova coluna conversation_id primeiro + try: + if conversation_id: + try: + rows = self._execute_with_retry( + "SELECT usuario, mensagem, resposta, humor, modelo_usado, created_at FROM mensagens " + "WHERE conversation_id = ? ORDER BY id DESC LIMIT ?", + (conversation_id, limite) + ) + except Exception: + # Fallback para banco antigo sem conversation_id + rows = [] + elif numero: + rows = self._execute_with_retry( + "SELECT usuario, mensagem, resposta, humor, modelo_usado, created_at FROM mensagens " + "WHERE numero = ? ORDER BY id DESC LIMIT ?", + (numero, limite) + ) + elif usuario: + rows = self._execute_with_retry( + "SELECT usuario, mensagem, resposta, humor, modelo_usado, created_at FROM mensagens " + "WHERE usuario = ? ORDER BY id DESC LIMIT ?", + (usuario, limite) + ) + else: + return [] + + return [dict(r) for r in (rows or [])][::-1] # Reverte para ordem cronológica + except Exception: + return [] + + def recuperar_resposta_por_id(self, message_id: str) -> Optional[Dict[str, Any]]: + """Recupera uma resposta já gerada para um message_id (idempotência).""" + if not message_id: return None + try: + # Garante que a coluna existe + with self._get_connection() as conn: + c = conn.cursor() + c.execute("PRAGMA table_info(mensagens)") + if 'message_id' not in [row[1] for row in c.fetchall()]: + return None + + rows = self._execute_with_retry( + "SELECT resposta, modelo_usado, created_at FROM mensagens WHERE message_id = ? LIMIT 1", + (message_id,) + ) + if rows: + return dict(rows[0]) + return None + except Exception as e: + logger.warning(f"Erro ao recuperar resposta por id: {e}") + return None + + def registrar_mensagem_conversation_id(self, usuario: str, mensagem: str, resposta: str, + conversation_id: str = "", numero: str = "", + is_reply: bool = False, mensagem_original: str = "", + humor: str = "neutro", modo_resposta: str = "normal", + modelo_usado: str = "desconhecido", **kwargs) -> bool: + """Registra mensagem com conversation_id para isolamento.""" + try: + # Verifica se a coluna conversation_id existe + with self._get_connection() as conn: + c = conn.cursor() + c.execute("PRAGMA table_info(mensagens)") + cols = [row[1] for row in c.fetchall()] + if 'conversation_id' not in cols: + c.execute("ALTER TABLE mensagens ADD COLUMN conversation_id TEXT") + conn.commit() + + self._execute_with_retry( + """INSERT INTO mensagens + (usuario, mensagem, resposta, numero, is_reply, mensagem_original, humor, modo_resposta, modelo_usado, conversation_id, message_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + (usuario, mensagem, resposta, numero, is_reply, mensagem_original, humor, modo_resposta, modelo_usado, conversation_id, kwargs.get('message_id')), + commit=True + ) + return True + except Exception as e: + logger.warning(f"Erro ao registrar mensagem com conversation_id: {e}") + return False + + def limpar_contexto_usuario(self, usuario: str = "", numero: str = "") -> bool: + """Limpa todas as mensagens de um usuário (reset).""" + try: + if numero: + self._execute_with_retry("DELETE FROM mensagens WHERE numero = ?", (numero,), commit=True) + elif usuario: + self._execute_with_retry("DELETE FROM mensagens WHERE usuario = ?", (usuario,), commit=True) + # Limpa também contextos isolados + self._execute_with_retry("DELETE FROM contextos_isolados WHERE numero_usuario = ?", (numero or usuario,), commit=True) + return True + except Exception as e: + logger.warning(f"Erro ao limpar contexto: {e}") + return False + + # ================================================================ + # PERSONA DO USUÁRIO (LTM) + # ================================================================ + def recuperar_persona(self, numero_usuario: str) -> Dict[str, Any]: + """Recupera a persona (personalidade, gostos, etc.) do usuário.""" + try: + rows = self._execute_with_retry( + "SELECT personalidade, vicios_linguagem, gostos, desgostos, emocional FROM persona_usuario WHERE numero_usuario = ?", + (numero_usuario,) + ) + if rows: + return dict(rows[0]) + return {} + except Exception as e: + logger.warning(f"Erro ao recuperar persona de {numero_usuario}: {e}") + return {} + + def atualizar_persona(self, numero_usuario: str, campos: Dict[str, str]) -> bool: + """Atualiza ou insere novos traços de persona para o usuário.""" + if not campos: + return False + try: + # Garante que as colunas existem (migração rápida se necessário) + with self._get_connection() as conn: + c = conn.cursor() + c.execute("INSERT OR IGNORE INTO persona_usuario (numero_usuario) VALUES (?)", (numero_usuario,)) + + query_parts = [] + params = [] + for campo, valor in campos.items(): + query_parts.append(f"{campo} = ?") + params.append(valor) + + query_parts.append("updated_at = CURRENT_TIMESTAMP") + params.append(numero_usuario) + + query = f"UPDATE persona_usuario SET {', '.join(query_parts)} WHERE numero_usuario = ?" + c.execute(query, tuple(params)) + conn.commit() + return True + except Exception as e: + logger.warning(f"Erro ao atualizar persona de {numero_usuario}: {e}") + return False + + def fazer_checkpoint_hf_sync(self) -> bool: + """ + Executa um backup seguro da base de dados otimizado para o Hugging Face Buckets (hf sync). + Garante integridade total forçando a gravação do WAL e criando um snapshot isolado. + + Isso previne qualquer corrupção de dados por conflito entre a nuvem e as + threads concorrentes da Akira. + """ + import shutil + from pathlib import Path + try: + # 1. Força a gravação de todos os dados do WAL para o arquivo DB principal (Truncate) + self._execute_with_retry("PRAGMA wal_checkpoint(TRUNCATE)") + + # 2. Configura caminhos + db_path_obj = Path(self.db_path) + # Diretório alvo para o hf sync + cloud_sync_dir = db_path_obj.parent / "cloud_sync" + cloud_sync_dir.mkdir(parents=True, exist_ok=True) + + backup_path = cloud_sync_dir / db_path_obj.name + + # 3. Usa a API de Backup Nativa e atômica do próprio sqlite + with self._get_connection() as source: + backup_conn = sqlite3.connect( + str(backup_path), + timeout=30.0, + check_same_thread=False + ) + with backup_conn: + source.backup(backup_conn, pages=-1) + backup_conn.close() + + logger.info(f"✅ Checkpoint Seguro para HF Buckets concluído em: {backup_path}") + return True + except Exception as e: + logger.error(f"❌ Erro ao criar Checkpoint HF Sync: {e}") + return False + def check_idempotency(self, message_id: str, context: str = "general") -> bool: + """ + Verifica se um message_id já foi processado recentemente. + Retorna True se for DUPLICADO, False se for NOVO. + """ + if not message_id: + return False + + try: + # Tenta inserir na tabela de mensagens. Se falhar por UNIQUE constraint, é duplicado. + # Mas como a tabela mensagens pode ter muitos campos, vamos usar um cache rápido ou + # verificar apenas o message_id. + query = "SELECT id FROM mensagens WHERE message_id = ? LIMIT 1" + res = self._execute_with_retry(query, (message_id,)) + return len(res) > 0 if res else False + except Exception: + return False + +def get_database(db_path: Optional[str] = None) -> Database: + """Factory function para obter a instância singleton do database.""" + from . import config + path = db_path or getattr(config, 'DB_PATH', '/akira/data/akira.db') + return Database(path)