akira-index / modules /database.py
akra35567's picture
Upload 20 files
d3a1a58 verified
# type: ignore
"""
================================================================================
AKIRA V21 ULTIMATE - DATABASE MODULE
================================================================================
Banco de dados SQLite extremamente robusto, moderno e completo.
Gerencia: mensagens, embeddings, gírias, tom, aprendizados, API logs, training sessions.
Features:
- SQLite com WAL mode para performance máxima
- Pool de conexões permanente (NÃO fecha a cada operação!)
- Sistema neural de aprendizado contínuo
- 3 níveis de transição de tom gradual
- Full-text search com FTS5
- Vector storage para embeddings
- Tracking de interações para análise de padrões
================================================================================
"""
import sqlite3
import os
import json
import re
import random
import threading
from typing import Optional, List, Dict, Any, Tuple, Union, Callable
from datetime import datetime, timedelta
from loguru import logger
# numpy é opcional - usado apenas para embeddings
try:
import numpy as np
NUMPY_AVAILABLE = True
except ImportError:
NUMPY_AVAILABLE = False
np = None # type: ignore
class DatabasePool:
"""
Pool de conexões SQLite para performance máxima.
Mantém conexões abertas permanentemente para evitar overhead.
"""
_instance = None
_lock = threading.Lock()
def __new__(cls, db_path: str = "akira.db"):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, db_path: str = "akira.db"):
if self._initialized:
return
self.db_path = db_path
self._local = threading.local()
self._initialized = True
db_dir = os.path.dirname(db_path)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir, exist_ok=True)
self._init_connection()
def _init_connection(self):
"""Inicializa conexão permanente."""
try:
conn = sqlite3.connect(
self.db_path,
timeout=30.0,
check_same_thread=False
)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
conn.execute("PRAGMA cache_size=-64000")
conn.execute("PRAGMA temp_store=MEMORY")
conn.execute("PRAGMA busy_timeout=60000")
conn.execute("PRAGMA mmap_size=268435456")
conn.execute("PRAGMA foreign_keys=ON")
conn.row_factory = sqlite3.Row
self._local.conn = conn
logger.info(f"✅ Conexão DB permanente: {self.db_path}")
except Exception as e:
logger.error(f"❌ Erro ao criar conexão DB: {e}")
raise
def _get_connection(self) -> sqlite3.Connection:
if not hasattr(self._local, 'conn') or self._local.conn is None:
self._init_connection()
return self._local.conn
def cursor(self):
return self._get_connection().cursor()
def close(self):
if hasattr(self._local, 'conn') and self._local.conn:
try:
self._local.conn.close()
self._local.conn = None
except:
pass
_db_pool: Optional[DatabasePool] = None
def get_db_pool() -> DatabasePool:
global _db_pool
if _db_pool is None:
from .config import DB_PATH
_db_pool = DatabasePool(DB_PATH)
return _db_pool
class Database:
"""
Banco de dados Akira V21 Ultimate.
Pool de conexões permanente - NÃO fecha a cada operação!
Sistema neural de aprendizado contínuo com 3 níveis de transição.
"""
# Class variables (used as defaults)
CODIGOS_VERIFICACAO: Dict[str, Dict] = {}
USUARIOS_PRIVILEGIADOS_DEFAULT: List[str] = []
def __init__(self, db_path: str = None):
from .config import DB_PATH as CONFIG_DB_PATH, PRIVILEGED_USERS
self.db_path = db_path or CONFIG_DB_PATH
self._pool = get_db_pool()
self._ensure_tables()
# Instance variables from config
self.USUARIOS_PRIVILEGIADOS = list(PRIVILEGED_USERS)
logger.info("Database Akira V21 inicializado (pool permanente)")
def _ensure_tables(self):
"""Cria todas as tabelas necessárias."""
conn = self._pool._get_connection()
c = conn.cursor()
# Tabela de mensagens
c.execute("""
CREATE TABLE IF NOT EXISTS mensagens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
usuario TEXT,
mensagem TEXT,
resposta TEXT,
numero TEXT,
is_reply INTEGER DEFAULT 0,
mensagem_original TEXT,
humor TEXT DEFAULT 'neutro',
modo_resposta TEXT DEFAULT 'normal',
nivel_transicao INTEGER DEFAULT 1,
usuario_privilegiado INTEGER DEFAULT 0,
tom_detectado TEXT DEFAULT 'neutro',
intensidade_tom REAL DEFAULT 0.5,
reply_contexto TEXT,
conversation_id TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
# 🔥 MIGRAÇÃO: Adiciona coluna conversation_id se não existir
try:
c.execute("ALTER TABLE mensagens ADD COLUMN conversation_id TEXT")
logger.info("✅ Coluna conversation_id adicionada à tabela mensagens")
except sqlite3.OperationalError:
# Coluna já existe
pass
# Usuários privilegiados
c.execute("""
CREATE TABLE IF NOT EXISTS usuarios_privilegiados (
id INTEGER PRIMARY KEY AUTOINCREMENT,
numero TEXT UNIQUE,
nome TEXT,
apelido TEXT,
modo_fala TEXT,
codigo_verificacao TEXT,
ativo INTEGER DEFAULT 1,
nivel INTEGER DEFAULT 1,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
# Tom do usuário
c.execute("""
CREATE TABLE IF NOT EXISTS tom_usuario (
id INTEGER PRIMARY KEY AUTOINCREMENT,
numero_usuario TEXT,
tom_detectado TEXT,
intensidade REAL DEFAULT 0.5,
contexto TEXT,
humor TEXT DEFAULT 'neutro',
nivel_transicao INTEGER DEFAULT 1,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
# Contexto persistente
c.execute("""
CREATE TABLE IF NOT EXISTS contexto (
user_key TEXT PRIMARY KEY,
historico TEXT,
emocao_atual TEXT,
humor_atual TEXT DEFAULT 'neutro',
modo_resposta TEXT DEFAULT 'normal',
nivel_transicao INTEGER DEFAULT 1,
usuario_privilegiado INTEGER DEFAULT 0,
termos TEXT,
girias TEXT,
tom TEXT,
pesos_neurais TEXT,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
# Pesos neurais
c.execute("""
CREATE TABLE IF NOT EXISTS pesos_neurais (
id INTEGER PRIMARY KEY AUTOINCREMENT,
numero_usuario TEXT,
tipo_peso TEXT,
valor REAL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
# Histórico de interações
c.execute("""
CREATE TABLE IF NOT EXISTS historico_interacoes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
numero_usuario TEXT,
mensagem TEXT,
resposta TEXT,
emocao_detectada TEXT,
tom_usado TEXT,
tom_usuario TEXT,
intensidade REAL,
nivel_transicao INTEGER,
reply_contexto TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
# Aprendizados
c.execute("""
CREATE TABLE IF NOT EXISTS aprendizados (
id INTEGER PRIMARY KEY AUTOINCREMENT,
numero_usuario TEXT,
chave TEXT,
valor TEXT,
peso REAL DEFAULT 1.0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
# Códigos de verificação
c.execute("""
CREATE TABLE IF NOT EXISTS codigos_verificacao (
numero TEXT PRIMARY KEY,
codigo TEXT,
expira_em DATETIME,
permissoes TEXT,
nivel INTEGER,
ativo INTEGER DEFAULT 1,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
# Embeddings vetoriais
c.execute("""
CREATE TABLE IF NOT EXISTS embeddings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
numero_usuario TEXT,
mensagem TEXT,
embedding TEXT,
tipo TEXT DEFAULT 'mensagem',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
# Índices
c.execute("CREATE INDEX IF NOT EXISTS idx_mensagens_numero ON mensagens(numero)")
c.execute("CREATE INDEX IF NOT EXISTS idx_mensagens_created ON mensagens(created_at)")
c.execute("CREATE INDEX IF NOT EXISTS idx_mensagens_conversation_id ON mensagens(conversation_id)") # 🔥 FIX
c.execute("CREATE INDEX IF NOT EXISTS idx_tom_usuario ON tom_usuario(numero_usuario, created_at)")
c.execute("CREATE INDEX IF NOT EXISTS idx_historico_interacoes ON historico_interacoes(numero_usuario, created_at)")
c.execute("CREATE INDEX IF NOT EXISTS idx_contexto_key ON contexto(user_key)")
c.execute("CREATE INDEX IF NOT EXISTS idx_pesos_neurais ON pesos_neurais(numero_usuario, tipo_peso)")
# Usuários default
usuarios_default = [
('244937035662', 'Isaac Quarenta', 'Isaac', 'tecnico_formal', 3),
('244978787009', 'Isaac Quarenta 2', 'Isaac', 'tecnico_formal', 3)
]
for numero, nome, apelido, modo, nivel in usuarios_default:
c.execute(
"INSERT OR IGNORE INTO usuarios_privilegiados (numero, nome, apelido, modo_fala, nivel) VALUES (?, ?, ?, ?, ?)",
(numero, nome, apelido, modo, nivel)
)
conn.commit()
def _execute(self, query: str, params: tuple = None) -> Optional[List[sqlite3.Row]]:
"""Executa query na conexão permanente."""
conn = self._pool._get_connection()
c = conn.cursor()
c.execute(query, params or ())
if query.strip().upper().startswith("SELECT"):
return c.fetchall()
conn.commit()
return None
def _execute_with_retry(self, query: str, params: tuple = None, max_retries: int = 3) -> Optional[List[sqlite3.Row]]:
"""Executa query com retry em caso de falha."""
for attempt in range(max_retries):
try:
return self._execute(query, params)
except Exception as e:
if attempt < max_retries - 1:
time.sleep(0.1 * (attempt + 1)) # Exponential backoff
else:
logger.warning(f"Query falhou após {max_retries} tentativas: {e}")
return None
# ================================================================
# SISTEMA NEURAL DE APRENDIZADO 3-NÍVEIS
# ================================================================
def _update_neural_weights(self, numero_usuario: str, interacao: Dict[str, Any]) -> None:
"""Atualiza pesos neurais baseado na interação."""
try:
tom_usuario = interacao.get('tom_detectado', 'neutro')
emocao = interacao.get('emocao', 'neutral')
intensidade = interacao.get('intensidade', 0.5)
nivel = interacao.get('nivel_transicao', 1)
tom_mapping = {
'rude': 'tom_rude', 'grosseiro': 'tom_rude', 'raivoso': 'tom_rude',
'formal': 'tom_neutro', 'neutro': 'tom_neutro',
'debochado': 'tom_proximo', 'proximo': 'tom_proximo', 'amoroso': 'tom_proximo',
'informal': 'tom_neutro'
}
tom_key = tom_mapping.get(tom_usuario, 'tom_neutro')
current_weights = self._get_neural_weights(numero_usuario)
learning_rate = 0.1 * nivel
current_tom = current_weights.get(tom_key, 0.5)
new_tom = current_tom + (intensidade - current_tom) * learning_rate
new_tom = max(0.0, min(1.0, new_tom))
emocao_key = f'emocao_{emocao}'
current_emocao = current_weights.get(emocao_key, 0.5)
new_emocao = current_emocao + (intensidade - current_emocao) * learning_rate * 0.5
new_emocao = max(0.0, min(1.0, new_emocao))
eng_key = 'engajamento'
current_eng = current_weights.get(eng_key, 0.5)
new_eng = current_eng + (0.05 * nivel)
new_eng = max(0.0, min(1.0, new_eng))
self._set_neural_weight(numero_usuario, tom_key, new_tom)
self._set_neural_weight(numero_usuario, emocao_key, new_emocao)
self._set_neural_weight(numero_usuario, eng_key, new_eng)
except Exception as e:
logger.warning(f"Erro pesos neurais: {e}")
def _get_neural_weights(self, numero_usuario: str) -> Dict[str, float]:
try:
rows = self._execute(
"SELECT tipo_peso, valor FROM pesos_neurais WHERE numero_usuario = ?",
(numero_usuario,)
)
return {r['tipo_peso']: r['valor'] for r in rows} if rows else {}
except:
return {}
def _set_neural_weight(self, numero_usuario: str, tipo_peso: str, valor: float) -> None:
self._execute(
"INSERT OR REPLACE INTO pesos_neurais (numero_usuario, tipo_peso, valor, updated_at) VALUES (?, ?, ?, CURRENT_TIMESTAMP)",
(numero_usuario, tipo_peso, valor)
)
def get_next_tom_level(self, numero_usuario: str, user_tom: str) -> Tuple[str, int]:
"""Determina próximo nível de tom baseado no padrão do usuário."""
weights = self._get_neural_weights(numero_usuario)
if user_tom in ['rude', 'grosseiro', 'raivoso']:
rude_weight = weights.get('tom_rude', 0.0)
if rude_weight > 0.7:
return ('rude_forte', 3)
elif rude_weight > 0.4:
return ('rude_moderado', 2)
else:
return ('rude_sutil', 1)
elif user_tom in ['debochado', 'proximo', 'amoroso']:
prox_weight = weights.get('tom_proximo', 0.0)
if prox_weight > 0.7:
return ('proximo_forte', 3)
elif prox_weight > 0.4:
return ('proximo_moderado', 2)
else:
return ('proximo_sutil', 1)
return ('neutro', 1)
# ================================================================
# REGISTRO DE INTERAÇÕES
# ================================================================
def registrar_interacao_completa(
self,
numero_usuario: str,
mensagem: str,
resposta: str,
emocao_detectada: str,
tom_usado: str,
tom_usuario: str,
intensidade: float = 0.5,
nivel_transicao: int = 1,
reply_contexto: Optional[str] = None,
is_reply: bool = False,
mensagem_original: Optional[str] = None,
humor: str = "neutro"
) -> bool:
"""Registra interação completa para aprendizado neural."""
try:
self._execute(
"""INSERT INTO historico_interacoes
(numero_usuario, mensagem, resposta, emocao_detectada, tom_usado,
tom_usuario, intensidade, nivel_transicao, reply_contexto)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(numero_usuario, mensagem[:1000], resposta[:2000], emocao_detectada,
tom_usado, tom_usuario, intensidade, nivel_transicao, reply_contexto)
)
self._update_neural_weights(numero_usuario, {
'tom_detectado': tom_usuario,
'emocao': emocao_detectada,
'intensidade': intensidade,
'nivel_transicao': nivel_transicao
})
self._execute(
"""INSERT INTO tom_usuario
(numero_usuario, tom_detectado, intensidade, contexto, humor, nivel_transicao)
VALUES (?, ?, ?, ?, ?, ?)""",
(numero_usuario, tom_usuario, intensidade, reply_contexto, humor, nivel_transicao)
)
self._execute(
"""INSERT INTO mensagens
(usuario, mensagem, resposta, numero, is_reply, mensagem_original,
humor, modo_resposta, nivel_transicao, tom_detectado, intensidade_tom, reply_contexto)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(numero_usuario, mensagem[:1000], resposta[:2000], numero_usuario,
1 if is_reply else 0, mensagem_original, humor, tom_usado,
nivel_transicao, tom_usuario, intensidade, reply_contexto)
)
pesos = self._get_neural_weights(numero_usuario)
self._execute(
"""INSERT OR REPLACE INTO contexto
(user_key, emocao_atual, humor_atual, modo_resposta, nivel_transicao,
tom, pesos_neurais, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)""",
(numero_usuario, emocao_detectada, humor, tom_usado,
nivel_transicao, tom_usuario, json.dumps(pesos))
)
return True
except Exception as e:
logger.warning(f"Erro ao registrar interação: {e}")
return False
def recuperar_contexto_persistente(self, numero_usuario: str) -> Dict[str, Any]:
try:
rows = self._execute(
"SELECT * FROM contexto WHERE user_key = ?",
(numero_usuario,)
)
if rows:
row = rows[0]
pesos = json.loads(row['pesos_neurais']) if row['pesos_neurais'] else {}
return {
'emocao': row['emocao_atual'],
'humor': row['humor_atual'],
'tom': row['tom'],
'nivel': row['nivel_transicao'],
'pesos_neurais': pesos,
'updated_at': row['updated_at']
}
return {}
except:
return {}
def get_historical_interactions(self, numero_usuario: str, limit: int = 50) -> List[Dict]:
try:
rows = self._execute(
"""SELECT * FROM historico_interacoes
WHERE numero_usuario = ?
ORDER BY created_at DESC LIMIT ?""",
(numero_usuario, limit)
)
return [
{
'mensagem': r['mensagem'],
'resposta': r['resposta'],
'emocao': r['emocao_detectada'],
'tom_usuario': r['tom_usuario'],
'intensidade': r['intensidade'],
'nivel': r['nivel_transicao'],
'reply_contexto': r['reply_contexto'],
'created_at': r['created_at']
}
for r in rows
] if rows else []
except:
return []
# ================================================================
# PRIVILÉGIOS
# ================================================================
def verificar_privilegios_usuario(self, numero: str) -> Dict[str, Any]:
if not numero:
return {"privilegiado": False, "nivel": 0}
numero_limpo = re.sub(r'[^\d]', '', str(numero))
if numero_limpo in self.USUARIOS_PRIVILEGIADOS:
return {"privilegiado": True, "nivel": 3, "permissoes": ["all"]}
try:
rows = self._execute(
"SELECT nivel, permissoes FROM usuarios_privilegiados WHERE numero = ? AND ativo = 1",
(numero_limpo,)
)
if rows:
permissoes = json.loads(rows[0]['permissoes']) if rows[0]['permissoes'] else []
return {"privilegiado": True, "nivel": rows[0]['nivel'], "permissoes": permissoes}
except:
pass
return {"privilegiado": False, "nivel": 0}
def eh_privilegiado(self, numero: str) -> bool:
return self.verificar_privilegios_usuario(numero).get("privilegiado", False)
def obter_modo_fala_privilegiado(self, numero: str) -> Optional[str]:
"""Obtém o modo de fala de um usuário privilegiado."""
try:
rows = self._execute(
"SELECT modo_fala FROM usuarios_privilegiados WHERE numero = ? AND ativo = 1",
(numero,)
)
return rows[0]['modo_fala'] if rows else None
except:
return None
# ================================================================
# OPERAÇÕES BÁSICAS
# ================================================================
def salvar_mensagem(self, usuario=None, mensagem=None, resposta=None, numero=None, is_reply=False, mensagem_original=None, humor='neutro', modo_resposta='normal', nivel_transicao=1, conversation_id=None, **kwargs) -> bool:
"""
Salva mensagem no banco com suporte a isolamento de contexto.
Args:
conversation_id: ID único da conversa (pv:numero ou grupo:grupo_id) 🔥 FIX
"""
try:
self._execute(
"""INSERT INTO mensagens
(usuario, mensagem, resposta, numero, is_reply, mensagem_original,
humor, modo_resposta, nivel_transicao, conversation_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(usuario, mensagem, resposta, numero, 1 if is_reply else 0,
mensagem_original, humor, modo_resposta, nivel_transicao, conversation_id)
)
if conversation_id:
logger.debug(f"💾 Mensagem salva com conversation_id: {conversation_id[:30]}...")
return True
except Exception as e:
logger.warning(f"Erro ao salvar mensagem: {e}")
return False
def recuperar_historico(
self,
usuario: str,
limite: int = 20,
conversation_id: Optional[str] = None # 🔥 CONTEXT ISOLATION FIX
) -> List[Tuple[str, str]]:
"""
Recupera histórico de mensagens.
Args:
usuario: Nome ou número do usuário
limite: Máximo de mensagens a retornar
conversation_id: ID da conversa isolada (PV ou grupo específico)
Returns:
Lista de tuplas (mensagem, resposta)
"""
try:
if conversation_id:
# 🔥 ISOLADO: Filtra por conversation_id (contexto específico)
rows = self._execute(
"""SELECT mensagem, resposta FROM mensagens
WHERE conversation_id = ?
ORDER BY id DESC LIMIT ?""",
(conversation_id, limite)
)
logger.debug(f"📝 Histórico recuperado por conversation_id: {len(rows) if rows else 0} mensagens")
else:
# LEGACY: Filtra por usuário (mantido para retrocompatibilidade)
rows = self._execute(
"""SELECT mensagem, resposta FROM mensagens
WHERE usuario = ? OR numero = ?
ORDER BY id DESC LIMIT ?""",
(usuario, usuario, limite)
)
logger.debug(f"📝 Histórico recuperado por usuário (legacy): {len(rows) if rows else 0} mensagens")
return [(r['mensagem'], r['resposta']) for r in rows] if rows else []
except Exception as e:
logger.warning(f"Erro ao recuperar histórico: {e}")
return []
def recuperar_mensagens(self, usuario: str, limite: int = 20) -> List[Tuple[str, str]]:
"""Alias para recuperar_historico para compatibilidade."""
return self.recuperar_historico(usuario, limite)
def salvar_aprendizado(self, numero_usuario: str, chave: str, valor: str) -> bool:
try:
self._execute(
"INSERT INTO aprendizados (numero_usuario, chave, valor) VALUES (?, ?, ?)",
(numero_usuario, chave, valor)
)
return True
except:
return False
def salvar_aprendizado_detalhado(self, numero_usuario: str, chave: str, valor: str) -> bool:
"""Salva um aprendizado detalhado."""
return self.salvar_aprendizado(numero_usuario, chave, valor)
def salvar_giria_aprendida(self, numero_usuario: str, giria: str, significado: str, contexto: str) -> bool:
"""Salva uma gíria aprendida."""
try:
chave = f"giria_{giria}"
valor = json.dumps({
'significado': significado,
'contexto': contexto,
'frequencia': 1
})
return self.salvar_aprendizado(numero_usuario, chave, valor)
except:
return False
def salvar_contexto(self, user_key: str, historico: str, emocao_atual: str, termos: str, girias: str, tom: str) -> bool:
"""Salva o contexto do usuário."""
try:
self._execute(
"INSERT OR REPLACE INTO contexto (user_key, historico, emocao_atual, humor_atual, termos, girias, tom, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)",
(user_key, historico, emocao_atual, emocao_atual, termos, girias, tom)
)
return True
except:
return False
def salvar_embedding(self, numero_usuario: str, mensagem: str, embedding: List[float], tipo: str = "mensagem") -> bool:
"""Salva embedding vetorial da mensagem."""
try:
# Convert numpy array to list if necessary
if NUMPY_AVAILABLE and isinstance(embedding, np.ndarray):
# Handle both 1D and multi-dimensional arrays
if embedding.ndim == 1:
embedding = embedding.tolist()
else:
# Flatten multi-dimensional arrays
embedding = embedding.flatten().tolist()
# Also handle nested numpy arrays and ensure all elements are JSON serializable
elif isinstance(embedding, (list, tuple)):
embedding = [
float(x) if NUMPY_AVAILABLE and isinstance(x, (np.number, np.ndarray)) else
(x.tolist() if NUMPY_AVAILABLE and isinstance(x, np.ndarray) else x)
for x in embedding
]
# Final check: ensure all elements are JSON serializable (float, int, etc.)
def make_serializable(obj):
if NUMPY_AVAILABLE and isinstance(obj, np.ndarray):
return obj.tolist()
elif NUMPY_AVAILABLE and isinstance(obj, np.number):
return float(obj)
elif isinstance(obj, (list, tuple)):
return [make_serializable(x) for x in obj]
else:
return obj
embedding = make_serializable(embedding)
embedding_json = json.dumps(embedding)
self._execute(
"INSERT INTO embeddings (numero_usuario, mensagem, embedding, tipo) VALUES (?, ?, ?, ?)",
(numero_usuario, mensagem[:500], embedding_json, tipo)
)
return True
except Exception as e:
logger.warning(f"Erro ao salvar embedding: {e}")
return False
def registrar_tom_usuario(self, numero_usuario: str, tom_detectado: str, intensidade: float = 0.5, contexto: str = "", humor: str = "neutro", nivel_transicao: int = 1) -> bool:
"""Registra o tom detectado do usuário."""
try:
self._execute(
"""INSERT INTO tom_usuario
(numero_usuario, tom_detectado, intensidade, contexto, humor, nivel_transicao)
VALUES (?, ?, ?, ?, ?, ?)""",
(numero_usuario, tom_detectado, intensidade, contexto, humor, nivel_transicao)
)
return True
except Exception as e:
logger.warning(f"Erro ao registrar tom do usuário: {e}")
return False
def obter_tom_predominante(self, numero_usuario: str) -> Optional[str]:
"""Obtém o tom predominante do usuário baseado no histórico."""
try:
rows = self._execute(
"""SELECT tom_detectado, COUNT(*) as freq
FROM tom_usuario
WHERE numero_usuario = ?
GROUP BY tom_detectado
ORDER BY freq DESC
LIMIT 1""",
(numero_usuario,)
)
return rows[0]['tom_detectado'] if rows else None
except Exception as e:
logger.warning(f"Erro ao obter tom predominante: {e}")
return None
def recuperar_aprendizados(self, numero_usuario: str) -> Dict[str, str]:
try:
rows = self._execute(
"SELECT chave, valor FROM aprendizados WHERE numero_usuario = ?",
(numero_usuario,)
)
return {r['chave']: r['valor'] for r in rows} if rows else {}
except:
return {}
def recuperar_aprendizado_detalhado(self, numero_usuario: str, chave: str) -> Optional[str]:
"""Recupera um aprendizado detalhado específico."""
try:
rows = self._execute(
"SELECT valor FROM aprendizados WHERE numero_usuario = ? AND chave = ?",
(numero_usuario, chave)
)
return rows[0]['valor'] if rows else None
except:
return None
def recuperar_girias_usuario(self, numero_usuario: str) -> List[Dict[str, Any]]:
"""Recupera gírias aprendidas pelo usuário."""
try:
# Assuming girias are stored in aprendizados with chave starting with 'giria_'
rows = self._execute(
"SELECT chave, valor FROM aprendizados WHERE numero_usuario = ? AND chave LIKE 'giria_%'",
(numero_usuario,)
)
girias = []
for row in rows:
chave = row['chave']
valor = row['valor']
try:
# Try to parse as JSON
data = json.loads(valor)
girias.append({
'giria': chave.replace('giria_', ''),
'significado': data.get('significado', valor),
'frequencia': data.get('frequencia', 1)
})
except:
girias.append({
'giria': chave.replace('giria_', ''),
'significado': valor,
'frequencia': 1
})
return girias
except:
return []
def get_stats(self) -> Dict[str, int]:
try:
total_msgs = self._execute("SELECT COUNT(*) FROM mensagens")
total_users = self._execute("SELECT COUNT(DISTINCT numero) FROM mensagens")
total_interacoes = self._execute("SELECT COUNT(*) FROM historico_interacoes")
return {
'mensagens': total_msgs[0][0] if total_msgs else 0,
'usuarios': total_users[0][0] if total_users else 0,
'interacoes_neurais': total_interacoes[0][0] if total_interacoes else 0
}
except:
return {'mensagens': 0, 'usuarios': 0, 'interacoes_neurais': 0}
# ================================================================
# CONTEXT ISOLATION - NOVOS MÉTODOS
# ================================================================
def _ensure_context_isolation_tables(self):
"""Garante criação das tabelas de isolamento de contexto."""
conn = self._pool._get_connection()
c = conn.cursor()
# Tabela de contextos isolados
c.execute("""
CREATE TABLE IF NOT EXISTS contextos_isolados (
id INTEGER PRIMARY KEY AUTOINCREMENT,
context_id TEXT UNIQUE,
numero_usuario TEXT,
grupo_id TEXT,
tipo_conversa TEXT,
short_memory TEXT,
estado_emocional TEXT DEFAULT 'neutral',
nivel_intimidade INTEGER DEFAULT 1,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (context_id)
)
""")
# Tabela de embeddings vetoriais (para memória persistente)
c.execute("""
CREATE TABLE IF NOT EXISTS embeddings_vectoriais (
id INTEGER PRIMARY KEY AUTOINCREMENT,
conversation_id TEXT,
mensagem TEXT,
embedding TEXT,
embedding_model TEXT,
tipo TEXT DEFAULT 'mensagem',
metadata TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
# Índices para performance
c.execute("CREATE INDEX IF NOT EXISTS idx_ci_context_id ON contextos_isolados(context_id)")
c.execute("CREATE INDEX IF NOT EXISTS idx_ci_numero ON contextos_isolados(numero_usuario)")
c.execute("CREATE INDEX IF NOT EXISTS idx_ci_grupo ON contextos_isolados(grupo_id)")
c.execute("CREATE INDEX IF NOT EXISTS idx_ev_conversation ON embeddings_vectoriais(conversation_id)")
c.execute("CREATE INDEX IF NOT EXISTS idx_ev_tipo ON embeddings_vectoriais(tipo)")
conn.commit()
logger.debug("✅ Tabelas de isolamento de contexto criadas/verificadas")
def salvar_contexto_isolado(self, context_data: Dict[str, Any]) -> bool:
"""
Salva contexto isolado no banco.
Args:
context_data: Dicionário com dados do ConversationContext
Returns:
True se salvo com sucesso
"""
try:
self._ensure_context_isolation_tables()
self._execute(
"""INSERT OR REPLACE INTO contextos_isolados
(context_id, numero_usuario, grupo_id, tipo_conversa, short_memory,
estado_emocional, nivel_intimidade, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)""",
(
context_data.get('context_id'),
context_data.get('numero_usuario'),
context_data.get('grupo_id'),
context_data.get('tipo_conversa'),
json.dumps(context_data.get('short_memory', [])),
context_data.get('estado_emocional', 'neutral'),
context_data.get('nivel_intimidade', 1)
)
)
return True
except Exception as e:
logger.warning(f"Erro ao salvar contexto isolado: {e}")
return False
def recuperar_contexto_isolado(self, context_id: str) -> Optional[Dict[str, Any]]:
"""
Recupera contexto isolado pelo ID.
Args:
context_id: ID do contexto
Returns:
Dicionário com dados ou None
"""
try:
self._ensure_context_isolation_tables()
rows = self._execute(
"SELECT * FROM contextos_isolados WHERE context_id = ?",
(context_id,)
)
if rows:
row = rows[0]
return {
'context_id': row['context_id'],
'numero_usuario': row['numero_usuario'],
'grupo_id': row['grupo_id'],
'tipo_conversa': row['tipo_conversa'],
'short_memory': json.loads(row['short_memory']) if row['short_memory'] else [],
'estado_emocional': row['estado_emocional'],
'nivel_intimidade': row['nivel_intimidade'],
'created_at': row['created_at'],
'updated_at': row['updated_at']
}
return None
except Exception as e:
logger.warning(f"Erro ao recuperar contexto isolado: {e}")
return None
def deletar_contexto_isolado(self, context_id: str) -> bool:
"""
Deleta contexto isolado.
Args:
context_id: ID do contexto
Returns:
True se deletado com sucesso
"""
try:
self._ensure_context_isolation_tables()
# Deleta contexto
self._execute(
"DELETE FROM contextos_isolados WHERE context_id = ?",
(context_id,)
)
# Deleta embeddings associados
self._execute(
"DELETE FROM embeddings_vectoriais WHERE conversation_id = ?",
(context_id,)
)
return True
except Exception as e:
logger.warning(f"Erro ao deletar contexto isolado: {e}")
return False
def listar_contextos_usuario(self, numero_usuario: str) -> List[Dict[str, Any]]:
"""
Lista todos os contextos de um usuário.
Args:
numero_usuario: Número do usuário
Returns:
Lista de contextos
"""
try:
self._ensure_context_isolation_tables()
rows = self._execute(
"SELECT * FROM contextos_isolados WHERE numero_usuario = ? ORDER BY updated_at DESC",
(numero_usuario,)
)
contexts = []
for row in rows:
contexts.append({
'context_id': row['context_id'],
'numero_usuario': row['numero_usuario'],
'grupo_id': row['grupo_id'],
'tipo_conversa': row['tipo_conversa'],
'short_memory': json.loads(row['short_memory']) if row['short_memory'] else [],
'estado_emocional': row['estado_emocional'],
'nivel_intimidade': row['nivel_intimidade'],
'created_at': row['created_at'],
'updated_at': row['updated_at']
})
return contexts
except Exception as e:
logger.warning(f"Erro ao listar contextos do usuário: {e}")
return []
def salvar_embedding_vetorial(
self,
conversation_id: str,
mensagem: str,
embedding: List[float],
embedding_model: str = "paraphrase-multilingual-MiniLM-L12-v2",
tipo: str = "mensagem",
metadata: Optional[Dict[str, Any]] = None
) -> bool:
"""
Salva embedding vetorial para memória persistente.
Args:
conversation_id: ID da conversa isolada
mensagem: Texto original
embedding: Vetor de embedding
embedding_model: Modelo usado
tipo: Tipo de embedding
metadata: Metadados adicionais
Returns:
True se salvo com sucesso
"""
try:
self._ensure_context_isolation_tables()
# Convert numpy array to list if necessary
if NUMPY_AVAILABLE and isinstance(embedding, np.ndarray):
embedding = embedding.tolist()
# Also handle nested numpy arrays
elif isinstance(embedding, (list, tuple)):
embedding = [
float(x) if NUMPY_AVAILABLE and isinstance(x, (np.number, np.ndarray)) else x
for x in embedding
]
embedding_json = json.dumps(embedding)
metadata_json = json.dumps(metadata) if metadata else None
self._execute(
"""INSERT INTO embeddings_vectoriais
(conversation_id, mensagem, embedding, embedding_model, tipo, metadata)
VALUES (?, ?, ?, ?, ?, ?)""",
(conversation_id, mensagem[:500], embedding_json, embedding_model, tipo, metadata_json)
)
return True
except Exception as e:
logger.warning(f"Erro ao salvar embedding vetorial: {e}")
return False
def buscar_similaridade_vetorial(
self,
conversation_id: str,
query_embedding: List[float],
top_k: int = 5
) -> List[Dict[str, Any]]:
"""
Busca embeddings similares (similaridade por cosseno).
Args:
conversation_id: ID da conversa
query_embedding: Vetor de query
top_k: Quantidade de resultados
Returns:
Lista de resultados ordenados por similaridade
"""
try:
self._ensure_context_isolation_tables()
# Busca todos os embeddings da conversa
rows = self._execute(
"SELECT * FROM embeddings_vectoriais WHERE conversation_id = ?",
(conversation_id,)
)
if not rows:
return []
# Calcula similaridade por cosseno
results = []
for row in rows:
try:
stored_embedding = json.loads(row['embedding'])
similarity = self._cosine_similarity(query_embedding, stored_embedding)
results.append({
'mensagem': row['mensagem'],
'similarity': similarity,
'tipo': row['tipo'],
'metadata': json.loads(row['metadata']) if row['metadata'] else {},
'created_at': row['created_at']
})
except Exception:
continue
# Ordena por similaridade e limita
results.sort(key=lambda x: x['similarity'], reverse=True)
return results[:top_k]
except Exception as e:
logger.warning(f"Erro ao buscar similaridade vetorial: {e}")
return []
def _cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float:
"""Calcula similaridade por cosseno entre dois vetores."""
try:
if not vec1 or not vec2:
return 0.0
import math
# Converte para float
v1 = [float(x) for x in vec1]
v2 = [float(x) for x in vec2]
# Garante mesmo tamanho
min_len = min(len(v1), len(v2))
v1 = v1[:min_len]
v2 = v2[:min_len]
# Produto escalar
dot_product = sum(a * b for a, b in zip(v1, v2))
# Normas
norm1 = math.sqrt(sum(a * a for a in v1))
norm2 = math.sqrt(sum(a * a for a in v2))
if norm1 == 0 or norm2 == 0:
return 0.0
return dot_product / (norm1 * norm2)
except Exception:
return 0.0
def atualizar_short_memory_contexto(
self,
context_id: str,
short_memory: List[Dict[str, Any]]
) -> bool:
"""
Atualiza apenas a memória de curto prazo de um contexto.
Args:
context_id: ID do contexto
short_memory: Nova lista de mensagens
Returns:
True se atualizado com sucesso
"""
try:
self._ensure_context_isolation_tables()
self._execute(
"""UPDATE contextos_isolados
SET short_memory = ?, updated_at = CURRENT_TIMESTAMP
WHERE context_id = ?""",
(json.dumps(short_memory), context_id)
)
return True
except Exception as e:
logger.warning(f"Erro ao atualizar short memory: {e}")
return False
def get_conversation_stats(self, conversation_id: str) -> Dict[str, Any]:
"""
Retorna estatísticas de uma conversa isolada.
Args:
conversation_id: ID da conversa
Returns:
Dicionário com estatísticas
"""
try:
self._ensure_context_isolation_tables()
# Dados do contexto
context = self.recuperar_contexto_isolado(conversation_id)
# Contagem de embeddings
embedding_count = self._execute(
"SELECT COUNT(*) FROM embeddings_vectoriais WHERE conversation_id = ?",
(conversation_id,)
)
return {
'context_exists': context is not None,
'numero_usuario': context.get('numero_usuario') if context else None,
'tipo_conversa': context.get('tipo_conversa') if context else None,
'grupo_id': context.get('grupo_id') if context else None,
'short_memory_count': len(context.get('short_memory', [])) if context else 0,
'nivel_intimidade': context.get('nivel_intimidade', 1) if context else 1,
'estado_emocional': context.get('estado_emocional', 'neutral') if context else 'neutral',
'embeddings_count': embedding_count[0][0] if embedding_count else 0,
'updated_at': context.get('updated_at') if context else None
}
except Exception as e:
logger.warning(f"Erro ao obter stats da conversa: {e}")
return {}
def get_database() -> Database:
return Database()
def init_pool():
global _db_pool
if _db_pool is None:
from .config import DB_PATH
_db_pool = DatabasePool(DB_PATH)
logger.info("✅ Database pool inicializado")
return _db_pool