#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Sistema de Votação Inteligente para Ensemble AI Otimiza decisões através de algoritmos avançados de consenso """ import numpy as np import logging from typing import Dict, List, Optional, Tuple, Any from dataclasses import dataclass from enum import Enum from datetime import datetime, timedelta import json from collections import defaultdict, deque import statistics class VotingStrategy(Enum): """Estratégias de votação disponíveis""" SIMPLE_MAJORITY = "simple_majority" WEIGHTED_AVERAGE = "weighted_average" CONFIDENCE_WEIGHTED = "confidence_weighted" DYNAMIC_CONSENSUS = "dynamic_consensus" BAYESIAN_FUSION = "bayesian_fusion" ADAPTIVE_ENSEMBLE = "adaptive_ensemble" @dataclass class VoteResult: """Resultado de uma votação""" decision: str confidence: float consensus_strength: float strategy_used: VotingStrategy individual_votes: List[Dict[str, Any]] metadata: Dict[str, Any] processing_time: float @dataclass class ModelPerformance: """Métricas de performance de um modelo""" accuracy_history: deque recent_accuracy: float long_term_accuracy: float consistency_score: float response_time_avg: float last_updated: datetime class AdaptiveWeightCalculator: """Calculadora de pesos adaptativos para modelos""" def __init__(self, window_size: int = 100): self.window_size = window_size self.performance_tracker = defaultdict(lambda: ModelPerformance( accuracy_history=deque(maxlen=window_size), recent_accuracy=0.5, long_term_accuracy=0.5, consistency_score=0.5, response_time_avg=1.0, last_updated=datetime.now() )) self.market_conditions = { 'volatility': 0.5, 'trend_strength': 0.5, 'volume_profile': 0.5 } def update_performance(self, model_name: str, accuracy: float, response_time: float): """Atualiza métricas de performance de um modelo""" perf = self.performance_tracker[model_name] # Adicionar nova accuracy perf.accuracy_history.append(accuracy) # Calcular métricas if len(perf.accuracy_history) >= 10: perf.recent_accuracy = np.mean(list(perf.accuracy_history)[-10:]) else: perf.recent_accuracy = np.mean(list(perf.accuracy_history)) perf.long_term_accuracy = np.mean(list(perf.accuracy_history)) # Calcular consistência (inverso do desvio padrão) if len(perf.accuracy_history) >= 5: std_dev = np.std(list(perf.accuracy_history)) perf.consistency_score = max(0.1, 1.0 - std_dev) # Atualizar tempo de resposta médio alpha = 0.1 perf.response_time_avg = alpha * response_time + (1 - alpha) * perf.response_time_avg perf.last_updated = datetime.now() def calculate_adaptive_weights(self, model_names: List[str], market_context: Optional[Dict[str, float]] = None) -> Dict[str, float]: """Calcula pesos adaptativos baseados em performance e contexto""" weights = {} # Atualizar condições de mercado se fornecidas if market_context: self.market_conditions.update(market_context) for model_name in model_names: perf = self.performance_tracker[model_name] # Peso base da accuracy recente accuracy_weight = perf.recent_accuracy # Ajuste por consistência consistency_factor = perf.consistency_score # Ajuste por tempo de resposta (modelos mais rápidos têm vantagem) speed_factor = min(2.0, 2.0 / max(0.1, perf.response_time_avg)) # Ajuste por condições de mercado market_factor = self._calculate_market_adjustment(model_name) # Peso final final_weight = accuracy_weight * consistency_factor * speed_factor * market_factor weights[model_name] = max(0.1, min(2.0, final_weight)) # Limitar entre 0.1 e 2.0 # Normalizar pesos total_weight = sum(weights.values()) if total_weight > 0: weights = {k: v / total_weight for k, v in weights.items()} return weights def _calculate_market_adjustment(self, model_name: str) -> float: """Calcula ajuste baseado nas condições de mercado""" # Diferentes modelos podem ter performance melhor em diferentes condições model_preferences = { 'FinBERT': { 'high_volatility': 1.2, 'strong_trend': 1.1, 'high_volume': 1.0 }, 'DistilBERT-Financial': { 'high_volatility': 1.0, 'strong_trend': 1.2, 'high_volume': 1.1 }, 'RoBERTa-Sentiment': { 'high_volatility': 0.9, 'strong_trend': 1.0, 'high_volume': 1.2 }, 'BERT-Base': { 'high_volatility': 1.0, 'strong_trend': 1.0, 'high_volume': 1.0 } } preferences = model_preferences.get(model_name, { 'high_volatility': 1.0, 'strong_trend': 1.0, 'high_volume': 1.0 }) # Calcular fator de ajuste volatility_factor = preferences['high_volatility'] if self.market_conditions['volatility'] > 0.7 else 1.0 trend_factor = preferences['strong_trend'] if self.market_conditions['trend_strength'] > 0.7 else 1.0 volume_factor = preferences['high_volume'] if self.market_conditions['volume_profile'] > 0.7 else 1.0 return (volatility_factor + trend_factor + volume_factor) / 3.0 class IntelligentVotingSystem: """Sistema de votação inteligente com múltiplas estratégias""" def __init__(self): self.weight_calculator = AdaptiveWeightCalculator() self.voting_history = deque(maxlen=1000) self.strategy_performance = defaultdict(lambda: deque(maxlen=100)) self.logger = logging.getLogger(__name__) # Configurações de estratégias self.strategy_configs = { VotingStrategy.SIMPLE_MAJORITY: {'threshold': 0.5}, VotingStrategy.WEIGHTED_AVERAGE: {'min_confidence': 0.3}, VotingStrategy.CONFIDENCE_WEIGHTED: {'confidence_power': 2.0}, VotingStrategy.DYNAMIC_CONSENSUS: {'consensus_threshold': 0.7}, VotingStrategy.BAYESIAN_FUSION: {'prior_strength': 0.1}, VotingStrategy.ADAPTIVE_ENSEMBLE: {'adaptation_rate': 0.1} } def vote(self, predictions: List[Dict[str, Any]], strategy: VotingStrategy = VotingStrategy.ADAPTIVE_ENSEMBLE, market_context: Optional[Dict[str, float]] = None) -> VoteResult: """Executa votação usando estratégia especificada""" start_time = datetime.now() if not predictions: return self._empty_vote_result(strategy, start_time) # Selecionar estratégia automaticamente se for ADAPTIVE_ENSEMBLE if strategy == VotingStrategy.ADAPTIVE_ENSEMBLE: strategy = self._select_best_strategy(predictions, market_context) # Executar votação result = self._execute_voting_strategy(predictions, strategy, market_context) # Calcular tempo de processamento processing_time = (datetime.now() - start_time).total_seconds() result.processing_time = processing_time # Armazenar no histórico self.voting_history.append({ 'timestamp': datetime.now(), 'strategy': strategy, 'result': result, 'num_predictions': len(predictions) }) return result def _select_best_strategy(self, predictions: List[Dict[str, Any]], market_context: Optional[Dict[str, float]]) -> VotingStrategy: """Seleciona a melhor estratégia baseada no contexto""" # Analisar características das predições confidences = [p.get('confidence', 0.5) for p in predictions] avg_confidence = np.mean(confidences) confidence_variance = np.var(confidences) # Analisar consenso predictions_count = defaultdict(int) for p in predictions: predictions_count[p.get('prediction', 'NEUTRO')] += 1 max_agreement = max(predictions_count.values()) / len(predictions) # Selecionar estratégia baseada nas características if max_agreement > 0.8: # Alto consenso return VotingStrategy.SIMPLE_MAJORITY elif avg_confidence > 0.8: # Alta confiança return VotingStrategy.CONFIDENCE_WEIGHTED elif confidence_variance > 0.1: # Alta variância na confiança return VotingStrategy.WEIGHTED_AVERAGE elif len(predictions) >= 4: # Muitos modelos return VotingStrategy.BAYESIAN_FUSION else: return VotingStrategy.DYNAMIC_CONSENSUS def _execute_voting_strategy(self, predictions: List[Dict[str, Any]], strategy: VotingStrategy, market_context: Optional[Dict[str, float]]) -> VoteResult: """Executa a estratégia de votação especificada""" if strategy == VotingStrategy.SIMPLE_MAJORITY: return self._simple_majority_vote(predictions) elif strategy == VotingStrategy.WEIGHTED_AVERAGE: return self._weighted_average_vote(predictions, market_context) elif strategy == VotingStrategy.CONFIDENCE_WEIGHTED: return self._confidence_weighted_vote(predictions) elif strategy == VotingStrategy.DYNAMIC_CONSENSUS: return self._dynamic_consensus_vote(predictions) elif strategy == VotingStrategy.BAYESIAN_FUSION: return self._bayesian_fusion_vote(predictions) else: # Fallback para weighted average return self._weighted_average_vote(predictions, market_context) def _simple_majority_vote(self, predictions: List[Dict[str, Any]]) -> VoteResult: """Votação por maioria simples""" vote_counts = defaultdict(int) for pred in predictions: vote_counts[pred.get('prediction', 'NEUTRO')] += 1 # Encontrar vencedor winner = max(vote_counts.keys(), key=lambda k: vote_counts[k]) max_votes = vote_counts[winner] # Calcular confiança e consenso confidence = max_votes / len(predictions) consensus_strength = confidence return VoteResult( decision=winner, confidence=confidence, consensus_strength=consensus_strength, strategy_used=VotingStrategy.SIMPLE_MAJORITY, individual_votes=[{'prediction': p.get('prediction'), 'confidence': p.get('confidence')} for p in predictions], metadata={'vote_counts': dict(vote_counts)}, processing_time=0.0 ) def _weighted_average_vote(self, predictions: List[Dict[str, Any]], market_context: Optional[Dict[str, float]]) -> VoteResult: """Votação por média ponderada""" model_names = [p.get('model_name', f'model_{i}') for i, p in enumerate(predictions)] weights = self.weight_calculator.calculate_adaptive_weights(model_names, market_context) # Calcular scores ponderados sentiment_scores = [] total_weight = 0 for i, pred in enumerate(predictions): model_name = model_names[i] weight = weights.get(model_name, 1.0) confidence = pred.get('confidence', 0.5) sentiment_score = pred.get('sentiment_score', 0.0) weighted_score = sentiment_score * weight * confidence sentiment_scores.append(weighted_score) total_weight += weight * confidence # Calcular resultado final if total_weight > 0: final_sentiment = sum(sentiment_scores) / total_weight else: final_sentiment = 0.0 # Determinar decisão if final_sentiment > 0.1: decision = "POSITIVO" elif final_sentiment < -0.1: decision = "NEGATIVO" else: decision = "NEUTRO" # Calcular confiança média ponderada weighted_confidences = [p.get('confidence', 0.5) * weights.get(model_names[i], 1.0) for i, p in enumerate(predictions)] confidence = sum(weighted_confidences) / sum(weights.values()) if weights else 0.5 # Calcular consenso consensus_strength = self._calculate_consensus_strength(predictions) return VoteResult( decision=decision, confidence=confidence, consensus_strength=consensus_strength, strategy_used=VotingStrategy.WEIGHTED_AVERAGE, individual_votes=[{'prediction': p.get('prediction'), 'confidence': p.get('confidence'), 'weight': weights.get(model_names[i], 1.0)} for i, p in enumerate(predictions)], metadata={'final_sentiment': final_sentiment, 'weights': weights}, processing_time=0.0 ) def _confidence_weighted_vote(self, predictions: List[Dict[str, Any]]) -> VoteResult: """Votação ponderada pela confiança""" power = self.strategy_configs[VotingStrategy.CONFIDENCE_WEIGHTED]['confidence_power'] # Calcular pesos baseados na confiança weighted_votes = defaultdict(float) total_weight = 0 for pred in predictions: confidence = pred.get('confidence', 0.5) prediction = pred.get('prediction', 'NEUTRO') weight = confidence ** power weighted_votes[prediction] += weight total_weight += weight # Normalizar if total_weight > 0: weighted_votes = {k: v / total_weight for k, v in weighted_votes.items()} # Encontrar vencedor winner = max(weighted_votes.keys(), key=lambda k: weighted_votes[k]) confidence = weighted_votes[winner] # Calcular consenso consensus_strength = confidence return VoteResult( decision=winner, confidence=confidence, consensus_strength=consensus_strength, strategy_used=VotingStrategy.CONFIDENCE_WEIGHTED, individual_votes=[{'prediction': p.get('prediction'), 'confidence': p.get('confidence')} for p in predictions], metadata={'weighted_votes': dict(weighted_votes)}, processing_time=0.0 ) def _dynamic_consensus_vote(self, predictions: List[Dict[str, Any]]) -> VoteResult: """Votação por consenso dinâmico""" threshold = self.strategy_configs[VotingStrategy.DYNAMIC_CONSENSUS]['consensus_threshold'] # Agrupar por predição groups = defaultdict(list) for pred in predictions: groups[pred.get('prediction', 'NEUTRO')].append(pred) # Encontrar grupo com maior consenso best_group = None best_consensus = 0 for prediction, group in groups.items(): # Calcular consenso do grupo confidences = [p.get('confidence', 0.5) for p in group] group_size_factor = len(group) / len(predictions) avg_confidence = np.mean(confidences) consensus = group_size_factor * avg_confidence if consensus > best_consensus: best_consensus = consensus best_group = (prediction, group) if best_group and best_consensus >= threshold: decision = best_group[0] confidence = best_consensus else: # Fallback para neutro se não há consenso suficiente decision = "NEUTRO" confidence = 0.5 return VoteResult( decision=decision, confidence=confidence, consensus_strength=best_consensus, strategy_used=VotingStrategy.DYNAMIC_CONSENSUS, individual_votes=[{'prediction': p.get('prediction'), 'confidence': p.get('confidence')} for p in predictions], metadata={'threshold': threshold, 'groups': {k: len(v) for k, v in groups.items()}}, processing_time=0.0 ) def _bayesian_fusion_vote(self, predictions: List[Dict[str, Any]]) -> VoteResult: """Votação usando fusão Bayesiana""" prior_strength = self.strategy_configs[VotingStrategy.BAYESIAN_FUSION]['prior_strength'] # Prior uniforme classes = ['POSITIVO', 'NEUTRO', 'NEGATIVO'] prior = {cls: 1.0/len(classes) for cls in classes} # Calcular likelihood para cada classe posteriors = prior.copy() for pred in predictions: prediction = pred.get('prediction', 'NEUTRO') confidence = pred.get('confidence', 0.5) # Atualizar posterior for cls in classes: if cls == prediction: likelihood = confidence else: likelihood = (1 - confidence) / (len(classes) - 1) posteriors[cls] *= (prior_strength * prior[cls] + likelihood) # Normalizar total = sum(posteriors.values()) if total > 0: posteriors = {k: v / total for k, v in posteriors.items()} # Encontrar classe com maior probabilidade winner = max(posteriors.keys(), key=lambda k: posteriors[k]) confidence = posteriors[winner] # Calcular consenso baseado na distribuição entropy = -sum(p * np.log(p + 1e-10) for p in posteriors.values()) max_entropy = np.log(len(classes)) consensus_strength = 1 - (entropy / max_entropy) return VoteResult( decision=winner, confidence=confidence, consensus_strength=consensus_strength, strategy_used=VotingStrategy.BAYESIAN_FUSION, individual_votes=[{'prediction': p.get('prediction'), 'confidence': p.get('confidence')} for p in predictions], metadata={'posteriors': posteriors, 'entropy': entropy}, processing_time=0.0 ) def _calculate_consensus_strength(self, predictions: List[Dict[str, Any]]) -> float: """Calcula força do consenso entre predições""" if not predictions: return 0.0 # Contar predições por classe counts = defaultdict(int) for pred in predictions: counts[pred.get('prediction', 'NEUTRO')] += 1 # Calcular consenso max_count = max(counts.values()) consensus = max_count / len(predictions) return consensus def _empty_vote_result(self, strategy: VotingStrategy, start_time: datetime) -> VoteResult: """Resultado para quando não há predições""" return VoteResult( decision="NEUTRO", confidence=0.0, consensus_strength=0.0, strategy_used=strategy, individual_votes=[], metadata={'error': 'no_predictions'}, processing_time=(datetime.now() - start_time).total_seconds() ) def update_strategy_performance(self, strategy: VotingStrategy, accuracy: float): """Atualiza performance de uma estratégia""" self.strategy_performance[strategy].append(accuracy) def get_best_strategy(self) -> VotingStrategy: """Retorna a estratégia com melhor performance recente""" if not self.strategy_performance: return VotingStrategy.ADAPTIVE_ENSEMBLE best_strategy = VotingStrategy.ADAPTIVE_ENSEMBLE best_performance = 0.0 for strategy, performances in self.strategy_performance.items(): if len(performances) >= 5: # Mínimo de amostras avg_performance = np.mean(list(performances)[-10:]) # Últimas 10 if avg_performance > best_performance: best_performance = avg_performance best_strategy = strategy return best_strategy def get_voting_stats(self) -> Dict[str, Any]: """Retorna estatísticas do sistema de votação""" stats = { 'total_votes': len(self.voting_history), 'strategy_usage': defaultdict(int), 'avg_processing_time': 0.0, 'avg_consensus_strength': 0.0, 'strategy_performance': {} } if self.voting_history: # Contar uso de estratégias for vote in self.voting_history: stats['strategy_usage'][vote['strategy'].value] += 1 # Calcular médias processing_times = [vote['result'].processing_time for vote in self.voting_history] consensus_strengths = [vote['result'].consensus_strength for vote in self.voting_history] stats['avg_processing_time'] = np.mean(processing_times) stats['avg_consensus_strength'] = np.mean(consensus_strengths) # Performance das estratégias for strategy, performances in self.strategy_performance.items(): if performances: stats['strategy_performance'][strategy.value] = { 'avg_accuracy': np.mean(list(performances)), 'recent_accuracy': np.mean(list(performances)[-10:]) if len(performances) >= 10 else np.mean(list(performances)), 'sample_count': len(performances) } return dict(stats) # Instância global do sistema de votação voting_system = IntelligentVotingSystem() # Função de conveniência def intelligent_vote(predictions: List[Dict[str, Any]], strategy: VotingStrategy = VotingStrategy.ADAPTIVE_ENSEMBLE, market_context: Optional[Dict[str, float]] = None) -> VoteResult: """Função principal para votação inteligente""" return voting_system.vote(predictions, strategy, market_context) if __name__ == "__main__": # Teste do sistema test_predictions = [ {'model_name': 'FinBERT', 'prediction': 'POSITIVO', 'confidence': 0.8, 'sentiment_score': 0.6}, {'model_name': 'DistilBERT', 'prediction': 'POSITIVO', 'confidence': 0.7, 'sentiment_score': 0.4}, {'model_name': 'RoBERTa', 'prediction': 'NEUTRO', 'confidence': 0.6, 'sentiment_score': 0.1}, {'model_name': 'BERT', 'prediction': 'POSITIVO', 'confidence': 0.9, 'sentiment_score': 0.7} ] print("Testando sistema de votação inteligente...") for strategy in VotingStrategy: result = intelligent_vote(test_predictions, strategy) print(f"\nEstratégia: {strategy.value}") print(f"Decisão: {result.decision}") print(f"Confiança: {result.confidence:.3f}") print(f"Consenso: {result.consensus_strength:.3f}") print(f"Tempo: {result.processing_time:.3f}s")