Spaces:
Sleeping
Sleeping
| """ | |
| Technical Indicators Module | |
| Calculates various technical indicators for trading analysis. | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| from typing import Dict, List, Any, Optional | |
| def calculate_sma(data: pd.DataFrame, period: int = 20) -> List[float]: | |
| """Simple Moving Average""" | |
| return data['close'].rolling(window=period).mean().tolist() | |
| def calculate_ema(data: pd.DataFrame, period: int = 20) -> List[float]: | |
| """Exponential Moving Average""" | |
| return data['close'].ewm(span=period, adjust=False).mean().tolist() | |
| def calculate_bollinger_bands(data: pd.DataFrame, period: int = 20, std_dev: int = 2) -> Dict[str, List[float]]: | |
| """Bollinger Bands with middle, upper, and lower bands""" | |
| middle = data['close'].rolling(window=period).mean() | |
| std = data['close'].rolling(window=period).std() | |
| upper = middle + (std * std_dev) | |
| lower = middle - (std * std_dev) | |
| return { | |
| 'middle': middle.tolist(), | |
| 'upper': upper.tolist(), | |
| 'lower': lower.tolist() | |
| } | |
| def calculate_rsi(data: pd.DataFrame, period: int = 14) -> List[float]: | |
| """Relative Strength Index""" | |
| delta = data['close'].diff() | |
| gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() | |
| loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() | |
| rs = gain / loss | |
| rsi = 100 - (100 / (1 + rs)) | |
| return rsi.tolist() | |
| def calculate_macd(data: pd.DataFrame, fast: int = 12, slow: int = 26, signal: int = 9) -> Dict[str, List[float]]: | |
| """MACD with signal line and histogram""" | |
| ema_fast = data['close'].ewm(span=fast, adjust=False).mean() | |
| ema_slow = data['close'].ewm(span=slow, adjust=False).mean() | |
| macd_line = ema_fast - ema_slow | |
| signal_line = macd_line.ewm(span=signal, adjust=False).mean() | |
| histogram = macd_line - signal_line | |
| return { | |
| 'macd': macd_line.tolist(), | |
| 'signal': signal_line.tolist(), | |
| 'histogram': histogram.tolist() | |
| } | |
| def calculate_vwap(data: pd.DataFrame) -> List[float]: | |
| """Volume Weighted Average Price""" | |
| typical_price = (data['high'] + data['low'] + data['close']) / 3 | |
| vwap = (typical_price * data['volume']).cumsum() / data['volume'].cumsum() | |
| return vwap.tolist() | |
| def calculate_williams_r(data: pd.DataFrame, period: int = 14) -> List[float]: | |
| """Williams %R oscillator""" | |
| highest_high = data['high'].rolling(window=period).max() | |
| lowest_low = data['low'].rolling(window=period).min() | |
| williams_r = -100 * (highest_high - data['close']) / (highest_high - lowest_low) | |
| return williams_r.tolist() | |
| def calculate_volume_profile(data: pd.DataFrame, num_bins: int = 50) -> Dict[str, Any]: | |
| """ | |
| Volume Profile - distribution of volume across price levels | |
| Returns POC (Point of Control), Value Area, and histogram data | |
| """ | |
| price_min = data['low'].min() | |
| price_max = data['high'].max() | |
| if pd.isna(price_min) or pd.isna(price_max) or price_min == price_max: | |
| return {'poc': 0, 'value_area_high': 0, 'value_area_low': 0, 'histogram': []} | |
| price_range = price_max - price_min | |
| bin_size = price_range / num_bins | |
| bins = np.linspace(price_min, price_max, num_bins + 1) | |
| volumes = [] | |
| for i in range(num_bins): | |
| mask = (data['close'] >= bins[i]) & (data['close'] < bins[i + 1]) | |
| volumes.append(float(data.loc[mask, 'volume'].sum())) | |
| # Find POC (price level with highest volume) | |
| poc_idx = int(np.argmax(volumes)) | |
| poc = float((bins[poc_idx] + bins[poc_idx + 1]) / 2) | |
| # Calculate Value Area (70% of volume) | |
| total_volume = sum(volumes) | |
| sorted_indices = np.argsort(volumes)[::-1] | |
| cumulative_volume = 0 | |
| value_area_indices = [] | |
| for idx in sorted_indices: | |
| cumulative_volume += volumes[idx] | |
| value_area_indices.append(idx) | |
| if cumulative_volume >= total_volume * 0.7: | |
| break | |
| value_area_high = float(bins[max(value_area_indices) + 1]) if value_area_indices else price_max | |
| value_area_low = float(bins[min(value_area_indices)]) if value_area_indices else price_min | |
| histogram = [] | |
| for i in range(num_bins): | |
| histogram.append({ | |
| 'price': float((bins[i] + bins[i + 1]) / 2), | |
| 'volume': volumes[i], | |
| 'price_low': float(bins[i]), | |
| 'price_high': float(bins[i + 1]) | |
| }) | |
| return { | |
| 'poc': poc, | |
| 'value_area_high': value_area_high, | |
| 'value_area_low': value_area_low, | |
| 'histogram': histogram | |
| } | |
| def calculate_fair_value_gaps(data: pd.DataFrame) -> List[Dict[str, Any]]: | |
| """ | |
| Fair Value Gaps - identifies three-candle imbalance patterns | |
| Bullish FVG: Low > High[i-2] | |
| Bearish FVG: High < Low[i-2] | |
| """ | |
| fvg_list = [] | |
| for i in range(2, len(data)): | |
| prev_idx = i - 2 | |
| curr_idx = i | |
| # Bullish FVG | |
| if data['low'].iloc[curr_idx] > data['high'].iloc[prev_idx]: | |
| fvg_list.append({ | |
| 'type': 'bullish', | |
| 'start_time': int(data.index[curr_idx].timestamp() * 1000), | |
| 'end_time': int(data.index[curr_idx].timestamp() * 1000) + 86400000, | |
| 'top': float(data['low'].iloc[curr_idx]), | |
| 'bottom': float(data['high'].iloc[prev_idx]), | |
| 'start_index': prev_idx, | |
| 'end_index': curr_idx | |
| }) | |
| # Bearish FVG | |
| elif data['high'].iloc[curr_idx] < data['low'].iloc[prev_idx]: | |
| fvg_list.append({ | |
| 'type': 'bearish', | |
| 'start_time': int(data.index[curr_idx].timestamp() * 1000), | |
| 'end_time': int(data.index[curr_idx].timestamp() * 1000) + 86400000, | |
| 'top': float(data['low'].iloc[prev_idx]), | |
| 'bottom': float(data['high'].iloc[curr_idx]), | |
| 'start_index': prev_idx, | |
| 'end_index': curr_idx | |
| }) | |
| return fvg_list | |
| def calculate_all_indicators(data: pd.DataFrame, indicator_config: Dict[str, Any]) -> Dict[str, Any]: | |
| """Calculate all requested indicators based on config""" | |
| results = {} | |
| for indicator in indicator_config.get('indicators', []): | |
| name = indicator.get('name', '') | |
| params = indicator.get('params', {}) | |
| try: | |
| if name == 'sma': | |
| period = params.get('period', 20) | |
| results[f'sma_{period}'] = calculate_sma(data, period) | |
| elif name == 'ema': | |
| period = params.get('period', 20) | |
| results[f'ema_{period}'] = calculate_ema(data, period) | |
| elif name == 'bollinger': | |
| bb = calculate_bollinger_bands(data, params.get('period', 20), params.get('std_dev', 2)) | |
| results['bollinger'] = bb | |
| elif name == 'rsi': | |
| period = params.get('period', 14) | |
| results[f'rsi_{period}'] = calculate_rsi(data, period) | |
| elif name == 'macd': | |
| macd = calculate_macd(data, params.get('fast', 12), params.get('slow', 26), params.get('signal', 9)) | |
| results['macd'] = macd | |
| elif name == 'vwap': | |
| results['vwap'] = calculate_vwap(data) | |
| elif name == 'williams_r': | |
| period = params.get('period', 14) | |
| results[f'williams_r_{period}'] = calculate_williams_r(data, period) | |
| elif name == 'volume_profile': | |
| results['volume_profile'] = calculate_volume_profile(data) | |
| elif name == 'fvg': | |
| results['fvg'] = calculate_fair_value_gaps(data) | |
| except Exception as e: | |
| print(f"Error calculating {name}: {e}") | |
| continue | |
| return results | |
| def resample_ohlcv(data: pd.DataFrame, timeframe: str) -> pd.DataFrame: | |
| """Resample OHLCV data to a different timeframe""" | |
| resampled = data.resample(timeframe).agg({ | |
| 'open': 'first', | |
| 'high': 'max', | |
| 'low': 'min', | |
| 'close': 'last', | |
| 'volume': 'sum' | |
| }).dropna() | |
| return resampled | |