| | import MetaTrader5 as mt5
|
| | import pandas as pd
|
| | import numpy as np
|
| | import plotly.graph_objects as go
|
| | from datetime import datetime, timedelta
|
| |
|
| |
|
| |
|
| |
|
| | if not mt5.initialize():
|
| | print("MT5 initialization failed")
|
| | mt5.shutdown()
|
| |
|
| |
|
| |
|
| |
|
| | symbol = "XAUUSDc"
|
| | timeframe = mt5.TIMEFRAME_M3
|
| | n_bars = 500
|
| | utc_from = datetime.now() - timedelta(days=3)
|
| |
|
| | rates = mt5.copy_rates_from(symbol, timeframe, utc_from, n_bars)
|
| | mt5.shutdown()
|
| |
|
| |
|
| |
|
| |
|
| | df = pd.DataFrame(rates)
|
| | df['time'] = pd.to_datetime(df['time'], unit='s')
|
| | df.set_index('time', inplace=True)
|
| |
|
| |
|
| |
|
| |
|
| | df = df[df.index.dayofweek < 5]
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | df['range'] = df['high'] - df['low']
|
| | df['body'] = (df['close'] - df['open']).abs()
|
| | df['upper_wick'] = df['high'] - df[['open', 'close']].max(axis=1)
|
| | df['lower_wick'] = df[['open', 'close']].min(axis=1) - df['low']
|
| | df['direction'] = np.where(df['close'] > df['open'], 1, np.where(df['close'] < df['open'], -1, 0))
|
| |
|
| |
|
| | vol_ma_len = 20
|
| | df['vol_ma'] = df['tick_volume'].rolling(vol_ma_len, min_periods=1).mean()
|
| | df['vol_std'] = df['tick_volume'].rolling(vol_ma_len, min_periods=1).std().fillna(0)
|
| | df['body_ma'] = df['body'].rolling(vol_ma_len, min_periods=1).mean()
|
| | df['range_ma'] = df['range'].rolling(vol_ma_len, min_periods=1).mean()
|
| |
|
| |
|
| | df['vol_z'] = (df['tick_volume'] - df['vol_ma']) / (df['vol_std'] + 1e-9)
|
| | df['body_pct'] = df['body'] / (df['range_ma'] + 1e-9)
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | df['signal'] = 0
|
| | df['signal_reason'] = ""
|
| |
|
| |
|
| | climax_multiplier = 3.0
|
| | high_vol_multiplier = 1.5
|
| | low_vol_multiplier = 0.5
|
| | small_body_thresh = 0.35
|
| | large_body_thresh = 0.6
|
| |
|
| |
|
| | df['close_slope'] = df['close'].diff().rolling(6, min_periods=1).mean()
|
| |
|
| | for i in range(2, len(df)):
|
| | vol = df['tick_volume'].iat[i]
|
| | vol_ma = df['vol_ma'].iat[i]
|
| | direction = df['direction'].iat[i]
|
| | body_pct = df['body_pct'].iat[i]
|
| | upper_wick = df['upper_wick'].iat[i]
|
| | lower_wick = df['lower_wick'].iat[i]
|
| | close = df['close'].iat[i]
|
| | open_ = df['open'].iat[i]
|
| | prev_close = df['close'].iat[i-1]
|
| | slope = df['close_slope'].iat[i-1]
|
| |
|
| |
|
| | if np.isnan(vol_ma) or vol_ma <= 0:
|
| | continue
|
| |
|
| |
|
| | if vol > vol_ma * climax_multiplier:
|
| | if direction == 1 and body_pct > large_body_thresh:
|
| |
|
| | df['signal'].iat[i] = -1
|
| | df['signal_reason'].iat[i] = "Climax Up (high vol & large up bar) -> possible distribution"
|
| | continue
|
| | if direction == -1 and body_pct > large_body_thresh:
|
| |
|
| | df['signal'].iat[i] = 1
|
| | df['signal_reason'].iat[i] = "Climax Down (high vol & large down bar) -> possible selling exhaustion"
|
| | continue
|
| |
|
| |
|
| |
|
| | if vol > vol_ma * high_vol_multiplier and direction == -1 and slope > 0:
|
| | df['signal'].iat[i] = -1
|
| | df['signal_reason'].iat[i] = "High vol down after uptrend -> weakness (sell)"
|
| | continue
|
| |
|
| |
|
| | if vol > vol_ma * high_vol_multiplier and direction == 1 and slope < 0:
|
| | df['signal'].iat[i] = 1
|
| | df['signal_reason'].iat[i] = "High vol up after downtrend -> strength (buy)"
|
| | continue
|
| |
|
| |
|
| | if vol < vol_ma * low_vol_multiplier and direction == 1 and body_pct < small_body_thresh and slope > 0:
|
| | df['signal'].iat[i] = -1
|
| | df['signal_reason'].iat[i] = "No Demand (low vol narrow up bar after advance) -> probable weak continuation"
|
| | continue
|
| |
|
| |
|
| | if vol < vol_ma * low_vol_multiplier and direction == -1 and body_pct < small_body_thresh and slope < 0:
|
| | df['signal'].iat[i] = 1
|
| | df['signal_reason'].iat[i] = "No Supply (low vol narrow down bar after decline) -> probable test, buy"
|
| | continue
|
| |
|
| |
|
| |
|
| | if i >= 2:
|
| | prev_vol = df['tick_volume'].iat[i-1]
|
| | prev_dir = df['direction'].iat[i-1]
|
| | if prev_vol > df['vol_ma'].iat[i-1] * climax_multiplier and prev_dir == -1:
|
| | if vol < vol_ma * low_vol_multiplier and direction != -1:
|
| | df['signal'].iat[i] = 1
|
| | df['signal_reason'].iat[i] = "Test after selling climax -> buy"
|
| | continue
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | buy_mask = df['signal'] == 1
|
| | sell_mask = df['signal'] == -1
|
| |
|
| |
|
| | price_buy_y = df['low'] - (df['range'] * 0.15)
|
| | price_sell_y = df['high'] + (df['range'] * 0.15)
|
| |
|
| |
|
| | vol_max = df['tick_volume'].max()
|
| | vol_buy_y = df['tick_volume'][buy_mask]
|
| | vol_sell_y = df['tick_volume'][sell_mask]
|
| |
|
| |
|
| |
|
| |
|
| | fig = go.Figure()
|
| |
|
| |
|
| | fig.add_trace(go.Candlestick(
|
| | x=df.index,
|
| | open=df['open'],
|
| | high=df['high'],
|
| | low=df['low'],
|
| | close=df['close'],
|
| | name=symbol,
|
| | increasing_line_color='green',
|
| | decreasing_line_color='red'
|
| | ))
|
| |
|
| |
|
| | fig.add_trace(go.Scatter(
|
| | x=df.index[buy_mask],
|
| | y=price_buy_y[buy_mask],
|
| | mode='markers',
|
| | marker=dict(symbol='triangle-up', size=10),
|
| | name='Buy Signal',
|
| | hovertext=df['signal_reason'][buy_mask],
|
| | hoverinfo='text'
|
| | ))
|
| |
|
| |
|
| | fig.add_trace(go.Scatter(
|
| | x=df.index[sell_mask],
|
| | y=price_sell_y[sell_mask],
|
| | mode='markers',
|
| | marker=dict(symbol='triangle-down', size=10),
|
| | name='Sell Signal',
|
| | hovertext=df['signal_reason'][sell_mask],
|
| | hoverinfo='text'
|
| | ))
|
| |
|
| |
|
| | fig.add_trace(go.Bar(
|
| | x=df.index,
|
| | y=df['tick_volume'],
|
| | name='Tick Volume',
|
| | marker_color='gray',
|
| | yaxis='y2'
|
| | ))
|
| |
|
| |
|
| | fig.add_trace(go.Scatter(
|
| | x=df.index[buy_mask],
|
| | y=vol_buy_y,
|
| | mode='markers',
|
| | marker=dict(symbol='circle', size=8),
|
| | name='Volume Buy Marker',
|
| | yaxis='y2',
|
| | hovertext=df['signal_reason'][buy_mask],
|
| | hoverinfo='text'
|
| | ))
|
| | fig.add_trace(go.Scatter(
|
| | x=df.index[sell_mask],
|
| | y=vol_sell_y,
|
| | mode='markers',
|
| | marker=dict(symbol='circle', size=8),
|
| | name='Volume Sell Marker',
|
| | yaxis='y2',
|
| | hovertext=df['signal_reason'][sell_mask],
|
| | hoverinfo='text'
|
| | ))
|
| |
|
| |
|
| | fig.update_layout(
|
| | template='plotly_white',
|
| | title=f'{symbol} 3-Minute Candlestick with VSA Signals',
|
| | xaxis=dict(title='Time', rangeslider=dict(visible=False)),
|
| | yaxis=dict(title='Price', domain=[0.35, 1]),
|
| | yaxis2=dict(title='Tick Volume', domain=[0.15, 0.33], showgrid=False, anchor="x"),
|
| | legend=dict(orientation='h', y=1.02, x=0),
|
| | height=900
|
| | )
|
| |
|
| | fig.show()
|
| |
|
| |
|
| |
|
| |
|
| | recent_signals = df[df['signal'] != 0].copy().tail(20)
|
| | if not recent_signals.empty:
|
| | print("Recent VSA signals (most recent first):")
|
| | for idx, row in recent_signals[::-1].iterrows():
|
| | ts = idx.strftime("%Y-%m-%d %H:%M")
|
| | print(f"{ts} | Signal: {'BUY' if row['signal']==1 else 'SELL'} | Vol: {int(row['tick_volume'])} | Reason: {row['signal_reason']}")
|
| | else:
|
| | print("No VSA signals found in the dataset.")
|
| |
|
| | import MetaTrader5 as mt5
|
| | import pandas as pd
|
| | import numpy as np
|
| | import plotly.graph_objects as go
|
| | from datetime import datetime, timedelta
|
| |
|
| |
|
| |
|
| |
|
| | if not mt5.initialize():
|
| | print("MT5 initialization failed")
|
| | mt5.shutdown()
|
| |
|
| |
|
| |
|
| |
|
| | symbol = "XAUUSDc"
|
| | timeframe = mt5.TIMEFRAME_M3
|
| | n_bars = 500
|
| | utc_from = datetime.now() - timedelta(days=3)
|
| |
|
| | rates = mt5.copy_rates_from(symbol, timeframe, utc_from, n_bars)
|
| | mt5.shutdown()
|
| |
|
| |
|
| |
|
| |
|
| | df = pd.DataFrame(rates)
|
| | df['time'] = pd.to_datetime(df['time'], unit='s')
|
| | df.set_index('time', inplace=True)
|
| |
|
| |
|
| |
|
| |
|
| | df = df[df.index.dayofweek < 5]
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | df['range'] = df['high'] - df['low']
|
| | df['body'] = (df['close'] - df['open']).abs()
|
| | df['upper_wick'] = df['high'] - df[['open', 'close']].max(axis=1)
|
| | df['lower_wick'] = df[['open', 'close']].min(axis=1) - df['low']
|
| | df['direction'] = np.where(df['close'] > df['open'], 1, np.where(df['close'] < df['open'], -1, 0))
|
| |
|
| |
|
| | vol_ma_len = 20
|
| | df['vol_ma'] = df['tick_volume'].rolling(vol_ma_len, min_periods=1).mean()
|
| | df['vol_std'] = df['tick_volume'].rolling(vol_ma_len, min_periods=1).std().fillna(0)
|
| | df['body_ma'] = df['body'].rolling(vol_ma_len, min_periods=1).mean()
|
| | df['range_ma'] = df['range'].rolling(vol_ma_len, min_periods=1).mean()
|
| |
|
| |
|
| | df['vol_z'] = (df['tick_volume'] - df['vol_ma']) / (df['vol_std'] + 1e-9)
|
| | df['body_pct'] = df['body'] / (df['range_ma'] + 1e-9)
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | df['signal'] = 0
|
| | df['signal_reason'] = ""
|
| |
|
| |
|
| | climax_multiplier = 3.0
|
| | high_vol_multiplier = 1.5
|
| | low_vol_multiplier = 0.5
|
| | small_body_thresh = 0.35
|
| | large_body_thresh = 0.6
|
| |
|
| |
|
| | df['close_slope'] = df['close'].diff().rolling(6, min_periods=1).mean()
|
| |
|
| | for i in range(2, len(df)):
|
| | vol = df['tick_volume'].iat[i]
|
| | vol_ma = df['vol_ma'].iat[i]
|
| | direction = df['direction'].iat[i]
|
| | body_pct = df['body_pct'].iat[i]
|
| | upper_wick = df['upper_wick'].iat[i]
|
| | lower_wick = df['lower_wick'].iat[i]
|
| | close = df['close'].iat[i]
|
| | open_ = df['open'].iat[i]
|
| | prev_close = df['close'].iat[i-1]
|
| | slope = df['close_slope'].iat[i-1]
|
| |
|
| |
|
| | if np.isnan(vol_ma) or vol_ma <= 0:
|
| | continue
|
| |
|
| |
|
| | if vol > vol_ma * climax_multiplier:
|
| | if direction == 1 and body_pct > large_body_thresh:
|
| |
|
| | df['signal'].iat[i] = -1
|
| | df['signal_reason'].iat[i] = "Climax Up (high vol & large up bar) -> possible distribution"
|
| | continue
|
| | if direction == -1 and body_pct > large_body_thresh:
|
| |
|
| | df['signal'].iat[i] = 1
|
| | df['signal_reason'].iat[i] = "Climax Down (high vol & large down bar) -> possible selling exhaustion"
|
| | continue
|
| |
|
| |
|
| |
|
| | if vol > vol_ma * high_vol_multiplier and direction == -1 and slope > 0:
|
| | df['signal'].iat[i] = -1
|
| | df['signal_reason'].iat[i] = "High vol down after uptrend -> weakness (sell)"
|
| | continue
|
| |
|
| |
|
| | if vol > vol_ma * high_vol_multiplier and direction == 1 and slope < 0:
|
| | df['signal'].iat[i] = 1
|
| | df['signal_reason'].iat[i] = "High vol up after downtrend -> strength (buy)"
|
| | continue
|
| |
|
| |
|
| | if vol < vol_ma * low_vol_multiplier and direction == 1 and body_pct < small_body_thresh and slope > 0:
|
| | df['signal'].iat[i] = -1
|
| | df['signal_reason'].iat[i] = "No Demand (low vol narrow up bar after advance) -> probable weak continuation"
|
| | continue
|
| |
|
| |
|
| | if vol < vol_ma * low_vol_multiplier and direction == -1 and body_pct < small_body_thresh and slope < 0:
|
| | df['signal'].iat[i] = 1
|
| | df['signal_reason'].iat[i] = "No Supply (low vol narrow down bar after decline) -> probable test, buy"
|
| | continue
|
| |
|
| |
|
| |
|
| | if i >= 2:
|
| | prev_vol = df['tick_volume'].iat[i-1]
|
| | prev_dir = df['direction'].iat[i-1]
|
| | if prev_vol > df['vol_ma'].iat[i-1] * climax_multiplier and prev_dir == -1:
|
| | if vol < vol_ma * low_vol_multiplier and direction != -1:
|
| | df['signal'].iat[i] = 1
|
| | df['signal_reason'].iat[i] = "Test after selling climax -> buy"
|
| | continue
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | buy_mask = df['signal'] == 1
|
| | sell_mask = df['signal'] == -1
|
| |
|
| |
|
| | price_buy_y = df['low'] - (df['range'] * 0.15)
|
| | price_sell_y = df['high'] + (df['range'] * 0.15)
|
| |
|
| |
|
| | vol_max = df['tick_volume'].max()
|
| | vol_buy_y = df['tick_volume'][buy_mask]
|
| | vol_sell_y = df['tick_volume'][sell_mask]
|
| |
|
| |
|
| |
|
| |
|
| | fig = go.Figure()
|
| |
|
| |
|
| | fig.add_trace(go.Candlestick(
|
| | x=df.index,
|
| | open=df['open'],
|
| | high=df['high'],
|
| | low=df['low'],
|
| | close=df['close'],
|
| | name=symbol,
|
| | increasing_line_color='green',
|
| | decreasing_line_color='red'
|
| | ))
|
| |
|
| |
|
| | fig.add_trace(go.Scatter(
|
| | x=df.index[buy_mask],
|
| | y=price_buy_y[buy_mask],
|
| | mode='markers',
|
| | marker=dict(symbol='triangle-up', size=10),
|
| | name='Buy Signal',
|
| | hovertext=df['signal_reason'][buy_mask],
|
| | hoverinfo='text'
|
| | ))
|
| |
|
| |
|
| | fig.add_trace(go.Scatter(
|
| | x=df.index[sell_mask],
|
| | y=price_sell_y[sell_mask],
|
| | mode='markers',
|
| | marker=dict(symbol='triangle-down', size=10),
|
| | name='Sell Signal',
|
| | hovertext=df['signal_reason'][sell_mask],
|
| | hoverinfo='text'
|
| | ))
|
| |
|
| |
|
| | fig.add_trace(go.Bar(
|
| | x=df.index,
|
| | y=df['tick_volume'],
|
| | name='Tick Volume',
|
| | marker_color='gray',
|
| | yaxis='y2'
|
| | ))
|
| |
|
| |
|
| | fig.add_trace(go.Scatter(
|
| | x=df.index[buy_mask],
|
| | y=vol_buy_y,
|
| | mode='markers',
|
| | marker=dict(symbol='circle', size=8),
|
| | name='Volume Buy Marker',
|
| | yaxis='y2',
|
| | hovertext=df['signal_reason'][buy_mask],
|
| | hoverinfo='text'
|
| | ))
|
| | fig.add_trace(go.Scatter(
|
| | x=df.index[sell_mask],
|
| | y=vol_sell_y,
|
| | mode='markers',
|
| | marker=dict(symbol='circle', size=8),
|
| | name='Volume Sell Marker',
|
| | yaxis='y2',
|
| | hovertext=df['signal_reason'][sell_mask],
|
| | hoverinfo='text'
|
| | ))
|
| |
|
| |
|
| | fig.update_layout(
|
| | template='plotly_white',
|
| | title=f'{symbol} 3-Minute Candlestick with VSA Signals',
|
| | xaxis=dict(title='Time', rangeslider=dict(visible=False)),
|
| | yaxis=dict(title='Price', domain=[0.35, 1]),
|
| | yaxis2=dict(title='Tick Volume', domain=[0.15, 0.33], showgrid=False, anchor="x"),
|
| | legend=dict(orientation='h', y=1.02, x=0),
|
| | height=900
|
| | )
|
| |
|
| | fig.show()
|
| |
|
| |
|
| |
|
| |
|
| | recent_signals = df[df['signal'] != 0].copy().tail(20)
|
| | if not recent_signals.empty:
|
| | print("Recent VSA signals (most recent first):")
|
| | for idx, row in recent_signals[::-1].iterrows():
|
| | ts = idx.strftime("%Y-%m-%d %H:%M")
|
| | print(f"{ts} | Signal: {'BUY' if row['signal']==1 else 'SELL'} | Vol: {int(row['tick_volume'])} | Reason: {row['signal_reason']}")
|
| | else:
|
| | print("No VSA signals found in the dataset.")
|
| |
|
| | import MetaTrader5 as mt5
|
| | import pandas as pd
|
| | import numpy as np
|
| | import plotly.graph_objects as go
|
| | from datetime import datetime, timedelta
|
| |
|
| |
|
| |
|
| |
|
| | if not mt5.initialize():
|
| | print("MT5 initialization failed")
|
| | mt5.shutdown()
|
| |
|
| |
|
| |
|
| |
|
| | symbol = "XAUUSDc"
|
| | timeframe = mt5.TIMEFRAME_M3
|
| | n_bars = 500
|
| | utc_from = datetime.now() - timedelta(days=3)
|
| |
|
| | rates = mt5.copy_rates_from(symbol, timeframe, utc_from, n_bars)
|
| | mt5.shutdown()
|
| |
|
| |
|
| |
|
| |
|
| | df = pd.DataFrame(rates)
|
| | df['time'] = pd.to_datetime(df['time'], unit='s')
|
| | df.set_index('time', inplace=True)
|
| |
|
| |
|
| |
|
| |
|
| | df = df[df.index.dayofweek < 5]
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | df['range'] = df['high'] - df['low']
|
| | df['body'] = (df['close'] - df['open']).abs()
|
| | df['upper_wick'] = df['high'] - df[['open', 'close']].max(axis=1)
|
| | df['lower_wick'] = df[['open', 'close']].min(axis=1) - df['low']
|
| | df['direction'] = np.where(df['close'] > df['open'], 1, np.where(df['close'] < df['open'], -1, 0))
|
| |
|
| |
|
| | vol_ma_len = 20
|
| | df['vol_ma'] = df['tick_volume'].rolling(vol_ma_len, min_periods=1).mean()
|
| | df['vol_std'] = df['tick_volume'].rolling(vol_ma_len, min_periods=1).std().fillna(0)
|
| | df['body_ma'] = df['body'].rolling(vol_ma_len, min_periods=1).mean()
|
| | df['range_ma'] = df['range'].rolling(vol_ma_len, min_periods=1).mean()
|
| |
|
| |
|
| | df['vol_z'] = (df['tick_volume'] - df['vol_ma']) / (df['vol_std'] + 1e-9)
|
| | df['body_pct'] = df['body'] / (df['range_ma'] + 1e-9)
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | df['signal'] = 0
|
| | df['signal_reason'] = ""
|
| |
|
| |
|
| | climax_multiplier = 3.0
|
| | high_vol_multiplier = 1.5
|
| | low_vol_multiplier = 0.5
|
| | small_body_thresh = 0.35
|
| | large_body_thresh = 0.6
|
| |
|
| |
|
| | df['close_slope'] = df['close'].diff().rolling(6, min_periods=1).mean()
|
| |
|
| | for i in range(2, len(df)):
|
| | vol = df['tick_volume'].iat[i]
|
| | vol_ma = df['vol_ma'].iat[i]
|
| | direction = df['direction'].iat[i]
|
| | body_pct = df['body_pct'].iat[i]
|
| | upper_wick = df['upper_wick'].iat[i]
|
| | lower_wick = df['lower_wick'].iat[i]
|
| | close = df['close'].iat[i]
|
| | open_ = df['open'].iat[i]
|
| | prev_close = df['close'].iat[i-1]
|
| | slope = df['close_slope'].iat[i-1]
|
| |
|
| |
|
| | if np.isnan(vol_ma) or vol_ma <= 0:
|
| | continue
|
| |
|
| |
|
| | if vol > vol_ma * climax_multiplier:
|
| | if direction == 1 and body_pct > large_body_thresh:
|
| |
|
| | df['signal'].iat[i] = -1
|
| | df['signal_reason'].iat[i] = "Climax Up (high vol & large up bar) -> possible distribution"
|
| | continue
|
| | if direction == -1 and body_pct > large_body_thresh:
|
| |
|
| | df['signal'].iat[i] = 1
|
| | df['signal_reason'].iat[i] = "Climax Down (high vol & large down bar) -> possible selling exhaustion"
|
| | continue
|
| |
|
| |
|
| |
|
| | if vol > vol_ma * high_vol_multiplier and direction == -1 and slope > 0:
|
| | df['signal'].iat[i] = -1
|
| | df['signal_reason'].iat[i] = "High vol down after uptrend -> weakness (sell)"
|
| | continue
|
| |
|
| |
|
| | if vol > vol_ma * high_vol_multiplier and direction == 1 and slope < 0:
|
| | df['signal'].iat[i] = 1
|
| | df['signal_reason'].iat[i] = "High vol up after downtrend -> strength (buy)"
|
| | continue
|
| |
|
| |
|
| | if vol < vol_ma * low_vol_multiplier and direction == 1 and body_pct < small_body_thresh and slope > 0:
|
| | df['signal'].iat[i] = -1
|
| | df['signal_reason'].iat[i] = "No Demand (low vol narrow up bar after advance) -> probable weak continuation"
|
| | continue
|
| |
|
| |
|
| | if vol < vol_ma * low_vol_multiplier and direction == -1 and body_pct < small_body_thresh and slope < 0:
|
| | df['signal'].iat[i] = 1
|
| | df['signal_reason'].iat[i] = "No Supply (low vol narrow down bar after decline) -> probable test, buy"
|
| | continue
|
| |
|
| |
|
| |
|
| | if i >= 2:
|
| | prev_vol = df['tick_volume'].iat[i-1]
|
| | prev_dir = df['direction'].iat[i-1]
|
| | if prev_vol > df['vol_ma'].iat[i-1] * climax_multiplier and prev_dir == -1:
|
| | if vol < vol_ma * low_vol_multiplier and direction != -1:
|
| | df['signal'].iat[i] = 1
|
| | df['signal_reason'].iat[i] = "Test after selling climax -> buy"
|
| | continue
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | buy_mask = df['signal'] == 1
|
| | sell_mask = df['signal'] == -1
|
| |
|
| |
|
| | price_buy_y = df['low'] - (df['range'] * 0.15)
|
| | price_sell_y = df['high'] + (df['range'] * 0.15)
|
| |
|
| |
|
| | vol_max = df['tick_volume'].max()
|
| | vol_buy_y = df['tick_volume'][buy_mask]
|
| | vol_sell_y = df['tick_volume'][sell_mask]
|
| |
|
| |
|
| |
|
| |
|
| | fig = go.Figure()
|
| |
|
| |
|
| | fig.add_trace(go.Candlestick(
|
| | x=df.index,
|
| | open=df['open'],
|
| | high=df['high'],
|
| | low=df['low'],
|
| | close=df['close'],
|
| | name=symbol,
|
| | increasing_line_color='green',
|
| | decreasing_line_color='red'
|
| | ))
|
| |
|
| |
|
| | fig.add_trace(go.Scatter(
|
| | x=df.index[buy_mask],
|
| | y=price_buy_y[buy_mask],
|
| | mode='markers',
|
| | marker=dict(symbol='triangle-up', size=10),
|
| | name='Buy Signal',
|
| | hovertext=df['signal_reason'][buy_mask],
|
| | hoverinfo='text'
|
| | ))
|
| |
|
| |
|
| | fig.add_trace(go.Scatter(
|
| | x=df.index[sell_mask],
|
| | y=price_sell_y[sell_mask],
|
| | mode='markers',
|
| | marker=dict(symbol='triangle-down', size=10),
|
| | name='Sell Signal',
|
| | hovertext=df['signal_reason'][sell_mask],
|
| | hoverinfo='text'
|
| | ))
|
| |
|
| |
|
| | fig.add_trace(go.Bar(
|
| | x=df.index,
|
| | y=df['tick_volume'],
|
| | name='Tick Volume',
|
| | marker_color='gray',
|
| | yaxis='y2'
|
| | ))
|
| |
|
| |
|
| | fig.add_trace(go.Scatter(
|
| | x=df.index[buy_mask],
|
| | y=vol_buy_y,
|
| | mode='markers',
|
| | marker=dict(symbol='circle', size=8),
|
| | name='Volume Buy Marker',
|
| | yaxis='y2',
|
| | hovertext=df['signal_reason'][buy_mask],
|
| | hoverinfo='text'
|
| | ))
|
| | fig.add_trace(go.Scatter(
|
| | x=df.index[sell_mask],
|
| | y=vol_sell_y,
|
| | mode='markers',
|
| | marker=dict(symbol='circle', size=8),
|
| | name='Volume Sell Marker',
|
| | yaxis='y2',
|
| | hovertext=df['signal_reason'][sell_mask],
|
| | hoverinfo='text'
|
| | ))
|
| |
|
| |
|
| | fig.update_layout(
|
| | template='plotly_white',
|
| | title=f'{symbol} 3-Minute Candlestick with VSA Signals',
|
| | xaxis=dict(title='Time', rangeslider=dict(visible=False)),
|
| | yaxis=dict(title='Price', domain=[0.35, 1]),
|
| | yaxis2=dict(title='Tick Volume', domain=[0.15, 0.33], showgrid=False, anchor="x"),
|
| | legend=dict(orientation='h', y=1.02, x=0),
|
| | height=900
|
| | )
|
| |
|
| | fig.show()
|
| |
|
| |
|
| |
|
| |
|
| | recent_signals = df[df['signal'] != 0].copy().tail(20)
|
| | if not recent_signals.empty:
|
| | print("Recent VSA signals (most recent first):")
|
| | for idx, row in recent_signals[::-1].iterrows():
|
| | ts = idx.strftime("%Y-%m-%d %H:%M")
|
| | print(f"{ts} | Signal: {'BUY' if row['signal']==1 else 'SELL'} | Vol: {int(row['tick_volume'])} | Reason: {row['signal_reason']}")
|
| | else:
|
| | print("No VSA signals found in the dataset.")
|
| |
|
| | import MetaTrader5 as mt5
|
| | import pandas as pd
|
| | import numpy as np
|
| | import plotly.graph_objects as go
|
| | from datetime import datetime, timedelta
|
| |
|
| |
|
| |
|
| |
|
| | if not mt5.initialize():
|
| | print("MT5 initialization failed")
|
| | mt5.shutdown()
|
| |
|
| |
|
| |
|
| |
|
| | symbol = "XAUUSDc"
|
| | timeframe = mt5.TIMEFRAME_M3
|
| | n_bars = 500
|
| | utc_from = datetime.now() - timedelta(days=3)
|
| |
|
| | rates = mt5.copy_rates_from(symbol, timeframe, utc_from, n_bars)
|
| | mt5.shutdown()
|
| |
|
| |
|
| |
|
| |
|
| | df = pd.DataFrame(rates)
|
| | df['time'] = pd.to_datetime(df['time'], unit='s')
|
| | df.set_index('time', inplace=True)
|
| |
|
| |
|
| |
|
| |
|
| | df = df[df.index.dayofweek < 5]
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | df['range'] = df['high'] - df['low']
|
| | df['body'] = (df['close'] - df['open']).abs()
|
| | df['upper_wick'] = df['high'] - df[['open', 'close']].max(axis=1)
|
| | df['lower_wick'] = df[['open', 'close']].min(axis=1) - df['low']
|
| | df['direction'] = np.where(df['close'] > df['open'], 1, np.where(df['close'] < df['open'], -1, 0))
|
| |
|
| |
|
| | vol_ma_len = 20
|
| | df['vol_ma'] = df['tick_volume'].rolling(vol_ma_len, min_periods=1).mean()
|
| | df['vol_std'] = df['tick_volume'].rolling(vol_ma_len, min_periods=1).std().fillna(0)
|
| | df['body_ma'] = df['body'].rolling(vol_ma_len, min_periods=1).mean()
|
| | df['range_ma'] = df['range'].rolling(vol_ma_len, min_periods=1).mean()
|
| |
|
| |
|
| | df['vol_z'] = (df['tick_volume'] - df['vol_ma']) / (df['vol_std'] + 1e-9)
|
| | df['body_pct'] = df['body'] / (df['range_ma'] + 1e-9)
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | df['signal'] = 0
|
| | df['signal_reason'] = ""
|
| |
|
| |
|
| | climax_multiplier = 3.0
|
| | high_vol_multiplier = 1.5
|
| | low_vol_multiplier = 0.5
|
| | small_body_thresh = 0.35
|
| | large_body_thresh = 0.6
|
| |
|
| |
|
| | df['close_slope'] = df['close'].diff().rolling(6, min_periods=1).mean()
|
| |
|
| | for i in range(2, len(df)):
|
| | vol = df['tick_volume'].iat[i]
|
| | vol_ma = df['vol_ma'].iat[i]
|
| | direction = df['direction'].iat[i]
|
| | body_pct = df['body_pct'].iat[i]
|
| | upper_wick = df['upper_wick'].iat[i]
|
| | lower_wick = df['lower_wick'].iat[i]
|
| | close = df['close'].iat[i]
|
| | open_ = df['open'].iat[i]
|
| | prev_close = df['close'].iat[i-1]
|
| | slope = df['close_slope'].iat[i-1]
|
| |
|
| |
|
| | if np.isnan(vol_ma) or vol_ma <= 0:
|
| | continue
|
| |
|
| |
|
| | if vol > vol_ma * climax_multiplier:
|
| | if direction == 1 and body_pct > large_body_thresh:
|
| |
|
| | df['signal'].iat[i] = -1
|
| | df['signal_reason'].iat[i] = "Climax Up (high vol & large up bar) -> possible distribution"
|
| | continue
|
| | if direction == -1 and body_pct > large_body_thresh:
|
| |
|
| | df['signal'].iat[i] = 1
|
| | df['signal_reason'].iat[i] = "Climax Down (high vol & large down bar) -> possible selling exhaustion"
|
| | continue
|
| |
|
| |
|
| |
|
| | if vol > vol_ma * high_vol_multiplier and direction == -1 and slope > 0:
|
| | df['signal'].iat[i] = -1
|
| | df['signal_reason'].iat[i] = "High vol down after uptrend -> weakness (sell)"
|
| | continue
|
| |
|
| |
|
| | if vol > vol_ma * high_vol_multiplier and direction == 1 and slope < 0:
|
| | df['signal'].iat[i] = 1
|
| | df['signal_reason'].iat[i] = "High vol up after downtrend -> strength (buy)"
|
| | continue
|
| |
|
| |
|
| | if vol < vol_ma * low_vol_multiplier and direction == 1 and body_pct < small_body_thresh and slope > 0:
|
| | df['signal'].iat[i] = -1
|
| | df['signal_reason'].iat[i] = "No Demand (low vol narrow up bar after advance) -> probable weak continuation"
|
| | continue
|
| |
|
| |
|
| | if vol < vol_ma * low_vol_multiplier and direction == -1 and body_pct < small_body_thresh and slope < 0:
|
| | df['signal'].iat[i] = 1
|
| | df['signal_reason'].iat[i] = "No Supply (low vol narrow down bar after decline) -> probable test, buy"
|
| | continue
|
| |
|
| |
|
| |
|
| | if i >= 2:
|
| | prev_vol = df['tick_volume'].iat[i-1]
|
| | prev_dir = df['direction'].iat[i-1]
|
| | if prev_vol > df['vol_ma'].iat[i-1] * climax_multiplier and prev_dir == -1:
|
| | if vol < vol_ma * low_vol_multiplier and direction != -1:
|
| | df['signal'].iat[i] = 1
|
| | df['signal_reason'].iat[i] = "Test after selling climax -> buy"
|
| | continue
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | buy_mask = df['signal'] == 1
|
| | sell_mask = df['signal'] == -1
|
| |
|
| |
|
| | price_buy_y = df['low'] - (df['range'] * 0.15)
|
| | price_sell_y = df['high'] + (df['range'] * 0.15)
|
| |
|
| |
|
| | vol_max = df['tick_volume'].max()
|
| | vol_buy_y = df['tick_volume'][buy_mask]
|
| | vol_sell_y = df['tick_volume'][sell_mask]
|
| |
|
| |
|
| |
|
| |
|
| | fig = go.Figure()
|
| |
|
| |
|
| | fig.add_trace(go.Candlestick(
|
| | x=df.index,
|
| | open=df['open'],
|
| | high=df['high'],
|
| | low=df['low'],
|
| | close=df['close'],
|
| | name=symbol,
|
| | increasing_line_color='green',
|
| | decreasing_line_color='red'
|
| | ))
|
| |
|
| |
|
| | fig.add_trace(go.Scatter(
|
| | x=df.index[buy_mask],
|
| | y=price_buy_y[buy_mask],
|
| | mode='markers',
|
| | marker=dict(symbol='triangle-up', size=10),
|
| | name='Buy Signal',
|
| | hovertext=df['signal_reason'][buy_mask],
|
| | hoverinfo='text'
|
| | ))
|
| |
|
| |
|
| | fig.add_trace(go.Scatter(
|
| | x=df.index[sell_mask],
|
| | y=price_sell_y[sell_mask],
|
| | mode='markers',
|
| | marker=dict(symbol='triangle-down', size=10),
|
| | name='Sell Signal',
|
| | hovertext=df['signal_reason'][sell_mask],
|
| | hoverinfo='text'
|
| | ))
|
| |
|
| |
|
| | fig.add_trace(go.Bar(
|
| | x=df.index,
|
| | y=df['tick_volume'],
|
| | name='Tick Volume',
|
| | marker_color='gray',
|
| | yaxis='y2'
|
| | ))
|
| |
|
| |
|
| | fig.add_trace(go.Scatter(
|
| | x=df.index[buy_mask],
|
| | y=vol_buy_y,
|
| | mode='markers',
|
| | marker=dict(symbol='circle', size=8),
|
| | name='Volume Buy Marker',
|
| | yaxis='y2',
|
| | hovertext=df['signal_reason'][buy_mask],
|
| | hoverinfo='text'
|
| | ))
|
| | fig.add_trace(go.Scatter(
|
| | x=df.index[sell_mask],
|
| | y=vol_sell_y,
|
| | mode='markers',
|
| | marker=dict(symbol='circle', size=8),
|
| | name='Volume Sell Marker',
|
| | yaxis='y2',
|
| | hovertext=df['signal_reason'][sell_mask],
|
| | hoverinfo='text'
|
| | ))
|
| |
|
| |
|
| | fig.update_layout(
|
| | template='plotly_white',
|
| | title=f'{symbol} 3-Minute Candlestick with VSA Signals',
|
| | xaxis=dict(title='Time', rangeslider=dict(visible=False)),
|
| | yaxis=dict(title='Price', domain=[0.35, 1]),
|
| | yaxis2=dict(title='Tick Volume', domain=[0.15, 0.33], showgrid=False, anchor="x"),
|
| | legend=dict(orientation='h', y=1.02, x=0),
|
| | height=900
|
| | )
|
| |
|
| | fig.show()
|
| |
|
| |
|
| |
|
| |
|
| | recent_signals = df[df['signal'] != 0].copy().tail(20)
|
| | if not recent_signals.empty:
|
| | print("Recent VSA signals (most recent first):")
|
| | for idx, row in recent_signals[::-1].iterrows():
|
| | ts = idx.strftime("%Y-%m-%d %H:%M")
|
| | print(f"{ts} | Signal: {'BUY' if row['signal']==1 else 'SELL'} | Vol: {int(row['tick_volume'])} | Reason: {row['signal_reason']}")
|
| | else:
|
| | print("No VSA signals found in the dataset.")
|
| |
|