teste / src /analysis /sentiment_analysis.py
torxyton's picture
feat: Implementa sistema ensemble avançado de IA com múltiplos modelos
215b833
"""Módulo de análise de sentimento usando IA financeira com sistema Ensemble."""
import re
import asyncio
from typing import Dict, Optional, Any
from dataclasses import dataclass
from config.config import FINANCIAL_MODELS, AIConfig, AppConfig
# Importações do sistema Ensemble
try:
from src.ai.ensemble_ai import ensemble_ai, EnsembleResult
from src.ai.voting_system import intelligent_vote, VotingStrategy
ENSEMBLE_AVAILABLE = True
except ImportError:
ENSEMBLE_AVAILABLE = False
print("Sistema Ensemble não disponível, usando fallback...")
# Importações opcionais para IA (fallback)
try:
from transformers import pipeline
import torch
TRANSFORMERS_AVAILABLE = True
except ImportError:
TRANSFORMERS_AVAILABLE = False
print(AppConfig.STATUS_MESSAGES['AI_UNAVAILABLE'])
@dataclass
class SentimentResult:
"""Classe para representar resultado de análise de sentimento."""
sentiment: str # 'positive', 'negative', 'neutral'
confidence: float # 0.0 - 1.0
label: str # 'POSITIVO', 'NEGATIVO', 'NEUTRO'
model_used: Optional[str] = None
class ModelManager:
"""Gerenciador de modelos de IA."""
def __init__(self):
self.sentiment_pipeline = None
self.current_model_info = None
self.is_available = TRANSFORMERS_AVAILABLE
if self.is_available:
self._load_models()
def _load_models(self) -> None:
"""Tenta carregar modelos em ordem de prioridade."""
for model_config in FINANCIAL_MODELS:
try:
print(AppConfig.STATUS_MESSAGES['AI_LOADING'].format(
model_config['description']
))
self.sentiment_pipeline = pipeline(
AIConfig.PIPELINE_CONFIG['task'],
model=model_config["name"],
return_all_scores=AIConfig.PIPELINE_CONFIG['return_all_scores']
)
self.current_model_info = model_config
print(AppConfig.STATUS_MESSAGES['AI_SUCCESS'].format(
model_config['description']
))
break
except Exception as e:
print(AppConfig.STATUS_MESSAGES['AI_FAILED'].format(
model_config['name'], str(e)
))
continue
if self.sentiment_pipeline is None:
print(AppConfig.STATUS_MESSAGES['NO_MODEL_LOADED'])
self.is_available = False
def get_model_info(self) -> Optional[Dict[str, Any]]:
"""Retorna informações do modelo atual."""
return self.current_model_info
def is_model_available(self) -> bool:
"""Verifica se há modelo disponível."""
return self.is_available and self.sentiment_pipeline is not None
class TextPreprocessor:
"""Pré-processador de texto para análise de sentimento."""
@staticmethod
def clean_text(text: str) -> str:
"""Limpa e prepara texto para análise."""
if not text:
return ""
# Remover caracteres especiais, manter apenas palavras, espaços e alguns símbolos
clean_text = re.sub(r'[^\w\s\+\-\%\.]', ' ', text)
# Limitar tamanho para o modelo
clean_text = clean_text[:AIConfig.MAX_TEXT_LENGTH]
# Remover espaços extras
clean_text = ' '.join(clean_text.split())
return clean_text
@staticmethod
def extract_financial_keywords(text: str) -> Dict[str, int]:
"""Extrai palavras-chave financeiras do texto."""
financial_keywords = {
'positive': ['alta', 'subida', 'ganho', 'lucro', 'crescimento', 'otimista', 'positivo'],
'negative': ['baixa', 'queda', 'perda', 'prejuízo', 'declínio', 'pessimista', 'negativo'],
'neutral': ['estável', 'neutro', 'lateral', 'consolidação']
}
text_lower = text.lower()
keyword_counts = {'positive': 0, 'negative': 0, 'neutral': 0}
for category, keywords in financial_keywords.items():
for keyword in keywords:
keyword_counts[category] += text_lower.count(keyword)
return keyword_counts
class SentimentAnalyzer:
"""Analisador de sentimento principal."""
def __init__(self, model_manager: ModelManager):
self.model_manager = model_manager
self.preprocessor = TextPreprocessor()
def analyze(self, text: str) -> SentimentResult:
"""Analisa o sentimento do texto."""
if not self.model_manager.is_model_available():
return self._get_fallback_sentiment(text)
try:
# Pré-processar texto
clean_text = self.preprocessor.clean_text(text)
if not clean_text.strip():
return SentimentResult(
sentiment='neutral',
confidence=0.5,
label='NEUTRO',
model_used='fallback'
)
# Executar análise de sentimento
result = self.model_manager.sentiment_pipeline(clean_text)
# Processar resultado
return self._process_model_result(result)
except Exception as e:
print(f"Erro na análise de sentimento: {e}")
return self._get_fallback_sentiment(text)
def _process_model_result(self, result: Any) -> SentimentResult:
"""Processa resultado do modelo de IA."""
try:
# Processar resultado baseado no formato
if isinstance(result, list) and len(result) > 0:
# Se return_all_scores=True, pegar o resultado com maior score
if isinstance(result[0], list):
predictions = result[0]
best_prediction = max(predictions, key=lambda x: x['score'])
else:
best_prediction = result[0]
# Mapear label usando o mapeamento do modelo atual
label = best_prediction['label']
confidence = best_prediction['score']
# Usar mapeamento específico do modelo ou fallback genérico
model_info = self.model_manager.get_model_info()
if model_info and label in model_info['labels']:
sentiment_label = model_info['labels'][label]
else:
# Fallback para mapeamento genérico
sentiment_label = self._map_generic_label(label)
return SentimentResult(
sentiment=label.lower(),
confidence=confidence,
label=sentiment_label,
model_used=model_info['name'] if model_info else 'unknown'
)
# Fallback se resultado não esperado
return SentimentResult(
sentiment='neutral',
confidence=0.5,
label='NEUTRO',
model_used='fallback'
)
except Exception as e:
print(f"Erro ao processar resultado do modelo: {e}")
return SentimentResult(
sentiment='neutral',
confidence=0.5,
label='NEUTRO',
model_used='error_fallback'
)
def _map_generic_label(self, label: str) -> str:
"""Mapeia labels genéricos para formato padrão."""
label_lower = label.lower()
if 'neg' in label_lower or 'bad' in label_lower:
return 'NEGATIVO'
elif 'pos' in label_lower or 'good' in label_lower:
return 'POSITIVO'
else:
return 'NEUTRO'
def _get_fallback_sentiment(self, text: str) -> SentimentResult:
"""Análise de sentimento baseada em palavras-chave (fallback)."""
if not text:
return SentimentResult(
sentiment='neutral',
confidence=0.5,
label='NEUTRO',
model_used='keyword_fallback'
)
# Análise baseada em palavras-chave
keyword_counts = self.preprocessor.extract_financial_keywords(text)
total_keywords = sum(keyword_counts.values())
if total_keywords == 0:
return SentimentResult(
sentiment='neutral',
confidence=0.5,
label='NEUTRO',
model_used='keyword_fallback'
)
# Determinar sentimento dominante
max_category = max(keyword_counts, key=keyword_counts.get)
max_count = keyword_counts[max_category]
confidence = min(0.8, max_count / total_keywords) # Máximo 80% de confiança
sentiment_mapping = {
'positive': ('positive', 'POSITIVO'),
'negative': ('negative', 'NEGATIVO'),
'neutral': ('neutral', 'NEUTRO')
}
sentiment, label = sentiment_mapping[max_category]
return SentimentResult(
sentiment=sentiment,
confidence=confidence,
label=label,
model_used='keyword_fallback'
)
class SentimentScorer:
"""Calculador de pontuação baseada em sentimento."""
@staticmethod
def calculate_sentiment_score(sentiment_result: SentimentResult) -> int:
"""Calcula pontuação de confiança baseada no sentimento."""
from config import ScoringConfig
base_score = int(sentiment_result.confidence * ScoringConfig.SENTIMENT_MAX_SCORE)
# Bonificação por modelo de IA vs fallback
if sentiment_result.model_used and 'fallback' not in sentiment_result.model_used:
base_score = int(base_score * 1.2) # 20% de bonificação para modelos de IA
return min(base_score, ScoringConfig.SENTIMENT_MAX_SCORE)
@staticmethod
def get_sentiment_signal_description(sentiment_result: SentimentResult) -> str:
"""Gera descrição do sinal de sentimento."""
confidence_pct = sentiment_result.confidence * 100
if sentiment_result.label == 'POSITIVO':
bias = "viés de COMPRA"
elif sentiment_result.label == 'NEGATIVO':
bias = "viés de VENDA"
else:
bias = "sem viés claro"
model_indicator = "🤖 IA" if 'fallback' not in (sentiment_result.model_used or '') else "📝 Palavras-chave"
return f"{model_indicator} Sentimento: {sentiment_result.label} ({confidence_pct:.1f}%): {bias}"
class SentimentAnalysisEngine:
"""Engine principal de análise de sentimento com sistema Ensemble."""
def __init__(self):
# Sistema Ensemble (preferido)
self.ensemble_available = ENSEMBLE_AVAILABLE
# Sistema tradicional (fallback)
self.model_manager = ModelManager()
self.analyzer = SentimentAnalyzer(self.model_manager)
self.scorer = SentimentScorer()
# Configurações do ensemble
self.voting_strategy = VotingStrategy.ADAPTIVE_ENSEMBLE
self.use_ensemble = self.ensemble_available
def analyze_text(self, text: str) -> Dict[str, Any]:
"""Executa análise completa de sentimento usando sistema Ensemble ou fallback."""
if not text:
return self._get_empty_result()
# Usar sistema Ensemble se disponível
if self.use_ensemble and self.ensemble_available:
try:
return self._analyze_with_ensemble(text)
except Exception as e:
print(f"Erro no sistema Ensemble, usando fallback: {e}")
# Continuar com sistema tradicional
# Sistema tradicional (fallback)
sentiment_result = self.analyzer.analyze(text)
score = self.scorer.calculate_sentiment_score(sentiment_result)
description = self.scorer.get_sentiment_signal_description(sentiment_result)
return {
'result': sentiment_result,
'score': score,
'description': description,
'ensemble_used': False
}
def get_model_status(self) -> Dict[str, Any]:
"""Retorna status dos modelos de IA (Ensemble + Fallback)."""
if self.use_ensemble and self.ensemble_available:
# Status do sistema Ensemble
try:
ensemble_stats = ensemble_ai.get_performance_stats()
active_models = len([m for m in ensemble_ai.models if m.is_available])
return {
'available': True,
'model_name': f'Ensemble AI ({active_models} modelos)',
'description': f'Sistema Ensemble com {active_models} modelos ativos',
'status': 'active',
'ensemble_stats': ensemble_stats,
'voting_strategy': self.voting_strategy.value
}
except Exception as e:
print(f"Erro ao obter status do Ensemble: {e}")
# Status do sistema tradicional
if self.model_manager.is_model_available():
model_info = self.model_manager.get_model_info()
return {
'available': True,
'model_name': model_info['name'] if model_info else 'Unknown',
'description': model_info['description'] if model_info else 'Unknown Model',
'status': 'active'
}
else:
return {
'available': False,
'model_name': None,
'description': 'IA indisponível - usando análise por palavras-chave',
'status': 'fallback'
}
def is_available(self) -> bool:
"""Verifica se análise de IA está disponível."""
return (self.use_ensemble and self.ensemble_available) or self.model_manager.is_model_available()
def _get_empty_result(self) -> Dict[str, Any]:
"""Retorna resultado vazio para texto inválido."""
from dataclasses import asdict
empty_result = SentimentResult(
sentiment='neutral',
confidence=0.5,
label='NEUTRO',
model_used='empty_input'
)
return {
'result': empty_result,
'score': 0,
'description': 'Texto vazio ou inválido',
'ensemble_used': False
}
def _analyze_with_ensemble(self, text: str) -> Dict[str, Any]:
"""Analisa texto usando sistema Ensemble."""
# Executar análise ensemble de forma síncrona
loop = None
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if loop.is_running():
# Se já há um loop rodando, criar uma task
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(asyncio.run, ensemble_ai.analyze_sentiment(text))
ensemble_result = future.result()
else:
# Executar diretamente
ensemble_result = loop.run_until_complete(ensemble_ai.analyze_sentiment(text))
# Converter resultado do ensemble para formato compatível
sentiment_result = SentimentResult(
sentiment=ensemble_result.final_prediction.lower(),
confidence=ensemble_result.confidence,
label=ensemble_result.final_prediction,
model_used=f'Ensemble ({len(ensemble_result.individual_predictions)} modelos)'
)
sentiment_score = self._convert_sentiment_to_score(ensemble_result.sentiment_score)
description = self.scorer.get_sentiment_signal_description(sentiment_result)
return {
'result': sentiment_result,
'score': sentiment_score,
'description': description,
'ensemble_used': True,
'ensemble_details': {
'consensus_strength': ensemble_result.consensus_strength,
'processing_time': ensemble_result.processing_time,
'individual_predictions': ensemble_result.individual_predictions,
'model_weights': ensemble_result.model_weights
}
}
def _convert_sentiment_to_score(self, sentiment_score: float) -> int:
"""Converte score de sentimento (-1 a 1) para escala de pontos."""
# Converter de [-1, 1] para [0, 100]
normalized_score = (sentiment_score + 1) / 2
return int(normalized_score * 100)
def set_voting_strategy(self, strategy):
"""Define estratégia de votação do ensemble."""
self.voting_strategy = strategy
def toggle_ensemble(self, use_ensemble: bool):
"""Ativa/desativa uso do sistema Ensemble."""
self.use_ensemble = use_ensemble and self.ensemble_available