teste / src /analysis /fibonacci_analysis.py
torxyton's picture
Initial commit: Complete Fibonacci analysis application with Gradio interface
7f335a2
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass, asdict
import logging
from src.core.log_parser import FibonacciAnalysis, BotAnalysis
# Configurar logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class FibonacciLevel:
"""Estrutura para um nível de Fibonacci"""
level: float
price: float
type: str # 'retracement', 'extension', 'projection'
ratio: float
distance_from_current: float
strength: float
@dataclass
class SwingPoint:
"""Estrutura para pontos de swing"""
price: float
timestamp: str
type: str # 'high' ou 'low'
strength: float
@dataclass
class ConfluenceZone:
"""Estrutura para zonas de confluência"""
price_range: Tuple[float, float]
levels_count: int
strength: float
types: List[str]
@dataclass
class HarmonicPattern:
"""Estrutura para padrões harmônicos"""
name: str
completion_point: float
confidence: float
target_levels: List[float]
stop_loss: float
@dataclass
class AdvancedFibonacciAnalysis:
"""Análise avançada de Fibonacci completa"""
swing_high: float
swing_low: float
current_price: float
swing_range: float
retracement_levels: List[FibonacciLevel]
extension_levels: List[FibonacciLevel]
projection_levels: List[FibonacciLevel]
confluence_zones: List[ConfluenceZone]
harmonic_patterns: List[HarmonicPattern]
key_support: float
key_resistance: float
trend_direction: str
fibonacci_zone: str
overall_strength: float
trading_signal: str
alerts_count: int
class AdvancedFibonacciEngine:
"""Engine para análise avançada de Fibonacci"""
def __init__(self):
# Ratios de Fibonacci padrão
self.retracement_ratios = [0.236, 0.382, 0.5, 0.618, 0.786]
self.extension_ratios = [1.272, 1.414, 1.618, 2.0, 2.618]
self.projection_ratios = [0.618, 1.0, 1.272, 1.618]
# Configurações de confluência
self.confluence_threshold = 0.001 # 0.1% de tolerância
self.min_confluence_levels = 2
# Padrões harmônicos
self.harmonic_patterns = {
'Gartley': {'XA': 0.618, 'AB': 0.618, 'BC': 0.786, 'CD': 1.272},
'Butterfly': {'XA': 0.786, 'AB': 0.618, 'BC': 0.886, 'CD': 1.618},
'Bat': {'XA': 0.382, 'AB': 0.618, 'BC': 0.886, 'CD': 2.618},
'Crab': {'XA': 0.618, 'AB': 0.618, 'BC': 0.886, 'CD': 3.618}
}
def analyze_from_bot_data(self, bot_analysis: BotAnalysis) -> AdvancedFibonacciAnalysis:
"""Analisa dados do bot externo"""
fib_data = bot_analysis.fibonacci_analysis
market_data = bot_analysis.market_data
return self.perform_advanced_analysis(
swing_high=fib_data.swing_high,
swing_low=fib_data.swing_low,
current_price=fib_data.current_price,
historical_data=None, # Pode ser expandido futuramente
bot_fibonacci_data=fib_data
)
def perform_advanced_analysis(
self,
swing_high: float,
swing_low: float,
current_price: float,
historical_data: Optional[pd.DataFrame] = None,
bot_fibonacci_data: Optional[FibonacciAnalysis] = None
) -> AdvancedFibonacciAnalysis:
"""Realiza análise avançada de Fibonacci"""
try:
swing_range = swing_high - swing_low
# Calcular níveis de retracement
retracement_levels = self._calculate_retracement_levels(
swing_high, swing_low, current_price
)
# Calcular níveis de extensão
extension_levels = self._calculate_extension_levels(
swing_high, swing_low, current_price
)
# Calcular níveis de projeção
projection_levels = self._calculate_projection_levels(
swing_high, swing_low, current_price
)
# Identificar zonas de confluência
all_levels = retracement_levels + extension_levels + projection_levels
confluence_zones = self._identify_confluence_zones(all_levels)
# Detectar padrões harmônicos
harmonic_patterns = self._detect_harmonic_patterns(
swing_high, swing_low, current_price, historical_data
)
# Determinar suporte e resistência chave
key_support, key_resistance = self._determine_key_levels(
all_levels, current_price
)
# Determinar direção da tendência
trend_direction = self._determine_trend_direction(
swing_high, swing_low, current_price
)
# Determinar zona de Fibonacci
fibonacci_zone = self._determine_fibonacci_zone(
swing_high, swing_low, current_price
)
# Calcular força geral da análise
overall_strength = self._calculate_overall_strength(
retracement_levels, extension_levels, confluence_zones, harmonic_patterns
)
# Gerar sinal de trading
trading_signal = self._generate_trading_signal(
current_price, key_support, key_resistance, trend_direction, overall_strength
)
# Contar alertas (baseado nos dados do bot se disponível)
alerts_count = bot_fibonacci_data.alerts if bot_fibonacci_data else len(confluence_zones)
return AdvancedFibonacciAnalysis(
swing_high=swing_high,
swing_low=swing_low,
current_price=current_price,
swing_range=swing_range,
retracement_levels=retracement_levels,
extension_levels=extension_levels,
projection_levels=projection_levels,
confluence_zones=confluence_zones,
harmonic_patterns=harmonic_patterns,
key_support=key_support,
key_resistance=key_resistance,
trend_direction=trend_direction,
fibonacci_zone=fibonacci_zone,
overall_strength=overall_strength,
trading_signal=trading_signal,
alerts_count=alerts_count
)
except Exception as e:
logger.error(f"Erro na análise avançada de Fibonacci: {e}")
raise
def _calculate_retracement_levels(
self, swing_high: float, swing_low: float, current_price: float
) -> List[FibonacciLevel]:
"""Calcula níveis de retracement de Fibonacci"""
levels = []
swing_range = swing_high - swing_low
for ratio in self.retracement_ratios:
price = swing_high - (swing_range * ratio)
distance = abs(current_price - price)
strength = self._calculate_level_strength(price, current_price, 'retracement')
levels.append(FibonacciLevel(
level=ratio,
price=price,
type='retracement',
ratio=ratio,
distance_from_current=distance,
strength=strength
))
return sorted(levels, key=lambda x: x.distance_from_current)
def _calculate_extension_levels(
self, swing_high: float, swing_low: float, current_price: float
) -> List[FibonacciLevel]:
"""Calcula níveis de extensão de Fibonacci"""
levels = []
swing_range = swing_high - swing_low
for ratio in self.extension_ratios:
# Extensão para cima
price_up = swing_high + (swing_range * (ratio - 1))
distance_up = abs(current_price - price_up)
strength_up = self._calculate_level_strength(price_up, current_price, 'extension')
levels.append(FibonacciLevel(
level=ratio,
price=price_up,
type='extension_up',
ratio=ratio,
distance_from_current=distance_up,
strength=strength_up
))
# Extensão para baixo
price_down = swing_low - (swing_range * (ratio - 1))
distance_down = abs(current_price - price_down)
strength_down = self._calculate_level_strength(price_down, current_price, 'extension')
levels.append(FibonacciLevel(
level=ratio,
price=price_down,
type='extension_down',
ratio=ratio,
distance_from_current=distance_down,
strength=strength_down
))
return sorted(levels, key=lambda x: x.distance_from_current)
def _calculate_projection_levels(
self, swing_high: float, swing_low: float, current_price: float
) -> List[FibonacciLevel]:
"""Calcula níveis de projeção de Fibonacci"""
levels = []
swing_range = swing_high - swing_low
for ratio in self.projection_ratios:
# Projeção baseada no movimento atual
if current_price > (swing_high + swing_low) / 2: # Tendência de alta
price = current_price + (swing_range * ratio)
direction = 'projection_up'
else: # Tendência de baixa
price = current_price - (swing_range * ratio)
direction = 'projection_down'
distance = abs(current_price - price)
strength = self._calculate_level_strength(price, current_price, 'projection')
levels.append(FibonacciLevel(
level=ratio,
price=price,
type=direction,
ratio=ratio,
distance_from_current=distance,
strength=strength
))
return sorted(levels, key=lambda x: x.distance_from_current)
def _calculate_level_strength(
self, level_price: float, current_price: float, level_type: str
) -> float:
"""Calcula a força de um nível de Fibonacci"""
distance_factor = 1 / (1 + abs(level_price - current_price) / current_price)
type_weights = {
'retracement': 1.0,
'extension': 0.8,
'projection': 0.6
}
base_strength = type_weights.get(level_type, 0.5)
return base_strength * distance_factor
def _identify_confluence_zones(
self, all_levels: List[FibonacciLevel]
) -> List[ConfluenceZone]:
"""Identifica zonas de confluência entre níveis"""
confluence_zones = []
# Agrupar níveis próximos
sorted_levels = sorted(all_levels, key=lambda x: x.price)
i = 0
while i < len(sorted_levels):
current_level = sorted_levels[i]
zone_levels = [current_level]
zone_types = [current_level.type]
# Procurar níveis próximos
j = i + 1
while j < len(sorted_levels):
next_level = sorted_levels[j]
price_diff = abs(next_level.price - current_level.price) / current_level.price
if price_diff <= self.confluence_threshold:
zone_levels.append(next_level)
zone_types.append(next_level.type)
j += 1
else:
break
# Criar zona de confluência se houver níveis suficientes
if len(zone_levels) >= self.min_confluence_levels:
min_price = min(level.price for level in zone_levels)
max_price = max(level.price for level in zone_levels)
avg_strength = sum(level.strength for level in zone_levels) / len(zone_levels)
confluence_zones.append(ConfluenceZone(
price_range=(min_price, max_price),
levels_count=len(zone_levels),
strength=avg_strength * len(zone_levels), # Força multiplicada pelo número de níveis
types=list(set(zone_types))
))
i = j if j > i + 1 else i + 1
return sorted(confluence_zones, key=lambda x: x.strength, reverse=True)
def _detect_harmonic_patterns(
self, swing_high: float, swing_low: float, current_price: float,
historical_data: Optional[pd.DataFrame] = None
) -> List[HarmonicPattern]:
"""Detecta padrões harmônicos (implementação básica)"""
patterns = []
# Implementação simplificada - pode ser expandida com dados históricos
swing_range = swing_high - swing_low
for pattern_name, ratios in self.harmonic_patterns.items():
# Verificar se o preço atual está em uma posição válida para o padrão
completion_point = swing_low + (swing_range * ratios['CD'])
if abs(current_price - completion_point) / current_price < 0.02: # 2% de tolerância
confidence = 0.7 # Confiança básica
# Calcular alvos baseados no padrão
target_levels = [
completion_point + (swing_range * 0.382),
completion_point + (swing_range * 0.618),
completion_point + (swing_range * 1.0)
]
stop_loss = completion_point - (swing_range * 0.236)
patterns.append(HarmonicPattern(
name=pattern_name,
completion_point=completion_point,
confidence=confidence,
target_levels=target_levels,
stop_loss=stop_loss
))
return sorted(patterns, key=lambda x: x.confidence, reverse=True)
def _determine_key_levels(
self, all_levels: List[FibonacciLevel], current_price: float
) -> Tuple[float, float]:
"""Determina níveis chave de suporte e resistência"""
support_levels = [level for level in all_levels if level.price < current_price]
resistance_levels = [level for level in all_levels if level.price > current_price]
# Suporte mais próximo e forte
key_support = current_price
if support_levels:
key_support = max(support_levels, key=lambda x: x.strength).price
# Resistência mais próxima e forte
key_resistance = current_price
if resistance_levels:
key_resistance = min(resistance_levels, key=lambda x: x.strength).price
return key_support, key_resistance
def _determine_trend_direction(
self, swing_high: float, swing_low: float, current_price: float
) -> str:
"""Determina direção da tendência"""
mid_point = (swing_high + swing_low) / 2
if current_price > mid_point + (swing_high - swing_low) * 0.1:
return 'ALTA'
elif current_price < mid_point - (swing_high - swing_low) * 0.1:
return 'BAIXA'
else:
return 'LATERAL'
def _determine_fibonacci_zone(
self, swing_high: float, swing_low: float, current_price: float
) -> str:
"""Determina zona de Fibonacci atual"""
swing_range = swing_high - swing_low
position = (current_price - swing_low) / swing_range
if position >= 0.786:
return 'ZONA_ALTA'
elif position >= 0.618:
return 'ZONA_MEDIA_ALTA'
elif position >= 0.382:
return 'ZONA_MEDIA'
elif position >= 0.236:
return 'ZONA_MEDIA_BAIXA'
else:
return 'ZONA_BAIXA'
def _calculate_overall_strength(
self, retracement_levels: List[FibonacciLevel],
extension_levels: List[FibonacciLevel],
confluence_zones: List[ConfluenceZone],
harmonic_patterns: List[HarmonicPattern]
) -> float:
"""Calcula força geral da análise"""
# Força baseada em níveis próximos
level_strength = sum(level.strength for level in retracement_levels[:3]) # Top 3
level_strength += sum(level.strength for level in extension_levels[:3]) # Top 3
# Força das zonas de confluência
confluence_strength = sum(zone.strength for zone in confluence_zones)
# Força dos padrões harmônicos
harmonic_strength = sum(pattern.confidence for pattern in harmonic_patterns)
# Normalizar para 0-1
total_strength = (level_strength + confluence_strength + harmonic_strength) / 10
return min(total_strength, 1.0)
def _generate_trading_signal(
self, current_price: float, key_support: float, key_resistance: float,
trend_direction: str, overall_strength: float
) -> str:
"""Gera sinal de trading baseado na análise"""
support_distance = abs(current_price - key_support) / current_price
resistance_distance = abs(current_price - key_resistance) / current_price
if overall_strength < 0.3:
return 'HOLD'
if trend_direction == 'ALTA' and support_distance < 0.02:
return 'BUY'
elif trend_direction == 'BAIXA' and resistance_distance < 0.02:
return 'SELL'
elif support_distance < resistance_distance and overall_strength > 0.6:
return 'BUY'
elif resistance_distance < support_distance and overall_strength > 0.6:
return 'SELL'
else:
return 'HOLD'
def format_analysis_report(self, analysis: AdvancedFibonacciAnalysis) -> str:
"""Formata relatório da análise"""
report = f"""
🔮 ANÁLISE AVANÇADA DE FIBONACCI
{'='*50}
📊 DADOS BÁSICOS:
Swing Alto: {analysis.swing_high:,.2f}
Swing Baixo: {analysis.swing_low:,.2f}
Preço Atual: {analysis.current_price:,.2f}
Range: {analysis.swing_range:,.2f}
📈 NÍVEIS DE RETRACEMENT ({len(analysis.retracement_levels)}):
"""
for level in analysis.retracement_levels[:5]: # Top 5
report += f" {level.ratio:.1%}: {level.price:,.2f} (Força: {level.strength:.2f})\n"
report += f"\n📊 NÍVEIS DE EXTENSÃO ({len(analysis.extension_levels)}):\n"
for level in analysis.extension_levels[:5]: # Top 5
report += f" {level.ratio:.1%}: {level.price:,.2f} ({level.type})\n"
if analysis.confluence_zones:
report += f"\n🎯 ZONAS DE CONFLUÊNCIA ({len(analysis.confluence_zones)}):\n"
for zone in analysis.confluence_zones[:3]: # Top 3
report += f" {zone.price_range[0]:,.2f} - {zone.price_range[1]:,.2f} ({zone.levels_count} níveis)\n"
if analysis.harmonic_patterns:
report += f"\n🎼 PADRÕES HARMÔNICOS ({len(analysis.harmonic_patterns)}):\n"
for pattern in analysis.harmonic_patterns:
report += f" {pattern.name}: {pattern.confidence:.1%} confiança\n"
report += f"""
🎯 NÍVEIS CHAVE:
Suporte: {analysis.key_support:,.2f}
Resistência: {analysis.key_resistance:,.2f}
📊 ANÁLISE GERAL:
Tendência: {analysis.trend_direction}
Zona Fibonacci: {analysis.fibonacci_zone}
Força da Análise: {analysis.overall_strength:.1%}
Sinal: {analysis.trading_signal}
Alertas: {analysis.alerts_count}
"""
return report
# Exemplo de uso
if __name__ == "__main__":
engine = AdvancedFibonacciEngine()
# Exemplo com dados do bot
analysis = engine.perform_advanced_analysis(
swing_high=140570.0,
swing_low=139540.0,
current_price=140135.0
)
print(engine.format_analysis_report(analysis))