teste / docs /developer-guide.md
torxyton's picture
Initial commit: Complete Fibonacci analysis application with Gradio interface
7f335a2

A newer version of the Gradio SDK is available: 6.6.0

Upgrade

👨‍💻 Guia do Desenvolvedor

Visão Geral

Este guia fornece informações essenciais para desenvolvedores que desejam contribuir ou estender o Vampire Trading Bot. Inclui padrões de código, arquitetura, fluxos de desenvolvimento e melhores práticas.

🏗️ Arquitetura do Sistema

Estrutura Modular

vampire-trading-bot/
├── app.py                 # Ponto de entrada principal
├── config.py             # Configurações centralizadas
├── ui.py                 # Interface Gradio
├── market_analysis.py    # Análise técnica
├── sentiment_analysis.py # Análise de sentimento
├── fibonacci_analysis.py # Análise de Fibonacci
├── log_parser.py        # Parser de logs
├── real_time_integration.py # Integração tempo real
├── performance_monitor.py   # Monitoramento
├── utils.py             # Utilitários
├── docs/                # Documentação
└── tests/               # Testes automatizados

Princípios de Design

  1. Separação de Responsabilidades: Cada módulo tem uma função específica
  2. Baixo Acoplamento: Módulos independentes com interfaces bem definidas
  3. Alta Coesão: Funcionalidades relacionadas agrupadas
  4. Extensibilidade: Fácil adição de novos recursos
  5. Testabilidade: Código facilmente testável

🔧 Configuração do Ambiente de Desenvolvimento

Pré-requisitos

# Python 3.8+
python --version

# Git
git --version

# Editor recomendado: VS Code com extensões Python

Setup Inicial

# 1. Clonar repositório
git clone <repository-url>
cd vampire-trading-bot

# 2. Criar ambiente virtual
python -m venv venv

# 3. Ativar ambiente
# Windows
venv\Scripts\activate
# Linux/macOS
source venv/bin/activate

# 4. Instalar dependências de desenvolvimento
pip install -r requirements-dev.txt

# 5. Instalar hooks de pre-commit
pre-commit install

Dependências de Desenvolvimento

# requirements-dev.txt
pytest>=7.0.0
pytest-cov>=4.0.0
black>=22.0.0
flake8>=5.0.0
mypy>=1.0.0
pre-commit>=2.20.0
bandit>=1.7.0
isort>=5.10.0
memory-profiler>=0.60.0
line-profiler>=4.0.0

📝 Padrões de Código

Estilo de Código

Formatação com Black:

# Formatar código
black .

# Verificar formatação
black --check .

Linting com Flake8:

# Verificar código
flake8 .

# Configuração em .flake8
[flake8]
max-line-length = 88
ignore = E203, W503
exclude = venv, __pycache__

Ordenação de Imports com isort:

# Ordenar imports
isort .

# Configuração em pyproject.toml
[tool.isort]
profile = "black"
line_length = 88

Convenções de Nomenclatura

# Classes: PascalCase
class TechnicalAnalysisEngine:
    pass

# Funções e variáveis: snake_case
def calculate_rsi_value(prices: List[float]) -> float:
    pass

# Constantes: UPPER_SNAKE_CASE
DEFAULT_RSI_PERIOD = 14
MAX_RETRY_ATTEMPTS = 3

# Arquivos: snake_case
# market_analysis.py
# sentiment_analysis.py

Documentação de Código

def analyze_market_data(
    self, 
    text: str, 
    include_fibonacci: bool = True
) -> Dict[str, Any]:
    """
    Analisa dados de mercado extraindo indicadores técnicos.
    
    Args:
        text: Texto contendo dados de mercado
        include_fibonacci: Se deve incluir análise de Fibonacci
        
    Returns:
        Dict contendo:
        - technical_signals: Lista de sinais técnicos
        - market_data: Dados extraídos do mercado
        - fibonacci_analysis: Análise de Fibonacci (se habilitada)
        
    Raises:
        ValueError: Se o texto não contém dados válidos
        RuntimeError: Se falha na análise técnica
        
    Example:
        >>> engine = TechnicalAnalysisEngine()
        >>> result = engine.analyze_market_data("WINV25: 140135")
        >>> print(result['technical_signals'])
    """
    pass

