Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import pandas as pd | |
| from typing import Dict, List, Tuple, Optional, Any | |
| from dataclasses import dataclass, asdict | |
| import logging | |
| from src.core.log_parser import FibonacciAnalysis, BotAnalysis | |
| # Configurar logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class FibonacciLevel: | |
| """Estrutura para um nível de Fibonacci""" | |
| level: float | |
| price: float | |
| type: str # 'retracement', 'extension', 'projection' | |
| ratio: float | |
| distance_from_current: float | |
| strength: float | |
| class SwingPoint: | |
| """Estrutura para pontos de swing""" | |
| price: float | |
| timestamp: str | |
| type: str # 'high' ou 'low' | |
| strength: float | |
| class ConfluenceZone: | |
| """Estrutura para zonas de confluência""" | |
| price_range: Tuple[float, float] | |
| levels_count: int | |
| strength: float | |
| types: List[str] | |
| class HarmonicPattern: | |
| """Estrutura para padrões harmônicos""" | |
| name: str | |
| completion_point: float | |
| confidence: float | |
| target_levels: List[float] | |
| stop_loss: float | |
| class AdvancedFibonacciAnalysis: | |
| """Análise avançada de Fibonacci completa""" | |
| swing_high: float | |
| swing_low: float | |
| current_price: float | |
| swing_range: float | |
| retracement_levels: List[FibonacciLevel] | |
| extension_levels: List[FibonacciLevel] | |
| projection_levels: List[FibonacciLevel] | |
| confluence_zones: List[ConfluenceZone] | |
| harmonic_patterns: List[HarmonicPattern] | |
| key_support: float | |
| key_resistance: float | |
| trend_direction: str | |
| fibonacci_zone: str | |
| overall_strength: float | |
| trading_signal: str | |
| alerts_count: int | |
| class AdvancedFibonacciEngine: | |
| """Engine para análise avançada de Fibonacci""" | |
| def __init__(self): | |
| # Ratios de Fibonacci padrão | |
| self.retracement_ratios = [0.236, 0.382, 0.5, 0.618, 0.786] | |
| self.extension_ratios = [1.272, 1.414, 1.618, 2.0, 2.618] | |
| self.projection_ratios = [0.618, 1.0, 1.272, 1.618] | |
| # Configurações de confluência | |
| self.confluence_threshold = 0.001 # 0.1% de tolerância | |
| self.min_confluence_levels = 2 | |
| # Padrões harmônicos | |
| self.harmonic_patterns = { | |
| 'Gartley': {'XA': 0.618, 'AB': 0.618, 'BC': 0.786, 'CD': 1.272}, | |
| 'Butterfly': {'XA': 0.786, 'AB': 0.618, 'BC': 0.886, 'CD': 1.618}, | |
| 'Bat': {'XA': 0.382, 'AB': 0.618, 'BC': 0.886, 'CD': 2.618}, | |
| 'Crab': {'XA': 0.618, 'AB': 0.618, 'BC': 0.886, 'CD': 3.618} | |
| } | |
| def analyze_from_bot_data(self, bot_analysis: BotAnalysis) -> AdvancedFibonacciAnalysis: | |
| """Analisa dados do bot externo""" | |
| fib_data = bot_analysis.fibonacci_analysis | |
| market_data = bot_analysis.market_data | |
| return self.perform_advanced_analysis( | |
| swing_high=fib_data.swing_high, | |
| swing_low=fib_data.swing_low, | |
| current_price=fib_data.current_price, | |
| historical_data=None, # Pode ser expandido futuramente | |
| bot_fibonacci_data=fib_data | |
| ) | |
| def perform_advanced_analysis( | |
| self, | |
| swing_high: float, | |
| swing_low: float, | |
| current_price: float, | |
| historical_data: Optional[pd.DataFrame] = None, | |
| bot_fibonacci_data: Optional[FibonacciAnalysis] = None | |
| ) -> AdvancedFibonacciAnalysis: | |
| """Realiza análise avançada de Fibonacci""" | |
| try: | |
| swing_range = swing_high - swing_low | |
| # Calcular níveis de retracement | |
| retracement_levels = self._calculate_retracement_levels( | |
| swing_high, swing_low, current_price | |
| ) | |
| # Calcular níveis de extensão | |
| extension_levels = self._calculate_extension_levels( | |
| swing_high, swing_low, current_price | |
| ) | |
| # Calcular níveis de projeção | |
| projection_levels = self._calculate_projection_levels( | |
| swing_high, swing_low, current_price | |
| ) | |
| # Identificar zonas de confluência | |
| all_levels = retracement_levels + extension_levels + projection_levels | |
| confluence_zones = self._identify_confluence_zones(all_levels) | |
| # Detectar padrões harmônicos | |
| harmonic_patterns = self._detect_harmonic_patterns( | |
| swing_high, swing_low, current_price, historical_data | |
| ) | |
| # Determinar suporte e resistência chave | |
| key_support, key_resistance = self._determine_key_levels( | |
| all_levels, current_price | |
| ) | |
| # Determinar direção da tendência | |
| trend_direction = self._determine_trend_direction( | |
| swing_high, swing_low, current_price | |
| ) | |
| # Determinar zona de Fibonacci | |
| fibonacci_zone = self._determine_fibonacci_zone( | |
| swing_high, swing_low, current_price | |
| ) | |
| # Calcular força geral da análise | |
| overall_strength = self._calculate_overall_strength( | |
| retracement_levels, extension_levels, confluence_zones, harmonic_patterns | |
| ) | |
| # Gerar sinal de trading | |
| trading_signal = self._generate_trading_signal( | |
| current_price, key_support, key_resistance, trend_direction, overall_strength | |
| ) | |
| # Contar alertas (baseado nos dados do bot se disponível) | |
| alerts_count = bot_fibonacci_data.alerts if bot_fibonacci_data else len(confluence_zones) | |
| return AdvancedFibonacciAnalysis( | |
| swing_high=swing_high, | |
| swing_low=swing_low, | |
| current_price=current_price, | |
| swing_range=swing_range, | |
| retracement_levels=retracement_levels, | |
| extension_levels=extension_levels, | |
| projection_levels=projection_levels, | |
| confluence_zones=confluence_zones, | |
| harmonic_patterns=harmonic_patterns, | |
| key_support=key_support, | |
| key_resistance=key_resistance, | |
| trend_direction=trend_direction, | |
| fibonacci_zone=fibonacci_zone, | |
| overall_strength=overall_strength, | |
| trading_signal=trading_signal, | |
| alerts_count=alerts_count | |
| ) | |
| except Exception as e: | |
| logger.error(f"Erro na análise avançada de Fibonacci: {e}") | |
| raise | |
| def _calculate_retracement_levels( | |
| self, swing_high: float, swing_low: float, current_price: float | |
| ) -> List[FibonacciLevel]: | |
| """Calcula níveis de retracement de Fibonacci""" | |
| levels = [] | |
| swing_range = swing_high - swing_low | |
| for ratio in self.retracement_ratios: | |
| price = swing_high - (swing_range * ratio) | |
| distance = abs(current_price - price) | |
| strength = self._calculate_level_strength(price, current_price, 'retracement') | |
| levels.append(FibonacciLevel( | |
| level=ratio, | |
| price=price, | |
| type='retracement', | |
| ratio=ratio, | |
| distance_from_current=distance, | |
| strength=strength | |
| )) | |
| return sorted(levels, key=lambda x: x.distance_from_current) | |
| def _calculate_extension_levels( | |
| self, swing_high: float, swing_low: float, current_price: float | |
| ) -> List[FibonacciLevel]: | |
| """Calcula níveis de extensão de Fibonacci""" | |
| levels = [] | |
| swing_range = swing_high - swing_low | |
| for ratio in self.extension_ratios: | |
| # Extensão para cima | |
| price_up = swing_high + (swing_range * (ratio - 1)) | |
| distance_up = abs(current_price - price_up) | |
| strength_up = self._calculate_level_strength(price_up, current_price, 'extension') | |
| levels.append(FibonacciLevel( | |
| level=ratio, | |
| price=price_up, | |
| type='extension_up', | |
| ratio=ratio, | |
| distance_from_current=distance_up, | |
| strength=strength_up | |
| )) | |
| # Extensão para baixo | |
| price_down = swing_low - (swing_range * (ratio - 1)) | |
| distance_down = abs(current_price - price_down) | |
| strength_down = self._calculate_level_strength(price_down, current_price, 'extension') | |
| levels.append(FibonacciLevel( | |
| level=ratio, | |
| price=price_down, | |
| type='extension_down', | |
| ratio=ratio, | |
| distance_from_current=distance_down, | |
| strength=strength_down | |
| )) | |
| return sorted(levels, key=lambda x: x.distance_from_current) | |
| def _calculate_projection_levels( | |
| self, swing_high: float, swing_low: float, current_price: float | |
| ) -> List[FibonacciLevel]: | |
| """Calcula níveis de projeção de Fibonacci""" | |
| levels = [] | |
| swing_range = swing_high - swing_low | |
| for ratio in self.projection_ratios: | |
| # Projeção baseada no movimento atual | |
| if current_price > (swing_high + swing_low) / 2: # Tendência de alta | |
| price = current_price + (swing_range * ratio) | |
| direction = 'projection_up' | |
| else: # Tendência de baixa | |
| price = current_price - (swing_range * ratio) | |
| direction = 'projection_down' | |
| distance = abs(current_price - price) | |
| strength = self._calculate_level_strength(price, current_price, 'projection') | |
| levels.append(FibonacciLevel( | |
| level=ratio, | |
| price=price, | |
| type=direction, | |
| ratio=ratio, | |
| distance_from_current=distance, | |
| strength=strength | |
| )) | |
| return sorted(levels, key=lambda x: x.distance_from_current) | |
| def _calculate_level_strength( | |
| self, level_price: float, current_price: float, level_type: str | |
| ) -> float: | |
| """Calcula a força de um nível de Fibonacci""" | |
| distance_factor = 1 / (1 + abs(level_price - current_price) / current_price) | |
| type_weights = { | |
| 'retracement': 1.0, | |
| 'extension': 0.8, | |
| 'projection': 0.6 | |
| } | |
| base_strength = type_weights.get(level_type, 0.5) | |
| return base_strength * distance_factor | |
| def _identify_confluence_zones( | |
| self, all_levels: List[FibonacciLevel] | |
| ) -> List[ConfluenceZone]: | |
| """Identifica zonas de confluência entre níveis""" | |
| confluence_zones = [] | |
| # Agrupar níveis próximos | |
| sorted_levels = sorted(all_levels, key=lambda x: x.price) | |
| i = 0 | |
| while i < len(sorted_levels): | |
| current_level = sorted_levels[i] | |
| zone_levels = [current_level] | |
| zone_types = [current_level.type] | |
| # Procurar níveis próximos | |
| j = i + 1 | |
| while j < len(sorted_levels): | |
| next_level = sorted_levels[j] | |
| price_diff = abs(next_level.price - current_level.price) / current_level.price | |
| if price_diff <= self.confluence_threshold: | |
| zone_levels.append(next_level) | |
| zone_types.append(next_level.type) | |
| j += 1 | |
| else: | |
| break | |
| # Criar zona de confluência se houver níveis suficientes | |
| if len(zone_levels) >= self.min_confluence_levels: | |
| min_price = min(level.price for level in zone_levels) | |
| max_price = max(level.price for level in zone_levels) | |
| avg_strength = sum(level.strength for level in zone_levels) / len(zone_levels) | |
| confluence_zones.append(ConfluenceZone( | |
| price_range=(min_price, max_price), | |
| levels_count=len(zone_levels), | |
| strength=avg_strength * len(zone_levels), # Força multiplicada pelo número de níveis | |
| types=list(set(zone_types)) | |
| )) | |
| i = j if j > i + 1 else i + 1 | |
| return sorted(confluence_zones, key=lambda x: x.strength, reverse=True) | |
| def _detect_harmonic_patterns( | |
| self, swing_high: float, swing_low: float, current_price: float, | |
| historical_data: Optional[pd.DataFrame] = None | |
| ) -> List[HarmonicPattern]: | |
| """Detecta padrões harmônicos (implementação básica)""" | |
| patterns = [] | |
| # Implementação simplificada - pode ser expandida com dados históricos | |
| swing_range = swing_high - swing_low | |
| for pattern_name, ratios in self.harmonic_patterns.items(): | |
| # Verificar se o preço atual está em uma posição válida para o padrão | |
| completion_point = swing_low + (swing_range * ratios['CD']) | |
| if abs(current_price - completion_point) / current_price < 0.02: # 2% de tolerância | |
| confidence = 0.7 # Confiança básica | |
| # Calcular alvos baseados no padrão | |
| target_levels = [ | |
| completion_point + (swing_range * 0.382), | |
| completion_point + (swing_range * 0.618), | |
| completion_point + (swing_range * 1.0) | |
| ] | |
| stop_loss = completion_point - (swing_range * 0.236) | |
| patterns.append(HarmonicPattern( | |
| name=pattern_name, | |
| completion_point=completion_point, | |
| confidence=confidence, | |
| target_levels=target_levels, | |
| stop_loss=stop_loss | |
| )) | |
| return sorted(patterns, key=lambda x: x.confidence, reverse=True) | |
| def _determine_key_levels( | |
| self, all_levels: List[FibonacciLevel], current_price: float | |
| ) -> Tuple[float, float]: | |
| """Determina níveis chave de suporte e resistência""" | |
| support_levels = [level for level in all_levels if level.price < current_price] | |
| resistance_levels = [level for level in all_levels if level.price > current_price] | |
| # Suporte mais próximo e forte | |
| key_support = current_price | |
| if support_levels: | |
| key_support = max(support_levels, key=lambda x: x.strength).price | |
| # Resistência mais próxima e forte | |
| key_resistance = current_price | |
| if resistance_levels: | |
| key_resistance = min(resistance_levels, key=lambda x: x.strength).price | |
| return key_support, key_resistance | |
| def _determine_trend_direction( | |
| self, swing_high: float, swing_low: float, current_price: float | |
| ) -> str: | |
| """Determina direção da tendência""" | |
| mid_point = (swing_high + swing_low) / 2 | |
| if current_price > mid_point + (swing_high - swing_low) * 0.1: | |
| return 'ALTA' | |
| elif current_price < mid_point - (swing_high - swing_low) * 0.1: | |
| return 'BAIXA' | |
| else: | |
| return 'LATERAL' | |
| def _determine_fibonacci_zone( | |
| self, swing_high: float, swing_low: float, current_price: float | |
| ) -> str: | |
| """Determina zona de Fibonacci atual""" | |
| swing_range = swing_high - swing_low | |
| position = (current_price - swing_low) / swing_range | |
| if position >= 0.786: | |
| return 'ZONA_ALTA' | |
| elif position >= 0.618: | |
| return 'ZONA_MEDIA_ALTA' | |
| elif position >= 0.382: | |
| return 'ZONA_MEDIA' | |
| elif position >= 0.236: | |
| return 'ZONA_MEDIA_BAIXA' | |
| else: | |
| return 'ZONA_BAIXA' | |
| def _calculate_overall_strength( | |
| self, retracement_levels: List[FibonacciLevel], | |
| extension_levels: List[FibonacciLevel], | |
| confluence_zones: List[ConfluenceZone], | |
| harmonic_patterns: List[HarmonicPattern] | |
| ) -> float: | |
| """Calcula força geral da análise""" | |
| # Força baseada em níveis próximos | |
| level_strength = sum(level.strength for level in retracement_levels[:3]) # Top 3 | |
| level_strength += sum(level.strength for level in extension_levels[:3]) # Top 3 | |
| # Força das zonas de confluência | |
| confluence_strength = sum(zone.strength for zone in confluence_zones) | |
| # Força dos padrões harmônicos | |
| harmonic_strength = sum(pattern.confidence for pattern in harmonic_patterns) | |
| # Normalizar para 0-1 | |
| total_strength = (level_strength + confluence_strength + harmonic_strength) / 10 | |
| return min(total_strength, 1.0) | |
| def _generate_trading_signal( | |
| self, current_price: float, key_support: float, key_resistance: float, | |
| trend_direction: str, overall_strength: float | |
| ) -> str: | |
| """Gera sinal de trading baseado na análise""" | |
| support_distance = abs(current_price - key_support) / current_price | |
| resistance_distance = abs(current_price - key_resistance) / current_price | |
| if overall_strength < 0.3: | |
| return 'HOLD' | |
| if trend_direction == 'ALTA' and support_distance < 0.02: | |
| return 'BUY' | |
| elif trend_direction == 'BAIXA' and resistance_distance < 0.02: | |
| return 'SELL' | |
| elif support_distance < resistance_distance and overall_strength > 0.6: | |
| return 'BUY' | |
| elif resistance_distance < support_distance and overall_strength > 0.6: | |
| return 'SELL' | |
| else: | |
| return 'HOLD' | |
| def format_analysis_report(self, analysis: AdvancedFibonacciAnalysis) -> str: | |
| """Formata relatório da análise""" | |
| report = f""" | |
| 🔮 ANÁLISE AVANÇADA DE FIBONACCI | |
| {'='*50} | |
| 📊 DADOS BÁSICOS: | |
| Swing Alto: {analysis.swing_high:,.2f} | |
| Swing Baixo: {analysis.swing_low:,.2f} | |
| Preço Atual: {analysis.current_price:,.2f} | |
| Range: {analysis.swing_range:,.2f} | |
| 📈 NÍVEIS DE RETRACEMENT ({len(analysis.retracement_levels)}): | |
| """ | |
| for level in analysis.retracement_levels[:5]: # Top 5 | |
| report += f" {level.ratio:.1%}: {level.price:,.2f} (Força: {level.strength:.2f})\n" | |
| report += f"\n📊 NÍVEIS DE EXTENSÃO ({len(analysis.extension_levels)}):\n" | |
| for level in analysis.extension_levels[:5]: # Top 5 | |
| report += f" {level.ratio:.1%}: {level.price:,.2f} ({level.type})\n" | |
| if analysis.confluence_zones: | |
| report += f"\n🎯 ZONAS DE CONFLUÊNCIA ({len(analysis.confluence_zones)}):\n" | |
| for zone in analysis.confluence_zones[:3]: # Top 3 | |
| report += f" {zone.price_range[0]:,.2f} - {zone.price_range[1]:,.2f} ({zone.levels_count} níveis)\n" | |
| if analysis.harmonic_patterns: | |
| report += f"\n🎼 PADRÕES HARMÔNICOS ({len(analysis.harmonic_patterns)}):\n" | |
| for pattern in analysis.harmonic_patterns: | |
| report += f" {pattern.name}: {pattern.confidence:.1%} confiança\n" | |
| report += f""" | |
| 🎯 NÍVEIS CHAVE: | |
| Suporte: {analysis.key_support:,.2f} | |
| Resistência: {analysis.key_resistance:,.2f} | |
| 📊 ANÁLISE GERAL: | |
| Tendência: {analysis.trend_direction} | |
| Zona Fibonacci: {analysis.fibonacci_zone} | |
| Força da Análise: {analysis.overall_strength:.1%} | |
| Sinal: {analysis.trading_signal} | |
| Alertas: {analysis.alerts_count} | |
| """ | |
| return report | |
| # Exemplo de uso | |
| if __name__ == "__main__": | |
| engine = AdvancedFibonacciEngine() | |
| # Exemplo com dados do bot | |
| analysis = engine.perform_advanced_analysis( | |
| swing_high=140570.0, | |
| swing_low=139540.0, | |
| current_price=140135.0 | |
| ) | |
| print(engine.format_analysis_report(analysis)) |