SmartTrade / backend /indicators.py
Sanyam400's picture
Upload 4 files
bac132b verified
"""
Technical Indicators Module
Calculates various technical indicators for trading analysis.
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Any, Optional
def calculate_sma(data: pd.DataFrame, period: int = 20) -> List[float]:
"""Simple Moving Average"""
return data['close'].rolling(window=period).mean().tolist()
def calculate_ema(data: pd.DataFrame, period: int = 20) -> List[float]:
"""Exponential Moving Average"""
return data['close'].ewm(span=period, adjust=False).mean().tolist()
def calculate_bollinger_bands(data: pd.DataFrame, period: int = 20, std_dev: int = 2) -> Dict[str, List[float]]:
"""Bollinger Bands with middle, upper, and lower bands"""
middle = data['close'].rolling(window=period).mean()
std = data['close'].rolling(window=period).std()
upper = middle + (std * std_dev)
lower = middle - (std * std_dev)
return {
'middle': middle.tolist(),
'upper': upper.tolist(),
'lower': lower.tolist()
}
def calculate_rsi(data: pd.DataFrame, period: int = 14) -> List[float]:
"""Relative Strength Index"""
delta = data['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi.tolist()
def calculate_macd(data: pd.DataFrame, fast: int = 12, slow: int = 26, signal: int = 9) -> Dict[str, List[float]]:
"""MACD with signal line and histogram"""
ema_fast = data['close'].ewm(span=fast, adjust=False).mean()
ema_slow = data['close'].ewm(span=slow, adjust=False).mean()
macd_line = ema_fast - ema_slow
signal_line = macd_line.ewm(span=signal, adjust=False).mean()
histogram = macd_line - signal_line
return {
'macd': macd_line.tolist(),
'signal': signal_line.tolist(),
'histogram': histogram.tolist()
}
def calculate_vwap(data: pd.DataFrame) -> List[float]:
"""Volume Weighted Average Price"""
typical_price = (data['high'] + data['low'] + data['close']) / 3
vwap = (typical_price * data['volume']).cumsum() / data['volume'].cumsum()
return vwap.tolist()
def calculate_williams_r(data: pd.DataFrame, period: int = 14) -> List[float]:
"""Williams %R oscillator"""
highest_high = data['high'].rolling(window=period).max()
lowest_low = data['low'].rolling(window=period).min()
williams_r = -100 * (highest_high - data['close']) / (highest_high - lowest_low)
return williams_r.tolist()
def calculate_volume_profile(data: pd.DataFrame, num_bins: int = 50) -> Dict[str, Any]:
"""
Volume Profile - distribution of volume across price levels
Returns POC (Point of Control), Value Area, and histogram data
"""
price_min = data['low'].min()
price_max = data['high'].max()
if pd.isna(price_min) or pd.isna(price_max) or price_min == price_max:
return {'poc': 0, 'value_area_high': 0, 'value_area_low': 0, 'histogram': []}
price_range = price_max - price_min
bin_size = price_range / num_bins
bins = np.linspace(price_min, price_max, num_bins + 1)
volumes = []
for i in range(num_bins):
mask = (data['close'] >= bins[i]) & (data['close'] < bins[i + 1])
volumes.append(float(data.loc[mask, 'volume'].sum()))
# Find POC (price level with highest volume)
poc_idx = int(np.argmax(volumes))
poc = float((bins[poc_idx] + bins[poc_idx + 1]) / 2)
# Calculate Value Area (70% of volume)
total_volume = sum(volumes)
sorted_indices = np.argsort(volumes)[::-1]
cumulative_volume = 0
value_area_indices = []
for idx in sorted_indices:
cumulative_volume += volumes[idx]
value_area_indices.append(idx)
if cumulative_volume >= total_volume * 0.7:
break
value_area_high = float(bins[max(value_area_indices) + 1]) if value_area_indices else price_max
value_area_low = float(bins[min(value_area_indices)]) if value_area_indices else price_min
histogram = []
for i in range(num_bins):
histogram.append({
'price': float((bins[i] + bins[i + 1]) / 2),
'volume': volumes[i],
'price_low': float(bins[i]),
'price_high': float(bins[i + 1])
})
return {
'poc': poc,
'value_area_high': value_area_high,
'value_area_low': value_area_low,
'histogram': histogram
}
def calculate_fair_value_gaps(data: pd.DataFrame) -> List[Dict[str, Any]]:
"""
Fair Value Gaps - identifies three-candle imbalance patterns
Bullish FVG: Low > High[i-2]
Bearish FVG: High < Low[i-2]
"""
fvg_list = []
for i in range(2, len(data)):
prev_idx = i - 2
curr_idx = i
# Bullish FVG
if data['low'].iloc[curr_idx] > data['high'].iloc[prev_idx]:
fvg_list.append({
'type': 'bullish',
'start_time': int(data.index[curr_idx].timestamp() * 1000),
'end_time': int(data.index[curr_idx].timestamp() * 1000) + 86400000,
'top': float(data['low'].iloc[curr_idx]),
'bottom': float(data['high'].iloc[prev_idx]),
'start_index': prev_idx,
'end_index': curr_idx
})
# Bearish FVG
elif data['high'].iloc[curr_idx] < data['low'].iloc[prev_idx]:
fvg_list.append({
'type': 'bearish',
'start_time': int(data.index[curr_idx].timestamp() * 1000),
'end_time': int(data.index[curr_idx].timestamp() * 1000) + 86400000,
'top': float(data['low'].iloc[prev_idx]),
'bottom': float(data['high'].iloc[curr_idx]),
'start_index': prev_idx,
'end_index': curr_idx
})
return fvg_list
def calculate_all_indicators(data: pd.DataFrame, indicator_config: Dict[str, Any]) -> Dict[str, Any]:
"""Calculate all requested indicators based on config"""
results = {}
for indicator in indicator_config.get('indicators', []):
name = indicator.get('name', '')
params = indicator.get('params', {})
try:
if name == 'sma':
period = params.get('period', 20)
results[f'sma_{period}'] = calculate_sma(data, period)
elif name == 'ema':
period = params.get('period', 20)
results[f'ema_{period}'] = calculate_ema(data, period)
elif name == 'bollinger':
bb = calculate_bollinger_bands(data, params.get('period', 20), params.get('std_dev', 2))
results['bollinger'] = bb
elif name == 'rsi':
period = params.get('period', 14)
results[f'rsi_{period}'] = calculate_rsi(data, period)
elif name == 'macd':
macd = calculate_macd(data, params.get('fast', 12), params.get('slow', 26), params.get('signal', 9))
results['macd'] = macd
elif name == 'vwap':
results['vwap'] = calculate_vwap(data)
elif name == 'williams_r':
period = params.get('period', 14)
results[f'williams_r_{period}'] = calculate_williams_r(data, period)
elif name == 'volume_profile':
results['volume_profile'] = calculate_volume_profile(data)
elif name == 'fvg':
results['fvg'] = calculate_fair_value_gaps(data)
except Exception as e:
print(f"Error calculating {name}: {e}")
continue
return results
def resample_ohlcv(data: pd.DataFrame, timeframe: str) -> pd.DataFrame:
"""Resample OHLCV data to a different timeframe"""
resampled = data.resample(timeframe).agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
}).dropna()
return resampled