Type Hints

from typing import List, Dict, Optional, Union, Tuple, Any
from dataclasses import dataclass
from enum import Enum

# Usar type hints em todas as funções
def calculate_ema(
    prices: List[float], 
    period: int
) -> List[float]:
    pass

# Dataclasses para estruturas de dados
@dataclass
class MarketData:
    symbol: str
    price: float
    volume: int
    timestamp: Optional[str] = None

# Enums para constantes
class SignalType(Enum):
    BUY = "buy"
    SELL = "sell"
    HOLD = "hold"

🧪 Testes

Estrutura de Testes

tests/
├── __init__.py
├── conftest.py              # Fixtures compartilhadas
├── test_market_analysis.py  # Testes análise técnica
├── test_sentiment_analysis.py # Testes análise sentimento
├── test_fibonacci_analysis.py # Testes Fibonacci
├── test_integration.py      # Testes integração
├── test_performance.py      # Testes performance
└── fixtures/               # Dados de teste
    ├── sample_logs.txt
    └── market_data.json

Escrevendo Testes

# test_market_analysis.py
import pytest
from unittest.mock import Mock, patch
from market_analysis import TechnicalAnalysisEngine, MarketData

class TestTechnicalAnalysisEngine:
    
    @pytest.fixture
    def engine(self):
        """Fixture para engine de análise."""
        return TechnicalAnalysisEngine()
    
    @pytest.fixture
    def sample_market_data(self):
        """Fixture para dados de mercado."""
        return MarketData(
            symbol="WINV25",
            price=140135.0,
            volume=5023
        )
    
    def test_analyze_market_data_success(self, engine, sample_market_data):
        """Testa análise bem-sucedida de dados de mercado."""
        text = "WINV25: 140135, Volume: 5023"
        
        result = engine.analyze_market_data(text)
        
        assert "technical_signals" in result
        assert "market_data" in result
        assert len(result["technical_signals"]) > 0
    
    def test_analyze_market_data_invalid_input(self, engine):
        """Testa comportamento com entrada inválida."""
        with pytest.raises(ValueError, match="Dados inválidos"):
            engine.analyze_market_data("")
    
    @patch('market_analysis.calculate_rsi')
    def test_rsi_calculation_called(self, mock_rsi, engine):
        """Testa se cálculo RSI é chamado."""
        mock_rsi.return_value = 65.5
        text = "WINV25: 140135"
        
        engine.analyze_market_data(text)
        
        mock_rsi.assert_called_once()
    
    @pytest.mark.parametrize("price,expected_signal", [
        (140000, "buy"),
        (145000, "sell"),
        (142500, "hold")
    ])
    def test_signal_generation(self, engine, price, expected_signal):
        """Testa geração de sinais para diferentes preços."""
        text = f"WINV25: {price}"
        
        result = engine.analyze_market_data(text)
        signal = result["technical_signals"][0]
        
        assert signal["action"] == expected_signal

Executando Testes

# Executar todos os testes
pytest

# Executar com cobertura
pytest --cov=. --cov-report=html

# Executar testes específicos
pytest tests/test_market_analysis.py::TestTechnicalAnalysisEngine::test_analyze_market_data_success

# Executar testes em paralelo
pytest -n auto

# Executar apenas testes rápidos
pytest -m "not slow"

Fixtures Compartilhadas

# conftest.py
import pytest
from pathlib import Path

@pytest.fixture
def sample_log_content():
    """Conteúdo de log de exemplo."""
    return """
    ⏰ Análise #1 - 09:46:58
    📊 DADOS DE MERCADO - WINV25
    Preço Atual: 140135.00000 ↗
    Variação: +5 (+0.00%)
    Volume: 5023
    """

