Spaces:
Running
Running
| # type: ignore | |
| """ | |
| ================================================================================ | |
| AKIRA V21 ULTIMATE - CONTEXT ISOLATION MODULE | |
| ================================================================================ | |
| Sistema de isolamento de contexto entre conversas (PV e Grupos). | |
| Garante que contexto de um grupo não vaze para outro ou para PVs. | |
| Features: | |
| - Context ID único por combinação (usuário + tipo + grupo) | |
| - Salt criptográfico para prevenir guessing | |
| - CRUD completo para contextos isolados | |
| - Integração com Database para persistência | |
| - Suporte a migração de dados existentes | |
| ================================================================================ | |
| """ | |
| import os | |
| import sys | |
| import hashlib | |
| import time | |
| import json | |
| import logging | |
| from pathlib import Path | |
| from typing import Optional, Dict, Any, List, Tuple | |
| from dataclasses import dataclass, field, asdict | |
| from datetime import datetime | |
| # Imports robustos com fallback - CORRIGIDO para usar modules. | |
| try: | |
| import modules.config as config | |
| from .database import Database | |
| CONTEXT_ISOLATION_AVAILABLE = True | |
| except ImportError: | |
| try: | |
| from . import config | |
| from .database import Database | |
| CONTEXT_ISOLATION_AVAILABLE = True | |
| except ImportError: | |
| CONTEXT_ISOLATION_AVAILABLE = False | |
| config = None | |
| Database = None | |
| logger = logging.getLogger(__name__) | |
| # ============================================================ | |
| # CONFIGURAÇÃO DE ISOLAMENTO | |
| # ============================================================ | |
| # Salt para geração de context_id (muda a cada deployment) | |
| CONTEXT_SALT: str = os.getenv("CONTEXT_SALT", "AKIRA_V21_CONTEXT_ISOLATION_v1") | |
| # Versão do esquema de isolamento (para migrações) | |
| SCHEMA_VERSION: int = 1 | |
| class ConversationContext: | |
| """ | |
| Contexto isolado para uma conversa específica (PV ou Grupo). | |
| Attributes: | |
| context_id: Identificador único (hash de tipo + numero + grupo) | |
| numero_usuario: Número do usuário | |
| grupo_id: ID do grupo (None para PV) | |
| tipo_conversa: "pv" ou "grupo" | |
| short_memory: Lista de mensagens de curto prazo (max 100) | |
| estado_emocional: Estado emocional atual | |
| nivel_intimidade: Nível de intimidade (1-3) | |
| created_at: Timestamp de criação | |
| last_interaction: Timestamp da última interação | |
| metadata: Metadados adicionais | |
| """ | |
| context_id: str | |
| numero_usuario: str | |
| grupo_id: Optional[str] = None | |
| tipo_conversa: str = "pv" | |
| short_memory: List[Dict[str, Any]] = field(default_factory=list) | |
| estado_emocional: str = "neutral" | |
| nivel_intimidade: int = 1 | |
| created_at: float = field(default_factory=time.time) | |
| last_interaction: float = field(default_factory=time.time) | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Converte para dicionário serializável.""" | |
| return asdict(self) | |
| def from_dict(cls, data: Dict[str, Any]) -> 'ConversationContext': | |
| """Cria instância a partir de dicionário.""" | |
| return cls(**data) | |
| def is_grupo(self) -> bool: | |
| """Retorna True se for conversa em grupo.""" | |
| return self.tipo_conversa == "grupo" | |
| def display_name(self) -> str: | |
| """Nome de exibição do contexto.""" | |
| if self.is_grupo: | |
| return f"Grupo {self.grupo_id or 'desconhecido'}" | |
| return f"PV {self.numero_usuario}" | |
| # ============================================================ | |
| # FUNÇÕES DE GERAÇÃO DE CONTEXT ID | |
| # ============================================================ | |
| def generate_context_id( | |
| numero_usuario: str, | |
| tipo_conversa: str, | |
| grupo_id: Optional[str] = None | |
| ) -> str: | |
| """ | |
| Gera ID único e criptográfico para uma conversa. | |
| Args: | |
| numero_usuario: Número de telefone do usuário | |
| tipo_conversa: "pv" ou "grupo" | |
| grupo_id: ID do grupo (opcional) | |
| Returns: | |
| String de 64 caracteres (SHA256 hash) | |
| """ | |
| # Limpa inputs | |
| numero_clean = ''.join(filter(str.isdigit, str(numero_usuario))) or "unknown" | |
| tipo_clean = str(tipo_conversa).lower().strip() | |
| grupo_clean = ''.join(filter(str.isdigit, str(grupo_id))) if grupo_id else "pv" | |
| # Monta raw string | |
| raw = f"{CONTEXT_SALT}:{tipo_clean}:{numero_clean}:{grupo_clean}:{int(time.time() // 86400)}" | |
| # Gera hash | |
| hash_obj = hashlib.sha256(raw.encode('utf-8')) | |
| return hash_obj.hexdigest() | |
| def validate_context_id(context_id: str) -> bool: | |
| """ | |
| Valida formato de context_id. | |
| Args: | |
| context_id: ID a ser validado | |
| Returns: | |
| True se formato válido | |
| """ | |
| if not context_id or not isinstance(context_id, str): | |
| return False | |
| # SHA256 hex = 64 caracteres | |
| return len(context_id) == 64 and all(c in '0123456789abcdef' for c in context_id) | |
| # ============================================================ | |
| # CLASSE PRINCIPAL DE ISOLAMENTO | |
| # ============================================================ | |
| class ContextIsolationManager: | |
| """ | |
| Gerenciador de isolamento de contexto. | |
| Provides: | |
| - Criação e gestão de contextos isolados | |
| - Persistência em banco de dados | |
| - Migração de dados legados | |
| - Estatísticas e debugging | |
| """ | |
| _instance = None | |
| _lock = None | |
| def __new__(cls): | |
| if cls._instance is None: | |
| cls._lock = __import__('threading').Lock() | |
| with cls._lock: | |
| if cls._instance is None: | |
| cls._instance = super().__new__(cls) | |
| cls._instance._initialized = False | |
| return cls._instance | |
| def __init__(self): | |
| if self._initialized: | |
| return | |
| self._db: Optional[Database] = None | |
| self._contexts_cache: Dict[str, ConversationContext] = {} | |
| self._initialized = True | |
| # Logger | |
| if CONTEXT_ISOLATION_AVAILABLE and config: | |
| logger.info("✅ ContextIsolationManager inicializado") | |
| else: | |
| print("[WARN] ContextIsolationManager: config/database não disponíveis") | |
| def _get_db(self) -> Database: | |
| """Obtém instância do banco de dados.""" | |
| if self._db is None: | |
| if Database: | |
| try: | |
| from .config import DB_PATH | |
| self._db = Database(DB_PATH) | |
| except ImportError: | |
| self._db = Database() | |
| else: | |
| raise RuntimeError("Database não disponível") | |
| return self._db | |
| # ============================================================ | |
| # CRIAÇÃO E GESTÃO DE CONTEXTOS | |
| # ============================================================ | |
| def get_or_create_context( | |
| self, | |
| numero_usuario: str, | |
| tipo_conversa: str, | |
| grupo_id: Optional[str] = None, | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> ConversationContext: | |
| """ | |
| Obtém contexto existente ou cria novo. | |
| Args: | |
| numero_usuario: Número do usuário | |
| tipo_conversa: "pv" ou "grupo" | |
| grupo_id: ID do grupo (None para PV) | |
| metadata: Metadados opcionais para novo contexto | |
| Returns: | |
| ConversationContext instance | |
| """ | |
| context_id = generate_context_id(numero_usuario, tipo_conversa, grupo_id) | |
| # Verifica cache | |
| if context_id in self._contexts_cache: | |
| ctx = self._contexts_cache[context_id] | |
| ctx.last_interaction = time.time() | |
| return ctx | |
| # Tenta carregar do banco | |
| db = self._get_db() | |
| ctx_data = db.recuperar_contexto_isolado(context_id) | |
| if ctx_data: | |
| ctx = ConversationContext.from_dict(ctx_data) | |
| else: | |
| # Cria novo contexto | |
| ctx = ConversationContext( | |
| context_id=context_id, | |
| numero_usuario=numero_usuario, | |
| grupo_id=grupo_id, | |
| tipo_conversa=tipo_conversa, | |
| metadata=metadata or {} | |
| ) | |
| # Salva no banco | |
| self._save_context(ctx) | |
| # Atualiza cache | |
| ctx.last_interaction = time.time() | |
| self._contexts_cache[context_id] = ctx | |
| return ctx | |
| def get_context( | |
| self, | |
| numero_usuario: str, | |
| tipo_conversa: str, | |
| grupo_id: Optional[str] = None | |
| ) -> Optional[ConversationContext]: | |
| """ | |
| Obtém contexto existente (não cria novo). | |
| Args: | |
| numero_usuario: Número do usuário | |
| tipo_conversa: "pv" ou "grupo" | |
| grupo_id: ID do grupo | |
| Returns: | |
| ConversationContext ou None se não existir | |
| """ | |
| context_id = generate_context_id(numero_usuario, tipo_conversa, grupo_id) | |
| # Verifica cache | |
| if context_id in self._contexts_cache: | |
| return self._contexts_cache[context_id] | |
| # Busca no banco | |
| db = self._get_db() | |
| ctx_data = db.recuperar_contexto_isolado(context_id) | |
| if ctx_data: | |
| ctx = ConversationContext.from_dict(ctx_data) | |
| self._contexts_cache[context_id] = ctx | |
| return ctx | |
| return None | |
| def _save_context(self, context: ConversationContext) -> bool: | |
| """Salva contexto no banco de dados.""" | |
| try: | |
| db = self._get_db() | |
| return db.salvar_contexto_isolado(context.to_dict()) | |
| except Exception as e: | |
| logger.warning(f"Falha ao salvar contexto: {e}") | |
| return False | |
| def save_context(self, context: ConversationContext) -> bool: | |
| """Salva contexto e atualiza cache.""" | |
| context.last_interaction = time.time() | |
| self._contexts_cache[context.context_id] = context | |
| return self._save_context(context) | |
| def delete_context(self, context_id: str) -> bool: | |
| """ | |
| Remove contexto isolado. | |
| Args: | |
| context_id: ID do contexto a remover | |
| Returns: | |
| True se removido com sucesso | |
| """ | |
| if not validate_context_id(context_id): | |
| logger.warning(f"Context ID inválido: {context_id}") | |
| return False | |
| # Remove do cache | |
| if context_id in self._contexts_cache: | |
| del self._contexts_cache[context_id] | |
| # Remove do banco | |
| try: | |
| db = self._get_db() | |
| return db.deletar_contexto_isolado(context_id) | |
| except Exception as e: | |
| logger.warning(f"Falha ao deletar contexto: {e}") | |
| return False | |
| # ============================================================ | |
| # GESTÃO DE MEMÓRIA DE CURTO PRAZO | |
| # ============================================================ | |
| def add_message_to_context( | |
| self, | |
| context: ConversationContext, | |
| role: str, | |
| content: str, | |
| importancia: float = 1.0, | |
| emocao: str = "neutral", | |
| reply_info: Optional[Dict[str, Any]] = None | |
| ) -> None: | |
| """ | |
| Adiciona mensagem à memória de curto prazo do contexto. | |
| Args: | |
| context: ConversationContext | |
| role: "user" ou "assistant" | |
| content: Texto da mensagem | |
| importancia: Peso da mensagem (1.0 = normal, >1.0 = reply) | |
| emocao: Emoção detectada | |
| reply_info: Info adicional se for reply | |
| """ | |
| MAX_MESSAGES = 100 # Configurado pelo usuário | |
| message_entry = { | |
| "role": role, | |
| "content": content, | |
| "timestamp": time.time(), | |
| "importancia": importancia, | |
| "emocao": emocao, | |
| "reply_info": reply_info or {} | |
| } | |
| # Adiciona à lista | |
| context.short_memory.append(message_entry) | |
| # Sliding window - remove mensagens antigas | |
| if len(context.short_memory) > MAX_MESSAGES: | |
| context.short_memory = context.short_memory[-MAX_MESSAGES:] | |
| # Atualiza timestamp | |
| context.last_interaction = time.time() | |
| # Salva no banco | |
| self.save_context(context) | |
| def get_context_window( | |
| self, | |
| context: ConversationContext, | |
| include_replies: bool = True, | |
| prioritize_replies: bool = True, | |
| max_messages: int = 100 | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Obtém janela de contexto com prioridade para replies. | |
| Args: | |
| context: ConversationContext | |
| include_replies: Se deve incluir mensagens de reply | |
| prioritize_replies: Se deve dar prioridade a replies | |
| max_messages: Máximo de mensagens a retornar | |
| Returns: | |
| Lista de mensagens ordenadas por importância | |
| """ | |
| messages = context.short_memory.copy() | |
| if not messages: | |
| return [] | |
| # Filtra replies se necessário | |
| if not include_replies: | |
| messages = [m for m in messages if not m.get('reply_info', {})] | |
| # Ordena por importância (replies primeiro) | |
| if prioritize_replies: | |
| messages.sort(key=lambda x: x.get('importancia', 1.0), reverse=True) | |
| # Limita quantidade | |
| return messages[:max_messages] | |
| def clear_context_memory(self, context: ConversationContext) -> bool: | |
| """ | |
| Limpa memória de curto prazo do contexto. | |
| Args: | |
| context: ConversationContext | |
| Returns: | |
| True se limpo com sucesso | |
| """ | |
| context.short_memory = [] | |
| context.last_interaction = time.time() | |
| return self.save_context(context) | |
| # ============================================================ | |
| # LISTAGEM E ESTATÍSTICAS | |
| # ============================================================ | |
| def list_user_contexts(self, numero_usuario: str) -> List[ConversationContext]: | |
| """ | |
| Lista todos os contextos de um usuário. | |
| Args: | |
| numero_usuario: Número do usuário | |
| Returns: | |
| Lista de ConversationContext | |
| """ | |
| try: | |
| db = self._get_db() | |
| contexts_data = db.listar_contextos_usuario(numero_usuario) | |
| contexts = [] | |
| for data in contexts_data: | |
| ctx = ConversationContext.from_dict(data) | |
| # Atualiza cache | |
| self._contexts_cache[ctx.context_id] = ctx | |
| contexts.append(ctx) | |
| return contexts | |
| except Exception as e: | |
| logger.warning(f"Erro ao listar contextos: {e}") | |
| return [] | |
| def get_stats(self) -> Dict[str, Any]: | |
| """ | |
| Retorna estatísticas do sistema de isolamento. | |
| Returns: | |
| Dicionário com estatísticas | |
| """ | |
| return { | |
| "cached_contexts": len(self._contexts_cache), | |
| "schema_version": SCHEMA_VERSION, | |
| "context_salt_set": bool(os.getenv("CONTEXT_SALT")), | |
| "max_messages_per_context": 100 | |
| } | |
| # ============================================================ | |
| # MIGRAÇÃO DE DADOS LEGADOS | |
| # ============================================================ | |
| def migrate_legacy_context( | |
| self, | |
| numero_usuario: str, | |
| grupo_id: Optional[str] = None, | |
| tipo_conversa: str = "pv" | |
| ) -> Optional[ConversationContext]: | |
| """ | |
| Migra contexto legado para novo sistema isolado. | |
| Args: | |
| numero_usuario: Número do usuário | |
| grupo_id: ID do grupo | |
| tipo_conversa: Tipo da conversa | |
| Returns: | |
| ConversationContext migrado ou None | |
| """ | |
| # Verifica se contexto já existe | |
| existing = self.get_context(numero_usuario, tipo_conversa, grupo_id) | |
| if existing: | |
| return existing # Já migrado | |
| # Cria novo contexto | |
| context = self.get_or_create_context(numero_usuario, tipo_conversa, grupo_id) | |
| logger.info(f"📦 Contexto migrado: {context.display_name}") | |
| return context | |
| # ============================================================ | |
| # FUNÇÕES DE COMPATIBILIDADE | |
| # ============================================================ | |
| def get_isolation_manager() -> ContextIsolationManager: | |
| """Obtém instância singleton do gerenciador.""" | |
| return ContextIsolationManager() | |
| def criar_contexto_isolado( | |
| numero_usuario: str, | |
| tipo_conversa: str, | |
| grupo_id: Optional[str] = None | |
| ) -> ConversationContext: | |
| """ | |
| Factory function para criar contexto isolado. | |
| Args: | |
| numero_usuario: Número do usuário | |
| tipo_conversa: "pv" ou "grupo" | |
| grupo_id: ID do grupo (None para PV) | |
| Returns: | |
| ConversationContext instance | |
| """ | |
| manager = get_isolation_manager() | |
| return manager.get_or_create_context(numero_usuario, tipo_conversa, grupo_id) | |
| # ============================================================ | |
| # HELPER PARA API | |
| # ============================================================ | |
| def extrair_conversation_id_do_request(data: Dict[str, Any]) -> Tuple[str, str, Optional[str]]: | |
| """ | |
| Extrai parâmetros para conversation_id de um request da API. | |
| Args: | |
| data: Payload do request (dict) | |
| Returns: | |
| Tupla (numero_usuario, tipo_conversa, grupo_id) | |
| """ | |
| numero_usuario = data.get('numero', 'anonimo') or 'anonimo' | |
| tipo_conversa = data.get('tipo_conversa', 'pv') | |
| # Para mensagens de grupo, grupo_id vem em campos diferentes | |
| grupo_id = data.get('grupo_id') or data.get('contexto_grupo') | |
| return numero_usuario, tipo_conversa, grupo_id | |
| # type: ignore | |