import numpy as np import pandas as pd from typing import Dict, List, Tuple, Optional, Any from dataclasses import dataclass, asdict import logging from src.core.log_parser import FibonacciAnalysis, BotAnalysis # Configurar logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class FibonacciLevel: """Estrutura para um nível de Fibonacci""" level: float price: float type: str # 'retracement', 'extension', 'projection' ratio: float distance_from_current: float strength: float @dataclass class SwingPoint: """Estrutura para pontos de swing""" price: float timestamp: str type: str # 'high' ou 'low' strength: float @dataclass class ConfluenceZone: """Estrutura para zonas de confluência""" price_range: Tuple[float, float] levels_count: int strength: float types: List[str] @dataclass class HarmonicPattern: """Estrutura para padrões harmônicos""" name: str completion_point: float confidence: float target_levels: List[float] stop_loss: float @dataclass class AdvancedFibonacciAnalysis: """Análise avançada de Fibonacci completa""" swing_high: float swing_low: float current_price: float swing_range: float retracement_levels: List[FibonacciLevel] extension_levels: List[FibonacciLevel] projection_levels: List[FibonacciLevel] confluence_zones: List[ConfluenceZone] harmonic_patterns: List[HarmonicPattern] key_support: float key_resistance: float trend_direction: str fibonacci_zone: str overall_strength: float trading_signal: str alerts_count: int class AdvancedFibonacciEngine: """Engine para análise avançada de Fibonacci""" def __init__(self): # Ratios de Fibonacci padrão self.retracement_ratios = [0.236, 0.382, 0.5, 0.618, 0.786] self.extension_ratios = [1.272, 1.414, 1.618, 2.0, 2.618] self.projection_ratios = [0.618, 1.0, 1.272, 1.618] # Configurações de confluência self.confluence_threshold = 0.001 # 0.1% de tolerância self.min_confluence_levels = 2 # Padrões harmônicos self.harmonic_patterns = { 'Gartley': {'XA': 0.618, 'AB': 0.618, 'BC': 0.786, 'CD': 1.272}, 'Butterfly': {'XA': 0.786, 'AB': 0.618, 'BC': 0.886, 'CD': 1.618}, 'Bat': {'XA': 0.382, 'AB': 0.618, 'BC': 0.886, 'CD': 2.618}, 'Crab': {'XA': 0.618, 'AB': 0.618, 'BC': 0.886, 'CD': 3.618} } def analyze_from_bot_data(self, bot_analysis: BotAnalysis) -> AdvancedFibonacciAnalysis: """Analisa dados do bot externo""" fib_data = bot_analysis.fibonacci_analysis market_data = bot_analysis.market_data return self.perform_advanced_analysis( swing_high=fib_data.swing_high, swing_low=fib_data.swing_low, current_price=fib_data.current_price, historical_data=None, # Pode ser expandido futuramente bot_fibonacci_data=fib_data ) def perform_advanced_analysis( self, swing_high: float, swing_low: float, current_price: float, historical_data: Optional[pd.DataFrame] = None, bot_fibonacci_data: Optional[FibonacciAnalysis] = None ) -> AdvancedFibonacciAnalysis: """Realiza análise avançada de Fibonacci""" try: swing_range = swing_high - swing_low # Calcular níveis de retracement retracement_levels = self._calculate_retracement_levels( swing_high, swing_low, current_price ) # Calcular níveis de extensão extension_levels = self._calculate_extension_levels( swing_high, swing_low, current_price ) # Calcular níveis de projeção projection_levels = self._calculate_projection_levels( swing_high, swing_low, current_price ) # Identificar zonas de confluência all_levels = retracement_levels + extension_levels + projection_levels confluence_zones = self._identify_confluence_zones(all_levels) # Detectar padrões harmônicos harmonic_patterns = self._detect_harmonic_patterns( swing_high, swing_low, current_price, historical_data ) # Determinar suporte e resistência chave key_support, key_resistance = self._determine_key_levels( all_levels, current_price ) # Determinar direção da tendência trend_direction = self._determine_trend_direction( swing_high, swing_low, current_price ) # Determinar zona de Fibonacci fibonacci_zone = self._determine_fibonacci_zone( swing_high, swing_low, current_price ) # Calcular força geral da análise overall_strength = self._calculate_overall_strength( retracement_levels, extension_levels, confluence_zones, harmonic_patterns ) # Gerar sinal de trading trading_signal = self._generate_trading_signal( current_price, key_support, key_resistance, trend_direction, overall_strength ) # Contar alertas (baseado nos dados do bot se disponível) alerts_count = bot_fibonacci_data.alerts if bot_fibonacci_data else len(confluence_zones) return AdvancedFibonacciAnalysis( swing_high=swing_high, swing_low=swing_low, current_price=current_price, swing_range=swing_range, retracement_levels=retracement_levels, extension_levels=extension_levels, projection_levels=projection_levels, confluence_zones=confluence_zones, harmonic_patterns=harmonic_patterns, key_support=key_support, key_resistance=key_resistance, trend_direction=trend_direction, fibonacci_zone=fibonacci_zone, overall_strength=overall_strength, trading_signal=trading_signal, alerts_count=alerts_count ) except Exception as e: logger.error(f"Erro na análise avançada de Fibonacci: {e}") raise def _calculate_retracement_levels( self, swing_high: float, swing_low: float, current_price: float ) -> List[FibonacciLevel]: """Calcula níveis de retracement de Fibonacci""" levels = [] swing_range = swing_high - swing_low for ratio in self.retracement_ratios: price = swing_high - (swing_range * ratio) distance = abs(current_price - price) strength = self._calculate_level_strength(price, current_price, 'retracement') levels.append(FibonacciLevel( level=ratio, price=price, type='retracement', ratio=ratio, distance_from_current=distance, strength=strength )) return sorted(levels, key=lambda x: x.distance_from_current) def _calculate_extension_levels( self, swing_high: float, swing_low: float, current_price: float ) -> List[FibonacciLevel]: """Calcula níveis de extensão de Fibonacci""" levels = [] swing_range = swing_high - swing_low for ratio in self.extension_ratios: # Extensão para cima price_up = swing_high + (swing_range * (ratio - 1)) distance_up = abs(current_price - price_up) strength_up = self._calculate_level_strength(price_up, current_price, 'extension') levels.append(FibonacciLevel( level=ratio, price=price_up, type='extension_up', ratio=ratio, distance_from_current=distance_up, strength=strength_up )) # Extensão para baixo price_down = swing_low - (swing_range * (ratio - 1)) distance_down = abs(current_price - price_down) strength_down = self._calculate_level_strength(price_down, current_price, 'extension') levels.append(FibonacciLevel( level=ratio, price=price_down, type='extension_down', ratio=ratio, distance_from_current=distance_down, strength=strength_down )) return sorted(levels, key=lambda x: x.distance_from_current) def _calculate_projection_levels( self, swing_high: float, swing_low: float, current_price: float ) -> List[FibonacciLevel]: """Calcula níveis de projeção de Fibonacci""" levels = [] swing_range = swing_high - swing_low for ratio in self.projection_ratios: # Projeção baseada no movimento atual if current_price > (swing_high + swing_low) / 2: # Tendência de alta price = current_price + (swing_range * ratio) direction = 'projection_up' else: # Tendência de baixa price = current_price - (swing_range * ratio) direction = 'projection_down' distance = abs(current_price - price) strength = self._calculate_level_strength(price, current_price, 'projection') levels.append(FibonacciLevel( level=ratio, price=price, type=direction, ratio=ratio, distance_from_current=distance, strength=strength )) return sorted(levels, key=lambda x: x.distance_from_current) def _calculate_level_strength( self, level_price: float, current_price: float, level_type: str ) -> float: """Calcula a força de um nível de Fibonacci""" distance_factor = 1 / (1 + abs(level_price - current_price) / current_price) type_weights = { 'retracement': 1.0, 'extension': 0.8, 'projection': 0.6 } base_strength = type_weights.get(level_type, 0.5) return base_strength * distance_factor def _identify_confluence_zones( self, all_levels: List[FibonacciLevel] ) -> List[ConfluenceZone]: """Identifica zonas de confluência entre níveis""" confluence_zones = [] # Agrupar níveis próximos sorted_levels = sorted(all_levels, key=lambda x: x.price) i = 0 while i < len(sorted_levels): current_level = sorted_levels[i] zone_levels = [current_level] zone_types = [current_level.type] # Procurar níveis próximos j = i + 1 while j < len(sorted_levels): next_level = sorted_levels[j] price_diff = abs(next_level.price - current_level.price) / current_level.price if price_diff <= self.confluence_threshold: zone_levels.append(next_level) zone_types.append(next_level.type) j += 1 else: break # Criar zona de confluência se houver níveis suficientes if len(zone_levels) >= self.min_confluence_levels: min_price = min(level.price for level in zone_levels) max_price = max(level.price for level in zone_levels) avg_strength = sum(level.strength for level in zone_levels) / len(zone_levels) confluence_zones.append(ConfluenceZone( price_range=(min_price, max_price), levels_count=len(zone_levels), strength=avg_strength * len(zone_levels), # Força multiplicada pelo número de níveis types=list(set(zone_types)) )) i = j if j > i + 1 else i + 1 return sorted(confluence_zones, key=lambda x: x.strength, reverse=True) def _detect_harmonic_patterns( self, swing_high: float, swing_low: float, current_price: float, historical_data: Optional[pd.DataFrame] = None ) -> List[HarmonicPattern]: """Detecta padrões harmônicos (implementação básica)""" patterns = [] # Implementação simplificada - pode ser expandida com dados históricos swing_range = swing_high - swing_low for pattern_name, ratios in self.harmonic_patterns.items(): # Verificar se o preço atual está em uma posição válida para o padrão completion_point = swing_low + (swing_range * ratios['CD']) if abs(current_price - completion_point) / current_price < 0.02: # 2% de tolerância confidence = 0.7 # Confiança básica # Calcular alvos baseados no padrão target_levels = [ completion_point + (swing_range * 0.382), completion_point + (swing_range * 0.618), completion_point + (swing_range * 1.0) ] stop_loss = completion_point - (swing_range * 0.236) patterns.append(HarmonicPattern( name=pattern_name, completion_point=completion_point, confidence=confidence, target_levels=target_levels, stop_loss=stop_loss )) return sorted(patterns, key=lambda x: x.confidence, reverse=True) def _determine_key_levels( self, all_levels: List[FibonacciLevel], current_price: float ) -> Tuple[float, float]: """Determina níveis chave de suporte e resistência""" support_levels = [level for level in all_levels if level.price < current_price] resistance_levels = [level for level in all_levels if level.price > current_price] # Suporte mais próximo e forte key_support = current_price if support_levels: key_support = max(support_levels, key=lambda x: x.strength).price # Resistência mais próxima e forte key_resistance = current_price if resistance_levels: key_resistance = min(resistance_levels, key=lambda x: x.strength).price return key_support, key_resistance def _determine_trend_direction( self, swing_high: float, swing_low: float, current_price: float ) -> str: """Determina direção da tendência""" mid_point = (swing_high + swing_low) / 2 if current_price > mid_point + (swing_high - swing_low) * 0.1: return 'ALTA' elif current_price < mid_point - (swing_high - swing_low) * 0.1: return 'BAIXA' else: return 'LATERAL' def _determine_fibonacci_zone( self, swing_high: float, swing_low: float, current_price: float ) -> str: """Determina zona de Fibonacci atual""" swing_range = swing_high - swing_low position = (current_price - swing_low) / swing_range if position >= 0.786: return 'ZONA_ALTA' elif position >= 0.618: return 'ZONA_MEDIA_ALTA' elif position >= 0.382: return 'ZONA_MEDIA' elif position >= 0.236: return 'ZONA_MEDIA_BAIXA' else: return 'ZONA_BAIXA' def _calculate_overall_strength( self, retracement_levels: List[FibonacciLevel], extension_levels: List[FibonacciLevel], confluence_zones: List[ConfluenceZone], harmonic_patterns: List[HarmonicPattern] ) -> float: """Calcula força geral da análise""" # Força baseada em níveis próximos level_strength = sum(level.strength for level in retracement_levels[:3]) # Top 3 level_strength += sum(level.strength for level in extension_levels[:3]) # Top 3 # Força das zonas de confluência confluence_strength = sum(zone.strength for zone in confluence_zones) # Força dos padrões harmônicos harmonic_strength = sum(pattern.confidence for pattern in harmonic_patterns) # Normalizar para 0-1 total_strength = (level_strength + confluence_strength + harmonic_strength) / 10 return min(total_strength, 1.0) def _generate_trading_signal( self, current_price: float, key_support: float, key_resistance: float, trend_direction: str, overall_strength: float ) -> str: """Gera sinal de trading baseado na análise""" support_distance = abs(current_price - key_support) / current_price resistance_distance = abs(current_price - key_resistance) / current_price if overall_strength < 0.3: return 'HOLD' if trend_direction == 'ALTA' and support_distance < 0.02: return 'BUY' elif trend_direction == 'BAIXA' and resistance_distance < 0.02: return 'SELL' elif support_distance < resistance_distance and overall_strength > 0.6: return 'BUY' elif resistance_distance < support_distance and overall_strength > 0.6: return 'SELL' else: return 'HOLD' def format_analysis_report(self, analysis: AdvancedFibonacciAnalysis) -> str: """Formata relatório da análise""" report = f""" 🔮 ANÁLISE AVANÇADA DE FIBONACCI {'='*50} 📊 DADOS BÁSICOS: Swing Alto: {analysis.swing_high:,.2f} Swing Baixo: {analysis.swing_low:,.2f} Preço Atual: {analysis.current_price:,.2f} Range: {analysis.swing_range:,.2f} 📈 NÍVEIS DE RETRACEMENT ({len(analysis.retracement_levels)}): """ for level in analysis.retracement_levels[:5]: # Top 5 report += f" {level.ratio:.1%}: {level.price:,.2f} (Força: {level.strength:.2f})\n" report += f"\n📊 NÍVEIS DE EXTENSÃO ({len(analysis.extension_levels)}):\n" for level in analysis.extension_levels[:5]: # Top 5 report += f" {level.ratio:.1%}: {level.price:,.2f} ({level.type})\n" if analysis.confluence_zones: report += f"\n🎯 ZONAS DE CONFLUÊNCIA ({len(analysis.confluence_zones)}):\n" for zone in analysis.confluence_zones[:3]: # Top 3 report += f" {zone.price_range[0]:,.2f} - {zone.price_range[1]:,.2f} ({zone.levels_count} níveis)\n" if analysis.harmonic_patterns: report += f"\n🎼 PADRÕES HARMÔNICOS ({len(analysis.harmonic_patterns)}):\n" for pattern in analysis.harmonic_patterns: report += f" {pattern.name}: {pattern.confidence:.1%} confiança\n" report += f""" 🎯 NÍVEIS CHAVE: Suporte: {analysis.key_support:,.2f} Resistência: {analysis.key_resistance:,.2f} 📊 ANÁLISE GERAL: Tendência: {analysis.trend_direction} Zona Fibonacci: {analysis.fibonacci_zone} Força da Análise: {analysis.overall_strength:.1%} Sinal: {analysis.trading_signal} Alertas: {analysis.alerts_count} """ return report # Exemplo de uso if __name__ == "__main__": engine = AdvancedFibonacciEngine() # Exemplo com dados do bot analysis = engine.perform_advanced_analysis( swing_high=140570.0, swing_low=139540.0, current_price=140135.0 ) print(engine.format_analysis_report(analysis))