@pytest.fixture
def temp_log_file(tmp_path, sample_log_content):
    """Arquivo de log temporário."""
    log_file = tmp_path / "test_log.txt"
    log_file.write_text(sample_log_content)
    return log_file

@pytest.fixture
def mock_ai_model():
    """Mock do modelo de IA."""
    from unittest.mock import Mock
    
    model = Mock()
    model.predict.return_value = [
        {"label": "POSITIVE", "score": 0.85}
    ]
    return model

🔄 Fluxo de Desenvolvimento

Git Workflow

# 1. Criar branch para feature
git checkout -b feature/nova-funcionalidade

# 2. Fazer commits pequenos e frequentes
git add .
git commit -m "feat: adiciona cálculo de MACD"

# 3. Push da branch
git push origin feature/nova-funcionalidade

# 4. Criar Pull Request
# 5. Code Review
# 6. Merge após aprovação

Convenções de Commit

# Formato: tipo(escopo): descrição

# Tipos:
feat: nova funcionalidade
fix: correção de bug
docs: documentação
style: formatação
refactor: refatoração
test: testes
chore: tarefas de manutenção

# Exemplos:
feat(analysis): adiciona indicador MACD
fix(ui): corrige erro na interface
docs(api): atualiza documentação da API
test(fibonacci): adiciona testes para análise

Pre-commit Hooks

# .pre-commit-config.yaml
repos:
  - repo: https://github.com/psf/black
    rev: 22.10.0
    hooks:
      - id: black
        language_version: python3

  - repo: https://github.com/pycqa/flake8
    rev: 5.0.4
    hooks:
      - id: flake8

  - repo: https://github.com/pycqa/isort
    rev: 5.10.1
    hooks:
      - id: isort

  - repo: https://github.com/pre-commit/mirrors-mypy
    rev: v0.991
    hooks:
      - id: mypy
        additional_dependencies: [types-all]

  - repo: https://github.com/PyCQA/bandit
    rev: 1.7.4
    hooks:
      - id: bandit
        args: ['-r', '.', '-f', 'json', '-o', 'bandit-report.json']

🏗️ Adicionando Novas Funcionalidades

1. Novo Indicador Técnico

# Em market_analysis.py

class MACDAnalyzer(BaseAnalyzer):
    """Analisador MACD (Moving Average Convergence Divergence)."""
    
    def __init__(self, fast_period: int = 12, slow_period: int = 26, signal_period: int = 9):
        self.fast_period = fast_period
        self.slow_period = slow_period
        self.signal_period = signal_period
    
    def analyze(self, prices: List[float]) -> List[TechnicalSignal]:
        """Calcula MACD e gera sinais."""
        if len(prices) < self.slow_period:
            return []
        
        # Calcular EMAs
        ema_fast = self._calculate_ema(prices, self.fast_period)
        ema_slow = self._calculate_ema(prices, self.slow_period)
        
        # Calcular MACD
        macd_line = [fast - slow for fast, slow in zip(ema_fast, ema_slow)]
        signal_line = self._calculate_ema(macd_line, self.signal_period)
        
        # Gerar sinais
        signals = []
        for i in range(1, len(macd_line)):
            if macd_line[i] > signal_line[i] and macd_line[i-1] <= signal_line[i-1]:
                signals.append(TechnicalSignal(
                    indicator="MACD",
                    signal_type="buy",
                    strength=0.7,
                    value=macd_line[i],
                    timestamp=datetime.now()
                ))
        
        return signals

# Registrar no engine
class TechnicalAnalysisEngine:
    def __init__(self):
        # ... outros analisadores
        self.macd_analyzer = MACDAnalyzer()
    
    def analyze_market_data(self, text: str) -> Dict[str, Any]:
        # ... análise existente
        macd_signals = self.macd_analyzer.analyze(prices)
        all_signals.extend(macd_signals)

2. Novo Modelo de IA

# Em sentiment_analysis.py

