""" Technical Indicators Module Calculates various technical indicators for trading analysis. """ import numpy as np import pandas as pd from typing import Dict, List, Any, Optional def calculate_sma(data: pd.DataFrame, period: int = 20) -> List[float]: """Simple Moving Average""" return data['close'].rolling(window=period).mean().tolist() def calculate_ema(data: pd.DataFrame, period: int = 20) -> List[float]: """Exponential Moving Average""" return data['close'].ewm(span=period, adjust=False).mean().tolist() def calculate_bollinger_bands(data: pd.DataFrame, period: int = 20, std_dev: int = 2) -> Dict[str, List[float]]: """Bollinger Bands with middle, upper, and lower bands""" middle = data['close'].rolling(window=period).mean() std = data['close'].rolling(window=period).std() upper = middle + (std * std_dev) lower = middle - (std * std_dev) return { 'middle': middle.tolist(), 'upper': upper.tolist(), 'lower': lower.tolist() } def calculate_rsi(data: pd.DataFrame, period: int = 14) -> List[float]: """Relative Strength Index""" delta = data['close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() rs = gain / loss rsi = 100 - (100 / (1 + rs)) return rsi.tolist() def calculate_macd(data: pd.DataFrame, fast: int = 12, slow: int = 26, signal: int = 9) -> Dict[str, List[float]]: """MACD with signal line and histogram""" ema_fast = data['close'].ewm(span=fast, adjust=False).mean() ema_slow = data['close'].ewm(span=slow, adjust=False).mean() macd_line = ema_fast - ema_slow signal_line = macd_line.ewm(span=signal, adjust=False).mean() histogram = macd_line - signal_line return { 'macd': macd_line.tolist(), 'signal': signal_line.tolist(), 'histogram': histogram.tolist() } def calculate_vwap(data: pd.DataFrame) -> List[float]: """Volume Weighted Average Price""" typical_price = (data['high'] + data['low'] + data['close']) / 3 vwap = (typical_price * data['volume']).cumsum() / data['volume'].cumsum() return vwap.tolist() def calculate_williams_r(data: pd.DataFrame, period: int = 14) -> List[float]: """Williams %R oscillator""" highest_high = data['high'].rolling(window=period).max() lowest_low = data['low'].rolling(window=period).min() williams_r = -100 * (highest_high - data['close']) / (highest_high - lowest_low) return williams_r.tolist() def calculate_volume_profile(data: pd.DataFrame, num_bins: int = 50) -> Dict[str, Any]: """ Volume Profile - distribution of volume across price levels Returns POC (Point of Control), Value Area, and histogram data """ price_min = data['low'].min() price_max = data['high'].max() if pd.isna(price_min) or pd.isna(price_max) or price_min == price_max: return {'poc': 0, 'value_area_high': 0, 'value_area_low': 0, 'histogram': []} price_range = price_max - price_min bin_size = price_range / num_bins bins = np.linspace(price_min, price_max, num_bins + 1) volumes = [] for i in range(num_bins): mask = (data['close'] >= bins[i]) & (data['close'] < bins[i + 1]) volumes.append(float(data.loc[mask, 'volume'].sum())) # Find POC (price level with highest volume) poc_idx = int(np.argmax(volumes)) poc = float((bins[poc_idx] + bins[poc_idx + 1]) / 2) # Calculate Value Area (70% of volume) total_volume = sum(volumes) sorted_indices = np.argsort(volumes)[::-1] cumulative_volume = 0 value_area_indices = [] for idx in sorted_indices: cumulative_volume += volumes[idx] value_area_indices.append(idx) if cumulative_volume >= total_volume * 0.7: break value_area_high = float(bins[max(value_area_indices) + 1]) if value_area_indices else price_max value_area_low = float(bins[min(value_area_indices)]) if value_area_indices else price_min histogram = [] for i in range(num_bins): histogram.append({ 'price': float((bins[i] + bins[i + 1]) / 2), 'volume': volumes[i], 'price_low': float(bins[i]), 'price_high': float(bins[i + 1]) }) return { 'poc': poc, 'value_area_high': value_area_high, 'value_area_low': value_area_low, 'histogram': histogram } def calculate_fair_value_gaps(data: pd.DataFrame) -> List[Dict[str, Any]]: """ Fair Value Gaps - identifies three-candle imbalance patterns Bullish FVG: Low > High[i-2] Bearish FVG: High < Low[i-2] """ fvg_list = [] for i in range(2, len(data)): prev_idx = i - 2 curr_idx = i # Bullish FVG if data['low'].iloc[curr_idx] > data['high'].iloc[prev_idx]: fvg_list.append({ 'type': 'bullish', 'start_time': int(data.index[curr_idx].timestamp() * 1000), 'end_time': int(data.index[curr_idx].timestamp() * 1000) + 86400000, 'top': float(data['low'].iloc[curr_idx]), 'bottom': float(data['high'].iloc[prev_idx]), 'start_index': prev_idx, 'end_index': curr_idx }) # Bearish FVG elif data['high'].iloc[curr_idx] < data['low'].iloc[prev_idx]: fvg_list.append({ 'type': 'bearish', 'start_time': int(data.index[curr_idx].timestamp() * 1000), 'end_time': int(data.index[curr_idx].timestamp() * 1000) + 86400000, 'top': float(data['low'].iloc[prev_idx]), 'bottom': float(data['high'].iloc[curr_idx]), 'start_index': prev_idx, 'end_index': curr_idx }) return fvg_list def calculate_all_indicators(data: pd.DataFrame, indicator_config: Dict[str, Any]) -> Dict[str, Any]: """Calculate all requested indicators based on config""" results = {} for indicator in indicator_config.get('indicators', []): name = indicator.get('name', '') params = indicator.get('params', {}) try: if name == 'sma': period = params.get('period', 20) results[f'sma_{period}'] = calculate_sma(data, period) elif name == 'ema': period = params.get('period', 20) results[f'ema_{period}'] = calculate_ema(data, period) elif name == 'bollinger': bb = calculate_bollinger_bands(data, params.get('period', 20), params.get('std_dev', 2)) results['bollinger'] = bb elif name == 'rsi': period = params.get('period', 14) results[f'rsi_{period}'] = calculate_rsi(data, period) elif name == 'macd': macd = calculate_macd(data, params.get('fast', 12), params.get('slow', 26), params.get('signal', 9)) results['macd'] = macd elif name == 'vwap': results['vwap'] = calculate_vwap(data) elif name == 'williams_r': period = params.get('period', 14) results[f'williams_r_{period}'] = calculate_williams_r(data, period) elif name == 'volume_profile': results['volume_profile'] = calculate_volume_profile(data) elif name == 'fvg': results['fvg'] = calculate_fair_value_gaps(data) except Exception as e: print(f"Error calculating {name}: {e}") continue return results def resample_ohlcv(data: pd.DataFrame, timeframe: str) -> pd.DataFrame: """Resample OHLCV data to a different timeframe""" resampled = data.resample(timeframe).agg({ 'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum' }).dropna() return resampled