akira-index / modules /context_isolation.py
akra35567's picture
Upload 20 files
d3a1a58 verified
# type: ignore
"""
================================================================================
AKIRA V21 ULTIMATE - CONTEXT ISOLATION MODULE
================================================================================
Sistema de isolamento de contexto entre conversas (PV e Grupos).
Garante que contexto de um grupo não vaze para outro ou para PVs.
Features:
- Context ID único por combinação (usuário + tipo + grupo)
- Salt criptográfico para prevenir guessing
- CRUD completo para contextos isolados
- Integração com Database para persistência
- Suporte a migração de dados existentes
================================================================================
"""
import os
import sys
import hashlib
import time
import json
import logging
from pathlib import Path
from typing import Optional, Dict, Any, List, Tuple
from dataclasses import dataclass, field, asdict
from datetime import datetime
# Imports robustos com fallback - CORRIGIDO para usar modules.
try:
import modules.config as config
from .database import Database
CONTEXT_ISOLATION_AVAILABLE = True
except ImportError:
try:
from . import config
from .database import Database
CONTEXT_ISOLATION_AVAILABLE = True
except ImportError:
CONTEXT_ISOLATION_AVAILABLE = False
config = None
Database = None
logger = logging.getLogger(__name__)
# ============================================================
# CONFIGURAÇÃO DE ISOLAMENTO
# ============================================================
# Salt para geração de context_id (muda a cada deployment)
CONTEXT_SALT: str = os.getenv("CONTEXT_SALT", "AKIRA_V21_CONTEXT_ISOLATION_v1")
# Versão do esquema de isolamento (para migrações)
SCHEMA_VERSION: int = 1
@dataclass
class ConversationContext:
"""
Contexto isolado para uma conversa específica (PV ou Grupo).
Attributes:
context_id: Identificador único (hash de tipo + numero + grupo)
numero_usuario: Número do usuário
grupo_id: ID do grupo (None para PV)
tipo_conversa: "pv" ou "grupo"
short_memory: Lista de mensagens de curto prazo (max 100)
estado_emocional: Estado emocional atual
nivel_intimidade: Nível de intimidade (1-3)
created_at: Timestamp de criação
last_interaction: Timestamp da última interação
metadata: Metadados adicionais
"""
context_id: str
numero_usuario: str
grupo_id: Optional[str] = None
tipo_conversa: str = "pv"
short_memory: List[Dict[str, Any]] = field(default_factory=list)
estado_emocional: str = "neutral"
nivel_intimidade: int = 1
created_at: float = field(default_factory=time.time)
last_interaction: float = field(default_factory=time.time)
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
"""Converte para dicionário serializável."""
return asdict(self)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ConversationContext':
"""Cria instância a partir de dicionário."""
return cls(**data)
@property
def is_grupo(self) -> bool:
"""Retorna True se for conversa em grupo."""
return self.tipo_conversa == "grupo"
@property
def display_name(self) -> str:
"""Nome de exibição do contexto."""
if self.is_grupo:
return f"Grupo {self.grupo_id or 'desconhecido'}"
return f"PV {self.numero_usuario}"
# ============================================================
# FUNÇÕES DE GERAÇÃO DE CONTEXT ID
# ============================================================
def generate_context_id(
numero_usuario: str,
tipo_conversa: str,
grupo_id: Optional[str] = None
) -> str:
"""
Gera ID único e criptográfico para uma conversa.
Args:
numero_usuario: Número de telefone do usuário
tipo_conversa: "pv" ou "grupo"
grupo_id: ID do grupo (opcional)
Returns:
String de 64 caracteres (SHA256 hash)
"""
# Limpa inputs
numero_clean = ''.join(filter(str.isdigit, str(numero_usuario))) or "unknown"
tipo_clean = str(tipo_conversa).lower().strip()
grupo_clean = ''.join(filter(str.isdigit, str(grupo_id))) if grupo_id else "pv"
# Monta raw string
raw = f"{CONTEXT_SALT}:{tipo_clean}:{numero_clean}:{grupo_clean}:{int(time.time() // 86400)}"
# Gera hash
hash_obj = hashlib.sha256(raw.encode('utf-8'))
return hash_obj.hexdigest()
def validate_context_id(context_id: str) -> bool:
"""
Valida formato de context_id.
Args:
context_id: ID a ser validado
Returns:
True se formato válido
"""
if not context_id or not isinstance(context_id, str):
return False
# SHA256 hex = 64 caracteres
return len(context_id) == 64 and all(c in '0123456789abcdef' for c in context_id)
# ============================================================
# CLASSE PRINCIPAL DE ISOLAMENTO
# ============================================================
class ContextIsolationManager:
"""
Gerenciador de isolamento de contexto.
Provides:
- Criação e gestão de contextos isolados
- Persistência em banco de dados
- Migração de dados legados
- Estatísticas e debugging
"""
_instance = None
_lock = None
def __new__(cls):
if cls._instance is None:
cls._lock = __import__('threading').Lock()
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._db: Optional[Database] = None
self._contexts_cache: Dict[str, ConversationContext] = {}
self._initialized = True
# Logger
if CONTEXT_ISOLATION_AVAILABLE and config:
logger.info("✅ ContextIsolationManager inicializado")
else:
print("[WARN] ContextIsolationManager: config/database não disponíveis")
def _get_db(self) -> Database:
"""Obtém instância do banco de dados."""
if self._db is None:
if Database:
try:
from .config import DB_PATH
self._db = Database(DB_PATH)
except ImportError:
self._db = Database()
else:
raise RuntimeError("Database não disponível")
return self._db
# ============================================================
# CRIAÇÃO E GESTÃO DE CONTEXTOS
# ============================================================
def get_or_create_context(
self,
numero_usuario: str,
tipo_conversa: str,
grupo_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
) -> ConversationContext:
"""
Obtém contexto existente ou cria novo.
Args:
numero_usuario: Número do usuário
tipo_conversa: "pv" ou "grupo"
grupo_id: ID do grupo (None para PV)
metadata: Metadados opcionais para novo contexto
Returns:
ConversationContext instance
"""
context_id = generate_context_id(numero_usuario, tipo_conversa, grupo_id)
# Verifica cache
if context_id in self._contexts_cache:
ctx = self._contexts_cache[context_id]
ctx.last_interaction = time.time()
return ctx
# Tenta carregar do banco
db = self._get_db()
ctx_data = db.recuperar_contexto_isolado(context_id)
if ctx_data:
ctx = ConversationContext.from_dict(ctx_data)
else:
# Cria novo contexto
ctx = ConversationContext(
context_id=context_id,
numero_usuario=numero_usuario,
grupo_id=grupo_id,
tipo_conversa=tipo_conversa,
metadata=metadata or {}
)
# Salva no banco
self._save_context(ctx)
# Atualiza cache
ctx.last_interaction = time.time()
self._contexts_cache[context_id] = ctx
return ctx
def get_context(
self,
numero_usuario: str,
tipo_conversa: str,
grupo_id: Optional[str] = None
) -> Optional[ConversationContext]:
"""
Obtém contexto existente (não cria novo).
Args:
numero_usuario: Número do usuário
tipo_conversa: "pv" ou "grupo"
grupo_id: ID do grupo
Returns:
ConversationContext ou None se não existir
"""
context_id = generate_context_id(numero_usuario, tipo_conversa, grupo_id)
# Verifica cache
if context_id in self._contexts_cache:
return self._contexts_cache[context_id]
# Busca no banco
db = self._get_db()
ctx_data = db.recuperar_contexto_isolado(context_id)
if ctx_data:
ctx = ConversationContext.from_dict(ctx_data)
self._contexts_cache[context_id] = ctx
return ctx
return None
def _save_context(self, context: ConversationContext) -> bool:
"""Salva contexto no banco de dados."""
try:
db = self._get_db()
return db.salvar_contexto_isolado(context.to_dict())
except Exception as e:
logger.warning(f"Falha ao salvar contexto: {e}")
return False
def save_context(self, context: ConversationContext) -> bool:
"""Salva contexto e atualiza cache."""
context.last_interaction = time.time()
self._contexts_cache[context.context_id] = context
return self._save_context(context)
def delete_context(self, context_id: str) -> bool:
"""
Remove contexto isolado.
Args:
context_id: ID do contexto a remover
Returns:
True se removido com sucesso
"""
if not validate_context_id(context_id):
logger.warning(f"Context ID inválido: {context_id}")
return False
# Remove do cache
if context_id in self._contexts_cache:
del self._contexts_cache[context_id]
# Remove do banco
try:
db = self._get_db()
return db.deletar_contexto_isolado(context_id)
except Exception as e:
logger.warning(f"Falha ao deletar contexto: {e}")
return False
# ============================================================
# GESTÃO DE MEMÓRIA DE CURTO PRAZO
# ============================================================
def add_message_to_context(
self,
context: ConversationContext,
role: str,
content: str,
importancia: float = 1.0,
emocao: str = "neutral",
reply_info: Optional[Dict[str, Any]] = None
) -> None:
"""
Adiciona mensagem à memória de curto prazo do contexto.
Args:
context: ConversationContext
role: "user" ou "assistant"
content: Texto da mensagem
importancia: Peso da mensagem (1.0 = normal, >1.0 = reply)
emocao: Emoção detectada
reply_info: Info adicional se for reply
"""
MAX_MESSAGES = 100 # Configurado pelo usuário
message_entry = {
"role": role,
"content": content,
"timestamp": time.time(),
"importancia": importancia,
"emocao": emocao,
"reply_info": reply_info or {}
}
# Adiciona à lista
context.short_memory.append(message_entry)
# Sliding window - remove mensagens antigas
if len(context.short_memory) > MAX_MESSAGES:
context.short_memory = context.short_memory[-MAX_MESSAGES:]
# Atualiza timestamp
context.last_interaction = time.time()
# Salva no banco
self.save_context(context)
def get_context_window(
self,
context: ConversationContext,
include_replies: bool = True,
prioritize_replies: bool = True,
max_messages: int = 100
) -> List[Dict[str, Any]]:
"""
Obtém janela de contexto com prioridade para replies.
Args:
context: ConversationContext
include_replies: Se deve incluir mensagens de reply
prioritize_replies: Se deve dar prioridade a replies
max_messages: Máximo de mensagens a retornar
Returns:
Lista de mensagens ordenadas por importância
"""
messages = context.short_memory.copy()
if not messages:
return []
# Filtra replies se necessário
if not include_replies:
messages = [m for m in messages if not m.get('reply_info', {})]
# Ordena por importância (replies primeiro)
if prioritize_replies:
messages.sort(key=lambda x: x.get('importancia', 1.0), reverse=True)
# Limita quantidade
return messages[:max_messages]
def clear_context_memory(self, context: ConversationContext) -> bool:
"""
Limpa memória de curto prazo do contexto.
Args:
context: ConversationContext
Returns:
True se limpo com sucesso
"""
context.short_memory = []
context.last_interaction = time.time()
return self.save_context(context)
# ============================================================
# LISTAGEM E ESTATÍSTICAS
# ============================================================
def list_user_contexts(self, numero_usuario: str) -> List[ConversationContext]:
"""
Lista todos os contextos de um usuário.
Args:
numero_usuario: Número do usuário
Returns:
Lista de ConversationContext
"""
try:
db = self._get_db()
contexts_data = db.listar_contextos_usuario(numero_usuario)
contexts = []
for data in contexts_data:
ctx = ConversationContext.from_dict(data)
# Atualiza cache
self._contexts_cache[ctx.context_id] = ctx
contexts.append(ctx)
return contexts
except Exception as e:
logger.warning(f"Erro ao listar contextos: {e}")
return []
def get_stats(self) -> Dict[str, Any]:
"""
Retorna estatísticas do sistema de isolamento.
Returns:
Dicionário com estatísticas
"""
return {
"cached_contexts": len(self._contexts_cache),
"schema_version": SCHEMA_VERSION,
"context_salt_set": bool(os.getenv("CONTEXT_SALT")),
"max_messages_per_context": 100
}
# ============================================================
# MIGRAÇÃO DE DADOS LEGADOS
# ============================================================
def migrate_legacy_context(
self,
numero_usuario: str,
grupo_id: Optional[str] = None,
tipo_conversa: str = "pv"
) -> Optional[ConversationContext]:
"""
Migra contexto legado para novo sistema isolado.
Args:
numero_usuario: Número do usuário
grupo_id: ID do grupo
tipo_conversa: Tipo da conversa
Returns:
ConversationContext migrado ou None
"""
# Verifica se contexto já existe
existing = self.get_context(numero_usuario, tipo_conversa, grupo_id)
if existing:
return existing # Já migrado
# Cria novo contexto
context = self.get_or_create_context(numero_usuario, tipo_conversa, grupo_id)
logger.info(f"📦 Contexto migrado: {context.display_name}")
return context
# ============================================================
# FUNÇÕES DE COMPATIBILIDADE
# ============================================================
def get_isolation_manager() -> ContextIsolationManager:
"""Obtém instância singleton do gerenciador."""
return ContextIsolationManager()
def criar_contexto_isolado(
numero_usuario: str,
tipo_conversa: str,
grupo_id: Optional[str] = None
) -> ConversationContext:
"""
Factory function para criar contexto isolado.
Args:
numero_usuario: Número do usuário
tipo_conversa: "pv" ou "grupo"
grupo_id: ID do grupo (None para PV)
Returns:
ConversationContext instance
"""
manager = get_isolation_manager()
return manager.get_or_create_context(numero_usuario, tipo_conversa, grupo_id)
# ============================================================
# HELPER PARA API
# ============================================================
def extrair_conversation_id_do_request(data: Dict[str, Any]) -> Tuple[str, str, Optional[str]]:
"""
Extrai parâmetros para conversation_id de um request da API.
Args:
data: Payload do request (dict)
Returns:
Tupla (numero_usuario, tipo_conversa, grupo_id)
"""
numero_usuario = data.get('numero', 'anonimo') or 'anonimo'
tipo_conversa = data.get('tipo_conversa', 'pv')
# Para mensagens de grupo, grupo_id vem em campos diferentes
grupo_id = data.get('grupo_id') or data.get('contexto_grupo')
return numero_usuario, tipo_conversa, grupo_id
# type: ignore