class CustomFinancialModel:
    """Modelo personalizado para análise financeira."""
    
    def __init__(self, model_path: str):
        self.model_path = model_path
        self.model = None
        self.tokenizer = None
    
    def load(self):
        """Carrega modelo personalizado."""
        from transformers import AutoTokenizer, AutoModelForSequenceClassification
        
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_path)
        self.model = AutoModelForSequenceClassification.from_pretrained(self.model_path)
    
    def predict(self, text: str) -> Dict[str, float]:
        """Faz predição de sentimento."""
        inputs = self.tokenizer(text, return_tensors="pt", truncation=True, padding=True)
        
        with torch.no_grad():
            outputs = self.model(**inputs)
            probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1)
        
        return {
            "bullish": probabilities[0][0].item(),
            "bearish": probabilities[0][1].item(),
            "neutral": probabilities[0][2].item()
        }

# Registrar no ModelManager
class ModelManager:
    def __init__(self):
        # ... modelos existentes
        self.custom_models = {
            "financial-bert": CustomFinancialModel,
            "trading-roberta": CustomFinancialModel
        }

3. Nova Interface de Usuário

# Em ui.py

def create_advanced_analysis_tab():
    """Cria aba de análise avançada."""
    with gr.Tab("🔬 Análise Avançada"):
        with gr.Row():
            with gr.Column(scale=1):
                # Controles
                timeframe_dropdown = gr.Dropdown(
                    choices=["1m", "5m", "15m", "1h", "4h", "1d"],
                    value="5m",
                    label="📊 Timeframe"
                )
                
                indicators_checklist = gr.CheckboxGroup(
                    choices=["RSI", "MACD", "Bollinger", "EMA", "Fibonacci"],
                    value=["RSI", "MACD"],
                    label="📈 Indicadores"
                )
                
                analyze_btn = gr.Button("🚀 Analisar", variant="primary")
            
            with gr.Column(scale=2):
                # Resultados
                analysis_output = gr.JSON(label="📊 Resultado da Análise")
                chart_output = gr.Plot(label="📈 Gráfico")
        
        # Conectar eventos
        analyze_btn.click(
            fn=advanced_analysis,
            inputs=[timeframe_dropdown, indicators_checklist],
            outputs=[analysis_output, chart_output]
        )
    
    return timeframe_dropdown, indicators_checklist, analyze_btn

def advanced_analysis(timeframe: str, indicators: List[str]) -> Tuple[Dict, Any]:
    """Executa análise avançada."""
    # Implementar lógica de análise
    result = {
        "timeframe": timeframe,
        "indicators": indicators,
        "signals": [],
        "confidence": 0.0
    }
    
    # Gerar gráfico
    import plotly.graph_objects as go
    fig = go.Figure()
    # ... adicionar dados ao gráfico
    
    return result, fig

🔍 Debugging e Profiling

Logging para Debug

import logging
from utils import LogUtils

