Spaces:
Sleeping
Sleeping
| # modules/contexto.py — AKIRA V21 FINAL CORRIGIDO (Dezembro 2025) | |
| """ | |
| ✅ TOTALMENTE ADAPTADO ao database.py correto | |
| ✅ Usa métodos corretos do database | |
| ✅ Processa reply_metadata do index.js | |
| ✅ Sistema emocional DistilBERT | |
| """ | |
| import logging | |
| import re | |
| import random | |
| import time | |
| import json | |
| from typing import Optional, List, Dict, Tuple, Any | |
| from collections import deque | |
| logger = logging.getLogger(__name__) | |
| # Modelo de emoções | |
| try: | |
| from transformers import pipeline | |
| EMOTION_CLASSIFIER = pipeline( | |
| "text-classification", | |
| model="j-hartmann/emotion-english-distilroberta-base", | |
| top_k=3, | |
| device=-1, | |
| truncation=True | |
| ) | |
| logger.info("✅ Modelo DistilBERT carregado") | |
| EMOTION_CACHE = {} | |
| except Exception as e: | |
| logger.warning(f"⚠️ DistilBERT não disponível: {e}") | |
| EMOTION_CLASSIFIER = None | |
| EMOTION_CACHE = {} | |
| # Mapeamento emoção → humor | |
| EMOTION_TO_HUMOR = { | |
| "joy": "feliz_ironica", | |
| "sadness": "triste_ironica", | |
| "anger": "irritada_ironica", | |
| "fear": "preocupada_ironica", | |
| "surprise": "curiosa_ironica", | |
| "disgust": "irritada_ironica", | |
| "neutral": "normal_ironico", | |
| "love": "romantico_carinhoso" | |
| } | |
| class MemoriaEmocional: | |
| def __init__(self, max_size=50): | |
| self.historico = deque(maxlen=max_size) | |
| self.tendencia_emocional = "neutral" | |
| self.volatilidade = 0.5 | |
| def adicionar_interacao(self, mensagem: str, emocao: str, confianca: float): | |
| self.historico.append({ | |
| "mensagem": mensagem[:100], | |
| "emocao": emocao, | |
| "confianca": confianca, | |
| "timestamp": time.time() | |
| }) | |
| self._atualizar_tendencia() | |
| def _atualizar_tendencia(self): | |
| if not self.historico: | |
| return | |
| recentes = list(self.historico)[-10:] | |
| contagem = {} | |
| for entry in recentes: | |
| emocao = entry["emocao"] | |
| contagem[emocao] = contagem.get(emocao, 0) + entry["confianca"] | |
| if contagem: | |
| self.tendencia_emocional = max(contagem, key=contagem.get) | |
| class Contexto: | |
| def __init__(self, db: Any, usuario: str = "anonimo"): | |
| self.db = db | |
| self.usuario = usuario | |
| # Estado | |
| self.humor_atual = "normal_ironico" | |
| self.modo_resposta_atual = "normal_ironico" | |
| self.memoria_emocional = MemoriaEmocional(max_size=50) | |
| # Transição | |
| self.nivel_transicao = 0 | |
| self.humor_alvo = "normal_ironico" | |
| self.ultima_transicao = time.time() | |
| # Conversa | |
| self.ultima_mensagem_akira = None | |
| self.tipo_conversa = "pv" | |
| self.is_grupo = False | |
| # Usuário | |
| self.numero_usuario = "" | |
| self.nome_usuario = "Anônimo" | |
| self.grupo_id = "" | |
| self.grupo_nome = "" | |
| # Histórico | |
| self.historico_mensagens = [] | |
| self._carregar_estado_inicial() | |
| logger.info(f"✅ Contexto inicializado: {self.usuario}") | |
| def _carregar_estado_inicial(self): | |
| """Carrega estado do banco""" | |
| try: | |
| if hasattr(self.db, 'recuperar_humor_atual'): | |
| self.humor_atual = self.db.recuperar_humor_atual(self.usuario) | |
| if hasattr(self.db, 'recuperar_modo_resposta'): | |
| self.modo_resposta_atual = self.db.recuperar_modo_resposta(self.usuario) | |
| if hasattr(self.db, 'recuperar_mensagens'): | |
| try: | |
| mensagens_db = self.db.recuperar_mensagens(self.usuario, limite=10) | |
| for msg in mensagens_db: | |
| if isinstance(msg, tuple) and len(msg) >= 2: | |
| if msg[0]: # mensagem | |
| self.historico_mensagens.append({ | |
| "role": "user", | |
| "content": msg[0], | |
| "timestamp": msg[7] if len(msg) > 7 else time.time() | |
| }) | |
| if len(msg) > 1 and msg[1]: # resposta | |
| self.historico_mensagens.append({ | |
| "role": "assistant", | |
| "content": msg[1], | |
| "timestamp": msg[7] if len(msg) > 7 else time.time() | |
| }) | |
| except Exception as e: | |
| logger.warning(f"Falha ao carregar histórico: {e}") | |
| self.historico_mensagens.sort(key=lambda x: x.get('timestamp', 0)) | |
| except Exception as e: | |
| logger.warning(f"Erro ao carregar estado: {e}") | |
| def detectar_emocao_avancada(self, mensagem: str) -> Tuple[str, float, Dict]: | |
| """Detecta emoção usando DistilBERT""" | |
| mensagem_limpa = mensagem.strip() | |
| cache_key = mensagem_limpa[:100].lower() | |
| if cache_key in EMOTION_CACHE: | |
| return EMOTION_CACHE[cache_key] | |
| if not EMOTION_CLASSIFIER: | |
| return self._detectar_emocao_fallback(mensagem_limpa) | |
| try: | |
| resultados = EMOTION_CLASSIFIER(mensagem_limpa[:256], truncation=True) | |
| emocao_primaria = resultados[0][0]['label'] | |
| confianca_primaria = resultados[0][0]['score'] | |
| detalhes = { | |
| "primaria": {"emocao": emocao_primaria, "confianca": confianca_primaria}, | |
| "polaridade": "positiva" if emocao_primaria in ["joy", "love"] else "negativa" if emocao_primaria in ["anger", "sadness"] else "neutra" | |
| } | |
| self.memoria_emocional.adicionar_interacao(mensagem_limpa, emocao_primaria, confianca_primaria) | |
| resultado = (emocao_primaria, confianca_primaria, detalhes) | |
| EMOTION_CACHE[cache_key] = resultado | |
| return resultado | |
| except Exception as e: | |
| logger.warning(f"Erro no DistilBERT: {e}") | |
| return self._detectar_emocao_fallback(mensagem_limpa) | |
| def _detectar_emocao_fallback(self, mensagem: str) -> Tuple[str, float, Dict]: | |
| """Fallback para detecção de emoção""" | |
| mensagem_lower = mensagem.lower() | |
| positivas = ['bom', 'ótimo', 'feliz', 'adorei'] | |
| negativas = ['ruim', 'péssimo', 'triste', 'raiva'] | |
| pos = sum(1 for p in positivas if p in mensagem_lower) | |
| neg = sum(1 for n in negativas if n in mensagem_lower) | |
| if pos > neg and pos >= 2: | |
| return ("joy", 0.7, {"primaria": {"emocao": "joy", "confianca": 0.7}}) | |
| elif neg > pos and neg >= 2: | |
| return ("anger", 0.7, {"primaria": {"emocao": "anger", "confianca": 0.7}}) | |
| else: | |
| return ("neutral", 0.5, {"primaria": {"emocao": "neutral", "confianca": 0.5}}) | |
| def atualizar_humor_gradual(self, emocao: str, confianca: float, tom_usuario: str, | |
| usuario_privilegiado: bool = False) -> str: | |
| """Atualiza humor gradualmente""" | |
| humor_anterior = self.humor_atual | |
| # Sugere humor | |
| humor_sugerido = EMOTION_TO_HUMOR.get(emocao, "normal_ironico") | |
| if usuario_privilegiado and tom_usuario == "formal": | |
| humor_sugerido = "tecnico_formal" | |
| # Inicia transição | |
| if self.humor_alvo != humor_sugerido: | |
| self.humor_alvo = humor_sugerido | |
| self.nivel_transicao = 0 | |
| # Transição | |
| taxa = 0.5 | |
| if confianca > 0.8: | |
| taxa += 0.3 | |
| if tom_usuario == "rude": | |
| taxa += 0.4 | |
| self.nivel_transicao = min(3, self.nivel_transicao + taxa) | |
| # Novo humor | |
| if self.nivel_transicao >= 3: | |
| novo_humor = self.humor_alvo | |
| else: | |
| novo_humor = self.humor_atual | |
| # Salva transição se mudou | |
| if novo_humor != humor_anterior and hasattr(self.db, 'salvar_transicao_humor'): | |
| try: | |
| self.db.salvar_transicao_humor( | |
| self.usuario, | |
| humor_anterior, | |
| novo_humor, | |
| emocao, | |
| confianca, | |
| self.nivel_transicao, | |
| f"Emoção: {emocao} ({confianca:.2f})" | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Erro ao salvar transição: {e}") | |
| self.humor_atual = novo_humor | |
| return novo_humor | |
| def detectar_tom_usuario(self, mensagem: str) -> Tuple[str, float]: | |
| """Detecta tom do usuário""" | |
| mensagem_lower = mensagem.lower() | |
| # Formal | |
| if any(x in mensagem_lower for x in ["senhor", "doutor", "por favor"]): | |
| return ("formal", 0.8) | |
| # Rude | |
| rudes = ['burro', 'idiota', 'merda', 'caralho'] | |
| if any(x in mensagem_lower for x in rudes): | |
| return ("rude", 0.9) | |
| # Informal | |
| if any(x in mensagem_lower for x in ['puto', 'mano', 'fixe']): | |
| return ("informal", 0.7) | |
| return ("neutro", 0.5) | |
| def detectar_modo_resposta(self, mensagem: str, tom_usuario: str, | |
| usuario_privilegiado: bool = False) -> str: | |
| """Detecta modo de resposta""" | |
| mensagem_lower = mensagem.lower() | |
| if usuario_privilegiado and tom_usuario == "formal": | |
| return "tecnico_formal" | |
| if tom_usuario == "rude": | |
| return "agressivo_direto" | |
| if '?' in mensagem and len(mensagem) > 100: | |
| return "filosofico_ironico" | |
| palavras_romanticas = ['amor', 'paixão', 'gosto de ti'] | |
| if any(p in mensagem_lower for p in palavras_romanticas): | |
| return "romantico_carinhoso" | |
| return "normal_ironico" | |
| def analisar_intencao_e_normalizar(self, mensagem: str, historico: List[Dict] = None, | |
| mensagem_citada: str = None, | |
| reply_metadata: Dict = None) -> Dict[str, Any]: | |
| """Análise principal - COMPATÍVEL COM INDEX.JS""" | |
| if not isinstance(mensagem, str): | |
| mensagem = str(mensagem) | |
| if historico is None: | |
| historico = self.obter_historico_para_llm() | |
| # Verifica privilégio | |
| usuario_privilegiado = False | |
| if self.numero_usuario and hasattr(self.db, 'is_usuario_privilegiado'): | |
| try: | |
| usuario_privilegiado = self.db.is_usuario_privilegiado(self.numero_usuario) | |
| except: | |
| pass | |
| # Detecta emoção | |
| emocao, confianca, detalhes_emocao = self.detectar_emocao_avancada(mensagem) | |
| # Detecta tom | |
| tom_usuario, intensidade_tom = self.detectar_tom_usuario(mensagem) | |
| # Atualiza humor | |
| humor_atualizado = self.atualizar_humor_gradual( | |
| emocao, confianca, tom_usuario, usuario_privilegiado | |
| ) | |
| # Detecta modo | |
| modo_resposta = self.detectar_modo_resposta(mensagem, tom_usuario, usuario_privilegiado) | |
| self.modo_resposta_atual = modo_resposta | |
| # Analisa reply | |
| reply_analysis = self._analisar_reply_context(mensagem_citada, reply_metadata) | |
| # Resultado | |
| resultado = { | |
| "tom_usuario": tom_usuario, | |
| "tom_intensidade": intensidade_tom, | |
| "emocao_primaria": emocao, | |
| "confianca_emocao": confianca, | |
| "detalhes_emocao": detalhes_emocao, | |
| "modo_resposta": modo_resposta, | |
| "humor_atualizado": humor_atualizado, | |
| "nivel_transicao": self.nivel_transicao, | |
| "humor_alvo": self.humor_alvo, | |
| "usuario_privilegiado": usuario_privilegiado, | |
| "nome_usuario": self.nome_usuario, | |
| "numero_usuario": self.numero_usuario, | |
| "eh_resposta": reply_analysis.get("is_reply", False), | |
| "eh_resposta_ao_bot": reply_analysis.get("reply_to_bot", False), | |
| "mensagem_citada_limpa": mensagem_citada or "", | |
| "reply_analysis": reply_analysis, | |
| "reply_metadata": reply_metadata, | |
| "tipo_conversa": self.tipo_conversa, | |
| "is_grupo": self.is_grupo, | |
| "tendencia_emocional": self.memoria_emocional.tendencia_emocional, | |
| "volatilidade_usuario": self.memoria_emocional.volatilidade | |
| } | |
| return resultado | |
| def _analisar_reply_context(self, mensagem_citada: str, reply_metadata: Dict) -> Dict[str, Any]: | |
| """Analisa contexto de reply""" | |
| if reply_metadata: | |
| return { | |
| "is_reply": reply_metadata.get('is_reply', False), | |
| "reply_to_bot": reply_metadata.get('reply_to_bot', False), | |
| "quoted_author_name": reply_metadata.get('quoted_author_name', ''), | |
| "texto_citado_completo": reply_metadata.get('texto_mensagem_citada', ''), | |
| "context_hint": reply_metadata.get('context_hint', ''), | |
| "source": "reply_metadata" | |
| } | |
| if mensagem_citada: | |
| reply_to_bot = "AKIRA" in mensagem_citada.upper() | |
| return { | |
| "is_reply": True, | |
| "reply_to_bot": reply_to_bot, | |
| "quoted_author_name": "Akira" if reply_to_bot else "desconhecido", | |
| "texto_citado_completo": mensagem_citada, | |
| "context_hint": f"Citando {'Akira' if reply_to_bot else 'outra pessoa'}", | |
| "source": "mensagem_citada" | |
| } | |
| return { | |
| "is_reply": False, | |
| "reply_to_bot": False, | |
| "quoted_author_name": "", | |
| "texto_citado_completo": "", | |
| "context_hint": "", | |
| "source": "nenhum" | |
| } | |
| def obter_historico_para_llm(self) -> List[Dict]: | |
| """Retorna histórico formatado""" | |
| return [ | |
| {"role": msg["role"], "content": msg["content"][:500]} | |
| for msg in self.historico_mensagens[-10:] | |
| ] | |
| def atualizar_contexto(self, mensagem: str, resposta: str, numero: str, | |
| is_reply: bool = False, mensagem_original: str = None, | |
| reply_to_bot: bool = False): | |
| """Atualiza contexto após interação""" | |
| try: | |
| timestamp = time.time() | |
| # Adiciona ao histórico | |
| self.historico_mensagens.append({ | |
| "role": "user", | |
| "content": mensagem, | |
| "timestamp": timestamp, | |
| "is_reply": is_reply, | |
| "reply_to_bot": reply_to_bot | |
| }) | |
| self.historico_mensagens.append({ | |
| "role": "assistant", | |
| "content": resposta, | |
| "timestamp": timestamp | |
| }) | |
| # Limita | |
| if len(self.historico_mensagens) > 20: | |
| self.historico_mensagens = self.historico_mensagens[-20:] | |
| self.ultima_mensagem_akira = resposta | |
| # Salva no banco | |
| if hasattr(self.db, 'salvar_mensagem'): | |
| try: | |
| self.db.salvar_mensagem( | |
| usuario=self.nome_usuario, | |
| mensagem=mensagem, | |
| resposta=resposta, | |
| numero=numero, | |
| is_reply=is_reply, | |
| mensagem_original=mensagem_original or '', | |
| reply_to_bot=reply_to_bot, | |
| humor=self.humor_atual, | |
| modo_resposta=self.modo_resposta_atual, | |
| usuario_nome=self.nome_usuario, | |
| tipo_conversa=self.tipo_conversa | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Erro ao salvar mensagem: {e}") | |
| except Exception as e: | |
| logger.error(f"Erro ao atualizar contexto: {e}") | |
| def atualizar_informacoes_usuario(self, nome: str, numero: str, | |
| grupo_id: str = "", grupo_nome: str = "", | |
| tipo_conversa: str = "pv"): | |
| """Atualiza informações do usuário""" | |
| self.nome_usuario = nome or self.nome_usuario | |
| self.numero_usuario = numero or self.numero_usuario | |
| self.grupo_id = grupo_id or self.grupo_id | |
| self.grupo_nome = grupo_nome or self.grupo_nome | |
| self.tipo_conversa = tipo_conversa | |
| self.is_grupo = tipo_conversa == "grupo" | |
| def criar_contexto(db: Any, identificador: str, tipo: str = "pv") -> Contexto: | |
| """Cria contexto isolado""" | |
| try: | |
| if tipo == "grupo": | |
| usuario_id = f"grupo_{identificador}" | |
| else: | |
| usuario_id = f"pv_{identificador}" | |
| contexto = Contexto(db, usuario_id) | |
| contexto.tipo_conversa = tipo | |
| contexto.is_grupo = (tipo == "grupo") | |
| return contexto | |
| except Exception as e: | |
| logger.error(f"Erro ao criar contexto: {e}") | |
| return Contexto(db, "fallback") |