# type: ignore """ ================================================================================ AKIRA V21 ULTIMATE - CONTEXT ISOLATION MODULE ================================================================================ Sistema de isolamento de contexto entre conversas (PV e Grupos). Garante que contexto de um grupo não vaze para outro ou para PVs. Features: - Context ID único por combinação (usuário + tipo + grupo) - Salt criptográfico para prevenir guessing - CRUD completo para contextos isolados - Integração com Database para persistência - Suporte a migração de dados existentes ================================================================================ """ import os import sys import hashlib import time import json import logging from pathlib import Path from typing import Optional, Dict, Any, List, Tuple from dataclasses import dataclass, field, asdict from datetime import datetime # Imports robustos com fallback - CORRIGIDO para usar modules. try: import modules.config as config from .database import Database CONTEXT_ISOLATION_AVAILABLE = True except ImportError: try: from . import config from .database import Database CONTEXT_ISOLATION_AVAILABLE = True except ImportError: CONTEXT_ISOLATION_AVAILABLE = False config = None Database = None logger = logging.getLogger(__name__) # ============================================================ # CONFIGURAÇÃO DE ISOLAMENTO # ============================================================ # Salt para geração de context_id (muda a cada deployment) CONTEXT_SALT: str = os.getenv("CONTEXT_SALT", "AKIRA_V21_CONTEXT_ISOLATION_v1") # Versão do esquema de isolamento (para migrações) SCHEMA_VERSION: int = 1 @dataclass class ConversationContext: """ Contexto isolado para uma conversa específica (PV ou Grupo). Attributes: context_id: Identificador único (hash de tipo + numero + grupo) numero_usuario: Número do usuário grupo_id: ID do grupo (None para PV) tipo_conversa: "pv" ou "grupo" short_memory: Lista de mensagens de curto prazo (max 100) estado_emocional: Estado emocional atual nivel_intimidade: Nível de intimidade (1-3) created_at: Timestamp de criação last_interaction: Timestamp da última interação metadata: Metadados adicionais """ context_id: str numero_usuario: str grupo_id: Optional[str] = None tipo_conversa: str = "pv" short_memory: List[Dict[str, Any]] = field(default_factory=list) estado_emocional: str = "neutral" nivel_intimidade: int = 1 created_at: float = field(default_factory=time.time) last_interaction: float = field(default_factory=time.time) metadata: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: """Converte para dicionário serializável.""" return asdict(self) @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'ConversationContext': """Cria instância a partir de dicionário.""" return cls(**data) @property def is_grupo(self) -> bool: """Retorna True se for conversa em grupo.""" return self.tipo_conversa == "grupo" @property def display_name(self) -> str: """Nome de exibição do contexto.""" if self.is_grupo: return f"Grupo {self.grupo_id or 'desconhecido'}" return f"PV {self.numero_usuario}" # ============================================================ # FUNÇÕES DE GERAÇÃO DE CONTEXT ID # ============================================================ def generate_context_id( numero_usuario: str, tipo_conversa: str, grupo_id: Optional[str] = None ) -> str: """ Gera ID único e criptográfico para uma conversa. Args: numero_usuario: Número de telefone do usuário tipo_conversa: "pv" ou "grupo" grupo_id: ID do grupo (opcional) Returns: String de 64 caracteres (SHA256 hash) """ # Limpa inputs numero_clean = ''.join(filter(str.isdigit, str(numero_usuario))) or "unknown" tipo_clean = str(tipo_conversa).lower().strip() grupo_clean = ''.join(filter(str.isdigit, str(grupo_id))) if grupo_id else "pv" # Monta raw string raw = f"{CONTEXT_SALT}:{tipo_clean}:{numero_clean}:{grupo_clean}:{int(time.time() // 86400)}" # Gera hash hash_obj = hashlib.sha256(raw.encode('utf-8')) return hash_obj.hexdigest() def validate_context_id(context_id: str) -> bool: """ Valida formato de context_id. Args: context_id: ID a ser validado Returns: True se formato válido """ if not context_id or not isinstance(context_id, str): return False # SHA256 hex = 64 caracteres return len(context_id) == 64 and all(c in '0123456789abcdef' for c in context_id) # ============================================================ # CLASSE PRINCIPAL DE ISOLAMENTO # ============================================================ class ContextIsolationManager: """ Gerenciador de isolamento de contexto. Provides: - Criação e gestão de contextos isolados - Persistência em banco de dados - Migração de dados legados - Estatísticas e debugging """ _instance = None _lock = None def __new__(cls): if cls._instance is None: cls._lock = __import__('threading').Lock() with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self._db: Optional[Database] = None self._contexts_cache: Dict[str, ConversationContext] = {} self._initialized = True # Logger if CONTEXT_ISOLATION_AVAILABLE and config: logger.info("✅ ContextIsolationManager inicializado") else: print("[WARN] ContextIsolationManager: config/database não disponíveis") def _get_db(self) -> Database: """Obtém instância do banco de dados.""" if self._db is None: if Database: try: from .config import DB_PATH self._db = Database(DB_PATH) except ImportError: self._db = Database() else: raise RuntimeError("Database não disponível") return self._db # ============================================================ # CRIAÇÃO E GESTÃO DE CONTEXTOS # ============================================================ def get_or_create_context( self, numero_usuario: str, tipo_conversa: str, grupo_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None ) -> ConversationContext: """ Obtém contexto existente ou cria novo. Args: numero_usuario: Número do usuário tipo_conversa: "pv" ou "grupo" grupo_id: ID do grupo (None para PV) metadata: Metadados opcionais para novo contexto Returns: ConversationContext instance """ context_id = generate_context_id(numero_usuario, tipo_conversa, grupo_id) # Verifica cache if context_id in self._contexts_cache: ctx = self._contexts_cache[context_id] ctx.last_interaction = time.time() return ctx # Tenta carregar do banco db = self._get_db() ctx_data = db.recuperar_contexto_isolado(context_id) if ctx_data: ctx = ConversationContext.from_dict(ctx_data) else: # Cria novo contexto ctx = ConversationContext( context_id=context_id, numero_usuario=numero_usuario, grupo_id=grupo_id, tipo_conversa=tipo_conversa, metadata=metadata or {} ) # Salva no banco self._save_context(ctx) # Atualiza cache ctx.last_interaction = time.time() self._contexts_cache[context_id] = ctx return ctx def get_context( self, numero_usuario: str, tipo_conversa: str, grupo_id: Optional[str] = None ) -> Optional[ConversationContext]: """ Obtém contexto existente (não cria novo). Args: numero_usuario: Número do usuário tipo_conversa: "pv" ou "grupo" grupo_id: ID do grupo Returns: ConversationContext ou None se não existir """ context_id = generate_context_id(numero_usuario, tipo_conversa, grupo_id) # Verifica cache if context_id in self._contexts_cache: return self._contexts_cache[context_id] # Busca no banco db = self._get_db() ctx_data = db.recuperar_contexto_isolado(context_id) if ctx_data: ctx = ConversationContext.from_dict(ctx_data) self._contexts_cache[context_id] = ctx return ctx return None def _save_context(self, context: ConversationContext) -> bool: """Salva contexto no banco de dados.""" try: db = self._get_db() return db.salvar_contexto_isolado(context.to_dict()) except Exception as e: logger.warning(f"Falha ao salvar contexto: {e}") return False def save_context(self, context: ConversationContext) -> bool: """Salva contexto e atualiza cache.""" context.last_interaction = time.time() self._contexts_cache[context.context_id] = context return self._save_context(context) def delete_context(self, context_id: str) -> bool: """ Remove contexto isolado. Args: context_id: ID do contexto a remover Returns: True se removido com sucesso """ if not validate_context_id(context_id): logger.warning(f"Context ID inválido: {context_id}") return False # Remove do cache if context_id in self._contexts_cache: del self._contexts_cache[context_id] # Remove do banco try: db = self._get_db() return db.deletar_contexto_isolado(context_id) except Exception as e: logger.warning(f"Falha ao deletar contexto: {e}") return False # ============================================================ # GESTÃO DE MEMÓRIA DE CURTO PRAZO # ============================================================ def add_message_to_context( self, context: ConversationContext, role: str, content: str, importancia: float = 1.0, emocao: str = "neutral", reply_info: Optional[Dict[str, Any]] = None ) -> None: """ Adiciona mensagem à memória de curto prazo do contexto. Args: context: ConversationContext role: "user" ou "assistant" content: Texto da mensagem importancia: Peso da mensagem (1.0 = normal, >1.0 = reply) emocao: Emoção detectada reply_info: Info adicional se for reply """ MAX_MESSAGES = 100 # Configurado pelo usuário message_entry = { "role": role, "content": content, "timestamp": time.time(), "importancia": importancia, "emocao": emocao, "reply_info": reply_info or {} } # Adiciona à lista context.short_memory.append(message_entry) # Sliding window - remove mensagens antigas if len(context.short_memory) > MAX_MESSAGES: context.short_memory = context.short_memory[-MAX_MESSAGES:] # Atualiza timestamp context.last_interaction = time.time() # Salva no banco self.save_context(context) def get_context_window( self, context: ConversationContext, include_replies: bool = True, prioritize_replies: bool = True, max_messages: int = 100 ) -> List[Dict[str, Any]]: """ Obtém janela de contexto com prioridade para replies. Args: context: ConversationContext include_replies: Se deve incluir mensagens de reply prioritize_replies: Se deve dar prioridade a replies max_messages: Máximo de mensagens a retornar Returns: Lista de mensagens ordenadas por importância """ messages = context.short_memory.copy() if not messages: return [] # Filtra replies se necessário if not include_replies: messages = [m for m in messages if not m.get('reply_info', {})] # Ordena por importância (replies primeiro) if prioritize_replies: messages.sort(key=lambda x: x.get('importancia', 1.0), reverse=True) # Limita quantidade return messages[:max_messages] def clear_context_memory(self, context: ConversationContext) -> bool: """ Limpa memória de curto prazo do contexto. Args: context: ConversationContext Returns: True se limpo com sucesso """ context.short_memory = [] context.last_interaction = time.time() return self.save_context(context) # ============================================================ # LISTAGEM E ESTATÍSTICAS # ============================================================ def list_user_contexts(self, numero_usuario: str) -> List[ConversationContext]: """ Lista todos os contextos de um usuário. Args: numero_usuario: Número do usuário Returns: Lista de ConversationContext """ try: db = self._get_db() contexts_data = db.listar_contextos_usuario(numero_usuario) contexts = [] for data in contexts_data: ctx = ConversationContext.from_dict(data) # Atualiza cache self._contexts_cache[ctx.context_id] = ctx contexts.append(ctx) return contexts except Exception as e: logger.warning(f"Erro ao listar contextos: {e}") return [] def get_stats(self) -> Dict[str, Any]: """ Retorna estatísticas do sistema de isolamento. Returns: Dicionário com estatísticas """ return { "cached_contexts": len(self._contexts_cache), "schema_version": SCHEMA_VERSION, "context_salt_set": bool(os.getenv("CONTEXT_SALT")), "max_messages_per_context": 100 } # ============================================================ # MIGRAÇÃO DE DADOS LEGADOS # ============================================================ def migrate_legacy_context( self, numero_usuario: str, grupo_id: Optional[str] = None, tipo_conversa: str = "pv" ) -> Optional[ConversationContext]: """ Migra contexto legado para novo sistema isolado. Args: numero_usuario: Número do usuário grupo_id: ID do grupo tipo_conversa: Tipo da conversa Returns: ConversationContext migrado ou None """ # Verifica se contexto já existe existing = self.get_context(numero_usuario, tipo_conversa, grupo_id) if existing: return existing # Já migrado # Cria novo contexto context = self.get_or_create_context(numero_usuario, tipo_conversa, grupo_id) logger.info(f"📦 Contexto migrado: {context.display_name}") return context # ============================================================ # FUNÇÕES DE COMPATIBILIDADE # ============================================================ def get_isolation_manager() -> ContextIsolationManager: """Obtém instância singleton do gerenciador.""" return ContextIsolationManager() def criar_contexto_isolado( numero_usuario: str, tipo_conversa: str, grupo_id: Optional[str] = None ) -> ConversationContext: """ Factory function para criar contexto isolado. Args: numero_usuario: Número do usuário tipo_conversa: "pv" ou "grupo" grupo_id: ID do grupo (None para PV) Returns: ConversationContext instance """ manager = get_isolation_manager() return manager.get_or_create_context(numero_usuario, tipo_conversa, grupo_id) # ============================================================ # HELPER PARA API # ============================================================ def extrair_conversation_id_do_request(data: Dict[str, Any]) -> Tuple[str, str, Optional[str]]: """ Extrai parâmetros para conversation_id de um request da API. Args: data: Payload do request (dict) Returns: Tupla (numero_usuario, tipo_conversa, grupo_id) """ numero_usuario = data.get('numero', 'anonimo') or 'anonimo' tipo_conversa = data.get('tipo_conversa', 'pv') # Para mensagens de grupo, grupo_id vem em campos diferentes grupo_id = data.get('grupo_id') or data.get('contexto_grupo') return numero_usuario, tipo_conversa, grupo_id # type: ignore