# Configurar logging detalhado
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('debug.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

def analyze_market_data(self, text: str) -> Dict[str, Any]:
    logger.debug(f"Iniciando análise para texto: {text[:100]}...")
    
    try:
        # Análise
        result = self._perform_analysis(text)
        logger.debug(f"Análise concluída: {len(result.get('signals', []))} sinais")
        return result
        
    except Exception as e:
        logger.error(f"Erro na análise: {e}", exc_info=True)
        raise

Profiling de Performance

# performance_profiler.py
import cProfile
import pstats
from functools import wraps
from memory_profiler import profile
from line_profiler import LineProfiler

def profile_time(func):
    """Decorator para profiling de tempo."""
    @wraps(func)
    def wrapper(*args, **kwargs):
        profiler = cProfile.Profile()
        profiler.enable()
        
        result = func(*args, **kwargs)
        
        profiler.disable()
        stats = pstats.Stats(profiler)
        stats.sort_stats('cumulative')
        stats.print_stats(10)  # Top 10 funções
        
        return result
    return wrapper

@profile_time
@profile  # memory_profiler
def analyze_large_dataset(data):
    """Análise de dataset grande."""
    # Implementação
    pass

# Usar line_profiler
def profile_line_by_line():
    profiler = LineProfiler()
    profiler.add_function(analyze_market_data)
    profiler.enable_by_count()
    
    # Executar função
    analyze_market_data(sample_text)
    
    profiler.print_stats()

Debugging com pdb

import pdb

def problematic_function(data):
    # Ponto de breakpoint
    pdb.set_trace()
    
    # Código para debug
    processed_data = process_data(data)
    
    # Breakpoint condicional
    if len(processed_data) == 0:
        pdb.set_trace()
    
    return processed_data

# Comandos úteis no pdb:
# l - listar código
# n - próxima linha
# s - step into
# c - continuar
# p variable - imprimir variável
# pp variable - pretty print
# h - ajuda

📊 Monitoramento e Métricas

Métricas Customizadas

# custom_metrics.py
from dataclasses import dataclass
from typing import Dict, List
import time
from collections import defaultdict

@dataclass
class AnalysisMetrics:
    """Métricas de análise."""
    total_analyses: int = 0
    successful_analyses: int = 0
    failed_analyses: int = 0
    average_processing_time: float = 0.0
    error_types: Dict[str, int] = None
    
    def __post_init__(self):
        if self.error_types is None:
            self.error_types = defaultdict(int)

class MetricsCollector:
    """Coletor de métricas personalizado."""
    
    def __init__(self):
        self.metrics = AnalysisMetrics()
        self.processing_times = []
    
    def record_analysis_start(self) -> float:
        """Registra início de análise."""
        return time.time()
    
    def record_analysis_success(self, start_time: float):
        """Registra análise bem-sucedida."""
        processing_time = time.time() - start_time
        
        self.metrics.total_analyses += 1
        self.metrics.successful_analyses += 1
        self.processing_times.append(processing_time)
        
        # Calcular média móvel
        if len(self.processing_times) > 100:
            self.processing_times = self.processing_times[-100:]
        
        self.metrics.average_processing_time = sum(self.processing_times) / len(self.processing_times)
    
    def record_analysis_failure(self, start_time: float, error_type: str):
        """Registra falha na análise."""
        self.metrics.total_analyses += 1
        self.metrics.failed_analyses += 1
        self.metrics.error_types[error_type] += 1
    
    def get_success_rate(self) -> float:
        """Calcula taxa de sucesso."""
        if self.metrics.total_analyses == 0:
            return 0.0
        return self.metrics.successful_analyses / self.metrics.total_analyses
    
    def export_metrics(self) -> Dict:
        """Exporta métricas para análise."""
        return {
            "total_analyses": self.metrics.total_analyses,
            "success_rate": self.get_success_rate(),
            "average_processing_time": self.metrics.average_processing_time,
            "error_distribution": dict(self.metrics.error_types)
        }

🚀 Deploy e Produção

Configuração para Produção

# production_config.py
import os
from dataclasses import dataclass

@dataclass
class ProductionConfig:
    """Configurações para produção."""
    
    # Servidor
    HOST: str = os.getenv("HOST", "0.0.0.0")
    PORT: int = int(os.getenv("PORT", "7860"))
    WORKERS: int = int(os.getenv("WORKERS", "4"))
    
    # Logging
    LOG_LEVEL: str = os.getenv("LOG_LEVEL", "INFO")
    LOG_FILE: str = os.getenv("LOG_FILE", "app.log")
    
    # Performance
    MAX_CONCURRENT_ANALYSES: int = int(os.getenv("MAX_CONCURRENT_ANALYSES", "10"))
    ANALYSIS_TIMEOUT: int = int(os.getenv("ANALYSIS_TIMEOUT", "30"))
    
    # Segurança
    ENABLE_AUTH: bool = os.getenv("ENABLE_AUTH", "false").lower() == "true"
    SECRET_KEY: str = os.getenv("SECRET_KEY", "")
    
    # Cache
    ENABLE_REDIS: bool = os.getenv("ENABLE_REDIS", "false").lower() == "true"
    REDIS_URL: str = os.getenv("REDIS_URL", "redis://localhost:6379")

Docker

# Dockerfile
FROM python:3.9-slim

# Instalar dependências do sistema
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    && rm -rf /var/lib/apt/lists/*

# Criar usuário não-root
RUN useradd --create-home --shell /bin/bash app
USER app
WORKDIR /home/app

# Copiar requirements
COPY --chown=app:app requirements.txt .

# Instalar dependências Python
RUN pip install --user --no-cache-dir -r requirements.txt

# Copiar código
COPY --chown=app:app . .

# Expor porta
EXPOSE 7860

# Comando de inicialização
CMD ["python", "app.py", "--host", "0.0.0.0", "--port", "7860"]
# docker-compose.yml
version: '3.8'

services:
  vampire-bot:
    build: .
    ports:
      - "7860:7860"
    environment:
      - LOG_LEVEL=INFO
      - MAX_CONCURRENT_ANALYSES=5
    volumes:
      - ./logs:/home/app/logs
      - ./models:/home/app/models
    restart: unless-stopped
    
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    restart: unless-stopped

volumes:
  redis_data:

📚 Recursos Adicionais

Ferramentas Recomendadas

  • IDE: VS Code com extensões Python
  • Debugging: pdb, ipdb, VS Code debugger
  • Profiling: cProfile, memory_profiler, line_profiler
  • Testing: pytest, coverage.py
  • Linting: flake8, pylint, mypy
  • Formatting: black, isort
  • Documentation: Sphinx, mkdocs

Bibliotecas Úteis

# Análise de dados
import pandas as pd
import numpy as np
from scipy import stats

# Visualização
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import seaborn as sns

# Machine Learning
from sklearn.metrics import accuracy_score, classification_report
from sklearn.model_selection import train_test_split

# Async/Concorrência
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# Monitoramento
import psutil
from prometheus_client import Counter, Histogram, Gauge

Padrões de Design Úteis

# Strategy Pattern para diferentes análises
from abc import ABC, abstractmethod

class AnalysisStrategy(ABC):
    @abstractmethod
    def analyze(self, data) -> Dict:
        pass

class RSIStrategy(AnalysisStrategy):
    def analyze(self, data) -> Dict:
        # Implementar RSI
        pass

class MACDStrategy(AnalysisStrategy):
    def analyze(self, data) -> Dict:
        # Implementar MACD
        pass

# Factory Pattern para criação de analisadores
class AnalyzerFactory:
    _strategies = {
        "rsi": RSIStrategy,
        "macd": MACDStrategy
    }
    
    @classmethod
    def create_analyzer(cls, analyzer_type: str) -> AnalysisStrategy:
        if analyzer_type not in cls._strategies:
            raise ValueError(f"Analisador {analyzer_type} não suportado")
        return cls._strategies[analyzer_type]()

# Observer Pattern para notificações
class Observable:
    def __init__(self):
        self._observers = []
    
    def attach(self, observer):
        self._observers.append(observer)
    
    def notify(self, event):
        for observer in self._observers:
            observer.update(event)

🤝 Contribuindo

Processo de Contribuição

  1. Fork do repositório
  2. Clone do seu fork
  3. Criar branch para feature/bugfix
  4. Implementar mudanças com testes
  5. Executar testes e linting
  6. Commit seguindo convenções
  7. Push da branch
  8. Criar Pull Request
  9. Code Review
  10. Merge após aprovação

Checklist para Pull Requests

  • Código segue padrões de estilo
  • Testes adicionados/atualizados
  • Documentação atualizada
  • Todos os testes passam
  • Sem warnings de linting
  • Performance não degradada
  • Compatibilidade mantida
  • Changelog atualizado

🎯 Objetivo: Manter código limpo, testável e bem documentado para facilitar manutenção e evolução do projeto!