Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Sistema de Votação Inteligente para Ensemble AI | |
| Otimiza decisões através de algoritmos avançados de consenso | |
| """ | |
| import numpy as np | |
| import logging | |
| from typing import Dict, List, Optional, Tuple, Any | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| from datetime import datetime, timedelta | |
| import json | |
| from collections import defaultdict, deque | |
| import statistics | |
| class VotingStrategy(Enum): | |
| """Estratégias de votação disponíveis""" | |
| SIMPLE_MAJORITY = "simple_majority" | |
| WEIGHTED_AVERAGE = "weighted_average" | |
| CONFIDENCE_WEIGHTED = "confidence_weighted" | |
| DYNAMIC_CONSENSUS = "dynamic_consensus" | |
| BAYESIAN_FUSION = "bayesian_fusion" | |
| ADAPTIVE_ENSEMBLE = "adaptive_ensemble" | |
| class VoteResult: | |
| """Resultado de uma votação""" | |
| decision: str | |
| confidence: float | |
| consensus_strength: float | |
| strategy_used: VotingStrategy | |
| individual_votes: List[Dict[str, Any]] | |
| metadata: Dict[str, Any] | |
| processing_time: float | |
| class ModelPerformance: | |
| """Métricas de performance de um modelo""" | |
| accuracy_history: deque | |
| recent_accuracy: float | |
| long_term_accuracy: float | |
| consistency_score: float | |
| response_time_avg: float | |
| last_updated: datetime | |
| class AdaptiveWeightCalculator: | |
| """Calculadora de pesos adaptativos para modelos""" | |
| def __init__(self, window_size: int = 100): | |
| self.window_size = window_size | |
| self.performance_tracker = defaultdict(lambda: ModelPerformance( | |
| accuracy_history=deque(maxlen=window_size), | |
| recent_accuracy=0.5, | |
| long_term_accuracy=0.5, | |
| consistency_score=0.5, | |
| response_time_avg=1.0, | |
| last_updated=datetime.now() | |
| )) | |
| self.market_conditions = { | |
| 'volatility': 0.5, | |
| 'trend_strength': 0.5, | |
| 'volume_profile': 0.5 | |
| } | |
| def update_performance(self, model_name: str, accuracy: float, response_time: float): | |
| """Atualiza métricas de performance de um modelo""" | |
| perf = self.performance_tracker[model_name] | |
| # Adicionar nova accuracy | |
| perf.accuracy_history.append(accuracy) | |
| # Calcular métricas | |
| if len(perf.accuracy_history) >= 10: | |
| perf.recent_accuracy = np.mean(list(perf.accuracy_history)[-10:]) | |
| else: | |
| perf.recent_accuracy = np.mean(list(perf.accuracy_history)) | |
| perf.long_term_accuracy = np.mean(list(perf.accuracy_history)) | |
| # Calcular consistência (inverso do desvio padrão) | |
| if len(perf.accuracy_history) >= 5: | |
| std_dev = np.std(list(perf.accuracy_history)) | |
| perf.consistency_score = max(0.1, 1.0 - std_dev) | |
| # Atualizar tempo de resposta médio | |
| alpha = 0.1 | |
| perf.response_time_avg = alpha * response_time + (1 - alpha) * perf.response_time_avg | |
| perf.last_updated = datetime.now() | |
| def calculate_adaptive_weights(self, model_names: List[str], | |
| market_context: Optional[Dict[str, float]] = None) -> Dict[str, float]: | |
| """Calcula pesos adaptativos baseados em performance e contexto""" | |
| weights = {} | |
| # Atualizar condições de mercado se fornecidas | |
| if market_context: | |
| self.market_conditions.update(market_context) | |
| for model_name in model_names: | |
| perf = self.performance_tracker[model_name] | |
| # Peso base da accuracy recente | |
| accuracy_weight = perf.recent_accuracy | |
| # Ajuste por consistência | |
| consistency_factor = perf.consistency_score | |
| # Ajuste por tempo de resposta (modelos mais rápidos têm vantagem) | |
| speed_factor = min(2.0, 2.0 / max(0.1, perf.response_time_avg)) | |
| # Ajuste por condições de mercado | |
| market_factor = self._calculate_market_adjustment(model_name) | |
| # Peso final | |
| final_weight = accuracy_weight * consistency_factor * speed_factor * market_factor | |
| weights[model_name] = max(0.1, min(2.0, final_weight)) # Limitar entre 0.1 e 2.0 | |
| # Normalizar pesos | |
| total_weight = sum(weights.values()) | |
| if total_weight > 0: | |
| weights = {k: v / total_weight for k, v in weights.items()} | |
| return weights | |
| def _calculate_market_adjustment(self, model_name: str) -> float: | |
| """Calcula ajuste baseado nas condições de mercado""" | |
| # Diferentes modelos podem ter performance melhor em diferentes condições | |
| model_preferences = { | |
| 'FinBERT': { | |
| 'high_volatility': 1.2, | |
| 'strong_trend': 1.1, | |
| 'high_volume': 1.0 | |
| }, | |
| 'DistilBERT-Financial': { | |
| 'high_volatility': 1.0, | |
| 'strong_trend': 1.2, | |
| 'high_volume': 1.1 | |
| }, | |
| 'RoBERTa-Sentiment': { | |
| 'high_volatility': 0.9, | |
| 'strong_trend': 1.0, | |
| 'high_volume': 1.2 | |
| }, | |
| 'BERT-Base': { | |
| 'high_volatility': 1.0, | |
| 'strong_trend': 1.0, | |
| 'high_volume': 1.0 | |
| } | |
| } | |
| preferences = model_preferences.get(model_name, { | |
| 'high_volatility': 1.0, | |
| 'strong_trend': 1.0, | |
| 'high_volume': 1.0 | |
| }) | |
| # Calcular fator de ajuste | |
| volatility_factor = preferences['high_volatility'] if self.market_conditions['volatility'] > 0.7 else 1.0 | |
| trend_factor = preferences['strong_trend'] if self.market_conditions['trend_strength'] > 0.7 else 1.0 | |
| volume_factor = preferences['high_volume'] if self.market_conditions['volume_profile'] > 0.7 else 1.0 | |
| return (volatility_factor + trend_factor + volume_factor) / 3.0 | |
| class IntelligentVotingSystem: | |
| """Sistema de votação inteligente com múltiplas estratégias""" | |
| def __init__(self): | |
| self.weight_calculator = AdaptiveWeightCalculator() | |
| self.voting_history = deque(maxlen=1000) | |
| self.strategy_performance = defaultdict(lambda: deque(maxlen=100)) | |
| self.logger = logging.getLogger(__name__) | |
| # Configurações de estratégias | |
| self.strategy_configs = { | |
| VotingStrategy.SIMPLE_MAJORITY: {'threshold': 0.5}, | |
| VotingStrategy.WEIGHTED_AVERAGE: {'min_confidence': 0.3}, | |
| VotingStrategy.CONFIDENCE_WEIGHTED: {'confidence_power': 2.0}, | |
| VotingStrategy.DYNAMIC_CONSENSUS: {'consensus_threshold': 0.7}, | |
| VotingStrategy.BAYESIAN_FUSION: {'prior_strength': 0.1}, | |
| VotingStrategy.ADAPTIVE_ENSEMBLE: {'adaptation_rate': 0.1} | |
| } | |
| def vote(self, predictions: List[Dict[str, Any]], | |
| strategy: VotingStrategy = VotingStrategy.ADAPTIVE_ENSEMBLE, | |
| market_context: Optional[Dict[str, float]] = None) -> VoteResult: | |
| """Executa votação usando estratégia especificada""" | |
| start_time = datetime.now() | |
| if not predictions: | |
| return self._empty_vote_result(strategy, start_time) | |
| # Selecionar estratégia automaticamente se for ADAPTIVE_ENSEMBLE | |
| if strategy == VotingStrategy.ADAPTIVE_ENSEMBLE: | |
| strategy = self._select_best_strategy(predictions, market_context) | |
| # Executar votação | |
| result = self._execute_voting_strategy(predictions, strategy, market_context) | |
| # Calcular tempo de processamento | |
| processing_time = (datetime.now() - start_time).total_seconds() | |
| result.processing_time = processing_time | |
| # Armazenar no histórico | |
| self.voting_history.append({ | |
| 'timestamp': datetime.now(), | |
| 'strategy': strategy, | |
| 'result': result, | |
| 'num_predictions': len(predictions) | |
| }) | |
| return result | |
| def _select_best_strategy(self, predictions: List[Dict[str, Any]], | |
| market_context: Optional[Dict[str, float]]) -> VotingStrategy: | |
| """Seleciona a melhor estratégia baseada no contexto""" | |
| # Analisar características das predições | |
| confidences = [p.get('confidence', 0.5) for p in predictions] | |
| avg_confidence = np.mean(confidences) | |
| confidence_variance = np.var(confidences) | |
| # Analisar consenso | |
| predictions_count = defaultdict(int) | |
| for p in predictions: | |
| predictions_count[p.get('prediction', 'NEUTRO')] += 1 | |
| max_agreement = max(predictions_count.values()) / len(predictions) | |
| # Selecionar estratégia baseada nas características | |
| if max_agreement > 0.8: # Alto consenso | |
| return VotingStrategy.SIMPLE_MAJORITY | |
| elif avg_confidence > 0.8: # Alta confiança | |
| return VotingStrategy.CONFIDENCE_WEIGHTED | |
| elif confidence_variance > 0.1: # Alta variância na confiança | |
| return VotingStrategy.WEIGHTED_AVERAGE | |
| elif len(predictions) >= 4: # Muitos modelos | |
| return VotingStrategy.BAYESIAN_FUSION | |
| else: | |
| return VotingStrategy.DYNAMIC_CONSENSUS | |
| def _execute_voting_strategy(self, predictions: List[Dict[str, Any]], | |
| strategy: VotingStrategy, | |
| market_context: Optional[Dict[str, float]]) -> VoteResult: | |
| """Executa a estratégia de votação especificada""" | |
| if strategy == VotingStrategy.SIMPLE_MAJORITY: | |
| return self._simple_majority_vote(predictions) | |
| elif strategy == VotingStrategy.WEIGHTED_AVERAGE: | |
| return self._weighted_average_vote(predictions, market_context) | |
| elif strategy == VotingStrategy.CONFIDENCE_WEIGHTED: | |
| return self._confidence_weighted_vote(predictions) | |
| elif strategy == VotingStrategy.DYNAMIC_CONSENSUS: | |
| return self._dynamic_consensus_vote(predictions) | |
| elif strategy == VotingStrategy.BAYESIAN_FUSION: | |
| return self._bayesian_fusion_vote(predictions) | |
| else: | |
| # Fallback para weighted average | |
| return self._weighted_average_vote(predictions, market_context) | |
| def _simple_majority_vote(self, predictions: List[Dict[str, Any]]) -> VoteResult: | |
| """Votação por maioria simples""" | |
| vote_counts = defaultdict(int) | |
| for pred in predictions: | |
| vote_counts[pred.get('prediction', 'NEUTRO')] += 1 | |
| # Encontrar vencedor | |
| winner = max(vote_counts.keys(), key=lambda k: vote_counts[k]) | |
| max_votes = vote_counts[winner] | |
| # Calcular confiança e consenso | |
| confidence = max_votes / len(predictions) | |
| consensus_strength = confidence | |
| return VoteResult( | |
| decision=winner, | |
| confidence=confidence, | |
| consensus_strength=consensus_strength, | |
| strategy_used=VotingStrategy.SIMPLE_MAJORITY, | |
| individual_votes=[{'prediction': p.get('prediction'), 'confidence': p.get('confidence')} for p in predictions], | |
| metadata={'vote_counts': dict(vote_counts)}, | |
| processing_time=0.0 | |
| ) | |
| def _weighted_average_vote(self, predictions: List[Dict[str, Any]], | |
| market_context: Optional[Dict[str, float]]) -> VoteResult: | |
| """Votação por média ponderada""" | |
| model_names = [p.get('model_name', f'model_{i}') for i, p in enumerate(predictions)] | |
| weights = self.weight_calculator.calculate_adaptive_weights(model_names, market_context) | |
| # Calcular scores ponderados | |
| sentiment_scores = [] | |
| total_weight = 0 | |
| for i, pred in enumerate(predictions): | |
| model_name = model_names[i] | |
| weight = weights.get(model_name, 1.0) | |
| confidence = pred.get('confidence', 0.5) | |
| sentiment_score = pred.get('sentiment_score', 0.0) | |
| weighted_score = sentiment_score * weight * confidence | |
| sentiment_scores.append(weighted_score) | |
| total_weight += weight * confidence | |
| # Calcular resultado final | |
| if total_weight > 0: | |
| final_sentiment = sum(sentiment_scores) / total_weight | |
| else: | |
| final_sentiment = 0.0 | |
| # Determinar decisão | |
| if final_sentiment > 0.1: | |
| decision = "POSITIVO" | |
| elif final_sentiment < -0.1: | |
| decision = "NEGATIVO" | |
| else: | |
| decision = "NEUTRO" | |
| # Calcular confiança média ponderada | |
| weighted_confidences = [p.get('confidence', 0.5) * weights.get(model_names[i], 1.0) | |
| for i, p in enumerate(predictions)] | |
| confidence = sum(weighted_confidences) / sum(weights.values()) if weights else 0.5 | |
| # Calcular consenso | |
| consensus_strength = self._calculate_consensus_strength(predictions) | |
| return VoteResult( | |
| decision=decision, | |
| confidence=confidence, | |
| consensus_strength=consensus_strength, | |
| strategy_used=VotingStrategy.WEIGHTED_AVERAGE, | |
| individual_votes=[{'prediction': p.get('prediction'), 'confidence': p.get('confidence'), | |
| 'weight': weights.get(model_names[i], 1.0)} for i, p in enumerate(predictions)], | |
| metadata={'final_sentiment': final_sentiment, 'weights': weights}, | |
| processing_time=0.0 | |
| ) | |
| def _confidence_weighted_vote(self, predictions: List[Dict[str, Any]]) -> VoteResult: | |
| """Votação ponderada pela confiança""" | |
| power = self.strategy_configs[VotingStrategy.CONFIDENCE_WEIGHTED]['confidence_power'] | |
| # Calcular pesos baseados na confiança | |
| weighted_votes = defaultdict(float) | |
| total_weight = 0 | |
| for pred in predictions: | |
| confidence = pred.get('confidence', 0.5) | |
| prediction = pred.get('prediction', 'NEUTRO') | |
| weight = confidence ** power | |
| weighted_votes[prediction] += weight | |
| total_weight += weight | |
| # Normalizar | |
| if total_weight > 0: | |
| weighted_votes = {k: v / total_weight for k, v in weighted_votes.items()} | |
| # Encontrar vencedor | |
| winner = max(weighted_votes.keys(), key=lambda k: weighted_votes[k]) | |
| confidence = weighted_votes[winner] | |
| # Calcular consenso | |
| consensus_strength = confidence | |
| return VoteResult( | |
| decision=winner, | |
| confidence=confidence, | |
| consensus_strength=consensus_strength, | |
| strategy_used=VotingStrategy.CONFIDENCE_WEIGHTED, | |
| individual_votes=[{'prediction': p.get('prediction'), 'confidence': p.get('confidence')} for p in predictions], | |
| metadata={'weighted_votes': dict(weighted_votes)}, | |
| processing_time=0.0 | |
| ) | |
| def _dynamic_consensus_vote(self, predictions: List[Dict[str, Any]]) -> VoteResult: | |
| """Votação por consenso dinâmico""" | |
| threshold = self.strategy_configs[VotingStrategy.DYNAMIC_CONSENSUS]['consensus_threshold'] | |
| # Agrupar por predição | |
| groups = defaultdict(list) | |
| for pred in predictions: | |
| groups[pred.get('prediction', 'NEUTRO')].append(pred) | |
| # Encontrar grupo com maior consenso | |
| best_group = None | |
| best_consensus = 0 | |
| for prediction, group in groups.items(): | |
| # Calcular consenso do grupo | |
| confidences = [p.get('confidence', 0.5) for p in group] | |
| group_size_factor = len(group) / len(predictions) | |
| avg_confidence = np.mean(confidences) | |
| consensus = group_size_factor * avg_confidence | |
| if consensus > best_consensus: | |
| best_consensus = consensus | |
| best_group = (prediction, group) | |
| if best_group and best_consensus >= threshold: | |
| decision = best_group[0] | |
| confidence = best_consensus | |
| else: | |
| # Fallback para neutro se não há consenso suficiente | |
| decision = "NEUTRO" | |
| confidence = 0.5 | |
| return VoteResult( | |
| decision=decision, | |
| confidence=confidence, | |
| consensus_strength=best_consensus, | |
| strategy_used=VotingStrategy.DYNAMIC_CONSENSUS, | |
| individual_votes=[{'prediction': p.get('prediction'), 'confidence': p.get('confidence')} for p in predictions], | |
| metadata={'threshold': threshold, 'groups': {k: len(v) for k, v in groups.items()}}, | |
| processing_time=0.0 | |
| ) | |
| def _bayesian_fusion_vote(self, predictions: List[Dict[str, Any]]) -> VoteResult: | |
| """Votação usando fusão Bayesiana""" | |
| prior_strength = self.strategy_configs[VotingStrategy.BAYESIAN_FUSION]['prior_strength'] | |
| # Prior uniforme | |
| classes = ['POSITIVO', 'NEUTRO', 'NEGATIVO'] | |
| prior = {cls: 1.0/len(classes) for cls in classes} | |
| # Calcular likelihood para cada classe | |
| posteriors = prior.copy() | |
| for pred in predictions: | |
| prediction = pred.get('prediction', 'NEUTRO') | |
| confidence = pred.get('confidence', 0.5) | |
| # Atualizar posterior | |
| for cls in classes: | |
| if cls == prediction: | |
| likelihood = confidence | |
| else: | |
| likelihood = (1 - confidence) / (len(classes) - 1) | |
| posteriors[cls] *= (prior_strength * prior[cls] + likelihood) | |
| # Normalizar | |
| total = sum(posteriors.values()) | |
| if total > 0: | |
| posteriors = {k: v / total for k, v in posteriors.items()} | |
| # Encontrar classe com maior probabilidade | |
| winner = max(posteriors.keys(), key=lambda k: posteriors[k]) | |
| confidence = posteriors[winner] | |
| # Calcular consenso baseado na distribuição | |
| entropy = -sum(p * np.log(p + 1e-10) for p in posteriors.values()) | |
| max_entropy = np.log(len(classes)) | |
| consensus_strength = 1 - (entropy / max_entropy) | |
| return VoteResult( | |
| decision=winner, | |
| confidence=confidence, | |
| consensus_strength=consensus_strength, | |
| strategy_used=VotingStrategy.BAYESIAN_FUSION, | |
| individual_votes=[{'prediction': p.get('prediction'), 'confidence': p.get('confidence')} for p in predictions], | |
| metadata={'posteriors': posteriors, 'entropy': entropy}, | |
| processing_time=0.0 | |
| ) | |
| def _calculate_consensus_strength(self, predictions: List[Dict[str, Any]]) -> float: | |
| """Calcula força do consenso entre predições""" | |
| if not predictions: | |
| return 0.0 | |
| # Contar predições por classe | |
| counts = defaultdict(int) | |
| for pred in predictions: | |
| counts[pred.get('prediction', 'NEUTRO')] += 1 | |
| # Calcular consenso | |
| max_count = max(counts.values()) | |
| consensus = max_count / len(predictions) | |
| return consensus | |
| def _empty_vote_result(self, strategy: VotingStrategy, start_time: datetime) -> VoteResult: | |
| """Resultado para quando não há predições""" | |
| return VoteResult( | |
| decision="NEUTRO", | |
| confidence=0.0, | |
| consensus_strength=0.0, | |
| strategy_used=strategy, | |
| individual_votes=[], | |
| metadata={'error': 'no_predictions'}, | |
| processing_time=(datetime.now() - start_time).total_seconds() | |
| ) | |
| def update_strategy_performance(self, strategy: VotingStrategy, accuracy: float): | |
| """Atualiza performance de uma estratégia""" | |
| self.strategy_performance[strategy].append(accuracy) | |
| def get_best_strategy(self) -> VotingStrategy: | |
| """Retorna a estratégia com melhor performance recente""" | |
| if not self.strategy_performance: | |
| return VotingStrategy.ADAPTIVE_ENSEMBLE | |
| best_strategy = VotingStrategy.ADAPTIVE_ENSEMBLE | |
| best_performance = 0.0 | |
| for strategy, performances in self.strategy_performance.items(): | |
| if len(performances) >= 5: # Mínimo de amostras | |
| avg_performance = np.mean(list(performances)[-10:]) # Últimas 10 | |
| if avg_performance > best_performance: | |
| best_performance = avg_performance | |
| best_strategy = strategy | |
| return best_strategy | |
| def get_voting_stats(self) -> Dict[str, Any]: | |
| """Retorna estatísticas do sistema de votação""" | |
| stats = { | |
| 'total_votes': len(self.voting_history), | |
| 'strategy_usage': defaultdict(int), | |
| 'avg_processing_time': 0.0, | |
| 'avg_consensus_strength': 0.0, | |
| 'strategy_performance': {} | |
| } | |
| if self.voting_history: | |
| # Contar uso de estratégias | |
| for vote in self.voting_history: | |
| stats['strategy_usage'][vote['strategy'].value] += 1 | |
| # Calcular médias | |
| processing_times = [vote['result'].processing_time for vote in self.voting_history] | |
| consensus_strengths = [vote['result'].consensus_strength for vote in self.voting_history] | |
| stats['avg_processing_time'] = np.mean(processing_times) | |
| stats['avg_consensus_strength'] = np.mean(consensus_strengths) | |
| # Performance das estratégias | |
| for strategy, performances in self.strategy_performance.items(): | |
| if performances: | |
| stats['strategy_performance'][strategy.value] = { | |
| 'avg_accuracy': np.mean(list(performances)), | |
| 'recent_accuracy': np.mean(list(performances)[-10:]) if len(performances) >= 10 else np.mean(list(performances)), | |
| 'sample_count': len(performances) | |
| } | |
| return dict(stats) | |
| # Instância global do sistema de votação | |
| voting_system = IntelligentVotingSystem() | |
| # Função de conveniência | |
| def intelligent_vote(predictions: List[Dict[str, Any]], | |
| strategy: VotingStrategy = VotingStrategy.ADAPTIVE_ENSEMBLE, | |
| market_context: Optional[Dict[str, float]] = None) -> VoteResult: | |
| """Função principal para votação inteligente""" | |
| return voting_system.vote(predictions, strategy, market_context) | |
| if __name__ == "__main__": | |
| # Teste do sistema | |
| test_predictions = [ | |
| {'model_name': 'FinBERT', 'prediction': 'POSITIVO', 'confidence': 0.8, 'sentiment_score': 0.6}, | |
| {'model_name': 'DistilBERT', 'prediction': 'POSITIVO', 'confidence': 0.7, 'sentiment_score': 0.4}, | |
| {'model_name': 'RoBERTa', 'prediction': 'NEUTRO', 'confidence': 0.6, 'sentiment_score': 0.1}, | |
| {'model_name': 'BERT', 'prediction': 'POSITIVO', 'confidence': 0.9, 'sentiment_score': 0.7} | |
| ] | |
| print("Testando sistema de votação inteligente...") | |
| for strategy in VotingStrategy: | |
| result = intelligent_vote(test_predictions, strategy) | |
| print(f"\nEstratégia: {strategy.value}") | |
| print(f"Decisão: {result.decision}") | |
| print(f"Confiança: {result.confidence:.3f}") | |
| print(f"Consenso: {result.consensus_strength:.3f}") | |
| print(f"Tempo: {result.processing_time:.3f}s") |