# New version ---
import streamlit as st
import pandas as pd
import os
import numpy as np
from datetime import date, datetime
import plotly.graph_objects as go
import itertools
import json
# --- CORRECTED IMPORTS: Moved MACD ---
from ta.volatility import BollingerBands
from ta.momentum import RSIIndicator # Removed MACD from here
from ta.trend import ADXIndicator, MACD
from ta.volume import MFIIndicator # Added MFI here
# --- [NEW] ADDED ATR FOR INTELLIGENT EXIT ---
from ta.volatility import AverageTrueRange
# --- [END NEW] ---
from multiprocessing import Pool, cpu_count
from functools import partial
from dateutil.relativedelta import relativedelta
from datetime import timedelta
# --- 0. Settings Management Functions ---
CONFIG_FILE = "config.json"
VETO_CONFIG_FILE = "veto_config.json"
TOP_SETUPS_FILE = "top_setups.json"
USER_SETUPS_FILE = "user_advisor_setups.json"
MARKOV_SETUP_FILE = "best_markov.json"
def save_settings(params_to_save):
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
json.dump(params_to_save, f, indent=4)
st.sidebar.success("Settings saved as default!")
# --- Replacement for load_settings function ---
def load_settings():
default_structure = {
"large_ma_period": 70,
"bband_period": 32,
"bband_std_dev": 1.4,
"confidence_threshold": 95,
"long_entry_threshold_pct": 0.01,
"long_exit_ma_threshold_pct": 0.0,
"long_trailing_stop_loss_pct": 0.2,
"long_delay_days": 0,
"short_entry_threshold_pct": 0.0,
"short_exit_ma_threshold_pct": 0.0,
"short_trailing_stop_loss_pct": 0.2,
"short_delay_days": 0,
"use_rsi": True, "rsi_w": 1.5,
"rsi_logic": "Level",
"primary_driver": "Bollinger Bands",
"exit_logic_type": "Intelligent (ADX/MACD/ATR)",
"exit_confidence_threshold": 40,
"smart_exit_atr_period": 14,
"smart_exit_atr_multiplier": 3.0,
"intelligent_tsl_pct": 0.2,
"norm_lookback_years": 1,
'use_rolling_benchmark': False,
"benchmark_rank": 99,
"use_ma_floor_filter": True,
"catcher_stop_pct": 0.03,
"use_vol": False, "vol_w": 0.5,
"use_trend": False, "trend_w": 2.0,
"use_volume": False, "volume_w": 0.5,
"use_adx_filter": False, "adx_threshold": 10.0,
"adx_period": 14,
"use_macd": False, "macd_w": 2.0,
"use_ma_slope": False, "ma_slope_w": 0.5,
"use_markov": False, "markov_w": 1.0,
"max_trading_days": 60,
"max_long_duration": 60,
"max_short_duration": 3
}
if os.path.exists(CONFIG_FILE):
try:
with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
loaded = json.load(f)
for key in default_structure:
if key in loaded:
default_structure[key] = loaded[key]
except (json.JSONDecodeError, Exception) as e:
print(f"Error loading config.json: {e}. Using default settings.")
return default_structure
# --- UPDATED: Save a list of veto setups ---
def save_veto_setup(veto_setups_list): # Takes a list now
with open(VETO_CONFIG_FILE, 'w', encoding='utf-8') as f:
# Save the list directly
json.dump(veto_setups_list, f, indent=4)
st.sidebar.success(f"Saved {len(veto_setups_list)} Veto filter(s) as default!")
# --- UPDATED: Load a list of veto setups, robustly ---
def load_veto_setup():
veto_list = [] # Default to empty list
if os.path.exists(VETO_CONFIG_FILE):
try:
with open(VETO_CONFIG_FILE, 'r', encoding='utf-8') as f:
loaded_data = json.load(f)
# Ensure it's loaded as a list
if isinstance(loaded_data, list):
veto_list = loaded_data
elif isinstance(loaded_data, dict): # Handle old single dict format
veto_list = [loaded_data]
except (json.JSONDecodeError, Exception) as e:
print(f"Error loading veto config: {e}. Using empty list.")
# Keep veto_list as empty
pass
return veto_list
# --- UPDATED: Includes new factors, num_setups=8, removed infinity filter ---
def save_top_setups(results_df, side, num_setups=8): # Default is now 8
df = results_df.copy()
if df.empty:
st.sidebar.warning(f"No valid setups found for {side.title()} to save.")
return
deduplication_cols = [
'Conf. Threshold', 'Avg Profit/Trade', 'Ticker G/B Ratio', 'Trade G/B Ratio',
'Winning Tickers', 'Losing Tickers', 'Avg Entry Conf.',
'Good Score', 'Bad Score', 'Norm. Score %', 'Total Trades'
]
factor_cols = ['RSI', 'Volatility', 'TREND', 'Volume', 'MACD', 'MA Slope']
existing_factor_cols = [col for col in factor_cols if col in df.columns]
if existing_factor_cols:
df['FactorsOn'] = df[existing_factor_cols].apply(lambda row: (row == 'On').sum(), axis=1)
else:
df['FactorsOn'] = 0
# --- CONSISTENCY FIX: Sort by the Weighted Score ("Norm. Score %") ---
# If Norm. Score % exists, use it. Otherwise fall back to Strategy Score or Trade G/B Ratio.
sort_col = 'Trade G/B Ratio' # Default fallback
if 'Norm. Score %' in df.columns:
sort_col = 'Norm. Score %'
elif 'Strategy Score' in df.columns:
sort_col = 'Strategy Score'
if sort_col in df.columns:
df[sort_col] = df[sort_col].fillna(-np.inf)
# Sort by Score (Descending), then by Complexity (Ascending - simpler is better)
df = df.sort_values(
by=[sort_col, 'FactorsOn'],
ascending=[False, True]
)
else:
st.sidebar.error(f"Could not sort top setups: '{sort_col}' column missing.")
return
existing_cols_for_dedup = [col for col in deduplication_cols if col in df.columns]
deduplicated_df = df.drop_duplicates(subset=existing_cols_for_dedup, keep='first')
top_setups = deduplicated_df.head(num_setups).to_dict('records')
if os.path.exists(TOP_SETUPS_FILE):
try:
with open(TOP_SETUPS_FILE, 'r', encoding='utf-8') as f:
all_top_setups = json.load(f)
except json.JSONDecodeError:
all_top_setups = {}
else:
all_top_setups = {}
all_top_setups[side] = top_setups
with open(TOP_SETUPS_FILE, 'w', encoding='utf-8') as f:
json.dump(all_top_setups, f, indent=4)
st.sidebar.success(f"Top {len(top_setups)} unique {side.title()} setups saved! (Sorted by {sort_col})")
def load_top_setups():
if os.path.exists(TOP_SETUPS_FILE):
try:
with open(TOP_SETUPS_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except json.JSONDecodeError:
return None # Return None if file is corrupt
return None
@st.cache_data # Use caching for efficiency
def load_proverbs():
# Default to an empty list
proverbs_list = []
if os.path.exists("proverbs.json"):
try:
with open("proverbs.json", 'r', encoding='utf-8') as f:
loaded_data = json.load(f)
# Ensure it's a list
if isinstance(loaded_data, list):
proverbs_list = loaded_data
except (json.JSONDecodeError, Exception) as e:
print(f"Error loading proverbs.json: {e}. Using fallback.")
# Keep proverbs_list empty or provide a default
# proverbs_list = ["Error loading proverbs."]
# Add a fallback if the list is empty after trying to load
if not proverbs_list:
proverbs_list = ["Have a great trading day!"] # Fallback message
return proverbs_list
def load_user_setups():
"""Loads user-defined advisor setups from a JSON file."""
# 1. Update Template to include Notes
default_row_template = {
"Run": False,
"Notes": "", # <--- NEW FIELD FOR ADVICE
"RSI": "Off", "Volatility": "Off", "TREND": "Off", "Volume": "Off",
"MACD": "Off", "MA Slope": "Off", "Markov": "Off",
"ADX Filter": "Off",
"Conf. Threshold": 50,
"Large MA Period": 50, "Bollinger Band Period": 20, "Bollinger Band Std Dev": 2.0,
"Catcher Offset (%)": 3.0,
"Long Entry Threshold (%)": 0.0, "Long Exit Threshold (%)": 0.0, "Long Stop Loss (%)": 8.0, "Long Delay (Days)": 0,
"Short Entry Threshold (%)": 0.0, "Short Exit Threshold (%)": 0.0, "Short Stop Loss (%)": 8.0, "Short Delay (Days)": 0,
"Z_Avg_Profit": 0.0, "Z_Num_Trades": 0, "Z_WL_Ratio": 0.0
}
# Default Examples
default_setup = [default_row_template.copy()]
# Pad with defaults
while len(default_setup) < 20:
default_setup.append(default_row_template.copy())
if os.path.exists(USER_SETUPS_FILE):
try:
with open(USER_SETUPS_FILE, 'r', encoding='utf-8') as f:
user_setups = json.load(f)
if isinstance(user_setups, list):
processed_setups = []
for setup in user_setups:
full_setup = default_row_template.copy()
full_setup.update(setup) # Merge loaded data
processed_setups.append(full_setup)
while len(processed_setups) < 20:
processed_setups.append(default_row_template.copy())
return processed_setups[:20]
else:
return default_setup
except Exception as e:
print(f"Error loading {USER_SETUPS_FILE}: {e}. Using defaults.")
return default_setup
return default_setup
# --- UPDATED: Callback to Save Setup (Added Notes) ---
def add_setup_to_user_list():
try:
stats = st.session_state.get('last_run_stats', {})
if not stats:
st.toast("⚠️ No stats found. Run analysis first.", icon="⚠️")
return
def get_weight_or_off(toggle_key, weight_key):
if st.session_state.get(toggle_key, False):
return round(st.session_state.get(weight_key, 1.0), 2)
return "Off"
adx_val = "Off"
if st.session_state.get("use_adx_filter", False):
val = st.session_state.get("adx_threshold", 25.0)
adx_val = max(20.0, min(30.0, val))
new_setup = {
"Run": True,
"Notes": "Auto-Saved Setup", # <--- Default note for auto-saves
"RSI": get_weight_or_off('use_rsi', 'rsi_w'),
"Volatility": get_weight_or_off('use_vol', 'vol_w'),
"TREND": get_weight_or_off('use_trend', 'trend_w'),
"Volume": get_weight_or_off('use_volume', 'volume_w'),
"MACD": get_weight_or_off('use_macd', 'macd_w'),
"MA Slope": get_weight_or_off('use_ma_slope', 'ma_slope_w'),
"Markov": get_weight_or_off('use_markov', 'markov_w'),
"ADX Filter": adx_val,
"Conf. Threshold": st.session_state.confidence_slider,
"Large MA Period": st.session_state.ma_period,
"Bollinger Band Period": st.session_state.bb_period,
"Bollinger Band Std Dev": st.session_state.bb_std,
"Long Entry Threshold (%)": st.session_state.long_entry,
"Long Exit Threshold (%)": st.session_state.long_exit,
"Long Stop Loss (%)": st.session_state.long_sl,
"Long Delay (Days)": st.session_state.long_delay,
"Short Entry Threshold (%)": st.session_state.short_entry,
"Short Exit Threshold (%)": st.session_state.short_exit,
"Short Stop Loss (%)": st.session_state.short_sl,
"Short Delay (Days)": st.session_state.short_delay,
"Z_Avg_Profit": stats.get("Z_Avg_Profit", 0.0) * 100.0,
"Z_Num_Trades": stats.get("Z_Num_Trades", 0),
"Z_WL_Ratio": stats.get("Z_WL_Ratio", 0.0)
}
current_setups = st.session_state.get("user_setups_data", [])
non_empty_setups = [s for s in current_setups if not is_row_blank(s)]
default_row_template = {k:v for k,v in load_user_setups()[0].items()}
non_empty_setups.append(new_setup)
while len(non_empty_setups) < 20: non_empty_setups.append(default_row_template.copy())
final_setups = non_empty_setups[:20]
save_user_setups(final_setups)
# Refresh State
processed_setups = []
for s in final_setups:
processed_setups.append(s.copy()) # Simple copy is enough now
st.session_state["user_setups_data"] = processed_setups
st.toast("✅ Setup saved!", icon="✅")
st.session_state.run_user_advisor_setup = True
st.rerun()
except Exception as e:
st.error(f"Could not save setup: {e}")
def save_user_setups(setups_list):
"""Saves user-defined advisor setups to a JSON file."""
try:
# Ensure we only save 20
with open(USER_SETUPS_FILE, 'w', encoding='utf-8') as f:
json.dump(setups_list[:20], f, indent=4) # <-- CHANGED TO 20
st.success("User-defined setups saved!")
except Exception as e:
st.error(f"Error saving user setups: {e}")
def save_markov_setup(setup_dict):
"""Saves the best Markov setup to a JSON file."""
try:
with open(MARKOV_SETUP_FILE, 'w', encoding='utf-8') as f:
json.dump(setup_dict, f, indent=4)
st.sidebar.success("Best Markov setup saved as default!")
except Exception as e:
st.sidebar.error(f"Error saving Markov setup: {e}")
def load_markov_setup():
"""Loads the best Markov setup from a JSON file."""
if os.path.exists(MARKOV_SETUP_FILE):
try:
with open(MARKOV_SETUP_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Error loading {MARKOV_SETUP_FILE}: {e}")
return None
return None
# --- UPDATED: Helper to accept filename (Replaces the old load_completed_setups) ---
def load_completed_setups(filename):
"""Reads the existing CSV and returns a set of completed configurations."""
completed_configs = set()
if os.path.exists(filename):
try:
# We only need to read the config columns: Factors and Weights
df_completed = pd.read_csv(filename, usecols=['RSI', 'Volatility', 'TREND', 'Volume', 'MACD', 'MA Slope', 'Markov',
'RSI W', 'Volatility W', 'TREND W', 'Volume W', 'MACD W', 'MA Slope W', 'Markov W'],
dtype={'RSI': str, 'Volatility': str, 'TREND': str, 'Volume': str, 'MACD': str, 'MA Slope': str, 'Markov': str})
for index, row in df_completed.iterrows():
toggles = tuple(row[['RSI', 'Volatility', 'TREND', 'Volume', 'MACD', 'MA Slope', 'Markov']].values)
weights = tuple(row[['RSI W', 'Volatility W', 'TREND W', 'Volume W', 'MACD W', 'MA Slope W', 'Markov W']].values)
completed_configs.add((toggles, weights))
except Exception as e:
print(f"Warning: Error loading CSV for checkpointing: {e}. Starting from scratch.")
return completed_configs
# --- 1. Data Loading and Cleaning Functions ---
@st.cache_data(ttl=300) # Added TTL to auto-refresh cache every 5 mins
def load_all_data(folder_path):
if not os.path.exists(folder_path):
st.error(f"Folder '{folder_path}' not found.")
return None, None
all_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')]
all_files.sort() # Ensure we load in chronological order (e.g. 2023, then 2024)
if not all_files:
st.error("No CSV files found in the 'csv_data' folder.")
return None, None
df_list = []
error_messages = []
for file_name in all_files:
file_path = os.path.join(folder_path, file_name)
try:
# 1. Read CSV without parsing dates yet (load as strings to be safe)
df = pd.read_csv(file_path, header=0, index_col=0, encoding='utf-8')
# 2. FORCE ISO PARSING (YYYY-MM-DD)
# We disable 'dayfirst' because ISO puts Year first.
df.index = pd.to_datetime(df.index, format='%Y-%m-%d', errors='coerce')
# 3. Drop rows with invalid dates
df = df[df.index.notna()]
if df.empty:
error_messages.append(f"Warning: Skipped {file_name}, no valid dates found.")
continue
df_list.append(df)
except Exception as e:
error_messages.append(f"Could not read {file_name}. Error: {e}")
if not df_list:
return None, "No data could be loaded successfully."
# 4. CONSOLIDATE
try:
master_df = pd.concat(df_list)
# 5. DEDUPLICATE (Keep the newest version of any overlapping date)
# This prevents "Double Dots" if 2024-2025.csv and 2025-NOW.csv overlap
if master_df.index.has_duplicates:
master_df = master_df[~master_df.index.duplicated(keep='last')]
# 6. FORCE SORT (The Final "Zig-Zag" Killer)
# Ensures Jan 1st always comes before Jan 2nd
master_df.sort_index(inplace=True)
# 7. NUMERIC CONVERSION (From your original code)
# Ensures all price columns are numbers, not strings
for col in master_df.columns:
master_df[col] = pd.to_numeric(master_df[col], errors='coerce')
for msg in error_messages: st.warning(msg)
return master_df, f"Successfully combined data from {len(df_list)} files."
except Exception as e:
return None, f"Critical Error merging data: {e}"
def clean_data_and_report_outliers(df):
"""
Cleans data using a 'Rolling Median' filter (User Defined: 1 Month Window).
Adapts to long-term trends while catching sudden 'Pence vs Pound' glitches.
"""
outlier_report = []
# Identify price columns (exclude _Volume, _High, _Low)
price_columns = [col for col in df.columns if not any(x in str(col) for x in ['_Volume', '_High', '_Low'])]
for ticker in price_columns:
if ticker in df.columns:
# Ensure numeric
series = pd.to_numeric(df[ticker], errors='coerce')
# --- ROLLING MEDIAN FILTER (1 Month / 20 Days) ---
# center=True looks at 10 days before and 10 days after (if available) to find the 'true' level.
# min_periods=1 ensures it works even at the start of the file.
rolling_median = series.rolling(window=20, center=True, min_periods=1).median()
# 1. Catch the 'Pence vs Pounds' Crash (e.g., 154 -> 1.56)
# Logic: If price is < 20% of the recent median (an 80% drop).
low_threshold = rolling_median * 0.20
# 2. Catch massive data spikes (e.g., 1.56 -> 154 if logic reversed)
# Logic: If price is > 5x the recent median (500% spike).
high_threshold = rolling_median * 5.0
bad_data_mask = (series < low_threshold) | (series > high_threshold)
bad_days = series[bad_data_mask].index
if not bad_days.empty:
df.loc[bad_days, ticker] = np.nan
outlier_report.append({'Ticker': ticker, 'Type': 'Rolling Filter', 'Count': len(bad_days)})
return df, outlier_report
# --- 2. Custom Backtesting Engine ---
# --- [UPDATED] 9-Factor Version (with MFI & SuperTrend) ---
def calculate_confidence_score(df, primary_driver,
use_rsi, use_volatility, use_trend, use_volume, use_macd, use_ma_slope, use_markov,
use_mfi, use_supertrend,
rsi_w, vol_w, trend_w, vol_w_val, macd_w, ma_slope_w, markov_w,
mfi_w, supertrend_w,
bband_params,
best_markov_setup=None):
long_score = pd.Series(0.0, index=df.index)
short_score = pd.Series(0.0, index=df.index)
# --- Pre-calculate Markov State if needed ---
if use_markov and best_markov_setup and 'RunUp_State' not in df.columns:
run_up_period = best_markov_setup.get('Run-Up Period', 10)
df['RunUp_Return'] = df['Close'].pct_change(periods=run_up_period)
df['RunUp_State'] = df['RunUp_Return'].apply(lambda x: 'Up' if x > 0 else 'Down')
# --- BBand Factor ---
if primary_driver != 'Bollinger Bands' and 'bband_lower' in df.columns:
bb_weight = 1.0
long_entry_pct = bband_params.get('long_entry_threshold_pct', 0.0)
short_entry_pct = bband_params.get('short_entry_threshold_pct', 0.0)
long_bb_trigger_price = df['bband_lower'] * (1 - long_entry_pct)
short_bb_trigger_price = df['bband_upper'] * (1 + short_entry_pct)
long_score_range = (df['large_ma'] - long_bb_trigger_price).replace(0, np.nan)
short_score_range = (short_bb_trigger_price - df['large_ma']).replace(0, np.nan)
long_score += ((df['large_ma'] - df['Close']) / long_score_range).clip(0, 1).fillna(0) * bb_weight
short_score += ((df['Close'] - df['large_ma']) / short_score_range).clip(0, 1).fillna(0) * bb_weight
# Factor 1: RSI
if primary_driver != 'RSI Crossover' and use_rsi and 'RSI' in df.columns:
long_score += ((30 - df['RSI']) / 30).clip(0, 1).fillna(0) * rsi_w
short_score += ((df['RSI'] - 70) / 30).clip(0, 1).fillna(0) * rsi_w
# Factor 2: Volatility
if use_volatility and 'Volatility_p' in df.columns:
vol_signal = (df['Volatility_p'] > 0.025).astype(float) * vol_w
long_score += vol_signal; short_score += vol_signal
# Factor 3: Trend
if use_trend and 'SMA_200' in df.columns and 'Close' in df.columns:
valid_sma = (df['SMA_200'] != 0) & df['SMA_200'].notna()
pct_dist = pd.Series(0.0, index=df.index)
pct_dist.loc[valid_sma] = df.loc[valid_sma].apply(lambda row: (row['Close'] - row['SMA_200']) / row['SMA_200'] if row['SMA_200'] != 0 else 0, axis=1)
long_score += (pct_dist / 0.10).clip(0, 1).fillna(0) * trend_w
short_score += (-pct_dist / 0.10).clip(0, 1).fillna(0) * trend_w
# Factor 4: Volume
if use_volume and 'Volume_Ratio' in df.columns:
vol_spike_signal = ((df['Volume_Ratio'] - 1.75) / 2.25).clip(0, 1).fillna(0) * vol_w_val
long_score += vol_spike_signal; short_score += vol_spike_signal
# Factor 5: MACD
if primary_driver != 'MACD Crossover' and use_macd and 'MACD_line' in df.columns:
macd_cross_long = (df['MACD_line'].shift(1) < df['MACD_signal'].shift(1)) & (df['MACD_line'] >= df['MACD_signal'])
macd_cross_short = (df['MACD_line'].shift(1) > df['MACD_signal'].shift(1)) & (df['MACD_line'] <= df['MACD_signal'])
long_score += macd_cross_long.astype(float) * macd_w * 0.6
short_score += macd_cross_short.astype(float) * macd_w * 0.6
long_score += (df['MACD_hist'] > 0).astype(float) * macd_w * 0.4
short_score += (df['MACD_hist'] < 0).astype(float) * macd_w * 0.4
# Factor 6: MA Slope
if primary_driver != 'MA Slope' and use_ma_slope and 'ma_slope' in df.columns:
long_score += (df['ma_slope'] > 0).astype(float) * ma_slope_w
short_score += (df['ma_slope'] < 0).astype(float) * ma_slope_w
# Factor 7: Markov State
if primary_driver != 'Markov State' and use_markov and best_markov_setup and 'RunUp_State' in df.columns:
strategy = best_markov_setup.get('Strategy')
if strategy == 'Down -> Up': long_score += (df['RunUp_State'] == 'Down').astype(float) * markov_w
elif strategy == 'Up -> Up': long_score += (df['RunUp_State'] == 'Up').astype(float) * markov_w
elif strategy == 'Up -> Down': short_score += (df['RunUp_State'] == 'Up').astype(float) * markov_w
elif strategy == 'Down -> Down': short_score += (df['RunUp_State'] == 'Down').astype(float) * markov_w
# Factor 8: MFI (Money Flow Index) [IMPROVED]
if use_mfi and 'MFI' in df.columns:
# Binary Logic: If MFI < 20 (Oversold), give full weight.
# This fixes the "tiny score" issue.
long_score += (df['MFI'] < 25).astype(float) * mfi_w
short_score += (df['MFI'] > 75).astype(float) * mfi_w
# Factor 9: SuperTrend [IMPROVED]
if use_supertrend and 'SuperTrend' in df.columns:
# Long Score: Price > SuperTrend (Trend is Bullish)
long_score += (df['Close'] > df['SuperTrend']).astype(float) * supertrend_w
# Short Score: Price < SuperTrend (Trend is Bearish)
short_score += (df['Close'] < df['SuperTrend']).astype(float) * supertrend_w
return long_score.fillna(0), short_score.fillna(0)
# --- FULL REPLACEMENT FOR run_backtest FUNCTION ---
def run_backtest(data, params,
use_rsi, use_volatility, use_trend, use_volume, use_macd, use_ma_slope, use_markov, use_mfi, use_supertrend, # Added MFI/ST
rsi_w, vol_w, trend_w, vol_w_val, macd_w, ma_slope_w, markov_w, mfi_w, supertrend_w, # Added MFI/ST weights
use_adx_filter, adx_threshold, rsi_logic,
adx_period=14,
veto_setups_list=None,
primary_driver='Bollinger Bands',
markov_setup=None,
exit_logic_type='Standard (Price-Based)',
exit_confidence_threshold=50,
smart_trailing_stop_pct=0.05,
long_score_95_percentile=None,
short_score_95_percentile=None,
benchmark_rank=0.95,
smart_exit_atr_period=14,
smart_exit_atr_multiplier=3.0,
intelligent_tsl_pct=1.0,
analysis_start_date=None,
analysis_end_date=None,
benchmark_lookback_years=None,
use_rolling_benchmark=False):
df = data.copy()
required_cols = ['Close']
if 'High' not in df.columns: df['High'] = df['Close']
if 'Low' not in df.columns: df['Low'] = df['Close']
required_cols.extend(['High', 'Low'])
if 'Volume' in df.columns: required_cols.append('Volume')
df['Close'] = pd.to_numeric(df['Close'], errors='coerce').replace(0, np.nan)
df.dropna(subset=['Close'], inplace=True)
min_ma_period = params.get('large_ma_period', 50)
min_lookback_needed = min_ma_period
if (primary_driver == 'Markov State' or use_markov) and markov_setup is not None:
min_lookback_needed = max(min_ma_period, markov_setup.get('Run-Up Period', 10))
if len(df) < min_lookback_needed or len(df) < params.get('bband_period', 20) or len(df) < 30:
return 0, 0, 0.0, 0.0, None, ([], [], [], []), [], (0, 0, 0, 0), ([], []), (None, None, None, None), (0, 0, 0, 0, 0, 0)
# --- Indicator Calculations ---
df['large_ma'] = df['Close'].rolling(window=min_ma_period).mean().ffill()
df['ma_slope'] = df['large_ma'].diff(periods=3).ffill()
bband_period = params.get('bband_period', 20)
if len(df) >= bband_period:
try:
indicator_bb = BollingerBands(close=df['Close'], window=bband_period, window_dev=params['bband_std_dev'])
df['bband_lower'] = indicator_bb.bollinger_lband()
df['bband_upper'] = indicator_bb.bollinger_hband()
except Exception: df['bband_lower'], df['bband_upper'] = np.nan, np.nan
else: df['bband_lower'], df['bband_upper'] = np.nan, np.nan
if len(df) >= 14:
try:
indicator_rsi = RSIIndicator(close=df['Close'], window=14)
df['RSI'] = indicator_rsi.rsi()
except Exception: df['RSI'] = np.nan
df['Volatility_p'] = df['Close'].pct_change().rolling(window=14).std()
try:
indicator_adx = ADXIndicator(high=df['High'], low=df['Low'], close=df['Close'], window=adx_period, fillna=True)
df['ADX'] = indicator_adx.adx().ffill()
except Exception: df['ADX'] = np.nan
try:
atr_window = int(smart_exit_atr_period)
indicator_atr = AverageTrueRange(high=df['High'], low=df['Low'], close=df['Close'], window=atr_window, fillna=True)
df['ATR'] = indicator_atr.average_true_range()
except Exception: df['ATR'] = np.nan
else:
df['RSI'], df['Volatility_p'], df['ADX'], df['ATR'] = np.nan, np.nan, np.nan, np.nan
# --- [NEW] MFI & SUPERTREND CALCULATIONS (Fixed) ---
if len(df) >= 14:
# 1. MFI Calculation
try:
mfi_ind = MFIIndicator(high=df['High'], low=df['Low'], close=df['Close'], volume=df['Volume'], window=14, fillna=True)
df['MFI'] = mfi_ind.money_flow_index()
except Exception: df['MFI'] = 50.0
# 2. Proper Recursive SuperTrend Calculation
try:
st_period = 10
st_multiplier = 3.0
# Calculate ATR
high_low = df['High'] - df['Low']
high_close = np.abs(df['High'] - df['Close'].shift())
low_close = np.abs(df['Low'] - df['Close'].shift())
ranges = pd.concat([high_low, high_close, low_close], axis=1)
true_range = np.max(ranges, axis=1)
atr = true_range.rolling(st_period).mean().fillna(0)
# Basic Bands
hl2 = (df['High'] + df['Low']) / 2
basic_upper = hl2 + (st_multiplier * atr)
basic_lower = hl2 - (st_multiplier * atr)
# Initialize Final Bands
final_upper = basic_upper.copy()
final_lower = basic_lower.copy()
trend = np.zeros(len(df), dtype=int)
supertrend = np.zeros(len(df))
# Recursive Loop (Accurate Logic)
close = df['Close'].values
bu = basic_upper.values
bl = basic_lower.values
fu = final_upper.values
fl = final_lower.values
# 1 = Uptrend, -1 = Downtrend
curr_trend = 1
for i in range(1, len(df)):
# Calculate Final Upper Band
if bu[i] < fu[i-1] or close[i-1] > fu[i-1]:
fu[i] = bu[i]
else:
fu[i] = fu[i-1]
# Calculate Final Lower Band
if bl[i] > fl[i-1] or close[i-1] < fl[i-1]:
fl[i] = bl[i]
else:
fl[i] = fl[i-1]
# Determine Trend
if curr_trend == 1 and close[i] < fl[i]:
curr_trend = -1
elif curr_trend == -1 and close[i] > fu[i]:
curr_trend = 1
trend[i] = curr_trend
supertrend[i] = fl[i] if curr_trend == 1 else fu[i]
df['SuperTrend'] = supertrend
except Exception as e:
# print(f"ST Error: {e}") # Debug if needed
df['SuperTrend'] = np.nan
else:
df['MFI'], df['SuperTrend'] = 50.0, np.nan
df['SMA_200'] = df['Close'].rolling(window=200, min_periods=1).mean()
if 'Volume' in df.columns:
df['Volume'] = pd.to_numeric(df['Volume'], errors='coerce').fillna(0)
df['Volume_MA50'] = df['Volume'].rolling(window=50, min_periods=1).mean()
df['Volume_Ratio'] = df.apply(lambda row: row['Volume'] / row['Volume_MA50'] if row['Volume_MA50'] > 0 else 0, axis=1)
df['Volume_Ratio'] = df['Volume_Ratio'].replace([np.inf, -np.inf], 0).fillna(0)
else: df['Volume_Ratio'] = 0.0
if len(df) >= 26:
indicator_macd = MACD(close=df['Close'], window_slow=26, window_fast=12, window_sign=9, fillna=True)
df['MACD_line'] = indicator_macd.macd().ffill()
df['MACD_signal'] = indicator_macd.macd_signal().ffill()
df['MACD_hist'] = indicator_macd.macd_diff().ffill()
else: df['MACD_line'], df['MACD_signal'], df['MACD_hist'] = np.nan, np.nan, np.nan
if (primary_driver == 'Markov State' or use_markov) and markov_setup is not None:
run_up_period = markov_setup.get('Run-Up Period', 10)
df['RunUp_Return'] = df['Close'].pct_change(periods=run_up_period)
df['RunUp_State'] = df['RunUp_Return'].apply(lambda x: 'Up' if x > 0 else 'Down')
bband_params_for_score = {
'long_entry_threshold_pct': params.get('long_entry_threshold_pct', 0.0),
'short_entry_threshold_pct': params.get('short_entry_threshold_pct', 0.0)
}
raw_long_score, raw_short_score = calculate_confidence_score(df,
primary_driver,
use_rsi, use_volatility, use_trend, use_volume, use_macd, use_ma_slope, use_markov,
use_mfi, use_supertrend, # <--- NEW
rsi_w, vol_w, trend_w, vol_w_val, macd_w, ma_slope_w, markov_w,
mfi_w, supertrend_w, # <--- NEW
bband_params_for_score,
best_markov_setup=markov_setup
)
# 1. GLOBAL (STATIC) MODE - "The Crystal Ball"
# Used for "Elite Filtering" of open trades.
if not use_rolling_benchmark:
if long_score_95_percentile is None:
long_scores_gt_zero = raw_long_score[raw_long_score > 0]
val = long_scores_gt_zero.quantile(benchmark_rank) if not long_scores_gt_zero.empty else 1.0
long_95 = pd.Series(val, index=df.index)
else:
long_95 = pd.Series(long_score_95_percentile, index=df.index)
if short_score_95_percentile is None:
short_scores_gt_zero = raw_short_score[raw_short_score > 0]
val = short_scores_gt_zero.quantile(benchmark_rank) if not short_scores_gt_zero.empty else 1.0
short_95 = pd.Series(val, index=df.index)
else:
short_95 = pd.Series(short_score_95_percentile, index=df.index)
# 2. ADAPTIVE (ROLLING) MODE - "Real World"
# Used for realistic historical simulation.
else:
if long_score_95_percentile is None:
# Default to 1 year if lookback is missing
years = benchmark_lookback_years if (benchmark_lookback_years is not None and benchmark_lookback_years > 0) else 1
window_days = int(years * 252)
# Calculate Rolling Percentile
long_95 = raw_long_score.rolling(window=window_days, min_periods=50).quantile(benchmark_rank).fillna(method='bfill')
else:
long_95 = pd.Series(long_score_95_percentile, index=df.index)
if short_score_95_percentile is None:
years = benchmark_lookback_years if (benchmark_lookback_years is not None and benchmark_lookback_years > 0) else 1
window_days = int(years * 252)
short_95 = raw_short_score.rolling(window=window_days, min_periods=50).quantile(benchmark_rank).fillna(method='bfill')
else:
short_95 = pd.Series(short_score_95_percentile, index=df.index)
# Safety: Ensure we don't divide by zero
long_95 = long_95.replace(0, 1.0)
short_95 = short_95.replace(0, 1.0)
# Final Calculation (Removing 'if' check to avoid Series Ambiguity Error)
df['long_confidence_score'] = (raw_long_score / long_95 * 100).clip(0, 100).fillna(0.0)
df['short_confidence_score'] = (raw_short_score / short_95 * 100).clip(0, 100).fillna(0.0)
apply_veto = bool(veto_setups_list)
if apply_veto:
df['any_long_veto_trigger'] = False; df['any_short_veto_trigger'] = False
for veto_setup in veto_setups_list:
veto_threshold = veto_setup.get('Conf. Threshold', 0)
veto_rsi_on = veto_setup.get('RSI') == 'On'; veto_vol_on = veto_setup.get('Volatility') == 'On'
veto_trend_on = veto_setup.get('TREND') == 'On'; veto_volume_on = veto_setup.get('Volume') == 'On'
veto_macd_on = veto_setup.get('MACD', 'Off') == 'On'; veto_ma_slope_on = veto_setup.get('MA Slope', 'Off') == 'On'
veto_markov_on = False
veto_long_raw, veto_short_raw = calculate_confidence_score(df,
'Bollinger Bands',
veto_rsi_on, veto_vol_on, veto_trend_on, veto_volume_on, veto_macd_on, veto_ma_slope_on, veto_markov_on,
1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
bband_params_for_score,
best_markov_setup=markov_setup
)
veto_long_norm = (veto_long_raw / 3.0) * 100
current_long_veto_trigger = (veto_long_norm >= veto_threshold); current_short_veto_trigger = (veto_long_norm >= veto_threshold)
df['any_long_veto_trigger'] |= current_long_veto_trigger; df['any_short_veto_trigger'] |= current_long_veto_trigger
# --- Entry Trigger Logic ---
bb_long_signal = df['Close'] < (df['bband_lower'] * (1 - params['long_entry_threshold_pct']))
bb_short_signal = df['Close'] > (df['bband_upper'] * (1 + params['short_entry_threshold_pct']))
if rsi_logic == "Crossover":
rsi_long_signal = (df['RSI'].shift(1) < 30) & (df['RSI'] >= 30) & df['RSI'].notna()
rsi_short_signal = (df['RSI'].shift(1) > 70) & (df['RSI'] <= 70) & df['RSI'].notna()
else: # Level
rsi_long_signal = (df['RSI'] <= 30) & df['RSI'].notna()
rsi_short_signal = (df['RSI'] >= 70) & df['RSI'].notna()
macd_long_signal = (df['MACD_line'].shift(1) < df['MACD_signal'].shift(1)) & (df['MACD_line'] >= df['MACD_signal'])
macd_short_signal = (df['MACD_line'].shift(1) > df['MACD_signal'].shift(1)) & (df['MACD_line'] <= df['MACD_signal'])
ma_slope_long_signal = (df['ma_slope'].shift(1) <= 0) & (df['ma_slope'] > 0)
ma_slope_short_signal = (df['ma_slope'].shift(1) >= 0) & (df['ma_slope'] < 0)
if primary_driver == 'RSI Crossover':
base_long_trigger = rsi_long_signal; base_short_trigger = rsi_short_signal
elif primary_driver == 'MACD Crossover':
base_long_trigger = macd_long_signal; base_short_trigger = macd_short_signal
elif primary_driver == 'MA Slope':
base_long_trigger = ma_slope_long_signal; base_short_trigger = ma_slope_short_signal
elif primary_driver == 'Markov State' and markov_setup is not None:
strategy = markov_setup.get('Strategy')
if strategy == 'Down -> Up':
base_long_trigger = (df['RunUp_State'] == 'Down'); base_short_trigger = pd.Series(False, index=df.index)
elif strategy == 'Up -> Up':
base_long_trigger = (df['RunUp_State'] == 'Up'); base_short_trigger = pd.Series(False, index=df.index)
elif strategy == 'Up -> Down':
base_long_trigger = pd.Series(False, index=df.index); base_short_trigger = (df['RunUp_State'] == 'Up')
elif strategy == 'Down -> Down':
base_long_trigger = pd.Series(False, index=df.index); base_short_trigger = (df['RunUp_State'] == 'Down')
else:
base_long_trigger = pd.Series(False, index=df.index); base_short_trigger = pd.Series(False, index=df.index)
else:
base_long_trigger = bb_long_signal; base_short_trigger = bb_short_signal
if use_adx_filter and 'ADX' in df.columns and df['ADX'].notna().any():
adx_allows_entry = (df['ADX'] < adx_threshold).fillna(False)
else: adx_allows_entry = True
ma_is_valid = pd.notna(df['large_ma'])
# [SURGICAL PATCH START] --------------------------------------------------
# Check if ALL confidence indicators are disabled
# CORRECTED: Added use_mfi and use_supertrend to this list so the score isn't ignored when they are On.
all_indicators_off = not any([
use_rsi, use_volatility, use_trend, use_volume, use_macd, use_ma_slope, use_markov,
use_mfi, use_supertrend
])
# If indicators are OFF, we bypass the confidence check (allow raw signal)
# Otherwise, we enforce the confidence threshold as usual
long_entry_trigger = base_long_trigger & adx_allows_entry & ((df['long_confidence_score'] >= params['confidence_threshold']) | all_indicators_off) & ma_is_valid
short_entry_trigger = base_short_trigger & adx_allows_entry & ((df['short_confidence_score'] >= params['confidence_threshold']) | all_indicators_off) & ma_is_valid
# [SURGICAL PATCH END] ----------------------------------------------------
# [FIX] Filter entries by BOTH Start and End date
if analysis_start_date is not None:
start_mask = df.index >= pd.Timestamp(analysis_start_date)
long_entry_trigger &= start_mask
short_entry_trigger &= start_mask
if analysis_end_date is not None:
end_mask = df.index <= pd.Timestamp(analysis_end_date)
long_entry_trigger &= end_mask
short_entry_trigger &= end_mask
if apply_veto:
long_entry_trigger &= ~df['any_long_veto_trigger']
short_entry_trigger &= ~df['any_short_veto_trigger']
potential_long_price_exit = (df['Close'] >= (df['large_ma'] * (1 + params['long_exit_ma_threshold_pct']))) | (df['Close'] >= df['bband_upper'])
potential_short_price_exit = (df['Close'] <= (df['large_ma'] * (1 - params['short_exit_ma_threshold_pct']))) | (df['Close'] <= df['bband_lower'])
long_entry_prices = df['Close'].where(long_entry_trigger).ffill()
short_entry_prices = df['Close'].where(short_entry_trigger).ffill()
if exit_logic_type == 'Intelligent (ADX/MACD/ATR)':
adx_rising = (df['ADX'] > df['ADX'].shift(1)).fillna(False)
adx_strong = (df['ADX'] > adx_threshold)
macd_bullish = (df['MACD_line'] > df['MACD_signal'])
stay_in_long_trade = (adx_rising | adx_strong) & macd_bullish
base_long_exit_trigger = potential_long_price_exit & (~stay_in_long_trade)
macd_bearish = (df['MACD_line'] < df['MACD_signal'])
stay_in_short_trade = (adx_rising | adx_strong) & macd_bearish
base_short_exit_trigger = potential_short_price_exit & (~stay_in_short_trade)
else:
long_is_in_profit = (df['Close'] > long_entry_prices).fillna(False)
short_is_in_profit = (df['Close'] < short_entry_prices).fillna(False)
base_long_exit_trigger = potential_long_price_exit & long_is_in_profit
base_short_exit_trigger = potential_short_price_exit & short_is_in_profit
df['long_signal'] = np.nan; df.loc[long_entry_trigger, 'long_signal'] = 1; df.loc[base_long_exit_trigger, 'long_signal'] = 0
df['short_signal'] = np.nan; df.loc[short_entry_trigger, 'short_signal'] = -1; df.loc[base_short_exit_trigger, 'short_signal'] = 0
if primary_driver == 'Markov State' and markov_setup is not None:
limit_long = markov_setup.get('Future Period', 5); limit_short = markov_setup.get('Future Period', 5)
else:
limit_long = params.get('max_long_duration', 120); limit_short = params.get('max_short_duration', 120)
temp_long_pos = df['long_signal'].ffill().fillna(0)
temp_short_pos = df['short_signal'].ffill().fillna(0)
long_groups = (~(temp_long_pos == 1)).cumsum(); df['days_in_long_trade'] = df.groupby(long_groups).cumcount(); df.loc[temp_long_pos == 0, 'days_in_long_trade'] = 0
short_groups = (~(temp_short_pos == -1)).cumsum(); df['days_in_short_trade'] = df.groupby(short_groups).cumcount(); df.loc[temp_short_pos == 0, 'days_in_short_trade'] = 0
long_time_exit_trigger = (df['days_in_long_trade'] > limit_long) & (temp_long_pos == 1)
short_time_exit_trigger = (df['days_in_short_trade'] > limit_short) & (temp_short_pos == -1)
df.loc[long_time_exit_trigger, 'long_signal'] = 0; df.loc[short_time_exit_trigger, 'short_signal'] = 0
df['long_entry_price_static'] = df['Close'].where(df['long_signal'].shift(1) == 0).ffill().bfill()
df['short_entry_price_static'] = df['Close'].where(df['short_signal'].shift(1) == 0).ffill().bfill()
# [SURGICAL UPDATE START] --- Catcher Offset Logic ---
# We retrieve the offset percentage from params (default to 0.0 if missing)
catcher_offset = params.get('catcher_stop_pct', 0.0)
# Calculate the Adjusted Floor/Ceiling
# Positive Offset = Higher Floor (Profit for Long, Profit for Short)
# Negative Offset = Lower Floor (Loss allowance for Long, Loss allowance for Short)
long_breakeven_floor = df['long_entry_price_static'] * (1 + catcher_offset)
short_breakeven_floor = df['short_entry_price_static'] * (1 - catcher_offset)
# [SURGICAL UPDATE END] -----------------------------
use_ma_floor_filter = params.get('use_ma_floor_filter', False)
standard_tsl_pct_long = params.get('long_trailing_stop_loss_pct', 0)
standard_tsl_pct_short = params.get('short_trailing_stop_loss_pct', 0)
intelligent_tsl_pct_long = intelligent_tsl_pct if exit_logic_type == 'Intelligent (ADX/MACD/ATR)' else 0.0
intelligent_tsl_pct_short = intelligent_tsl_pct if exit_logic_type == 'Intelligent (ADX/MACD/ATR)' else 0.0
long_tsl_exit = pd.Series(False, index=df.index); short_tsl_exit = pd.Series(False, index=df.index)
if primary_driver != 'Markov State':
# --- LONG TSL ---
has_hit_target = potential_long_price_exit
# Determine which TSL percentage to use (Standard vs Intelligent)
tsl_to_use_long = np.where(has_hit_target, intelligent_tsl_pct_long, standard_tsl_pct_long) if use_ma_floor_filter else (intelligent_tsl_pct_long if exit_logic_type == 'Intelligent (ADX/MACD/ATR)' else standard_tsl_pct_long)
if not isinstance(tsl_to_use_long, pd.Series): tsl_to_use_long = pd.Series(tsl_to_use_long, index=df.index)
# Only run calculation if TSL is active (> 0)
if (tsl_to_use_long > 0).any():
in_long_trade = (df['long_signal'].ffill().fillna(0) == 1)
long_high_water_mark = df['High'].where(in_long_trade).groupby((~in_long_trade).cumsum()).cummax()
tsl_from_hwm = long_high_water_mark * (1 - tsl_to_use_long)
# [FIX] Apply Catcher floor ONLY if target has been hit (Persistent during trade)
if use_ma_floor_filter:
trade_groups = (~in_long_trade).cumsum()
# [FIX] Cast to float before cummax to avoid Object dtype error
target_hit_persistent = has_hit_target.where(in_long_trade).astype(float).groupby(trade_groups).cummax().fillna(0).astype(bool)
long_tsl_price = np.where(target_hit_persistent, np.maximum(tsl_from_hwm, long_breakeven_floor), tsl_from_hwm)
else:
long_tsl_price = tsl_from_hwm
long_tsl_exit = in_long_trade & (df['Close'] < long_tsl_price)
df.loc[long_tsl_exit, 'long_signal'] = 0
# --- SHORT TSL ---
has_hit_target = potential_short_price_exit
# Determine which TSL percentage to use (Standard vs Intelligent)
tsl_to_use_short = np.where(has_hit_target, intelligent_tsl_pct_short, standard_tsl_pct_short) if use_ma_floor_filter else (intelligent_tsl_pct_short if exit_logic_type == 'Intelligent (ADX/MACD/ATR)' else standard_tsl_pct_short)
if not isinstance(tsl_to_use_short, pd.Series): tsl_to_use_short = pd.Series(tsl_to_use_short, index=df.index)
# Only run calculation if TSL is active (> 0)
if (tsl_to_use_short > 0).any():
in_short_trade = (df['short_signal'].ffill().fillna(0) == -1)
short_low_water_mark = df['Low'].where(in_short_trade).groupby((~in_short_trade).cumsum()).cummin()
tsl_from_lwm = short_low_water_mark * (1 + tsl_to_use_short)
# [FIX] Apply Catcher floor ONLY if target has been hit (Persistent during trade)
if use_ma_floor_filter:
trade_groups = (~in_short_trade).cumsum()
# [FIX] Cast to float before cummax to avoid Object dtype error
target_hit_persistent = has_hit_target.where(in_short_trade).astype(float).groupby(trade_groups).cummax().fillna(0).astype(bool)
short_tsl_price = np.where(target_hit_persistent, np.minimum(tsl_from_lwm, short_breakeven_floor), tsl_from_lwm)
else:
short_tsl_price = tsl_from_lwm
short_tsl_exit = in_short_trade & (df['Close'] > short_tsl_price)
df.loc[short_tsl_exit, 'short_signal'] = 0
df['long_position'] = df['long_signal'].ffill().fillna(0)
df['short_position'] = df['short_signal'].ffill().fillna(0)
if params['long_delay_days'] > 0: df['long_position'] = df['long_position'].shift(params['long_delay_days']).fillna(0)
if params['short_delay_days'] > 0: df['short_position'] = df['short_position'].shift(params['short_delay_days']).fillna(0)
df['daily_return'] = df['Close'].pct_change()
df['long_strategy_return'] = df['long_position'].shift(1) * df['daily_return']
df['short_strategy_return'] = df['short_position'].shift(1) * df['daily_return']
final_long_pnl = (1 + df['long_strategy_return'].fillna(0)).prod(skipna=True) - 1
final_short_pnl = (1 + df['short_strategy_return'].fillna(0)).prod(skipna=True) - 1
long_entries = df[(df['long_position'] == 1) & (df['long_position'].shift(1) == 0)]
long_exits = df[(df['long_position'] == 0) & (df['long_position'].shift(1) == 1)]
short_entries = df[(df['short_position'] == -1) & (df['short_position'].shift(1) == 0)]
short_exits = df[(df['short_position'] == 0) & (df['short_position'].shift(1) == -1)]
if not df.empty:
end_date = df.index.max();
else: end_date = pd.NaT;
# --- CHANGED: COLLECT ALL HISTORY (Removed "30 day" filter) ---
all_historical_trades = []
long_trade_profits, long_durations, first_long_entry_date, last_long_exit_date = [], [], None, None
short_trade_profits, short_durations, first_short_entry_date, last_short_exit_date = [], [], None, None
long_profit_take_count, long_tsl_count, long_time_exit_count = 0, 0, 0
short_profit_take_count, short_tsl_count, short_time_exit_count = 0, 0, 0
df_indices = pd.Series(range(len(df)), index=df.index)
for idx, row in long_entries.iterrows():
if first_long_entry_date is None: first_long_entry_date = idx
future_exits = long_exits[long_exits.index > idx]
if not future_exits.empty:
exit_row = future_exits.iloc[0]; last_long_exit_date = exit_row.name
exit_date = exit_row.name
is_tsl = long_tsl_exit.loc[exit_row.name]; is_time = long_time_exit_trigger.loc[exit_row.name]
if is_tsl: long_tsl_count += 1
elif is_time: long_time_exit_count += 1
else: long_profit_take_count += 1
profit = (exit_row['Close'] / row['Close']) - 1 if pd.notna(exit_row['Close']) and pd.notna(row['Close']) and row['Close'] != 0 else np.nan
long_trade_profits.append(profit)
# APPEND ALL TRADES (No Date Filter)
all_historical_trades.append({'Side': 'Long', 'Date Open': idx, 'Date Closed': exit_date, 'Start Confidence': row.get('long_confidence_score', np.nan), 'Final % P/L': profit, 'Status': 'Closed', 'Exit Reason': 'TSL' if is_tsl else ('Time' if is_time else 'Profit')})
try: long_durations.append(df_indices.loc[exit_row.name] - df_indices.loc[idx])
except KeyError: long_durations.append(np.nan)
avg_long_profit_per_trade = np.nanmean(long_trade_profits) if long_trade_profits else 0.0
for idx, row in short_entries.iterrows():
if first_short_entry_date is None: first_short_entry_date = idx
future_exits = short_exits[short_exits.index > idx]
if not future_exits.empty:
exit_row = future_exits.iloc[0]; last_short_exit_date = exit_row.name
exit_date = exit_row.name
is_tsl = short_tsl_exit.loc[exit_row.name]; is_time = short_time_exit_trigger.loc[exit_row.name]
if is_tsl: short_tsl_count += 1
elif is_time: short_time_exit_count += 1
else: short_profit_take_count += 1
profit = ((exit_row['Close'] / row['Close']) - 1) * -1 if pd.notna(exit_row['Close']) and pd.notna(row['Close']) and row['Close'] != 0 else np.nan
short_trade_profits.append(profit)
# APPEND ALL TRADES (No Date Filter)
all_historical_trades.append({'Side': 'Short', 'Date Open': idx, 'Date Closed': exit_date, 'Start Confidence': row.get('short_confidence_score', np.nan), 'Final % P/L': profit, 'Status': 'Closed', 'Exit Reason': 'TSL' if is_tsl else ('Time' if is_time else 'Profit')})
try: short_durations.append(df_indices.loc[exit_row.name] - df_indices.loc[idx])
except KeyError: short_durations.append(np.nan)
avg_short_profit_per_trade = np.nanmean(short_trade_profits) if short_trade_profits else 0.0
long_wins = sum(1 for p in long_trade_profits if pd.notna(p) and p > 0); long_losses = sum(1 for p in long_trade_profits if pd.notna(p) and p < 0)
short_wins = sum(1 for p in short_trade_profits if pd.notna(p) and p > 0); short_losses = sum(1 for p in short_trade_profits if pd.notna(p) and p < 0)
long_trades_log = [{'date': idx, 'price': row['Close'], 'confidence': row.get('long_confidence_score', np.nan)} for idx, row in long_entries.iterrows()]
short_trades_log = [{'date': idx, 'price': row['Close'], 'confidence': row.get('short_confidence_score', np.nan)} for idx, row in short_entries.iterrows()]
# --- ROBUST OPEN TRADE CAPTURE ---
open_trades = []
if not df.empty and 'Close' in df.columns:
last_close = df['Close'].iloc[-1]
# 1. Check Long
if df['long_position'].iloc[-1] == 1:
if not long_entries.empty:
last_entry_time = long_entries.index[-1]
last_entry = long_entries.loc[last_entry_time]
entry_price = last_entry['Close']
entry_conf = last_entry.get('long_confidence_score', np.nan)
else:
last_entry_time = pd.NaT
entry_price = df['long_entry_price_static'].iloc[-1]
entry_conf = np.nan
if pd.notna(last_close) and pd.notna(entry_price) and entry_price != 0:
pnl = (last_close / entry_price) - 1
open_trades.append({
'Side': 'Long',
'Date Open': last_entry_time,
'Date Closed': pd.NaT,
'Start Confidence': entry_conf,
'Final % P/L': pnl,
'Status': 'Open',
'Exit Reason': 'N/A (Open)'
})
# 2. Check Short
if df['short_position'].iloc[-1] == -1:
if not short_entries.empty:
last_entry_time = short_entries.index[-1]
last_entry = short_entries.loc[last_entry_time]
entry_price = last_entry['Close']
entry_conf = last_entry.get('short_confidence_score', np.nan)
else:
last_entry_time = pd.NaT
entry_price = df['short_entry_price_static'].iloc[-1]
entry_conf = np.nan
if pd.notna(last_close) and pd.notna(entry_price) and entry_price != 0:
pnl = ((last_close / entry_price) - 1) * -1
open_trades.append({
'Side': 'Short',
'Date Open': last_entry_time,
'Date Closed': pd.NaT,
'Start Confidence': entry_conf,
'Final % P/L': pnl,
'Status': 'Open',
'Exit Reason': 'N/A (Open)'
})
# ---------------------------------
# Combine: [All Historical Closed] + [Current Open]
open_trades.extend(all_historical_trades)
df.sort_index(inplace=True)
trade_dates = (first_long_entry_date, last_long_exit_date, first_short_entry_date, last_short_exit_date)
long_durations = [d for d in long_durations if pd.notna(d)]; short_durations = [d for d in short_durations if pd.notna(d)]
avg_long_profit = float(avg_long_profit_per_trade) if pd.notna(avg_long_profit_per_trade) else 0.0
avg_short_profit = float(avg_short_profit_per_trade) if pd.notna(avg_short_profit_per_trade) else 0.0
final_long_pnl_float = float(final_long_pnl) if pd.notna(final_long_pnl) else 0.0
final_short_pnl_float = float(final_short_pnl) if pd.notna(final_short_pnl) else 0.0
final_trade_logs = (long_trades_log, long_exits.index, short_trades_log, short_exits.index)
exit_breakdown = (long_profit_take_count, long_tsl_count, long_time_exit_count, short_profit_take_count, short_tsl_count, short_time_exit_count)
return final_long_pnl_float, final_short_pnl_float, avg_long_profit, avg_short_profit, df, final_trade_logs, open_trades, (long_wins, long_losses, short_wins, short_losses), (long_durations, short_durations), trade_dates, exit_breakdown
# --- 3. Charting and Display Functions ---
def generate_long_plot(df, trades, ticker):
fig = go.Figure()
# Add Price and MA Lines
fig.add_trace(go.Scatter(x=df.index, y=df['Close'], mode='lines', name='Close Price', line=dict(color='blue')))
if 'large_ma' in df.columns:
fig.add_trace(go.Scatter(x=df.index, y=df['large_ma'], mode='lines', name='Large MA', line=dict(color='orange', dash='dash')))
# Add Bollinger Bands
if 'bband_upper' in df.columns and 'bband_lower' in df.columns:
fig.add_trace(go.Scatter(x=df.index, y=df['bband_upper'], mode='lines', name='Upper Band', line=dict(color='gray', width=0.5)))
fig.add_trace(go.Scatter(x=df.index, y=df['bband_lower'], mode='lines', name='Lower Band', line=dict(color='gray', width=0.5), fill='tonexty', fillcolor='rgba(211,211,211,0.2)'))
# Unpack Trades Tuple: (Long Entries Log, Long Exits Index, Short Entries Log, Short Exits Index)
long_entries_log, long_exits, _, _ = trades
# 1. Plot Long Entries
if long_entries_log:
dates = [t['date'] for t in long_entries_log]
prices = [t['price'] for t in long_entries_log]
scores = [f"Confidence: {t['confidence']:.0f}%" for t in long_entries_log]
# Filter out entries that fall outside the dataframe's date range (just in case)
valid_points = [(d, p, s) for d, p, s in zip(dates, prices, scores) if d in df.index]
if valid_points:
v_dates, v_prices, v_scores = zip(*valid_points)
fig.add_trace(go.Scatter(x=v_dates, y=v_prices, mode='markers', name='Long Entry',
marker=dict(color='green', symbol='triangle-up', size=12),
text=v_scores, hoverinfo='text'))
# 2. Plot Long Exits
# Ensure long_exits is not empty and contains valid dates found in df
if not long_exits.empty:
valid_exits = [date for date in long_exits if date in df.index]
if valid_exits:
exit_prices = df.loc[valid_exits, 'Close']
fig.add_trace(go.Scatter(x=exit_prices.index, y=exit_prices, mode='markers', name='Long Exit',
marker=dict(color='darkgreen', symbol='x', size=8)))
fig.update_layout(title=f'Long Trades for {ticker}', xaxis_title='Date', yaxis_title='Price', legend_title="Indicator")
return fig
def generate_short_plot(df, trades, ticker):
fig = go.Figure()
# Add Price and MA Lines
fig.add_trace(go.Scatter(x=df.index, y=df['Close'], mode='lines', name='Close Price', line=dict(color='blue')))
if 'large_ma' in df.columns:
fig.add_trace(go.Scatter(x=df.index, y=df['large_ma'], mode='lines', name='Large MA', line=dict(color='orange', dash='dash')))
# Add Bollinger Bands
if 'bband_upper' in df.columns and 'bband_lower' in df.columns:
fig.add_trace(go.Scatter(x=df.index, y=df['bband_upper'], mode='lines', name='Upper Band', line=dict(color='gray', width=0.5)))
fig.add_trace(go.Scatter(x=df.index, y=df['bband_lower'], mode='lines', name='Lower Band', line=dict(color='gray', width=0.5), fill='tonexty', fillcolor='rgba(211,211,211,0.2)'))
# Add Trades
_, _, short_entries_log, short_exits = trades
if short_entries_log:
dates = [t['date'] for t in short_entries_log]
prices = [t['price'] for t in short_entries_log]
scores = [f"Confidence: {t['confidence']:.0f}%" for t in short_entries_log]
fig.add_trace(go.Scatter(x=dates, y=prices, mode='markers', name='Short Entry', marker=dict(color='red', symbol='triangle-down', size=12), text=scores, hoverinfo='text'))
if not short_exits.empty and 'Close' in df.columns: # Check if Close exists
exit_prices = df.loc[short_exits,'Close'].dropna() # Drop exits where price might be NaN
fig.add_trace(go.Scatter(x=exit_prices.index, y=exit_prices, mode='markers', name='Short Exit', marker=dict(color='darkred', symbol='x', size=8)))
fig.update_layout(title=f'Short Trades for {ticker}', xaxis_title='Date', yaxis_title='Price', legend_title="Indicator"); return fig
def normalise_strategy_score(raw_score, benchmark_for_100_percent=0.25):
if not np.isfinite(raw_score) or raw_score <= 0: return 0.0 # Handle NaN/inf
return min((raw_score / benchmark_for_100_percent) * 100, 100.0)
def calculate_strategy_score(avg_profit, gb_ratio, total_trades):
"""
Calculates a Weighted Strategy Score with a 'Quality Gate'.
1. Weights: Profit (42.5%), Win/Loss (42.5%), Trades (15%).
2. Targets: Profit 3.0%, Win/Loss 5.0, Trades 3000.
3. QUALITY GATE: If Win/Loss Ratio < 2.0, Profit Score is CAPPED at 1.0.
(You cannot get 'extra credit' for high profit if the structure is risky).
"""
# 1. Define Targets
target_profit = 0.03 # 3.0% per trade
target_gb = 5.0 # 5.0 Win/Loss Ratio
target_trades = 3000.0 # Perfect trade count
# 2. Define Weights (Sum = 1.0)
w_profit = 0.425
w_gb = 0.425
w_trades = 0.15
# 3. Calculate Scores
# Profit: Uncapped Base
s_profit = avg_profit / target_profit
if s_profit < 0: s_profit = 0
# G/B Ratio: Uncapped Base
s_gb = gb_ratio / target_gb
if s_gb < 0: s_gb = 0
# --- QUALITY GATE ---
# If the strategy has a poor Win/Loss ratio (< 2.0),
# we cap the Profit score at 1.0 (100%).
# This prevents high volatility/lucky profit from masking a bad ratio.
if gb_ratio < 2.0:
s_profit = min(s_profit, 1.0)
# Trade Count: Pyramid Penalty
dist = abs(total_trades - target_trades)
s_trades = 1.0 - (dist / target_trades)
if s_trades < 0: s_trades = 0
# 4. Final Weighted Score
final_score = (s_profit * w_profit) + (s_gb * w_gb) + (s_trades * w_trades)
return final_score * 100
def display_summary_analytics(summary_df):
st.subheader("Overall Strategy Performance")
trade_counts = st.session_state.get('trade_counts', {})
trade_durations = st.session_state.get('trade_durations', {})
exit_totals = st.session_state.get('exit_breakdown_totals', {})
# --- Clear last run stats before calculating ---
st.session_state.last_run_stats = {}
col1, col2 = st.columns(2)
for side in ["Long", "Short"]:
# Ensure summary_df exists and has the required columns
req_cols = [f'Num {side} Trades', f'Avg {side} Profit per Trade', f'Cumulative {side} P&L', f'Avg {side} Confidence']
if summary_df is None or not all(col in summary_df.columns for col in req_cols):
st.warning(f"Summary data for {side} trades is missing or incomplete.")
continue
active_trades_df = summary_df[summary_df[f'Num {side} Trades'] > 0]
container = col1 if side == "Long" else col2
with container:
st.subheader(f"{side} Trades")
if not active_trades_df.empty:
total_trades = active_trades_df[f'Num {side} Trades'].sum()
avg_trade_profit = (active_trades_df[f'Avg {side} Profit per Trade'].fillna(0) * active_trades_df[f'Num {side} Trades']).sum() / total_trades if total_trades > 0 else 0
avg_cumulative_profit = active_trades_df[f'Cumulative {side} P&L'].mean()
avg_confidence = active_trades_df[f'Avg {side} Confidence'].mean()
if pd.isna(avg_confidence): avg_confidence = 0
good_tickers = (active_trades_df[f'Cumulative {side} P&L'] > 0).sum()
bad_tickers = (active_trades_df[f'Cumulative {side} P&L'] < 0).sum()
ticker_good_bad_ratio = good_tickers / bad_tickers if bad_tickers > 0 else 99999.0
display_score = calculate_strategy_score(avg_trade_profit, ticker_good_bad_ratio, total_trades)
st.metric("Strategy Score", f"{display_score:.2f}%")
st.metric("Avg Profit per Trade (Active Tickers)", f"{avg_trade_profit:.2%}")
net_return_pct = avg_trade_profit * total_trades * 100
st.metric("Moneypile Score", f"{net_return_pct:.1f}", help="The total 'Pile of Money'. Sum of all trade percentages (Winners - Losers).")
st.text(f"Profitable Tickers: {good_tickers}")
st.text(f"Losing Tickers: {bad_tickers}")
st.text(f"Ticker Good/Bad Ratio: {ticker_good_bad_ratio:.2f}")
side_lower = side.lower()
wins = trade_counts.get(f"{side_lower}_wins", 0)
losses = trade_counts.get(f"{side_lower}_losses", 0)
# --- NEW: Calculate Open Trades explicitly ---
open_trades_count = int(total_trades) - (wins + losses)
# --- TRADING STATS ---
st.markdown("---")
st.text(f"Total Individual Trades: {int(total_trades)}")
st.text(f"Winning Trades: {wins}")
st.text(f"Losing Trades: {losses}")
st.text(f"Open Trades: {open_trades_count}")
trade_win_loss_ratio = 0.0
if wins > 0 or losses > 0:
trade_win_loss_ratio = wins / losses if losses > 0 else 99999.0
st.text(f"Trade Win/Loss Ratio: {trade_win_loss_ratio:.2f}")
if side == "Long":
st.session_state.last_run_stats = {
"Z_Avg_Profit": avg_trade_profit,
"Z_Num_Trades": int(total_trades),
"Z_WL_Ratio": trade_win_loss_ratio
}
# --- EXIT BREAKDOWN ---
if exit_totals:
st.markdown("---")
st.subheader("Exit Breakdown")
if side == "Long":
profit_count = exit_totals.get('long_profit_take_count', 0)
tsl_count = exit_totals.get('long_tsl_count', 0)
time_count = exit_totals.get('long_time_exit_count', 0)
# Get Long Timeout Setting
limit_days = st.session_state.get('max_long_duration', 60)
else:
profit_count = exit_totals.get('short_profit_take_count', 0)
tsl_count = exit_totals.get('short_tsl_count', 0)
time_count = exit_totals.get('short_time_exit_count', 0)
# Get Short Timeout Setting
limit_days = st.session_state.get('max_short_duration', 10)
exit_total = profit_count + tsl_count + time_count
if exit_total > 0:
st.markdown(f"**Profit Take:** {profit_count} ({profit_count/exit_total:.1%})")
st.markdown(f"**Stop Loss:** {tsl_count} ({tsl_count/exit_total:.1%})")
# Dynamic Label
st.markdown(f"**Time Out ({limit_days}d):** {time_count} ({time_count/exit_total:.1%})")
# --- DURATION STATS ---
avg_duration = trade_durations.get(f"avg_{side_lower}_duration", 0)
max_duration = trade_durations.get(f"max_{side_lower}_duration", 0)
if avg_duration > 0 or max_duration > 0:
st.text(f"Avg Trade Duration: {avg_duration:.1f} days")
st.text(f"Longest Trade: {max_duration:.0f} days")
else: st.info("No trades found for this side with current settings.")
def generate_profit_distribution_chart(summary_df):
"""
Creates two histograms showing the distribution of Average Profit per Trade
for Long and Short strategies.
[UPDATED] Now weighted by 'Num Trades' so the Y-axis shows Total Trades,
not just Total Tickers.
"""
if summary_df is None or summary_df.empty: return None
fig = go.Figure()
# 1. Long Distribution
if 'Avg Long Profit per Trade' in summary_df.columns and 'Num Long Trades' in summary_df.columns:
# Filter out tickers that didn't trade
mask = summary_df['Num Long Trades'] > 0
long_data = summary_df.loc[mask, 'Avg Long Profit per Trade']
long_weights = summary_df.loc[mask, 'Num Long Trades'] # Use trade count as weight
if not long_data.empty:
fig.add_trace(go.Histogram(
x=long_data,
y=long_weights, # [FIX] Weight by number of trades
histfunc='sum', # [FIX] Sum the weights to get Total Trades per bin
name='Long Strategy',
marker_color='green',
opacity=0.7,
nbinsx=50,
histnorm=''
))
# 2. Short Distribution
if 'Avg Short Profit per Trade' in summary_df.columns and 'Num Short Trades' in summary_df.columns:
mask = summary_df['Num Short Trades'] > 0
short_data = summary_df.loc[mask, 'Avg Short Profit per Trade']
short_weights = summary_df.loc[mask, 'Num Short Trades'] # Use trade count as weight
if not short_data.empty:
fig.add_trace(go.Histogram(
x=short_data,
y=short_weights, # [FIX] Weight by number of trades
histfunc='sum', # [FIX] Sum the weights
name='Short Strategy',
marker_color='red',
opacity=0.7,
nbinsx=50,
visible='legendonly'
))
# 3. Add Zero Line
fig.add_vline(x=0, line_width=2, line_dash="dash", line_color="black", annotation_text="Break Even")
# 4. Layout
fig.update_layout(
title="Distribution of Average Profit per Trade (Weighted by Trade Count)",
xaxis_title="Average Profit per Trade (decimal, e.g. 0.01 = 1%)",
yaxis_title="Number of Trades", # [FIX] Updated Label
barmode='overlay',
bargap=0.1,
template="plotly_white",
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
height=400
)
# Format X axis as percentage
fig.update_xaxes(tickformat=".1%")
return fig
# --- 4. Optimisation Functions (Parallelised) ---
# --- FULL REPLACEMENT WORKER: ACCEPTS A SINGLE DICTIONARY ARGUMENT ---
def run_single_parameter_test(task_data):
# Unpack all arguments from the single dictionary provided by the multiprocessing pool
params = task_data['params']
confidence_settings = task_data['confidence_settings']
master_df = task_data['master_df']
optimise_for = task_data['optimise_for']
tickers = task_data['tickers']
date_range = task_data['date_range']
power = task_data['power']
# --- FIX: Correctly unpack ALL 7 items from the list ---
toggles_list = list(confidence_settings['toggles'])
weights_list = list(confidence_settings['weights'])
# Unpack all 7 factors (matches the order sent by generate_and_run_optimisation)
use_rsi, use_vol, use_trend, use_volume, use_macd, use_ma_slope, use_markov = toggles_list
rsi_w, vol_w, trend_w, volume_w, macd_w, ma_slope_w, markov_w = weights_list
# --- END FIX ---
# --- Rest of Unpack ---
use_adx_filter, adx_threshold, adx_period = confidence_settings['adx_settings']
rsi_logic = confidence_settings['rsi_logic']
primary_driver = confidence_settings['primary_driver']
markov_setup = confidence_settings['markov_setup']
exit_logic = confidence_settings['exit_logic']
exit_thresh = confidence_settings['exit_thresh']
smart_trailing_stop = confidence_settings['smart_trailing_stop']
smart_exit_atr_p = confidence_settings['smart_exit_atr_period']
smart_exit_atr_m = confidence_settings['smart_exit_atr_multiplier']
intelligent_tsl_pct = confidence_settings['intelligent_tsl_pct']
long_95_percentile = confidence_settings['long_95_percentile']
short_95_percentile = confidence_settings['short_95_percentile']
veto_list = confidence_settings['veto_list']
# --- End Unpack ---
total_profit_weighted_avg, total_trades, winning_tickers, losing_tickers = 0, 0, 0, 0
total_wins, total_losses = 0, 0
all_confidences = []
PROFIT_THRESHOLD = 1.0
excluded_tickers = []
# Initialize exit breakdown counters (Long and Short)
total_exit_breakdown = [0, 0, 0, 0, 0, 0] # LP, LT, LE, SP, ST, SE
if not isinstance(tickers, list): tickers = [tickers]
for ticker in tickers:
cols_to_use = [ticker]
if f'{ticker}_High' in master_df.columns: cols_to_use.append(f'{ticker}_High')
if f'{ticker}_Low' in master_df.columns: cols_to_use.append(f'{ticker}_Low')
if f'{ticker}_Volume' in master_df.columns: cols_to_use.append(f'{ticker}_Volume')
existing_cols = [col for col in cols_to_use if col in master_df.columns]
if ticker not in existing_cols: continue
ticker_data_full = master_df.loc[:, existing_cols]
ticker_data = ticker_data_full.loc[date_range[0]:date_range[1]]
rename_dict = {
ticker: 'Close', f'{ticker}_High': 'High',
f'{ticker}_Low': 'Low', f'{ticker}_Volume': 'Volume'
}
rename_dict_filtered = {k: v for k, v in rename_dict.items() if k in existing_cols}
ticker_data = ticker_data.rename(columns=rename_dict_filtered)
if not ticker_data.empty and 'Close' in ticker_data.columns and not ticker_data['Close'].isna().all():
# --- CAPTURE FIX: run_backtest now returns exit_breakdown (11th return value) ---
long_pnl, short_pnl, avg_long_trade, avg_short_trade, _, trades, _, trade_counts, _, _, exit_breakdown = run_backtest(
ticker_data, params,
use_rsi, use_vol, use_trend, use_volume, use_macd, use_ma_slope, use_markov,
rsi_w, vol_w, trend_w, volume_w, macd_w, ma_slope_w, markov_w,
use_adx_filter, adx_threshold, rsi_logic,
adx_period,
veto_setups_list=veto_list,
primary_driver=primary_driver,
markov_setup=markov_setup,
exit_logic_type=exit_logic,
exit_confidence_threshold=exit_thresh,
smart_trailing_stop_pct=smart_trailing_stop,
smart_exit_atr_period=smart_exit_atr_p,
smart_exit_atr_multiplier=smart_exit_atr_m,
intelligent_tsl_pct=intelligent_tsl_pct,
long_score_95_percentile=long_95_percentile,
short_score_95_percentile=short_95_percentile
)
if abs(long_pnl) > PROFIT_THRESHOLD or abs(short_pnl) > PROFIT_THRESHOLD or \
(avg_long_trade is not None and pd.notna(avg_long_trade) and abs(avg_long_trade) > PROFIT_THRESHOLD) or \
(avg_short_trade is not None and pd.notna(avg_short_trade) and abs(avg_short_trade) > PROFIT_THRESHOLD):
excluded_tickers.append(ticker)
continue
# Aggregate exit breakdown counters
total_exit_breakdown = [sum(x) for x in zip(total_exit_breakdown, exit_breakdown)]
if optimise_for == 'long':
pnl, avg_trade_profit, trade_log = long_pnl, avg_long_trade, trades[0]
total_wins += trade_counts[0]; total_losses += trade_counts[1]
else:
pnl, avg_trade_profit, trade_log = short_pnl, avg_short_trade, trades[2]
total_wins += trade_counts[2]; total_losses += trade_counts[3]
num_trades = len(trade_log)
if num_trades > 0 and avg_trade_profit is not None and pd.notna(avg_trade_profit):
total_trades += num_trades
total_profit_weighted_avg += avg_trade_profit * num_trades
if pnl > 0: winning_tickers += 1
elif pnl < 0: losing_tickers += 1
all_confidences.extend([trade['confidence'] for trade in trade_log if pd.notna(trade.get('confidence'))])
overall_avg_profit = 0.0
good_bad_ratio = 0.0
trade_good_bad_ratio = 0.0
if total_trades > 0:
overall_avg_profit = total_profit_weighted_avg / total_trades
if losing_tickers > 0:
good_bad_ratio = winning_tickers / losing_tickers
elif winning_tickers > 0:
good_bad_ratio = 99999.0
if total_losses > 0:
trade_good_bad_ratio = total_wins / total_losses
elif total_wins > 0:
trade_good_bad_ratio = 99999.0
avg_entry_confidence = np.mean(all_confidences) if all_confidences else 0.0
# Return a single dictionary containing all calculated metrics and the configuration data
return {
# CORE METRICS
"Avg Profit/Trade": overall_avg_profit,
"Ticker G/B Ratio": good_bad_ratio,
"Trade G/B Ratio": trade_good_bad_ratio,
"Total Trades": total_trades,
"Net % Return": total_profit_weighted_avg * 100,
"Avg Entry Conf.": avg_entry_confidence,
"Winning Tickers": winning_tickers,
"Losing Tickers": losing_tickers,
"Exit Breakdown": total_exit_breakdown, # NEW: Return the aggregated breakdown
# CONFIG DATA (for Orchestrator unpacking)
"params": params,
"confidence_settings": confidence_settings
}
# --- UPDATED: Calculation Only (Saves to Session State) ---
def generate_and_run_optimisation(master_df, main_content_placeholder, optimise_for, use_squared_weighting):
# Clear previous results to avoid confusion
st.session_state.param_results_df = None
st.session_state.best_params = None
# Clear other sections
st.session_state.summary_df = None
st.session_state.single_ticker_results = None
st.session_state.confidence_results_df = None
st.session_state.open_trades_df = None
st.session_state.advisor_df = None
with main_content_placeholder.container():
veto_list_to_use = st.session_state.get('veto_setup_list', [])
if veto_list_to_use:
st.info(f"{len(veto_list_to_use)} Veto filter(s) are ACTIVE for this optimisation run.")
st.info("Calibrating confidence scores (0-100%)...")
# --- 1. Gather Settings & Run Calibration (Same as before) ---
use_rsi = st.session_state.use_rsi; use_vol = st.session_state.use_vol; use_trend = st.session_state.use_trend
use_volume = st.session_state.use_volume; use_macd = st.session_state.use_macd
use_ma_slope = st.session_state.use_ma_slope; use_markov = st.session_state.use_markov
rsi_w = st.session_state.rsi_w; vol_w = st.session_state.vol_w; trend_w = st.session_state.trend_w
vol_w_val = st.session_state.volume_w; macd_w = st.session_state.macd_w
ma_slope_w = st.session_state.ma_slope_w; markov_w = st.session_state.markov_w
calibration_params = {
"large_ma_period": st.session_state.ma_period, "bband_period": st.session_state.bb_period,
"bband_std_dev": st.session_state.bb_std, "confidence_threshold": st.session_state.confidence_slider,
"long_entry_threshold_pct": st.session_state.long_entry / 100.0, "long_exit_ma_threshold_pct": st.session_state.long_exit / 100.0,
"long_trailing_stop_loss_pct": st.session_state.long_sl / 100.0, "long_delay_days": st.session_state.long_delay,
"short_entry_threshold_pct": st.session_state.short_entry / 100.0, "short_exit_ma_threshold_pct": st.session_state.short_exit / 100.0,
"short_trailing_stop_loss_pct": st.session_state.short_sl / 100.0, "short_delay_days": st.session_state.short_delay,
"max_trading_days": st.session_state.max_duration
}
markov_setup = st.session_state.get('best_markov_setup')
primary_driver = st.session_state.primary_driver
calib_adx_filter = st.session_state.use_adx_filter
calib_adx_thresh = st.session_state.adx_threshold
calib_adx_period = st.session_state.adx_period
calib_rsi_logic = st.session_state.rsi_logic
calib_exit_logic = st.session_state.exit_logic_type
calib_exit_thresh = st.session_state.exit_confidence_threshold
calib_smart_trailing_stop = st.session_state.smart_trailing_stop_pct / 100.0
calib_smart_atr_p = st.session_state.smart_exit_atr_period
calib_smart_atr_m = st.session_state.smart_exit_atr_multiplier
calib_intelligent_tsl = st.session_state.intelligent_tsl_pct / 100.0
all_long_scores = []; all_short_scores = []
tickers_to_run_calib = [col for col in master_df.columns if not ('_Volume' in str(col) or '_High' in str(col) or '_Low' in str(col))]
date_range_calib = (pd.Timestamp(st.session_state.start_date), pd.Timestamp(st.session_state.end_date))
for ticker_symbol in tickers_to_run_calib:
cols_to_use = [ticker_symbol, f'{ticker_symbol}_High', f'{ticker_symbol}_Low', f'{ticker_symbol}_Volume']
existing_cols = [col for col in cols_to_use if col in master_df.columns]
if ticker_symbol not in existing_cols: continue
ticker_data_full = master_df.loc[:, existing_cols]
ticker_data = ticker_data_full.loc[date_range_calib[0]:date_range_calib[1]]
rename_dict = {ticker_symbol: 'Close', f'{ticker_symbol}_High': 'High', f'{ticker_symbol}_Low': 'Low', f'{ticker_symbol}_Volume': 'Volume'}
rename_dict_filtered = {k: v for k, v in rename_dict.items() if k in existing_cols}
ticker_data = ticker_data.rename(columns=rename_dict_filtered)
l_pnl, s_pnl, al, as_, _, _, _, _, _, _, _ = run_backtest(
data=ticker_data, params=calibration_params,
use_rsi=use_rsi, use_volatility=use_vol, use_trend=use_trend, use_volume=use_volume,
use_macd=use_macd, use_ma_slope=use_ma_slope, use_markov=use_markov,
rsi_w=rsi_w, vol_w=vol_w, trend_w=trend_w, vol_w_val=vol_w_val, macd_w=macd_w, ma_slope_w=ma_slope_w, markov_w=markov_w,
use_adx_filter=calib_adx_filter, adx_threshold=calib_adx_thresh, rsi_logic=calib_rsi_logic, adx_period=calib_adx_period,
veto_setups_list=veto_list_to_use, primary_driver=primary_driver, markov_setup=markov_setup,
exit_logic_type=calib_exit_logic, exit_confidence_threshold=calib_exit_thresh,
smart_trailing_stop_pct=calib_smart_trailing_stop, smart_exit_atr_period=calib_smart_atr_p,
smart_exit_atr_multiplier=calib_smart_atr_m, intelligent_tsl_pct=calib_intelligent_tsl,
long_score_95_percentile=1.0, short_score_95_percentile=1.0
)
raw_long, raw_short = calculate_confidence_score(
ticker_data, primary_driver,
use_rsi, use_vol, use_trend, use_volume, use_macd, use_ma_slope, use_markov,
rsi_w, vol_w, trend_w, vol_w_val, macd_w, ma_slope_w, markov_w,
calibration_params, markov_setup
)
all_long_scores.append(raw_long[raw_long > 0])
all_short_scores.append(raw_short[raw_short > 0])
long_95 = pd.concat(all_long_scores).quantile(0.95) if all_long_scores else 1.0
short_95 = pd.concat(all_short_scores).quantile(0.95) if all_short_scores else 1.0
if pd.isna(long_95) or long_95 == 0: long_95 = 1.0
if pd.isna(short_95) or short_95 == 0: short_95 = 1.0
st.info(f"Confidence calibrated: Long 95th percentile = {long_95:.2f}, Short 95th percentile = {short_95:.2f}")
# --- 2. Build Combinations (Same as before) ---
ma_range = range(st.session_state.ma_start, st.session_state.ma_end + 1, st.session_state.ma_step) if st.session_state.opt_ma_cb else [st.session_state.ma_period]
bb_range = range(st.session_state.bb_start, st.session_state.bb_end + 1, st.session_state.bb_step) if st.session_state.opt_bb_cb else [st.session_state.bb_period]
std_range = np.arange(st.session_state.std_start, st.session_state.std_end + 0.001, st.session_state.std_step) if st.session_state.opt_std_cb else [st.session_state.bb_std]
sl_range = np.arange(st.session_state.sl_start, st.session_state.sl_end + 0.001, st.session_state.sl_step) / 100 if st.session_state.opt_sl_cb else [st.session_state.long_sl / 100]
delay_range = range(st.session_state.delay_start, st.session_state.delay_end + 1, st.session_state.delay_step) if st.session_state.opt_delay_cb else [st.session_state.long_delay]
entry_range = np.arange(st.session_state.entry_start, st.session_state.entry_end + 0.001, st.session_state.entry_step) / 100 if st.session_state.opt_entry_cb else [st.session_state.long_entry / 100]
exit_range = np.arange(st.session_state.exit_start, st.session_state.exit_end + 0.001, st.session_state.exit_step) / 100 if st.session_state.opt_exit_cb else [st.session_state.long_exit / 100]
conf_range = range(st.session_state.conf_start, st.session_state.conf_end + 1, st.session_state.conf_step) if st.session_state.opt_conf_cb else [st.session_state.confidence_slider]
dur_range = range(st.session_state.dur_start, st.session_state.dur_end + 1, st.session_state.dur_step) if st.session_state.opt_duration_cb else [st.session_state.max_duration]
param_product = itertools.product(ma_range, bb_range, std_range, sl_range, delay_range, entry_range, exit_range, conf_range, dur_range)
param_combinations = [{
"large_ma_period": p[0], "bband_period": p[1], "bband_std_dev": p[2],
"long_trailing_stop_loss_pct": p[3], "short_trailing_stop_loss_pct": p[3],
"long_delay_days": p[4], "short_delay_days": p[4],
"long_entry_threshold_pct": p[5], "short_entry_threshold_pct": p[5],
"long_exit_ma_threshold_pct": p[6], "short_exit_ma_threshold_pct": p[6],
"confidence_threshold": p[7], "max_trading_days": p[8]
} for p in param_product]
total_combinations = len(param_combinations)
if total_combinations <= 1:
st.warning("No optimisation parameters selected."); return
confidence_settings = {
'toggles': (use_rsi, use_vol, use_trend, use_volume, use_macd, use_ma_slope, use_markov),
'weights': (rsi_w, vol_w, trend_w, vol_w_val, macd_w, ma_slope_w, markov_w),
'adx_settings': (st.session_state.use_adx_filter, st.session_state.adx_threshold, st.session_state.adx_period),
'rsi_logic': st.session_state.rsi_logic, 'primary_driver': primary_driver, 'markov_setup': markov_setup,
'exit_logic': st.session_state.exit_logic_type, 'exit_thresh': st.session_state.exit_confidence_threshold,
'smart_trailing_stop': st.session_state.smart_trailing_stop_pct / 100.0,
'smart_exit_atr_period': st.session_state.smart_exit_atr_period, 'smart_exit_atr_multiplier': st.session_state.smart_exit_atr_multiplier,
'intelligent_tsl_pct': st.session_state.intelligent_tsl_pct / 100.0,
'long_95_percentile': long_95, 'short_95_percentile': short_95, 'veto_list': veto_list_to_use
}
# --- 3. Execution ---
num_cores = cpu_count()
st.info(f"Starting {optimise_for.upper()} optimisation on {num_cores} cores... Testing {total_combinations} combinations.")
if st.session_state.run_mode.startswith("Analyse Full List"):
tickers_to_run = [col for col in master_df.columns if not ('_Volume' in str(col) or '_High' in str(col) or '_Low' in str(col))]
else:
tickers_to_run = [st.session_state.ticker_select]
date_range = (pd.Timestamp(st.session_state.start_date), pd.Timestamp(st.session_state.end_date))
power = 2 if use_squared_weighting else 1
status_text = st.empty(); status_text.text("Optimisation starting...")
progress_bar = st.progress(0)
tasks = []
for p in param_combinations:
tasks.append({
'params': p, 'confidence_settings': confidence_settings, 'master_df': master_df,
'optimise_for': optimise_for, 'tickers': tickers_to_run, 'date_range': date_range, 'power': power
})
results_list = []
with Pool(processes=num_cores) as pool:
try:
iterator = pool.imap_unordered(run_single_parameter_test, tasks)
for i, result_dict in enumerate(iterator, 1):
results_list.append(result_dict)
progress_bar.progress(i / total_combinations, text=f"Optimising... {i}/{total_combinations} combinations complete.")
except Exception as e:
st.error(f"An error occurred during multiprocessing: {e}")
status_text.text("Optimisation failed due to an error."); return
status_text.text("Optimisation complete. Processing results...")
if not results_list:
st.warning("Optimisation finished, but no valid results were found."); return
flattened_results = []
for r in results_list:
flat_row = r.copy()
del flat_row['params']; del flat_row['confidence_settings']
flat_row.update(r['params'])
flattened_results.append(flat_row)
results_df = pd.DataFrame(flattened_results)
min_trades_threshold = 10
if 'Total Trades' in results_df.columns:
results_df = results_df[results_df['Total Trades'] >= min_trades_threshold].copy()
if results_df.empty:
st.warning(f"No results found with at least {min_trades_threshold} trades. Try a smaller threshold or different settings."); return
results_df['Strategy Score'] = results_df.apply(
lambda row: calculate_strategy_score(row['Avg Profit/Trade'], row['Ticker G/B Ratio'], row['Total Trades']), axis=1
)
# --- 4. PRE-SORT (Descending Duration) ---
# We sort by Score (Desc) AND Duration (Desc) so the "Biggest Number" comes first.
if 'max_trading_days' in results_df.columns:
results_df = results_df.sort_values(by=['Strategy Score', 'max_trading_days'], ascending=[False, False])
else:
results_df = results_df.sort_values(by='Strategy Score', ascending=False)
# Save the full results to session state so `main()` can display them
st.session_state.param_results_df = results_df
# Save best params (first row is best score + highest duration)
best_setup_series = results_df.iloc[0]
st.session_state.best_params = {k: best_setup_series[k] for k in param_combinations[0].keys()}
status_text.empty()
st.success(f"Optimisation Complete! Best Strategy Score: {best_setup_series['Strategy Score']:.2f}%")
st.subheader("Optimal Parameters Found (based on Strategy Score):")
st.json(st.session_state.best_params)
# Note: We do NOT display the table here anymore. main() handles it.
# --- 4.5. NEW COMBINED OPTIMISATION FUNCTION (Optimized: 32k -> 3k Tasks) ---
def generate_and_run_combined_optimisation(master_df, main_content_placeholder, optimise_for):
st.session_state.summary_df = None
st.session_state.single_ticker_results = None
st.session_state.confidence_results_df = None
st.session_state.open_trades_df = None
st.session_state.advisor_df = None
st.session_state.best_params = None
st.session_state.best_weights = None
# Define side-specific filename to separate Long/Short results
results_file = f"combined_optimisation_results_{optimise_for}.csv"
with main_content_placeholder.container():
# --- Visual Feedback ---
st.header(f"⚡ Running Combined Factor & Weight Optimisation ({optimise_for.title()})")
st.caption(f"Results will be saved to: {results_file}")
# --- FIXED PARAMETERS ---
fixed_params = {
"large_ma_period": st.session_state.ma_period, "bband_period": st.session_state.bb_period,
"bband_std_dev": st.session_state.bb_std, "confidence_threshold": 50,
"long_entry_threshold_pct": st.session_state.long_entry / 100, "short_entry_threshold_pct": st.session_state.short_entry / 100,
"long_exit_ma_threshold_pct": st.session_state.long_exit / 100, "short_exit_ma_threshold_pct": st.session_state.short_exit / 100,
"long_trailing_stop_loss_pct": st.session_state.long_sl / 100, "short_trailing_stop_loss_pct": st.session_state.short_sl / 100,
"long_delay_days": st.session_state.long_delay, "short_delay_days": st.session_state.short_delay
}
# --- CHECKPOINT LOADING ---
completed_configs = load_completed_setups(results_file)
st.info(f"Loaded {len(completed_configs)} completed combinations from {results_file}. Resuming job...")
# --- DYNAMIC FACTOR/WEIGHT GENERATION (OPTIMIZED) ---
# Instead of looping Toggles and Weights separately (which creates duplicates),
# we treat 0.0 as "Off" and any other value as "On + Weight".
# 5 States per Factor: [Off, 0.5, 1.0, 1.5, 2.0]
factors = ['RSI', 'Volatility', 'Volume', 'MACD', 'MA Slope']
possible_states = [0.0, 0.5, 1.0, 1.5, 2.0] # 0.0 = Off
# Generate all 3,125 unique combinations (5^5)
all_combos = list(itertools.product(possible_states, repeat=len(factors)))
all_tasks = []
fixed_adx_settings = (st.session_state.use_adx_filter, st.session_state.adx_threshold, st.session_state.adx_period)
fixed_markov_setup = st.session_state.get('best_markov_setup')
fixed_exit_settings = {
'rsi_logic': st.session_state.rsi_logic, 'primary_driver': st.session_state.primary_driver,
'exit_logic': st.session_state.exit_logic_type, 'exit_thresh': st.session_state.exit_confidence_threshold,
'smart_trailing_stop': st.session_state.smart_trailing_stop_pct / 100.0,
'smart_exit_atr_period': st.session_state.smart_exit_atr_period,
'smart_exit_atr_multiplier': st.session_state.smart_exit_atr_multiplier,
'intelligent_tsl_pct': st.session_state.intelligent_tsl_pct / 100.0
}
tickers_to_run = [col for col in master_df.columns if not ('_Volume' in str(col) or '_High' in str(col) or '_Low' in str(col))]
date_range = (pd.Timestamp(st.session_state.start_date), pd.Timestamp(st.session_state.end_date))
for combo in all_combos:
weights_5 = list(combo)
toggles_5 = [w > 0.0 for w in weights_5] # If weight > 0, it's On. If 0.0, it's Off.
# --- EXPAND TO 7 ITEMS (Insert False/0.0 for Trend and Markov) ---
# Standard Order: RSI, Vol, TREND, Volume, MACD, MA Slope, Markov
toggles_7 = (
toggles_5[0], toggles_5[1], False, toggles_5[2], toggles_5[3], toggles_5[4], False
)
weights_7 = (
weights_5[0], weights_5[1], 0.0, weights_5[2], weights_5[3], weights_5[4], 0.0
)
# --- CHECKPOINT LOGIC ---
toggles_str = tuple(["On" if t else "Off" for t in toggles_7])
weights_round = tuple([round(w, 2) for w in weights_7])
task_key = (toggles_str, weights_round)
if task_key not in completed_configs:
confidence_settings = {
'toggles': toggles_7, 'weights': weights_7,
'adx_settings': fixed_adx_settings,
'markov_setup': fixed_markov_setup, 'confidence_threshold': fixed_params['confidence_threshold'],
'long_95_percentile': 1.0, 'short_95_percentile': 1.0, 'veto_list': st.session_state.get('veto_setup_list', []),
}
confidence_settings.update(fixed_exit_settings)
all_tasks.append({
'params': fixed_params,
'confidence_settings': confidence_settings,
'master_df': master_df,
'optimise_for': optimise_for,
'tickers': tickers_to_run,
'date_range': date_range,
'power': 1
})
# --- Multiprocessing Execution Setup ---
total_combinations_theoretical = 3125 # Corrected to 5^5
tasks_remaining = len(all_tasks)
if tasks_remaining == 0:
st.success(f"Optimisation complete. All {total_combinations_theoretical} combinations for {optimise_for.title()} have been processed.");
pass
current_progress_base = total_combinations_theoretical - tasks_remaining
BATCH_SIZE = 50 # Smaller batch for smoother progress
if tasks_remaining > 0:
num_cores = cpu_count()
st.info(f"Starting COMBINED optimisation ({optimise_for.title()}) on {num_cores} cores... {tasks_remaining} unique tasks remaining.")
results_list = []
status_text = st.empty(); status_text.text("Optimisation starting...")
progress_bar = st.progress(0)
with Pool(processes=cpu_count()) as pool:
try:
iterator = pool.imap_unordered(run_single_parameter_test, all_tasks)
for i, result_dict in enumerate(iterator, 1):
results_list.append(result_dict)
total_progress = (current_progress_base + i) / total_combinations_theoretical
progress_bar.progress(total_progress, text=f"Optimising {optimise_for.title()}... {i}/{tasks_remaining} processed.")
if i % BATCH_SIZE == 0 or i == tasks_remaining:
factors_csv_order = ['RSI', 'Volatility', 'TREND', 'Volume', 'MACD', 'MA Slope', 'Markov']
def unpack_csv_config(row):
conf_settings = row['confidence_settings']
result = {}
togs = conf_settings['toggles']
wgts = conf_settings['weights']
for idx, name in enumerate(factors_csv_order):
result[name] = "On" if togs[idx] else "Off"
result[name + ' W'] = wgts[idx]
return pd.Series(result)
config_df = pd.DataFrame(results_list).apply(unpack_csv_config, axis=1)
results_df_batch = pd.concat([pd.DataFrame(results_list).drop(['params', 'confidence_settings'], axis=1), config_df], axis=1)
results_df_batch['Strategy Score'] = results_df_batch.apply(
lambda row: calculate_strategy_score(row['Avg Profit/Trade'], row['Trade G/B Ratio'], row['Total Trades']), axis=1
)
write_header = not os.path.exists(results_file)
results_df_batch.to_csv(results_file, mode='a', header=write_header, index=False)
status_text.text(f"CHECKPOINT: Saved {i} new {optimise_for} combinations. Total: {current_progress_base + i} / {total_combinations_theoretical}")
results_list = []
except Exception as e:
st.error(f"FATAL ERROR during multiprocessing. Checkpoint saved. Error: {e}")
status_text.text("Optimization stopped.")
return
status_text.empty()
st.success(f"Optimization finished. All {optimise_for.title()} results saved.")
# --- FINAL DISPLAY ---
if os.path.exists(results_file):
final_df = pd.read_csv(results_file)
final_df = final_df.sort_values(by='Strategy Score', ascending=False)
st.subheader(f"Top 20 Complete {optimise_for.title()} Setups:")
display_cols = ['Strategy Score', 'Avg Profit/Trade', 'Trade G/B Ratio', 'Total Trades']
factors_display = ['RSI', 'Volatility', 'TREND', 'Volume', 'MACD', 'MA Slope', 'Markov']
display_cols.extend(factors_display)
display_cols.extend([f + ' W' for f in factors_display])
display_cols = [c for c in display_cols if c in final_df.columns]
display_df = final_df.head(20)
st.dataframe(display_df[display_cols].style.format({
"Strategy Score": "{:.2f}%",
"Avg Profit/Trade": "{:.2%}",
"Trade G/B Ratio": "{:.2f}",
}))
else:
st.info(f"No results found yet for {optimise_for.title()}.")
# --- CORRECTED: Ensures ALL arguments are correctly passed to run_backtest ---
def run_single_confidence_test(task, base_params, master_df, date_range, tickers_to_run, optimise_for, factor_weights):
# Unpack the factor combo tuple (now has 7 elements)
combo, threshold, _ = task
use_rsi_combo, use_volatility_combo, use_trend_combo, use_volume_combo, use_macd_combo, use_ma_slope_combo, use_markov_combo = combo
test_params = base_params.copy()
test_params["confidence_threshold"] = threshold
total_profit_weighted_avg, total_trades, winning_tickers, losing_tickers = 0, 0, 0, 0
all_confidences = []
total_wins, total_losses = 0, 0
PROFIT_THRESHOLD = 1.0
excluded_tickers_conf = []
# --- Get STATIC settings from factor_weights ---
rsi_logic = factor_weights.get('rsi_logic', 'Crossover')
use_adx_filter = factor_weights.get('use_adx', True)
adx_threshold = factor_weights.get('adx_thresh', 25.0)
adx_period = factor_weights.get('adx_period', 14)
primary_driver = factor_weights.get('primary_driver', 'Bollinger Bands')
markov_setup = factor_weights.get('markov_setup')
exit_logic = factor_weights.get('exit_logic')
exit_thresh = factor_weights.get('exit_thresh')
smart_trailing_stop = factor_weights.get('smart_trailing_stop')
smart_exit_atr_p = factor_weights.get('smart_exit_atr_period', 14)
smart_exit_atr_m = factor_weights.get('smart_exit_atr_multiplier', 3.0)
intelligent_tsl_pct = factor_weights.get('intelligent_tsl_pct', 0.60)
# Get weights
rsi_w = factor_weights.get('rsi', 1.0); vol_w = factor_weights.get('vol', 1.0)
trend_w = factor_weights.get('trend', 1.0); volume_w = factor_weights.get('volume', 1.0)
macd_w = factor_weights.get('macd', 1.0); ma_slope_w = factor_weights.get('ma_slope', 1.0)
markov_w = factor_weights.get('markov', 1.0)
for ticker in tickers_to_run:
cols_to_use = [ticker]
if f'{ticker}_High' in master_df.columns: cols_to_use.append(f'{ticker}_High')
if f'{ticker}_Low' in master_df.columns: cols_to_use.append(f'{ticker}_Low')
if f'{ticker}_Volume' in master_df.columns: cols_to_use.append(f'{ticker}_Volume')
existing_cols = [col for col in cols_to_use if col in master_df.columns]
if ticker not in existing_cols: continue
ticker_data_full = master_df.loc[:, existing_cols]
ticker_data = ticker_data_full.loc[date_range[0]:date_range[1]]
rename_dict = {ticker: 'Close', f'{ticker}_High': 'High', f'{ticker}_Low': 'Low', f'{ticker}_Volume': 'Volume'}
rename_dict_filtered = {k: v for k, v in rename_dict.items() if k in existing_cols}
ticker_data = ticker_data.rename(columns=rename_dict_filtered)
if not ticker_data.empty and 'Close' in ticker_data.columns and not ticker_data['Close'].isna().all():
# --- CORRECTED run_backtest CALL ---
long_pnl, short_pnl, avg_long_trade, avg_short_trade, _, trades, _, trade_counts, _, _ = run_backtest(
ticker_data, test_params,
use_rsi_combo, use_volatility_combo, use_trend_combo, use_volume_combo, use_macd_combo, use_ma_slope_combo, use_markov_combo,
rsi_w, vol_w, trend_w, volume_w, macd_w, ma_slope_w, markov_w,
use_adx_filter, adx_threshold,
rsi_logic,
adx_period,
veto_setups_list=None,
primary_driver=primary_driver,
markov_setup=markov_setup,
exit_logic_type=exit_logic,
exit_confidence_threshold=exit_thresh,
smart_trailing_stop_pct=smart_trailing_stop,
smart_exit_atr_period=smart_exit_atr_p,
smart_exit_atr_multiplier=smart_exit_atr_m,
intelligent_tsl_pct=intelligent_tsl_pct
)
if abs(long_pnl) > PROFIT_THRESHOLD or abs(short_pnl) > PROFIT_THRESHOLD or \
(avg_long_trade is not None and pd.notna(avg_long_trade) and abs(avg_long_trade) > PROFIT_THRESHOLD) or \
(avg_short_trade is not None and pd.notna(avg_short_trade) and abs(avg_short_trade) > PROFIT_THRESHOLD):
excluded_tickers_conf.append(ticker); continue
if optimise_for == 'long':
pnl, avg_trade_profit, trade_log = long_pnl, avg_long_trade, trades[0]
total_wins += trade_counts[0]; total_losses += trade_counts[1]
else:
pnl, avg_trade_profit, trade_log = short_pnl, avg_short_trade, trades[2]
total_wins += trade_counts[2]; total_losses += trade_counts[3]
num_trades = len(trade_log)
if num_trades > 0 and avg_trade_profit is not None and pd.notna(avg_trade_profit):
total_trades += num_trades
total_profit_weighted_avg += avg_trade_profit * num_trades
if pnl > 0: winning_tickers += 1
elif pnl < 0: losing_tickers += 1
all_confidences.extend([trade['confidence'] for trade in trade_log if pd.notna(trade.get('confidence'))])
# --- [NEW SCORING LOGIC] ---
overall_avg_profit = 0.0
ticker_good_bad_ratio = 0.0
badness_score = 0.0
if total_trades > 0:
overall_avg_profit = total_profit_weighted_avg / total_trades
if losing_tickers > 0:
ticker_good_bad_ratio = winning_tickers / losing_tickers
elif winning_tickers > 0:
ticker_good_bad_ratio = 99999.0
if winning_tickers > 0 and overall_avg_profit < 0:
badness_score = (losing_tickers / winning_tickers) * abs(overall_avg_profit)
avg_entry_confidence = np.mean(all_confidences) if all_confidences else 0.0
trade_good_bad_ratio = total_wins / total_losses if total_losses > 0 else 99999.0
if pd.isna(avg_entry_confidence): avg_entry_confidence = 0.0
# Use the new 3-Factor Score
# We pass TRADE G/B ratio as the second argument because it is more granular/reliable
norm_score = calculate_strategy_score(overall_avg_profit, trade_good_bad_ratio, total_trades)
# For sorting consistency, we set "Good Score" to the same value as Norm Score
raw_score = norm_score
return {
"RSI": use_rsi_combo, "Volatility": use_volatility_combo, "TREND": use_trend_combo, "Volume": use_volume_combo,
"MACD": use_macd_combo, "MA Slope": use_ma_slope_combo, "Markov": use_markov_combo,
"Conf. Threshold": threshold, "Avg Profit/Trade": overall_avg_profit if pd.notna(overall_avg_profit) else 0.0,
"Ticker G/B Ratio": ticker_good_bad_ratio if pd.notna(ticker_good_bad_ratio) else 0.0,
"Trade G/B Ratio": trade_good_bad_ratio if pd.notna(trade_good_bad_ratio) else 0.0,
"Winning Tickers": winning_tickers, "Losing Tickers": losing_tickers,
"Avg Entry Conf.": avg_entry_confidence,
"Good Score": raw_score if pd.notna(raw_score) else 0.0,
"Bad Score": badness_score if pd.notna(badness_score) else 0.0,
"Norm. Score %": norm_score,
"Total Trades": total_trades
}
def run_single_weight_test(confidence_settings, base_params, master_df, optimise_for, tickers, date_range, power):
"""
Worker function for weight optimisation.
'base_params' (MA, BB, etc.) are fixed.
'confidence_settings' (weights) are variable.
"""
total_profit_weighted_avg, total_trades, winning_tickers, losing_tickers = 0, 0, 0, 0
total_wins, total_losses = 0, 0
all_confidences = []
# --- UNPACK all settings from the variable dictionary ---
use_rsi, use_vol, use_trend, use_volume, use_macd, use_ma_slope, use_markov = confidence_settings['toggles']
rsi_w, vol_w, trend_w, volume_w, macd_w, ma_slope_w, markov_w = confidence_settings['weights']
use_adx_filter, adx_threshold, adx_period = confidence_settings['adx_settings'] # <-- MODIFIED
rsi_logic = confidence_settings['rsi_logic']
primary_driver = confidence_settings['primary_driver']
markov_setup = confidence_settings['markov_setup']
exit_logic = confidence_settings['exit_logic']
exit_thresh = confidence_settings['exit_thresh']
smart_trailing_stop = confidence_settings['smart_trailing_stop']
smart_exit_atr_p = confidence_settings['smart_exit_atr_period']
smart_exit_atr_m = confidence_settings['smart_exit_atr_multiplier']
intelligent_tsl_pct = confidence_settings['intelligent_tsl_pct'] # <-- ADDED
# --- End Unpack ---
PROFIT_THRESHOLD = 1.0
if not isinstance(tickers, list): tickers = [tickers]
for ticker in tickers:
cols_to_use = [ticker]
if f'{ticker}_High' in master_df.columns: cols_to_use.append(f'{ticker}_High')
if f'{ticker}_Low' in master_df.columns: cols_to_use.append(f'{ticker}_Low')
if f'{ticker}_Volume' in master_df.columns: cols_to_use.append(f'{ticker}_Volume')
existing_cols = [col for col in cols_to_use if col in master_df.columns]
if ticker not in existing_cols: continue
ticker_data_full = master_df.loc[:, existing_cols]
ticker_data = ticker_data_full.loc[date_range[0]:date_range[1]]
rename_dict = {
ticker: 'Close', f'{ticker}_High': 'High',
f'{ticker}_Low': 'Low', f'{ticker}_Volume': 'Volume'
}
rename_dict_filtered = {k: v for k, v in rename_dict.items() if k in existing_cols}
ticker_data = ticker_data.rename(columns=rename_dict_filtered)
if not ticker_data.empty and 'Close' in ticker_data.columns and not ticker_data['Close'].isna().all():
long_pnl, short_pnl, avg_long_trade, avg_short_trade, _, trades, _, trade_counts, _, _ = run_backtest(
ticker_data, base_params,
use_rsi, use_vol, use_trend, use_volume, use_macd, use_ma_slope, use_markov,
rsi_w, vol_w, trend_w, volume_w, macd_w, ma_slope_w, markov_w,
use_adx_filter, adx_threshold, rsi_logic,
adx_period, # <-- ADDED
veto_setups_list=None, # Veto is OFF during optimisation
primary_driver=primary_driver,
markov_setup=markov_setup,
exit_logic_type=exit_logic,
exit_confidence_threshold=exit_thresh,
smart_trailing_stop_pct=smart_trailing_stop,
smart_exit_atr_period=smart_exit_atr_p,
smart_exit_atr_multiplier=smart_exit_atr_m,
intelligent_tsl_pct=intelligent_tsl_pct # <-- ADDED
)
if abs(long_pnl) > PROFIT_THRESHOLD or abs(short_pnl) > PROFIT_THRESHOLD or \
(avg_long_trade is not None and pd.notna(avg_long_trade) and abs(avg_long_trade) > PROFIT_THRESHOLD) or \
(avg_short_trade is not None and pd.notna(avg_short_trade) and abs(avg_short_trade) > PROFIT_THRESHOLD):
continue # Skip outlier tickers
if optimise_for == 'long':
pnl, avg_trade_profit, trade_log = long_pnl, avg_long_trade, trades[0]
total_wins += trade_counts[0]; total_losses += trade_counts[1]
else:
pnl, avg_trade_profit, trade_log = short_pnl, avg_short_trade, trades[2]
total_wins += trade_counts[2]; total_losses += trade_counts[3]
num_trades = len(trade_log)
if num_trades > 0 and avg_trade_profit is not None and pd.notna(avg_trade_profit):
total_trades += num_trades
total_profit_weighted_avg += avg_trade_profit * num_trades
if pnl > 0: winning_tickers += 1
elif pnl < 0: losing_tickers += 1
all_confidences.extend([trade['confidence'] for trade in trade_log if pd.notna(trade.get('confidence'))])
overall_avg_profit = 0.0
good_bad_ratio = 0.0
trade_good_bad_ratio = 0.0
if total_trades > 0:
overall_avg_profit = total_profit_weighted_avg / total_trades
if losing_tickers > 0:
good_bad_ratio = winning_tickers / losing_tickers
elif winning_tickers > 0:
good_bad_ratio = 99999.0
if total_losses > 0:
trade_good_bad_ratio = total_wins / total_losses
elif total_wins > 0:
trade_good_bad_ratio = 99999.0
avg_entry_confidence = np.mean(all_confidences) if all_confidences else 0.0
weights_tested = {
"rsi_w": rsi_w, "vol_w": vol_w, "trend_w": trend_w,
"volume_w": volume_w, "macd_w": macd_w, "ma_slope_w": ma_slope_w,
"markov_w": markov_w
}
return {
"weights": weights_tested,
"Avg Profit/Trade": overall_avg_profit,
"Ticker G/B Ratio": good_bad_ratio,
"Trade G/B Ratio": trade_good_bad_ratio,
"Total Trades": total_trades,
"Avg Entry Conf.": avg_entry_confidence,
"Winning Tickers": winning_tickers,
"Losing Tickers": losing_tickers
}
def run_advisor_scan(main_df, setups_to_run, advisor_type="Advisor"):
"""
Scans tickers for open trades using a list of setups.
Updated to provide real-time granular feedback per ticker.
"""
st.info(f"Scanning tickers for open trades based on {len(setups_to_run)} {advisor_type} setups...")
base_params = {
"large_ma_period": st.session_state.ma_period,
"bband_period": st.session_state.bb_period,
"bband_std_dev": st.session_state.bb_std,
"long_entry_threshold_pct": st.session_state.long_entry / 100,
"long_exit_ma_threshold_pct": st.session_state.long_exit / 100,
"long_trailing_stop_loss_pct": st.session_state.long_sl / 100,
"long_delay_days": st.session_state.long_delay,
"short_entry_threshold_pct": st.session_state.short_entry / 100,
"short_exit_ma_threshold_pct": st.session_state.short_exit / 100,
"short_trailing_stop_loss_pct": st.session_state.short_sl / 100,
"short_delay_days": st.session_state.short_delay
}
lookbacks = [
base_params.get('large_ma_period', 50),
base_params.get('bband_period', 20),
200, 50, 26, 14,
st.session_state.adx_period
]
markov_setup_to_use = None
if st.session_state.primary_driver == 'Markov State' or st.session_state.use_markov:
if 'best_markov_setup' in st.session_state and st.session_state.best_markov_setup:
markov_setup_to_use = st.session_state.best_markov_setup
if st.session_state.primary_driver == 'Markov State':
st.info("Using saved Best Markov Setup as Primary Driver.")
lookbacks.append(markov_setup_to_use.get('Run-Up Period', 10))
else:
st.error("Markov State is active, but no Best Markov Setup found. Run Section 7 first.")
st.stop()
if advisor_type == "User-Defined":
for setup in setups_to_run:
try:
lookbacks.append(int(setup.get("Large MA Period", 50)))
lookbacks.append(int(setup.get("Bollinger Band Period", 20)))
except (ValueError, TypeError):
lookbacks.append(50)
lookbacks.append(20)
max_lookback_period = max(lookbacks)
active_scan_days = 120
buffer_days = 10
total_days_needed = max_lookback_period + active_scan_days + buffer_days
scan_end_date = pd.Timestamp(st.session_state.end_date)
scan_start_date = scan_end_date - timedelta(days=total_days_needed)
earliest_data_date = main_df.index.min()
if scan_start_date < earliest_data_date: scan_start_date = earliest_data_date
st.caption(f"Analysing data from {scan_start_date.date()} to {scan_end_date.date()}.")
factor_settings = {
"use_adx": st.session_state.use_adx_filter, "adx_thresh": st.session_state.adx_threshold,
"adx_period": st.session_state.adx_period,
"rsi_logic": st.session_state.get('rsi_logic', 'Crossover'),
"primary_driver": st.session_state.get('primary_driver', 'Bollinger Bands'),
"markov_setup": markov_setup_to_use,
"exit_logic": st.session_state.exit_logic_type,
"exit_thresh": st.session_state.exit_confidence_threshold,
"smart_trailing_stop": st.session_state.smart_trailing_stop_pct / 100.0,
"rsi_w": st.session_state.rsi_w, "vol_w": st.session_state.vol_w,
"trend_w": st.session_state.trend_w, "volume_w": st.session_state.volume_w,
"macd_w": st.session_state.macd_w, "ma_slope_w": st.session_state.ma_slope_w,
"markov_w": st.session_state.markov_w,
'smart_exit_atr_period': st.session_state.smart_exit_atr_period,
'smart_exit_atr_multiplier': st.session_state.smart_exit_atr_multiplier,
'intelligent_tsl_pct': st.session_state.intelligent_tsl_pct / 100.0
}
all_advisor_trades = []
ticker_list = sorted([col for col in main_df.columns if not ('_Volume' in str(col) or '_High' in str(col) or '_Low' in str(col))])
# --- [NEW] Better Progress Tracking ---
num_setups = len(setups_to_run)
num_tickers = len(ticker_list)
total_ops = num_setups * num_tickers
current_op = 0
progress_bar = st.progress(0, text="Initializing scan...")
status_text = st.empty() # Placeholder for rolling ticker names
for i, setup in enumerate(setups_to_run):
if advisor_type == "Top Setups":
params_for_run = base_params.copy()
params_for_run['confidence_threshold'] = setup.get('Conf. Threshold', 50)
setup_use_rsi = setup.get('RSI', 'Off') == 'On'
setup_use_vol = setup.get('Volatility', 'Off') == 'On'
setup_use_trend = setup.get('TREND', 'Off') == 'On'
setup_use_volume = setup.get('Volume', 'Off') == 'On'
setup_use_macd = setup.get('MACD', 'Off') == 'On'
setup_use_ma_slope = setup.get('MA Slope', 'Off') == 'On'
setup_use_markov = setup.get('Markov', 'Off') == 'On'
scan_rsi_w = factor_settings['rsi_w']; scan_vol_w = factor_settings['vol_w']
scan_trend_w = factor_settings['trend_w']; scan_volume_w = factor_settings['volume_w']
scan_macd_w = factor_settings['macd_w']; scan_ma_slope_w = factor_settings['ma_slope_w']
scan_markov_w = factor_settings['markov_w']
elif advisor_type == "User-Defined":
try: ma_p = int(setup.get("Large MA Period", 50))
except: ma_p = 50
try: bb_p = int(setup.get("Bollinger Band Period", 20))
except: bb_p = 20
try: long_d = int(setup.get("Long Delay (Days)", 0))
except: long_d = 0
try: short_d = int(setup.get("Short Delay (Days)", 0))
except: short_d = 0
catcher_pct = setup.get("Catcher Offset (%)", 3.0)
try: catcher_decimal = float(catcher_pct) / 100.0
except: catcher_decimal = 0.03
params_for_run = {
"large_ma_period": ma_p, "bband_period": bb_p, "long_delay_days": long_d, "short_delay_days": short_d,
"bband_std_dev": setup.get("Bollinger Band Std Dev", 2.0),
"confidence_threshold": setup.get("Conf. Threshold", 50),
"catcher_stop_pct": catcher_decimal,
"long_entry_threshold_pct": setup.get("Long Entry Threshold (%)", 0.0) / 100.0,
"long_exit_ma_threshold_pct": setup.get("Long Exit Threshold (%)", 0.0) / 100.0,
"long_trailing_stop_loss_pct": setup.get("Long Stop Loss (%)", 0.0) / 100.0,
"short_entry_threshold_pct": setup.get("Short Entry Threshold (%)", 0.0) / 100.0,
"short_exit_ma_threshold_pct": setup.get("Short Exit Threshold (%)", 0.0) / 100.0,
"short_trailing_stop_loss_pct": setup.get("Short Stop Loss (%)", 0.0) / 100.0,
"use_ma_floor_filter": st.session_state.use_ma_floor_filter # Pass this through
}
def get_weight_toggle(val):
try:
w = float(val)
if w > 0: return w, True
except: pass
return 0.0, False
scan_rsi_w, setup_use_rsi = get_weight_toggle(setup.get('RSI', 'Off'))
scan_vol_w, setup_use_vol = get_weight_toggle(setup.get('Volatility', 'Off'))
scan_trend_w, setup_use_trend = get_weight_toggle(setup.get('TREND', 'Off'))
scan_volume_w, setup_use_volume = get_weight_toggle(setup.get('Volume', 'Off'))
scan_macd_w, setup_use_macd = get_weight_toggle(setup.get('MACD', 'Off'))
scan_ma_slope_w, setup_use_ma_slope = get_weight_toggle(setup.get('MA Slope', 'Off'))
scan_markov_w, setup_use_markov = get_weight_toggle(setup.get('Markov', 'Off'))
else:
continue
# --- INNER LOOP ---
for ticker_symbol in ticker_list:
# Update progress per ticker
current_op += 1
if current_op % 10 == 0: # Update visual every 10 items to be faster
progress_bar.progress(current_op / total_ops, text=f"Setup {i+1}/{num_setups}: Scanning {ticker_symbol}...")
cols_to_use = [ticker_symbol]
if f'{ticker_symbol}_High' in main_df.columns: cols_to_use.append(f'{ticker_symbol}_High')
if f'{ticker_symbol}_Low' in main_df.columns: cols_to_use.append(f'{ticker_symbol}_Low')
if f'{ticker_symbol}_Volume' in main_df.columns: cols_to_use.append(f'{ticker_symbol}_Volume')
existing_cols = [col for col in cols_to_use if col in main_df.columns]
if ticker_symbol not in existing_cols: continue
data_for_backtest_full = main_df.loc[:, existing_cols]
data_for_scan = data_for_backtest_full.loc[scan_start_date:scan_end_date]
rename_dict = {ticker_symbol: 'Close', f'{ticker_symbol}_High': 'High', f'{ticker_symbol}_Low': 'Low', f'{ticker_symbol}_Volume': 'Volume'}
rename_dict_filtered = {k: v for k, v in rename_dict.items() if k in existing_cols}
data_for_scan = data_for_scan.rename(columns=rename_dict_filtered)
if advisor_type == "User-Defined":
current_lookback = max(params_for_run.get('large_ma_period', 50), params_for_run.get('bband_period', 20), 200, 50, 26, 14, factor_settings['adx_period'])
else:
current_lookback = max_lookback_period
if (factor_settings['primary_driver'] == 'Markov State' or setup_use_markov) and factor_settings['markov_setup']:
current_lookback = max(current_lookback, factor_settings['markov_setup'].get('Run-Up Period', 10))
if not data_for_scan.empty and 'Close' in data_for_scan.columns and not data_for_scan['Close'].isna().all() and len(data_for_scan) >= current_lookback :
try:
# Added missing MFI and SuperTrend arguments (set to False/1.0) ---
_, _, _, _, _, _, open_trades, _, _, _, _ = run_backtest(
data_for_scan, params_for_run,
setup_use_rsi, setup_use_vol, setup_use_trend, setup_use_volume, setup_use_macd, setup_use_ma_slope, setup_use_markov,
False, False, # use_mfi, use_supertrend (Default Off for Advisor scans)
scan_rsi_w, scan_vol_w, scan_trend_w, scan_volume_w, scan_macd_w, scan_ma_slope_w, scan_markov_w,
1.0, 1.0, # mfi_w, supertrend_w (Default 1.0)
factor_settings['use_adx'], factor_settings['adx_thresh'], factor_settings['rsi_logic'], factor_settings['adx_period'],
veto_setups_list=None,
primary_driver=factor_settings['primary_driver'], markov_setup=factor_settings['markov_setup'],
exit_logic_type=factor_settings['exit_logic'], exit_confidence_threshold=factor_settings['exit_thresh'],
smart_trailing_stop_pct=factor_settings['smart_trailing_stop'],
smart_exit_atr_period=factor_settings['smart_exit_atr_period'],
smart_exit_atr_multiplier=factor_settings['smart_exit_atr_multiplier'],
intelligent_tsl_pct=factor_settings['intelligent_tsl_pct']
)
except Exception as e:
print(f"Error during advisor backtest for {ticker_symbol} with setup {i+1}: {e}")
continue
if open_trades:
for trade in open_trades:
trade['Ticker'] = ticker_symbol
trade['Setup Rank'] = i + 1
if advisor_type == "Top Setups":
trade['Setup G/B Ratio'] = setup.get('Ticker G/B Ratio', np.nan)
all_advisor_trades.append(trade)
progress_bar.empty()
status_text.empty()
print(f"\nFound {len(all_advisor_trades)} total trades before de-duplication.")
best_trades = {}
for trade in all_advisor_trades:
try:
trade_key = (trade.get('Ticker'), trade.get('Entry Date'), trade.get('Trade Type'))
except Exception as e:
print(f"Warning: Could not create trade key for {trade}. Error: {e}")
continue
try:
current_rank = int(trade.get('Setup Rank', 999))
except (ValueError, TypeError):
current_rank = 999
if trade_key not in best_trades:
best_trades[trade_key] = trade
else:
existing_rank = int(best_trades[trade_key].get('Setup Rank', 999))
if current_rank < existing_rank:
best_trades[trade_key] = trade
deduplicated_trades = list(best_trades.values())
if all_advisor_trades:
raw_advisor_df = pd.DataFrame(all_advisor_trades)
deduped_advisor_df = pd.DataFrame(deduplicated_trades)
cols_order = ['Ticker', 'Status', 'Setup Rank', 'Final % P/L', 'Side', 'Date Open', 'Date Closed', 'Start Confidence']
if advisor_type == "Top Setups":
cols_order.append('Setup G/B Ratio')
if advisor_type == "User-Defined":
param_keys = [k for k in setups_to_run[0].keys() if k not in ["RSI", "Volatility", "TREND", "Volume", "MACD", "MA Slope", "Conf. Threshold", "Markov"]]
cols_order.extend(param_keys)
setups_df = pd.DataFrame(setups_to_run)
setups_df['Setup Rank'] = setups_df.index + 1
raw_advisor_df = pd.merge(raw_advisor_df, setups_df, on='Setup Rank', how='left')
deduped_advisor_df = pd.merge(deduped_advisor_df, setups_df, on='Setup Rank', how='left')
existing_cols_final = [col for col in cols_order if col in raw_advisor_df.columns]
st.session_state.raw_df = raw_advisor_df[existing_cols_final].sort_values(by=['Status', 'Date Open'], ascending=[True, False])
st.session_state.deduped_df = deduped_advisor_df[existing_cols_final].sort_values(by=['Status', 'Date Open'], ascending=[True, False])
st.session_state.advisor_type = advisor_type
else:
st.session_state.raw_df = None
st.session_state.deduped_df = None
st.session_state.advisor_type = advisor_type
st.session_state.run_advanced_advisor = False
st.session_state.run_user_advisor_setup = False
st.session_state.run_scan_user_setups = False
st.rerun()
def generate_and_run_weight_optimisation(master_df, main_content_placeholder, side, use_sq_weighting):
"""
Runs weight optimisation using multiprocessing.
"""
st.session_state.summary_df = None
st.session_state.single_ticker_results = None
st.session_state.confidence_results_df = None
st.session_state.open_trades_df = None
st.session_state.advisor_df = None
st.session_state.best_params = None
st.session_state.best_weights = None
with main_content_placeholder.container():
# Get FIXED strategy parameters
base_params = {
"large_ma_period": st.session_state.ma_period, "bband_period": st.session_state.bb_period,
"bband_std_dev": st.session_state.bb_std, "long_entry_threshold_pct": st.session_state.long_entry / 100,
"long_exit_ma_threshold_pct": st.session_state.long_exit / 100,
"long_trailing_stop_loss_pct": st.session_state.long_sl / 100,
"long_delay_days": st.session_state.long_delay, "short_entry_threshold_pct": st.session_state.short_entry / 100,
"short_exit_ma_threshold_pct": st.session_state.short_exit / 100,
"short_trailing_stop_loss_pct": st.session_state.short_sl / 100,
"short_delay_days": st.session_state.short_delay, "confidence_threshold": st.session_state.confidence_slider
}
# Get FIXED confidence settings
fixed_toggles = (st.session_state.use_rsi, st.session_state.use_vol, st.session_state.use_trend,
st.session_state.use_volume, st.session_state.use_macd, st.session_state.use_ma_slope, st.session_state.use_markov)
fixed_adx_settings = (st.session_state.use_adx_filter, st.session_state.adx_threshold, st.session_state.adx_period)
fixed_rsi_logic = st.session_state.rsi_logic
fixed_primary_driver = st.session_state.primary_driver
fixed_markov_setup = st.session_state.get('best_markov_setup')
fixed_exit_logic = st.session_state.exit_logic_type
fixed_exit_thresh = st.session_state.exit_confidence_threshold
fixed_smart_trailing_stop = st.session_state.smart_trailing_stop_pct / 100.0
smart_atr_p = st.session_state.smart_exit_atr_period
smart_atr_m = st.session_state.smart_exit_atr_multiplier
# Check if key exists, default to 0.6 if not (backward compatibility)
fixed_intelligent_tsl = st.session_state.get('intelligent_tsl_pct', 60.0) / 100.0
# Dynamic Weight Optimisation Logic
weight_step = 0.5
weight_range = np.arange(0.5, 2.0 + weight_step, weight_step)
all_factor_info = {
'RSI': {'toggle_key': 'use_rsi', 'weight_key': 'rsi_w'},
'Volatility': {'toggle_key': 'use_vol', 'weight_key': 'vol_w'},
'TREND': {'toggle_key': 'use_trend', 'weight_key': 'trend_w'},
'Volume': {'toggle_key': 'use_volume', 'weight_key': 'volume_w'},
'MACD': {'toggle_key': 'use_macd', 'weight_key': 'macd_w'},
'MA Slope': {'toggle_key': 'use_ma_slope', 'weight_key': 'ma_slope_w'},
'Markov': {'toggle_key': 'use_markov', 'weight_key': 'markov_w'}
}
driver_map = {'RSI Crossover': 'RSI', 'MACD Crossover': 'MACD', 'MA Slope': 'MA Slope', 'Markov State': 'Markov', 'Bollinger Bands': None}
primary_factor_key = driver_map.get(fixed_primary_driver)
factors_to_optimise = []
for factor_key, info in all_factor_info.items():
is_active = st.session_state.get(info['toggle_key'], False)
is_primary = (factor_key == primary_factor_key)
if factor_key == 'Markov' and not fixed_markov_setup:
st.warning("Skipping Markov weight optimisation: No 'Best Markov Setup' found. Please run Section 7 first.")
is_active = False
if is_active and not is_primary:
factors_to_optimise.append(factor_key)
if not factors_to_optimise:
st.warning("No active, non-primary factors to optimise. Toggle some factors 'On' in Section 2.")
st.session_state.run_weight_optimisation = False
return
st.info(f"Optimising weights for: {', '.join(factors_to_optimise)}")
weight_product = itertools.product(weight_range, repeat=len(factors_to_optimise))
base_weight_keys = ('rsi_w', 'vol_w', 'trend_w', 'volume_w', 'macd_w', 'ma_slope_w', 'markov_w')
base_weights_tuple = (st.session_state.rsi_w, st.session_state.vol_w, st.session_state.trend_w, st.session_state.volume_w, st.session_state.macd_w, st.session_state.ma_slope_w, st.session_state.markov_w)
confidence_combinations = []
for weight_tuple in weight_product:
current_weights_map = dict(zip(base_weight_keys, base_weights_tuple))
for factor_key, new_weight_value in zip(factors_to_optimise, weight_tuple):
weight_key = all_factor_info[factor_key]['weight_key']
current_weights_map[weight_key] = new_weight_value
final_weights_tuple = (current_weights_map['rsi_w'], current_weights_map['vol_w'], current_weights_map['trend_w'], current_weights_map['volume_w'], current_weights_map['macd_w'], current_weights_map['ma_slope_w'], current_weights_map['markov_w'])
confidence_combinations.append({
'toggles': fixed_toggles, 'weights': final_weights_tuple, 'adx_settings': fixed_adx_settings,
'rsi_logic': fixed_rsi_logic, 'primary_driver': fixed_primary_driver, 'markov_setup': fixed_markov_setup,
'exit_logic': fixed_exit_logic, 'exit_thresh': fixed_exit_thresh, 'smart_trailing_stop': fixed_smart_trailing_stop,
'smart_exit_atr_period': smart_atr_p, 'smart_exit_atr_multiplier': smart_atr_m, 'intelligent_tsl_pct': fixed_intelligent_tsl
})
total_combinations = len(confidence_combinations)
num_cores = cpu_count()
st.info(f"Starting {side.upper()} weight optimisation on {num_cores} cores... Testing {total_combinations} weight combinations.")
if st.session_state.run_mode.startswith("Analyse Full List"):
tickers_to_run = [col for col in master_df.columns if not ('_Volume' in str(col) or '_High' in str(col) or '_Low' in str(col))]
else:
tickers_to_run = [st.session_state.ticker_select]
date_range = (pd.Timestamp(st.session_state.start_date), pd.Timestamp(st.session_state.end_date))
power = 2 if use_sq_weighting else 1
status_text = st.empty(); status_text.text("Optimisation starting...")
progress_bar = st.progress(0)
worker_func = partial(run_single_weight_test, base_params=base_params, master_df=master_df, optimise_for=side, tickers=tickers_to_run, date_range=date_range, power=power)
results_list = []
with Pool(processes=num_cores) as pool:
try:
iterator = pool.imap_unordered(worker_func, confidence_combinations)
for i, result_dict in enumerate(iterator, 1):
results_list.append(result_dict)
progress_bar.progress(i / total_combinations, text=f"Optimising... {i}/{total_combinations} combinations complete.")
except Exception as e:
st.error(f"An error occurred during multiprocessing: {e}")
status_text.text("Optimisation failed due to an error."); st.session_state.run_weight_optimisation = False; return
status_text.text("Optimisation complete. Calculating scores...")
if not results_list:
st.warning("Optimisation finished, but no valid results were found."); st.session_state.run_weight_optimisation = False; return
results_df = pd.DataFrame(results_list)
weights_df = results_df['weights'].apply(pd.Series)
results_df = pd.concat([results_df.drop('weights', axis=1), weights_df], axis=1)
min_trades_threshold = 10
if 'Total Trades' in results_df.columns: results_df = results_df[results_df['Total Trades'] >= min_trades_threshold].copy()
if results_df.empty:
st.warning(f"No results found with at least {min_trades_threshold} trades."); st.session_state.run_weight_optimisation = False; return
results_df['Strategy Score'] = results_df.apply(lambda row: calculate_strategy_score(row['Avg Profit/Trade'], row['Ticker G/B Ratio'], row['Total Trades']), axis=1)
results_df = results_df.sort_values(by='Strategy Score', ascending=False)
best_setup_series = results_df.iloc[0]
best_metric = best_setup_series['Strategy Score']
best_weights_dict = {key: float(best_setup_series[key]) for key in base_weight_keys}
status_text.empty()
st.success(f"Weight Optimisation Complete! Best Strategy Score: {best_metric:.2f}%")
st.subheader("Optimal Weights Found (based on Strategy Score):"); st.json(best_weights_dict)
st.session_state.best_weights = best_weights_dict
st.subheader("Full Weight Optimisation Results:")
display_cols = ["Strategy Score", "Avg Profit/Trade", "Ticker G/B Ratio", "Total Trades"] + list(base_weight_keys)
display_cols = [col for col in display_cols if col in results_df.columns]
formatters = {"Strategy Score": "{:.2f}%", "Avg Profit/Trade": "{:.2%}", "Ticker G/B Ratio": "{:.2f}"}
for w_key in base_weight_keys: formatters[w_key] = "{:.1f}"
st.dataframe(results_df[display_cols].style.format(formatters))
st.session_state.run_weight_optimisation = False
def apply_best_weights_to_widgets():
"""Loads optimised weights from st.session_state.best_weights into the sidebar widgets."""
if 'best_weights' in st.session_state and st.session_state.best_weights:
weights = st.session_state.best_weights
if 'rsi_w' in weights: st.session_state.rsi_w = weights['rsi_w']
if 'vol_w' in weights: st.session_state.vol_w = weights['vol_w']
if 'trend_w' in weights: st.session_state.trend_w = weights['trend_w']
if 'volume_w' in weights: st.session_state.volume_w = weights['volume_w']
if 'macd_w' in weights: st.session_state.macd_w = weights['macd_w']
if 'ma_slope_w' in weights: st.session_state.ma_slope_w = weights['ma_slope_w']
st.sidebar.success("Optimal weights loaded into sidebar!")
st.rerun()
else:
st.sidebar.error("No optimal weights found in session state.")
def apply_best_params_to_widgets():
"""Loads parameters from st.session_state.best_params into the sidebar widgets."""
if 'best_params' in st.session_state and st.session_state.best_params:
params = st.session_state.best_params
if 'large_ma_period' in params: st.session_state.ma_period = params['large_ma_period']
if 'bband_period' in params: st.session_state.bb_period = params['bband_period']
if 'bband_std_dev' in params: st.session_state.bb_std = params['bband_std_dev']
if 'confidence_threshold' in params: st.session_state.confidence_slider = params['confidence_threshold']
if 'long_entry_threshold_pct' in params: st.session_state.long_entry = params['long_entry_threshold_pct'] * 100
if 'long_exit_ma_threshold_pct' in params: st.session_state.long_exit = params['long_exit_ma_threshold_pct'] * 100
if 'long_trailing_stop_loss_pct' in params: st.session_state.long_sl = params['long_trailing_stop_loss_pct'] * 100
if 'long_delay_days' in params: st.session_state.long_delay = params['long_delay_days']
if 'long_entry_threshold_pct' in params: st.session_state.short_entry = params['long_entry_threshold_pct'] * 100
if 'long_exit_ma_threshold_pct' in params: st.session_state.short_exit = params['long_exit_ma_threshold_pct'] * 100
if 'long_trailing_stop_loss_pct' in params: st.session_state.short_sl = params['long_trailing_stop_loss_pct'] * 100
if 'long_delay_days' in params: st.session_state.short_delay = params['long_delay_days']
# --- NEW: Load Max Duration ---
if 'max_trading_days' in params: st.session_state.max_duration = params['max_trading_days']
# --- NEW: Load Catcher Offset ---
if 'catcher_stop_pct' in params: st.session_state.catcher_stop_pct = params['catcher_stop_pct'] * 100
st.sidebar.success("Optimal parameters loaded into sidebar!")
st.rerun()
else:
st.sidebar.error("No optimal parameters found in session state.")
# --- 5. Streamlit User Interface ---
def update_state():
"""
Callback to update the main session state from the 'widget_' keys.
This synchronizes all widgets.
"""
# Get all keys from session state
keys = list(st.session_state.keys())
# Loop through all keys that start with 'widget_'
for widget_key in keys:
if widget_key.startswith('widget_'):
# Find the main key (e.g., 'widget_ma_period' -> 'ma_period')
main_key = widget_key[len('widget_'):]
# Update the main key with the widget's value
if main_key in st.session_state:
st.session_state[main_key] = st.session_state[widget_key]
# --- UPDATED: Helper to check for blank rows (Checks Factors, Stats, Notes, AND Run status) ---
def is_row_blank(s):
"""Checks if a row from the user-defined table is blank."""
factors = ["RSI", "Volatility", "TREND", "Volume", "MACD", "MA Slope", "Markov", "ADX Filter"]
# Check 1: Are all factors Off?
all_factors_off = all(str(s.get(f, "Off")).lower() in ["off", "0", "0.0"] for f in factors)
# Check 2: Are stats zero?
stats_are_zero = s.get("Z_Num_Trades", 0) == 0
# Check 3: Is the note empty?
note_is_empty = str(s.get("Notes", "")).strip() == ""
# Check 4: Is the Run box unchecked?
run_is_unchecked = s.get("Run", False) is False
# The row is considered blank ONLY if ALL conditions are met
return all_factors_off and stats_are_zero and note_is_empty and run_is_unchecked
# --- [NEW] 6. Markov Chain Optimisation Functions ---
def run_single_markov_test(params, master_df, tickers, date_range):
"""
Worker function for the Markov optimisation.
Tests a single combination of (run_up_period, future_period).
"""
run_up_period = params['run_up']
future_period = params['future']
total_results = {
'Up -> Up': {'profit': 0.0, 'count': 0},
'Up -> Down': {'profit': 0.0, 'count': 0},
'Down -> Up': {'profit': 0.0, 'count': 0},
'Down -> Down': {'profit': 0.0, 'count': 0}
}
for ticker in tickers:
if ticker not in master_df.columns:
continue
# Get only the 'Close' price for this ticker
ticker_data = master_df[ticker].loc[date_range[0]:date_range[1]].to_frame(name='Close')
# --- [NEW] CLEANING LOGIC TO PREVENT INFINITY ---
ticker_data['Close'] = pd.to_numeric(ticker_data['Close'], errors='coerce').replace(0, np.nan)
ticker_data.dropna(subset=['Close'], inplace=True)
# --- [END NEW] ---
if ticker_data.empty or ticker_data['Close'].isna().all():
continue
# 1. Calculate Run-Up state (past)
# pct_change(N) calculates (price[t] / price[t-N]) - 1
ticker_data['RunUp_Return'] = ticker_data['Close'].pct_change(periods=run_up_period)
ticker_data['RunUp_State'] = ticker_data['RunUp_Return'].apply(lambda x: 'Up' if x > 0 else 'Down')
# 2. Calculate Future state (what *actually* happened)
# shift(-N) looks *forward* N periods
ticker_data['Future_Return'] = (ticker_data['Close'].shift(-future_period) / ticker_data['Close']) - 1
# 3. Drop NaNs created by the shifts/pct_change
ticker_data.dropna(subset=['RunUp_State', 'Future_Return'], inplace=True)
if ticker_data.empty:
continue
# 4. Tally results
# We use .values for speed
runup_states = ticker_data['RunUp_State'].values
future_returns = ticker_data['Future_Return'].values
for i in range(len(runup_states)):
state = runup_states[i]
ret = future_returns[i]
# --- [NEW] Check for infinity returns ---
if not np.isfinite(ret):
continue
# --- [END NEW] ---
if state == 'Up':
# Strategy: Bet on Up
total_results['Up -> Up']['profit'] += ret
total_results['Up -> Up']['count'] += 1
# Strategy: Bet on Down
total_results['Up -> Down']['profit'] += (ret * -1)
total_results['Up -> Down']['count'] += 1
else: # State is 'Down'
# Strategy: Bet on Up
total_results['Down -> Up']['profit'] += ret
total_results['Down -> Up']['count'] += 1
# Strategy: Bet on Down
total_results['Down -> Down']['profit'] += (ret * -1)
total_results['Down -> Down']['count'] += 1
# 5. Compile final metrics
final_report = []
for strategy, results in total_results.items():
if results['count'] > 0:
avg_pnl = (results['profit'] / results['count'])
# We use a simple score: Avg P&L * log(count) to value consistency
score = avg_pnl * np.log10(results['count'] + 1)
else:
avg_pnl = 0
score = 0
final_report.append({
'Run-Up Period': run_up_period,
'Future Period': future_period,
'Strategy': strategy,
'Avg. P/L': avg_pnl,
'Total Occurrences': results['count'],
'Total P/L': results['profit'],
'Score': score
})
return final_report
def generate_and_run_markov_optimisation(master_df, main_content_placeholder, side):
"""
Main UI and orchestrator for Markov Chain optimisation.
[NEW] This version calculates an 'Alpha Score per Day' to find the
most efficient predictive edge.
"""
st.session_state.summary_df = None
st.session_state.single_ticker_results = None
st.session_state.confidence_results_df = None
st.session_state.open_trades_df = None
st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None
st.session_state.best_params = None
st.session_state.best_weights = None
st.session_state.markov_results_df = None # Clear previous Markov results
st.session_state.best_markov_setup = None
with main_content_placeholder.container():
st.header(f"🔮 Finding Best Markov Probabilities ({side.title()})")
# --- Get UI inputs from session state ---
run_up_start = st.session_state.markov_run_up_start
run_up_end = st.session_state.markov_run_up_end
run_up_step = st.session_state.markov_run_up_step
future_start = st.session_state.markov_future_start
future_end = st.session_state.markov_future_end
future_step = st.session_state.markov_future_step
# --- Create parameter combinations ---
run_up_range = range(run_up_start, run_up_end + 1, run_up_step)
future_range = range(future_start, future_end + 1, future_step)
param_product = itertools.product(run_up_range, future_range)
param_combinations = [{
"run_up": p[0], "future": p[1]
} for p in param_product]
total_combinations = len(param_combinations)
if total_combinations == 0:
st.warning("No combinations to test. Check your ranges in the sidebar.")
st.session_state.run_markov_optimisation = False # Reset flag
st.stop()
num_cores = cpu_count()
st.info(f"Starting Markov optimisation on {num_cores} cores... Testing {total_combinations} period combinations.")
tickers_to_run = [col for col in master_df.columns if not ('_Volume' in str(col) or '_High' in str(col) or '_Low' in str(col))]
date_range = (pd.Timestamp(st.session_state.start_date), pd.Timestamp(st.session_state.end_date))
status_text = st.empty(); status_text.text("Optimisation starting...")
progress_bar = st.progress(0) # <-- The progress bar
worker_func = partial(run_single_markov_test, master_df=master_df, tickers=tickers_to_run, date_range=date_range)
results_list = []
with Pool(processes=num_cores) as pool:
try:
iterator = pool.imap_unordered(worker_func, param_combinations)
for i, result_group in enumerate(iterator, 1):
results_list.extend(result_group) # Add the 4 strategies
progress_bar.progress(i / total_combinations, text=f"Optimising... {i}/{total_combinations} combinations complete.")
except Exception as e:
st.error(f"An error occurred during multiprocessing: {e}")
status_text.text("Optimisation failed due to an error.");
st.session_state.run_markov_optimisation = False # Reset flag
return
status_text.text("Optimisation complete. Compiling results...")
if not results_list:
st.warning("Optimisation finished, but no valid results were found.")
st.session_state.run_markov_optimisation = False # Reset flag
return
results_df = pd.DataFrame(results_list)
# --- [NEW ALPHA SCORE PER DAY LOGIC] ---
# 1. Pivot the data to get all 4 strategies in one row per time period
pivot_df = results_df.pivot_table(
index=['Run-Up Period', 'Future Period'],
columns='Strategy',
values='Avg. P/L' # We use Avg. P/L for the alpha calculation
).reset_index()
# 2. Calculate the "Alpha Scores"
pivot_df['Long Alpha Score'] = pivot_df.get('Down -> Up', 0) - pivot_df.get('Up -> Up', 0)
pivot_df['Short Alpha Score'] = pivot_df.get('Up -> Down', 0) - pivot_df.get('Down -> Down', 0)
# 3. Calculate the "Alpha Score per Day"
pivot_df['Long Alpha Score per Day'] = pivot_df['Long Alpha Score'] / pivot_df['Future Period']
pivot_df['Short Alpha Score per Day'] = pivot_df['Short Alpha Score'] / pivot_df['Future Period']
# 4. Join this back to the original results to get counts, etc.
results_df = results_df.set_index(['Run-Up Period', 'Future Period'])
pivot_df = pivot_df.set_index(['Run-Up Period', 'Future Period'])
# Join the new Alpha Scores to the main results
results_df = results_df.join(pivot_df[['Long Alpha Score', 'Short Alpha Score', 'Long Alpha Score per Day', 'Short Alpha Score per Day']])
results_df = results_df.reset_index()
# 5. Determine which Alpha Score and strategies to show
if side == 'long':
score_to_use = 'Long Alpha Score per Day'
target_strategies = ['Down -> Up', 'Up -> Up']
else: # short
score_to_use = 'Short Alpha Score per Day'
target_strategies = ['Up -> Down', 'Down -> Down']
# Filter for the strategies we care about for this 'side'
final_df = results_df[results_df['Strategy'].isin(target_strategies)].copy()
if final_df.empty:
st.warning(f"No valid results found for {side}-biased strategies.")
st.session_state.run_markov_optimisation = False # Reset flag
return
# 6. Sort by the new "Alpha Score per Day"
final_df = final_df.sort_values(by=score_to_use, ascending=False)
# Get the best row (which will be the one with the highest Alpha per Day)
best_setup_series = final_df.iloc[0]
st.success(f"Markov Optimisation Complete! Best 'Alpha' strategy found:")
st.subheader("Best Markov Setup (by Alpha Score per Day):")
# Save the best setup to session state
st.session_state.best_markov_setup = best_setup_series.to_dict()
save_markov_setup(st.session_state.best_markov_setup) # <-- SAVE TO FILE
st.json(st.session_state.best_markov_setup)
st.subheader(f"Top 10 Setups ({side.title()}-biased, sorted by {score_to_use}):")
display_cols = [
'Strategy', 'Run-Up Period', 'Future Period', score_to_use, 'Avg. P/L', 'Total Occurrences'
]
st.dataframe(final_df.head(10)[display_cols].style.format({
score_to_use: "{:.4%}", # Format as percentage
"Avg. P/L": "{:.4%}",
}))
# --- [NEW] HEATMAP INFOGRAPHIC ---
st.subheader(f"Markov '{side.title()}' Alpha Score per Day Heatmap")
st.caption("This heatmap shows the most *efficient* signals (highest predictive value per day held).")
try:
# We use the pivot_df we created earlier, which has the scores
heatmap_data = pivot_df.pivot_table(
index='Run-Up Period',
columns='Future Period',
values=score_to_use # Plot the Alpha Score per Day
)
# Create the heatmap
st.dataframe(
heatmap_data.style
.background_gradient(cmap='RdYlGn', axis=None) # Red-Yellow-Green colormap
.format("{:.4%}", na_rep='-') # Format as percentage
)
except Exception as e:
st.warning(f"Could not generate heatmap. Error: {e}")
# --- [END NEW] ---
st.session_state.markov_results_df = final_df # Save the sorted results
st.session_state.run_markov_optimisation = False # Reset flag
# --- Corrected: Ensures rsi_logic AND primary_driver are passed ---
def run_confidence_optimisation(optimise_for, find_mode, master_df, main_content_placeholder, veto_factors):
st.session_state.summary_df = None
st.session_state.single_ticker_results = None
st.session_state.open_trades_df = None
st.session_state.best_params = None
st.session_state.advisor_df = None
st.session_state.worst_confidence_setups_list = []
with main_content_placeholder.container():
num_cores = cpu_count()
st.info(f"Starting to find **{find_mode.upper()}** {optimise_for.upper()} setups on {num_cores} CPU cores...")
st.caption("Note: This process runs in parallel for maximum speed. The status below updates as each strategy combination completes.")
# --- [NEW] Added Markov ---
factors = ['RSI', 'Volatility', 'TREND', 'Volume', 'MACD', 'MA Slope', 'Markov']
num_factors = len(factors)
if find_mode == 'worst':
if veto_factors is None or len(veto_factors) != num_factors:
st.error("Internal error: Veto factors not provided correctly.")
return
target_combo = veto_factors
on_off_combos = [c for c in itertools.product([False, True], repeat=num_factors) if c == target_combo]
if not on_off_combos or not any(on_off_combos[0]):
st.warning("Please select at least one factor for the Veto search."); return
else:
on_off_combos = [c for c in itertools.product([False, True], repeat=num_factors) if any(c)]
thresholds_to_test = [20, 25, 30, 35, 40, 45, 50]
tasks = list(itertools.product(on_off_combos, thresholds_to_test, [1.0]))
total_tasks = len(tasks)
base_params = { "large_ma_period": st.session_state.ma_period, "bband_period": st.session_state.bb_period, "bband_std_dev": st.session_state.bb_std, "long_entry_threshold_pct": st.session_state.long_entry / 100, "long_exit_ma_threshold_pct": st.session_state.long_exit / 100, "long_trailing_stop_loss_pct": st.session_state.long_sl / 100, "long_delay_days": st.session_state.long_delay, "short_entry_threshold_pct": st.session_state.short_entry / 100, "short_exit_ma_threshold_pct": st.session_state.short_exit / 100, "short_trailing_stop_loss_pct": st.session_state.short_sl / 100, "short_delay_days": st.session_state.short_delay, }
tickers_to_run = sorted([col for col in master_df.columns if not ('_Volume' in str(col) or '_High' in str(col) or '_Low' in str(col))])
date_range = (pd.Timestamp(st.session_state.start_date), pd.Timestamp(st.session_state.end_date))
factor_weights = {
"rsi": st.session_state.rsi_w, "vol": st.session_state.vol_w,
"trend": st.session_state.trend_w, "volume": st.session_state.volume_w,
"macd": st.session_state.macd_w, "ma_slope": st.session_state.ma_slope_w,
"markov": st.session_state.markov_w,
"use_adx": st.session_state.use_adx_filter,
"adx_thresh": st.session_state.adx_threshold,
"adx_period": st.session_state.adx_period,
"rsi_logic": st.session_state.rsi_logic,
"primary_driver": st.session_state.primary_driver,
'markov_setup': st.session_state.get('best_markov_setup'),
'exit_logic': st.session_state.exit_logic_type,
'exit_thresh': st.session_state.exit_confidence_threshold,
'smart_trailing_stop': st.session_state.smart_trailing_stop_pct / 100.0,
'smart_exit_atr_period': st.session_state.smart_exit_atr_period,
'smart_exit_atr_multiplier': st.session_state.smart_exit_atr_multiplier,
'intelligent_tsl_pct': st.session_state.intelligent_tsl_pct / 100.0
}
worker_func = partial(run_single_confidence_test, base_params=base_params, master_df=master_df, date_range=date_range, tickers_to_run=tickers_to_run, optimise_for=optimise_for, factor_weights=factor_weights)
results_list = []
# --- UI Feedback Elements ---
progress_bar = st.progress(0, text="Initializing parallel engines...")
status_text = st.empty()
with Pool(processes=num_cores) as pool:
try:
iterator = pool.imap_unordered(worker_func, tasks)
for i, result in enumerate(iterator, 1):
if isinstance(result, dict) and "Trade G/B Ratio" in result:
results_list.append(result)
# --- DYNAMIC STATUS UPDATE ---
# This creates the "Screaming Past" effect for strategies
active = []
if result['RSI']: active.append("RSI")
if result['Volatility']: active.append("Vol")
if result['TREND']: active.append("Trend")
if result['Volume']: active.append("VolSpike")
if result['MACD']: active.append("MACD")
if result['MA Slope']: active.append("Slope")
if result['Markov']: active.append("Markov")
status_msg = f"Analyzed: [{' + '.join(active)}] @ Thresh {result['Conf. Threshold']} -> Profit: {result['Avg Profit/Trade']:.2%} ({result['Total Trades']} trades)"
status_text.text(status_msg)
else: print(f"Warning: Worker returned invalid result format: {result}")
# Update progress bar
progress_bar.progress(i / total_tasks, text=f"Optimising... {i}/{total_tasks} combinations complete.")
except Exception as e:
st.error(f"An error occurred during multiprocessing: {e}")
progress_bar.empty(); return
if results_list:
results_df = pd.DataFrame(results_list)
# --- [NEW] FILTER OUT "INFINITY" RESULTS ---
# We exclude results where Trade G/B Ratio is > 50000 (the placeholder for 0 losses).
# This prevents low-volume "perfect" trades from gaming the Uncapped Scoring system.
if 'Trade G/B Ratio' in results_df.columns:
results_df = results_df[results_df['Trade G/B Ratio'] < 50000].copy()
if results_df.empty:
st.warning("No valid setups found after removing infinite ratio outliers.")
st.session_state.run_confidence_optimisation = False
return
# --- Sort by the NEW 3-Factor Weighted Score ---
sort_col = "Good Score" if find_mode == 'best' else "Bad Score"
if sort_col in results_df.columns:
fill_value = -np.inf if find_mode == 'best' else 0
results_df[sort_col] = results_df[sort_col].fillna(fill_value)
results_df = results_df.sort_values(by=sort_col, ascending=False).reset_index(drop=True)
else: st.error(f"Sorting column '{sort_col}' not found."); return
for factor in factors:
if factor in results_df.columns:
results_df[factor] = results_df[factor].apply(lambda x: "On" if x else "Off")
# Save all results to state so they persist
st.session_state.confidence_results_df = results_df
if find_mode == 'best':
st.subheader(f"📊 Top 60 Confidence Setups ({optimise_for.title()} Trades)")
if not results_df.empty:
best_setup = results_df.iloc[0]
st.session_state.best_confidence_setup = best_setup.to_dict()
# Save TOP setups using the same DF (which is now sorted by Weighted Score)
save_top_setups(results_df, optimise_for)
else:
st.warning("No valid 'best' setups found.")
st.session_state.best_confidence_setup = None
else: # 'worst' mode
valid_worst_df = results_df[results_df['Trade G/B Ratio'] < 99999.0].copy()
if not valid_worst_df.empty:
valid_worst_df['Bad Score'] = valid_worst_df['Bad Score'].fillna(0)
valid_worst_df = valid_worst_df.sort_values(by="Bad Score", ascending=False).reset_index(drop=True)
st.session_state.worst_confidence_setups_list = valid_worst_df.head(4).to_dict('records')
st.info(f"Top {len(st.session_state.worst_confidence_setups_list)} valid 'worst' setups ready.")
st.subheader(f"🏆 Top 60 Valid Worst Setups ({optimise_for.title()} Trades)")
results_df = valid_worst_df # Use this for display
else:
st.warning("No valid 'worst' setups found.")
st.session_state.worst_confidence_setups_list = []
# --- Display the table ---
# [CHANGE] Rename the column for display purposes
display_df_conf = results_df.head(60).rename(columns={'Net % Return': 'Moneypile Score'})
# [CHANGE] Update formatter key to 'Moneypile Score' and remove '%' from format string
conf_formatters = {
"Avg Profit/Trade": "{:.2%}",
"Ticker G/B Ratio": "{:.2f}",
"Trade G/B Ratio": "{:.2f}",
"Moneypile Score": "{:.1f}",
"Avg Entry Conf.": "{:.1f}%",
"Good Score": "{:.4f}",
"Bad Score": "{:.4f}",
"Norm. Score %": "{:.2f}%"
}
if 'MACD' in display_df_conf.columns: conf_formatters['MACD'] = '{}'
if 'MA Slope' in display_df_conf.columns: conf_formatters['MA Slope'] = '{}'
if 'Markov' in display_df_conf.columns: conf_formatters['Markov'] = '{}'
valid_conf_formatters = {k: (lambda val, fmt=v: fmt.format(val) if pd.notna(val) else '-') for k, v in conf_formatters.items() if k in display_df_conf.columns}
st.dataframe(display_df_conf.style.format(valid_conf_formatters, na_rep='-'))
else:
st.warning("Optimisation completed but no results were generated."); st.session_state.confidence_results_df = None
st.session_state.run_confidence_optimisation = False # Reset flag
def generate_user_advisor_ui_and_run(main_df):
st.header("⚙️ User-Defined Advisor Setups")
data_key = "user_setups_data"
widget_key = "user_setups_editor_state"
def load_saved_setups_callback():
st.session_state[data_key] = load_user_setups()
if widget_key in st.session_state: del st.session_state[widget_key]
def clear_all_setups_callback():
blank_setups = [load_user_setups()[0].copy() for _ in range(20)]
st.session_state[data_key] = blank_setups
if widget_key in st.session_state: del st.session_state[widget_key]
st.caption("Add advice/notes in the 'Notes' column. Check 'Run' to test.")
col_config = {
"Run": st.column_config.CheckboxColumn("Run", width="small"),
# Added Notes Column
"Notes": st.column_config.TextColumn("Notes", help="Advice/Description for this setup", width="medium"),
"RSI": st.column_config.TextColumn("RSI", width="small"),
"Volatility": st.column_config.TextColumn("Volatility", width="small"),
"TREND": st.column_config.TextColumn("TREND", width="small"),
"Volume": st.column_config.TextColumn("Volume", width="small"),
"MACD": st.column_config.TextColumn("MACD", width="small"),
"MA Slope": st.column_config.TextColumn("MA Slope", width="small"),
"Markov": st.column_config.TextColumn("Markov", width="small"),
"ADX Filter": st.column_config.TextColumn("ADX Filter", width="small"),
"Conf. Threshold": st.column_config.NumberColumn("Conf.", width="small"),
"Large MA Period": st.column_config.NumberColumn("MA", width="small"),
"Bollinger Band Period": st.column_config.NumberColumn("BB", width="small"),
"Bollinger Band Std Dev": st.column_config.NumberColumn("Std", format="%.1f", width="small"),
"Catcher Offset (%)": st.column_config.NumberColumn("Catcher %", format="%.1f", width="small"),
"Long Entry Threshold (%)": st.column_config.NumberColumn("L Entry", format="%.1f", width="small"),
"Long Exit Threshold (%)": st.column_config.NumberColumn("L Exit", format="%.1f", width="small"),
"Long Stop Loss (%)": st.column_config.NumberColumn("L TSL", format="%.1f", width="small"),
"Long Delay (Days)": st.column_config.NumberColumn("L Delay", width="small"),
"Short Entry Threshold (%)": st.column_config.NumberColumn("S Entry", format="%.1f", width="small"),
"Short Exit Threshold (%)": st.column_config.NumberColumn("S Exit", format="%.1f", width="small"),
"Short Stop Loss (%)": st.column_config.NumberColumn("S TSL", format="%.1f", width="small"),
"Short Delay (Days)": st.column_config.NumberColumn("S Delay", width="small"),
"Z_Avg_Profit": st.column_config.NumberColumn("Avg %", format="%.2f%%", disabled=True, width="small"),
"Z_Num_Trades": st.column_config.NumberColumn("Trades", disabled=True, width="small"),
"Z_WL_Ratio": st.column_config.NumberColumn("W/L", format="%.2f", disabled=True, width="small"),
}
edited_setups_list = st.data_editor(
st.session_state[data_key],
column_config=col_config,
num_rows="dynamic",
key=widget_key,
column_order=[
"Run", "Notes", # <--- Notes column appears here (2nd)
"RSI", "Volatility", "TREND", "Volume", "MACD", "MA Slope", "Markov", "ADX Filter",
"Conf. Threshold",
"Large MA Period", "Bollinger Band Period", "Bollinger Band Std Dev","Catcher Offset (%)",
"Long Entry Threshold (%)", "Long Exit Threshold (%)", "Long Stop Loss (%)", "Long Delay (Days)",
"Short Entry Threshold (%)", "Short Exit Threshold (%)", "Short Stop Loss (%)", "Short Delay (Days)",
"Z_Avg_Profit", "Z_Num_Trades", "Z_WL_Ratio"
]
)
col1, col2, col3, col4, col5 = st.columns([1, 1, 1, 2, 2])
if col1.button("💾 Save Setups"):
save_user_setups(edited_setups_list[:20])
st.session_state[data_key] = edited_setups_list[:20]
col2.button("🔄 Load Saved", on_click=load_saved_setups_callback)
col3.button("🗑️ Clear All Setups", on_click=clear_all_setups_callback)
if col4.button("Scan with User Setups", type="primary"):
current_setups_list = edited_setups_list[:20]
setups_to_run = [s for s in current_setups_list if s.get("Run") == True]
if len(setups_to_run) == 0: st.error("No setups selected.")
elif len(setups_to_run) > 8: st.error(f"Selected {len(setups_to_run)} setups. Max is 8.")
else:
st.session_state.run_scan_user_setups = True
st.session_state.run_user_advisor_setup = False
st.session_state.advisor_df = None
st.session_state.setups_to_scan = setups_to_run
st.rerun()
if col5.button("Back to Main Analysis"):
st.session_state.run_user_advisor_setup = False; st.session_state.advisor_df = None; st.rerun()
def generate_advisor_report(main_df):
"""Handles the UI for running the 'Top Setups' advisor."""
st.header("📈 Advanced Advisor Report (Top 8 Setups)")
top_setups = load_top_setups()
if not top_setups:
st.warning("No saved top setups found. Run 'Find Best Confidence' first.")
if st.button("Back"):
st.session_state.run_advanced_advisor = False
st.rerun()
return
side = st.radio("Generate report for which saved setups?", ("Long", "Short"), horizontal=True, key="advisor_side_select_top")
setups_to_run_all = top_setups.get(side.lower())
if not setups_to_run_all:
st.warning(f"No saved top {side.lower()} setups found.")
if st.button("Back"):
st.session_state.run_advanced_advisor = False
st.rerun()
return
if st.button(f"Scan using Top {side} Setups", type="primary"):
st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None
run_advisor_scan(main_df, setups_to_run_all, "Top Setups") # Call the scanner
if st.button("Back"):
st.session_state.run_advanced_advisor = False
st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None
st.rerun()
def apply_best_weights_to_widgets():
"""Loads optimised weights from st.session_state.best_weights into the sidebar widgets."""
if 'best_weights' in st.session_state and st.session_state.best_weights:
weights = st.session_state.best_weights
if 'rsi_w' in weights: st.session_state.rsi_w = weights['rsi_w']
if 'vol_w' in weights: st.session_state.vol_w = weights['vol_w']
if 'trend_w' in weights: st.session_state.trend_w = weights['trend_w']
if 'volume_w' in weights: st.session_state.volume_w = weights['volume_w']
if 'macd_w' in weights: st.session_state.macd_w = weights['macd_w']
if 'ma_slope_w' in weights: st.session_state.ma_slope_w = weights['ma_slope_w']
st.sidebar.success("Optimal weights loaded into sidebar!")
st.rerun()
else:
st.sidebar.error("No optimal weights found in session state.")
def apply_best_params_to_widgets():
"""Loads parameters from st.session_state.best_params into the sidebar widgets."""
if 'best_params' in st.session_state and st.session_state.best_params:
params = st.session_state.best_params
if 'large_ma_period' in params: st.session_state.ma_period = params['large_ma_period']
if 'bband_period' in params: st.session_state.bb_period = params['bband_period']
if 'bband_std_dev' in params: st.session_state.bb_std = params['bband_std_dev']
if 'confidence_threshold' in params: st.session_state.confidence_slider = params['confidence_threshold']
if 'long_entry_threshold_pct' in params: st.session_state.long_entry = params['long_entry_threshold_pct'] * 100
if 'long_exit_ma_threshold_pct' in params: st.session_state.long_exit = params['long_exit_ma_threshold_pct'] * 100
if 'long_trailing_stop_loss_pct' in params: st.session_state.long_sl = params['long_trailing_stop_loss_pct'] * 100
if 'long_delay_days' in params: st.session_state.long_delay = params['long_delay_days']
if 'long_entry_threshold_pct' in params: st.session_state.short_entry = params['long_entry_threshold_pct'] * 100
if 'long_exit_ma_threshold_pct' in params: st.session_state.short_exit = params['long_exit_ma_threshold_pct'] * 100
if 'long_trailing_stop_loss_pct' in params: st.session_state.short_sl = params['long_trailing_stop_loss_pct'] * 100
if 'long_delay_days' in params: st.session_state.long_delay = params['long_delay_days']
st.sidebar.success("Optimal parameters loaded into sidebar!")
st.rerun()
else:
st.sidebar.error("No optimal parameters found in session state.")
# --- 5. Streamlit User Interface ---
def update_state():
"""
Callback to update the main session state from the 'widget_' keys.
This synchronizes all widgets.
"""
# Get all keys from session state
keys = list(st.session_state.keys())
# Loop through all keys that start with 'widget_'
for widget_key in keys:
if widget_key.startswith('widget_'):
# Find the main key (e.g., 'widget_ma_period' -> 'ma_period')
main_key = widget_key[len('widget_'):]
# Update the main key with the widget's value
if main_key in st.session_state:
st.session_state[main_key] = st.session_state[widget_key]
def convert_results_to_csv(summary_df, params):
"""
Combines the Performance Summary and the Configuration Settings into a single CSV string.
"""
if summary_df is None or summary_df.empty:
return ""
# 1. Get Performance Data (First Row)
perf_data = summary_df.iloc[0].to_dict()
# 2. Get Settings Data (From Session State/Params)
settings_data = {
"--- SETTINGS ---": "", # Separator
"Start Date": params.get('start_date'),
"End Date": params.get('end_date'),
"Primary Driver": params.get('primary_driver'),
"Confidence Threshold": params.get('confidence_threshold'),
"RSI Weight": params.get('rsi_w') if params.get('use_rsi') else "Off",
"Volatility Weight": params.get('vol_w') if params.get('use_volatility') else "Off",
"Trend Weight": params.get('trend_w') if params.get('use_trend') else "Off",
"Volume Weight": params.get('vol_w_val') if params.get('use_volume') else "Off",
"MACD Weight": params.get('macd_w') if params.get('use_macd') else "Off",
"MA Slope Weight": params.get('ma_slope_w') if params.get('use_ma_slope') else "Off",
"Markov Weight": params.get('markov_w') if params.get('use_markov') else "Off",
"MFI Weight": params.get('mfi_w') if params.get('use_mfi') else "Off",
"SuperTrend Weight": params.get('supertrend_w') if params.get('use_supertrend') else "Off",
"ADX Filter": f"On (< {params.get('adx_threshold')})" if params.get('use_adx_filter') else "Off",
"BB Period": params.get('bband_period'),
"BB Std Dev": params.get('bband_std_dev'),
"MA Period": params.get('large_ma_period'),
"Long Entry Thresh %": params.get('long_entry_threshold_pct'),
"Short Entry Thresh %": params.get('short_entry_threshold_pct'),
"Exit Logic": params.get('exit_logic_type'),
"Smart TSL %": params.get('smart_trailing_stop_pct')
}
# 3. Combine into one dictionary
combined_data = {**perf_data, **settings_data}
# 4. Create DataFrame and convert to CSV
export_df = pd.DataFrame([combined_data])
return export_df.to_csv(index=False).encode('utf-8')
def main():
if 'run_advanced_advisor' not in st.session_state: st.session_state.run_advanced_advisor = False
if 'run_user_advisor_setup' not in st.session_state: st.session_state.run_user_advisor_setup = False
# 2. Define ALL Defaults (Prevents "AttributeError" crashes)
defaults = {
'start_date': date(2024, 1, 1), 'end_date': date.today(),
'bband_period': 20, 'bband_std_dev': 2.0,
'long_entry_threshold_pct': 0.0, 'short_entry_threshold_pct': 0.0,
'long_exit_ma_threshold_pct': 0.0, 'short_exit_ma_threshold_pct': 0.0,
'confidence_threshold': 50, 'max_long_duration': 60, 'max_short_duration': 60,
'long_trailing_stop_loss_pct': 0.0, 'short_trailing_stop_loss_pct': 0.0,
'primary_driver': 'Bollinger Bands',
'use_rsi': True, 'rsi_w': 0.5,
'use_volatility': True, 'vol_w': 0.5,
'use_trend': True, 'trend_w': 0.5,
'use_volume': True, 'vol_w_val': 0.5,
'use_macd': False, 'macd_w': 0.5,
'use_ma_slope': False, 'ma_slope_w': 0.5,
'use_markov': False, 'markov_w': 0.5,
# New Indicators
'use_mfi': False, 'mfi_w': 0.5,
'use_supertrend': False, 'supertrend_w': 0.5,
# Filters & Exits
'use_adx_filter': False, 'adx_threshold': 25, 'rsi_logic': 'Level',
'exit_logic_type': 'Standard (Price-Based)',
'smart_trailing_stop_pct': 0.05,
'smart_exit_atr_period': 14, 'smart_exit_atr_multiplier': 3.0,
'intelligent_tsl_pct': 1.0,
'use_ma_floor_filter': False,
'catcher_stop_pct': 0.03,
'long_delay_days': 0, 'short_delay_days': 0,
'long_score_95_percentile': None, 'short_score_95_percentile': None,
'veto_setup_list': None,
'advisor_df': None, 'confidence_results_df': None,
'single_ticker_results': None, 'summary_df': None,
'open_trades_df': None, 'load_message': None,
'outlier_report': None,
'best_markov_setup': None,
# Optimization
'ma_period': 50, 'bb_period': 20, 'bb_std': 2.0,
'confidence_slider': 50, 'long_entry': 0.0, 'long_exit': 0.0,
'long_sl': 0.0, 'long_delay': 0, 'short_entry': 0.0,
'short_exit': 0.0, 'short_sl': 0.0, 'short_delay': 0, 'max_duration': 60,
'volume_w': 0.5 # fallback for vol_w_val naming differences
}
# 3. Apply Defaults if Missing
for key, val in defaults.items():
if key not in st.session_state: st.session_state[key] = val
# 4. Helper to update state from widgets (Fixed)
def update_state():
keys = list(st.session_state.keys())
for widget_key in keys:
if widget_key.startswith('widget_'):
main_key = widget_key[len('widget_'):]
if main_key in st.session_state:
st.session_state[main_key] = st.session_state[widget_key]
st.set_page_config(page_title="Quant Trader", page_icon="🔴", layout="wide")
# --- [FIX 1: Robust Initialization] ---
# We check for 'run_mode' explicitly to fix the crash if session state is stale
if 'first_run' not in st.session_state or 'run_mode' not in st.session_state:
st.session_state.first_run = True
defaults = load_settings()
st.session_state.widget_defaults = defaults
st.session_state.veto_setup_list = load_veto_setup()
st.session_state.use_ma_floor_filter = defaults.get("use_ma_floor_filter", True)
# Unpack all widget defaults into session state
st.session_state.ticker_select = None
st.session_state.run_mode = "Analyse Full List" # <--- This was missing in your stale state
st.session_state.start_date = None
st.session_state.end_date = None
# Section 2 Defaults
st.session_state.use_rsi = defaults.get('use_rsi', True)
st.session_state.rsi_w = defaults.get('rsi_w', 1.0)
st.session_state.rsi_logic = defaults.get('rsi_logic', 'Crossover')
st.session_state.primary_driver = defaults.get('primary_driver', 'Bollinger Bands')
st.session_state.exit_logic_type = defaults.get('exit_logic_type', 'Standard (Price-Based)')
st.session_state.exit_confidence_threshold = defaults.get('exit_confidence_threshold', 50)
st.session_state.smart_trailing_stop_pct = defaults.get('smart_trailing_stop_pct', 5.0)
st.session_state.use_vol = defaults.get('use_vol', True)
st.session_state.vol_w = defaults.get('vol_w', 1.0)
st.session_state.use_trend = defaults.get('use_trend', True)
st.session_state.trend_w = defaults.get('trend_w', 1.5)
st.session_state.use_volume = defaults.get('use_volume', True)
st.session_state.volume_w = defaults.get('volume_w', 1.0)
st.session_state.use_adx_filter = defaults.get('use_adx_filter', True)
st.session_state.adx_threshold = defaults.get('adx_threshold', 25.0)
st.session_state.adx_period = defaults.get('adx_period', 14)
st.session_state.use_macd = defaults.get('use_macd', True)
st.session_state.macd_w = defaults.get('macd_w', 1.0)
st.session_state.use_ma_slope = defaults.get('use_ma_slope', True)
st.session_state.ma_slope_w = defaults.get('ma_slope_w', 0.5)
st.session_state.use_markov = defaults.get('use_markov', False)
st.session_state.markov_w = defaults.get('markov_w', 1.0)
st.session_state.confidence_slider = defaults.get("confidence_threshold", 50)
st.session_state.smart_exit_atr_period = defaults.get("smart_exit_atr_period", 14)
st.session_state.smart_exit_atr_multiplier = defaults.get("smart_exit_atr_multiplier", 3.0)
st.session_state.intelligent_tsl_pct = defaults.get("intelligent_tsl_pct", 0.60) * 100
st.session_state.catcher_stop_pct = defaults.get("catcher_stop_pct", 0.0) * 100
# Section 3 Defaults
st.session_state.ma_period = defaults.get("large_ma_period", 50)
st.session_state.bb_period = defaults.get("bband_period", 20)
st.session_state.bb_std = defaults.get("bband_std_dev", 2.0)
st.session_state.long_entry = defaults.get("long_entry_threshold_pct", 0.0) * 100
st.session_state.long_exit = defaults.get("long_exit_ma_threshold_pct", 0.0) * 100
st.session_state.long_sl = defaults.get("long_trailing_stop_loss_pct", 8.0) * 100
st.session_state.long_delay = defaults.get("long_delay_days", 0)
st.session_state.short_entry = defaults.get("short_entry_threshold_pct", 0.0) * 100
st.session_state.short_exit = defaults.get("short_exit_ma_threshold_pct", 0.0) * 100
st.session_state.short_sl = defaults.get("short_trailing_stop_loss_pct", 8.0) * 100
st.session_state.short_delay = defaults.get("short_delay_days", 0)
st.session_state.max_duration = defaults.get("max_trading_days", 60)
st.session_state.max_long_duration = defaults.get("max_long_duration", 60)
st.session_state.max_short_duration = defaults.get("max_short_duration", 10)
# Section 4/5/6/7 Init (Optimisation variables)
st.session_state.sq_params_toggle = False
st.session_state.opt_ma_cb = False; st.session_state.opt_bb_cb = False; st.session_state.opt_std_cb = False
st.session_state.opt_conf_cb = False; st.session_state.opt_sl_cb = False; st.session_state.opt_delay_cb = False
st.session_state.opt_entry_cb = False; st.session_state.opt_exit_cb = False; st.session_state.opt_duration_cb = False
# Init Opt Ranges (Defaults)
st.session_state.ma_start = 50; st.session_state.ma_end = 55; st.session_state.ma_step = 5
st.session_state.bb_start = 20; st.session_state.bb_end = 25; st.session_state.bb_step = 5
st.session_state.std_start = 2.0; st.session_state.std_end = 2.1; st.session_state.std_step = 0.1
st.session_state.conf_start = 50; st.session_state.conf_end = 60; st.session_state.conf_step = 5
st.session_state.sl_start = 0.0; st.session_state.sl_end = 2.0; st.session_state.sl_step = 0.5
st.session_state.delay_start = 0; st.session_state.delay_end = 1; st.session_state.delay_step = 1
st.session_state.entry_start = 0.0; st.session_state.entry_end = 0.5; st.session_state.entry_step = 0.1
st.session_state.exit_start = 0.0; st.session_state.exit_end = 0.5; st.session_state.exit_step = 0.1
st.session_state.dur_start = 30; st.session_state.dur_end = 90; st.session_state.dur_step = 10
st.session_state.veto_rsi_cb = True; st.session_state.veto_vol_cb = True; st.session_state.veto_trend_cb = True
st.session_state.veto_volume_cb = True; st.session_state.veto_macd_cb = False; st.session_state.veto_ma_slope_cb = False
st.session_state.sq_weights_toggle = False
# Markov Init
st.session_state.markov_run_up_start = 5; st.session_state.markov_run_up_end = 20; st.session_state.markov_run_up_step = 2
st.session_state.markov_future_start = 3; st.session_state.markov_future_end = 10; st.session_state.markov_future_step = 1
st.session_state.best_markov_setup = load_markov_setup()
# Data Editor Init
st.session_state["user_setups_data"] = []
loaded_setups = load_user_setups()
processed = []
factor_cols = ["RSI", "Volatility", "TREND", "Volume", "MACD", "MA Slope", "Markov"]
for s in loaded_setups:
new_s = s.copy()
for k, v in s.items():
if k in factor_cols: new_s[k] = str(v)
elif k == "Run": new_s[k] = bool(v)
processed.append(new_s)
st.session_state["user_setups_data"] = processed
# --- Password Auth Init ---
st.session_state.dev_authenticated = False
# --- [Load Data Logic] ---
if 'master_df' not in st.session_state or st.session_state.master_df is None:
with st.spinner("Loading and cleaning data..."):
master_df, load_message = load_all_data('csv_data')
if master_df is None: st.error(load_message); st.stop()
master_df, outlier_report = clean_data_and_report_outliers(master_df)
st.session_state.master_df = master_df
st.session_state.ticker_list = sorted([col for col in master_df.columns if not ('_Volume' in str(col) or '_High' in str(col) or '_Low' in str(col))])
min_date = master_df.index.min().date()
max_date = master_df.index.max().date()
# --- NEW: Set Default Start to 1 Year Ago from TODAY ---
target_start_date = date.today() - relativedelta(years=1)
# Safety Check: Ensure the target date is valid within your CSV data range
if target_start_date < min_date:
st.session_state.start_date = min_date
elif target_start_date > max_date:
st.session_state.start_date = min_date # Fallback if data is older than 1 year ago
else:
st.session_state.start_date = target_start_date
st.session_state.end_date = max_date
if st.session_state.ticker_select is None and st.session_state.ticker_list:
st.session_state.ticker_select = st.session_state.ticker_list[0]
st.session_state.load_message = load_message
st.session_state.outlier_report = outlier_report
master_df = st.session_state.master_df
ticker_list = st.session_state.ticker_list
# --- HEADER / GREETING ---
st.title("🔴 🚀 Quant Trader")
current_hour = datetime.now().hour
if 5 <= current_hour < 12: greeting = "Good morning!"
elif 12 <= current_hour < 19: greeting = "Good afternoon!"
else: greeting = "Good evening!"
today = date.today()
day_str = today.strftime('%A, %d %B %Y')
proverbs = load_proverbs()
day_of_year = today.timetuple().tm_yday
proverb_index = (day_of_year - 1) % len(proverbs) if proverbs else 0
daily_proverb = proverbs[proverb_index] if proverbs else "Have a profitable day."
st.success(f"{greeting} Today is {day_str}. | *{daily_proverb}*")
main_content_placeholder = st.empty()
# --- SIDEBAR TOGGLE & PASSWORD LOGIC ---
# Default is TRUE (Production)
production_mode_toggle = st.sidebar.toggle("Production Mode", value=True, key="production_mode_toggle")
if not production_mode_toggle:
if not st.session_state.dev_authenticated:
st.sidebar.warning("Restricted Access")
pwd = st.sidebar.text_input("Enter Developer Password:", type="password", key="dev_password_input")
if pwd == "492211":
st.session_state.dev_authenticated = True
st.rerun()
else:
production_mode = True
else:
production_mode = False
else:
production_mode = True
if production_mode:
st.markdown("""""", unsafe_allow_html=True)
st.markdown("""""", unsafe_allow_html=True)
# --- [MOVED] RUN ANALYSIS BUTTON (PRODUCTION MODE) ---
if production_mode:
# Note: We do NOT use st.rerun() here. We let the script flow down.
if st.sidebar.button("Run Analysis", type="primary", key="run_analysis_prod_top",
help="Runs the test using the current settings below."):
# 1. Force a sync of all widgets to session state
update_state()
# 2. Set the flag to TRUE so the engine at the bottom runs
st.session_state.run_analysis_button = True
# 3. Clear previous results to ensure a fresh report
st.session_state.run_advanced_advisor = False
st.session_state.run_user_advisor_setup = False
st.session_state.run_scan_user_setups = False
st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None
st.session_state.param_results_df = None; st.session_state.confidence_results_df = None
st.session_state.summary_df = None; st.session_state.single_ticker_results = None
st.session_state.open_trades_df = None; st.session_state.last_run_stats = None
st.session_state.markov_results_df = None
st.sidebar.markdown("---")
# --- SIDEBAR SECTION 1: Select Test Mode & Dates (Common to both) ---
st.sidebar.header("1. Select Test Mode & Dates")
full_list_label = f"Analyse Full List ({len(ticker_list)} Tickers)"
if st.session_state.run_mode == "Analyse Full List":
st.session_state.run_mode = full_list_label
st.sidebar.radio("Mode:", ("Analyse Single Ticker", full_list_label), key='run_mode')
if st.session_state.get('run_mode') == "Analyse Single Ticker":
if not ticker_list:
st.sidebar.warning("No tickers loaded.")
else:
# Ensure valid selection logic (only reset if invalid/None)
if st.session_state.ticker_select not in ticker_list:
st.session_state.ticker_select = ticker_list[0]
try:
ticker_index = ticker_list.index(st.session_state.ticker_select)
except ValueError:
ticker_index = 0
# [FIXED] Removed the unconditional reset line that was here
st.sidebar.selectbox("Select a Ticker:", ticker_list, index=ticker_index, key='widget_ticker_select', on_change=update_state )
# --- UPDATED: Date Inputs (Fully Unrestricted 2000-2030) ---
st.sidebar.date_input(
"Start Date",
value=st.session_state.start_date,
min_value=date(2005, 1, 1),
max_value=date(2030, 12, 31), # Allow future dates to prevent locking
key='widget_start_date',
on_change=update_state
)
st.sidebar.date_input(
"End Date",
value=st.session_state.end_date,
min_value=date(2005, 1, 1),
max_value=date(2030, 12, 31),
key='widget_end_date',
on_change=update_state
)
# [NEW] Benchmark Settings Sliders (With Tooltips)
st.sidebar.markdown("---")
st.sidebar.write("**Benchmark Settings**")
# 1. Lookback Slider - VISIBLE IN ALL MODES
norm_lookback_years = st.sidebar.slider(
"Lookback (Yrs)",
min_value=1,
max_value=30,
value=st.session_state.get('norm_lookback_years', 1), # Load from Config
help=(
"How many years of history to use when calculating the 'Confidence Score' baseline.\n\n"
"• **30 Years (Recommended):** Stable, long-term benchmark. Ensures trades are judged against all-time history.\n"
"• **1-2 Years:** Adaptive. Judges trades only against recent market volatility (good for changing regimes)."
"• **Default is set to 1 year as we are usually most interested in the recent year.")
)
use_rolling_benchmark = st.sidebar.checkbox(
"Use Rolling Benchmark (Adaptive)",
value=st.session_state.get('use_rolling_benchmark', False),
key='widget_use_rolling_benchmark',
on_change=update_state,
help="CHECKED: Adaptive Mode. Benchmarks adapt to recent history. Realistic backtest.\nUNCHECKED: Global Mode (Crystal Ball). Uses ALL history to set one strict benchmark. Good for filtering Open Trades."
)
# 2. Grade Benchmark - HIDDEN IN PRODUCTION MODE
if not production_mode:
benchmark_percentile_setting = st.sidebar.slider(
"Grade Benchmark (%)",
min_value=50,
max_value=99,
value=st.session_state.get('benchmark_rank', 99), # Load from Config
help=(
"Sets the 'Bar' for the Strategy Score.\n\n"
"• **99% (Default):** The top 1% of historical trades set the standard for 'Perfection'.\n"
"• **80%:** Lowers the bar. Trades only need to be in the top 20% to get a high score (Useful for low volatility).")
)
else:
# In Production, keep the value but hide the slider
benchmark_percentile_setting = st.session_state.get('benchmark_rank', 99)
st.sidebar.markdown("---")
# --- PRIMARY TRIGGER (Hidden in Production, Forced to BBands) ---
if production_mode:
st.session_state.primary_driver = "Bollinger Bands" # Force logic
# No UI element shown
else:
st.sidebar.selectbox(
"Select Primary Trigger:",
("Bollinger Bands", "RSI Crossover", "MACD Crossover", "MA Slope", "Markov State"),
key='widget_primary_driver',
on_change=update_state,
help="Select the main indicator that will trigger a trade."
)
st.sidebar.markdown("---")
# --- RUN ACTIONS (Dual Mode Logic) ---
st.sidebar.subheader("Run Actions")
if production_mode:
# --- PRODUCTION BUTTONS ---
# [REMOVED] Old "Run Analysis" button was here.
# 2. Dropdown for User Setups (With "Default" option)
user_setups_raw = st.session_state.get("user_setups_data", [])
valid_user_setups = [s for s in user_setups_raw if not is_row_blank(s)]
setup_options = {"Default (Reset to Original)": -1}
for i, s in enumerate(valid_user_setups):
# Build Label with Notes
note = s.get('Notes', '')
if note:
if len(note) > 50: note = note[:47] + "..."
note_display = f" - {note}"
else:
note_display = ""
label = f"Setup {i+1}{note_display}"
setup_options[label] = i
# --- CALLBACK: AUTO-POPULATE SETTINGS ON SELECTION ---
def on_user_setup_change():
"""Callback to populate settings immediately when dropdown changes."""
selected_label = st.session_state.widget_user_setup_select_key
idx = setup_options[selected_label]
def sync_param(main_key, value):
st.session_state[main_key] = value
st.session_state[f"widget_{main_key}"] = value
# --- CASE A: DEFAULT (RESET) ---
if idx == -1:
defaults = st.session_state.get('widget_defaults', {}) or load_settings()
# Reset Factors
sync_param('use_rsi', defaults.get('use_rsi', True))
sync_param('rsi_w', defaults.get('rsi_w', 1.0))
sync_param('use_vol', defaults.get('use_vol', True))
sync_param('vol_w', defaults.get('vol_w', 1.0))
sync_param('use_trend', defaults.get('use_trend', True))
sync_param('trend_w', defaults.get('trend_w', 1.5))
sync_param('use_volume', defaults.get('use_volume', True))
sync_param('volume_w', defaults.get('volume_w', 1.0))
sync_param('use_macd', defaults.get('use_macd', True))
sync_param('macd_w', defaults.get('macd_w', 1.0))
sync_param('use_ma_slope', defaults.get('use_ma_slope', True))
sync_param('ma_slope_w', defaults.get('ma_slope_w', 0.5))
sync_param('use_markov', defaults.get('use_markov', False))
sync_param('markov_w', defaults.get('markov_w', 1.0))
# Reset ADX (With Clamping for Production Mode)
sync_param('use_adx_filter', defaults.get('use_adx_filter', True))
raw_adx = defaults.get('adx_threshold', 25.0)
clamped_adx = max(20.0, min(30.0, raw_adx))
sync_param('adx_threshold', clamped_adx)
# Reset Strategies
sync_param('ma_period', defaults.get("large_ma_period", 50))
sync_param('bb_period', defaults.get("bband_period", 20))
sync_param('bb_std', defaults.get("bband_std_dev", 2.0))
sync_param('confidence_slider', defaults.get("confidence_threshold", 50))
sync_param('long_entry', defaults.get("long_entry_threshold_pct", 0.0) * 100)
sync_param('long_exit', defaults.get("long_exit_ma_threshold_pct", 0.0) * 100)
sync_param('long_sl', defaults.get("long_trailing_stop_loss_pct", 8.0) * 100)
sync_param('long_delay', defaults.get("long_delay_days", 0))
sync_param('short_entry', defaults.get("short_entry_threshold_pct", 0.0) * 100)
sync_param('short_exit', defaults.get("short_exit_ma_threshold_pct", 0.0) * 100)
sync_param('short_sl', defaults.get("short_trailing_stop_loss_pct", 8.0) * 100)
sync_param('short_delay', defaults.get("short_delay_days", 0))
st.toast("Settings reset to Default.", icon="🔄")
# --- CASE B: USER SETUP ---
else:
selected_setup_data = valid_user_setups[idx]
def apply_factor_loading(key, main_key, weight_key):
val = selected_setup_data.get(key, 'Off')
if str(val).lower() in ['off', '0', '0.0', 'false']:
sync_param(main_key, False)
else:
sync_param(main_key, True)
try: w = float(val); sync_param(weight_key, w)
except: sync_param(weight_key, 1.0)
apply_factor_loading('RSI', 'use_rsi', 'rsi_w')
apply_factor_loading('Volatility', 'use_vol', 'vol_w')
apply_factor_loading('TREND', 'use_trend', 'trend_w')
apply_factor_loading('Volume', 'use_volume', 'volume_w')
apply_factor_loading('MACD', 'use_macd', 'macd_w')
apply_factor_loading('MA Slope', 'use_ma_slope', 'ma_slope_w')
apply_factor_loading('Markov', 'use_markov', 'markov_w')
# --- ADX FILTER LOGIC (With Clamping) ---
adx_val = selected_setup_data.get('ADX Filter', 'Off')
if str(adx_val).lower() in ['off', '0', '0.0', 'false']:
sync_param('use_adx_filter', False)
else:
sync_param('use_adx_filter', True)
try:
thresh = float(adx_val)
clamped_thresh = max(20.0, min(30.0, thresh))
sync_param('adx_threshold', clamped_thresh)
except:
sync_param('adx_threshold', 25.0)
def get_num(key, default, type_func):
try: return type_func(selected_setup_data.get(key, default))
except: return default
sync_param('ma_period', get_num('Large MA Period', st.session_state.ma_period, int))
sync_param('bb_period', get_num('Bollinger Band Period', st.session_state.bb_period, int))
sync_param('bb_std', get_num('Bollinger Band Std Dev', st.session_state.bb_std, float))
sync_param('confidence_slider', get_num('Conf. Threshold', st.session_state.confidence_slider, int))
sync_param('long_entry', get_num('Long Entry Threshold (%)', st.session_state.long_entry, float))
sync_param('long_exit', get_num('Long Exit Threshold (%)', st.session_state.long_exit, float))
sync_param('long_sl', get_num('Long Stop Loss (%)', st.session_state.long_sl, float))
sync_param('long_delay', get_num('Long Delay (Days)', st.session_state.long_delay, int))
sync_param('short_entry', get_num('Short Entry Threshold (%)', st.session_state.short_entry, float))
sync_param('short_exit', get_num('Short Exit Threshold (%)', st.session_state.short_exit, float))
sync_param('short_sl', get_num('Short Stop Loss (%)', st.session_state.short_sl, float))
sync_param('short_delay', get_num('Short Delay (Days)', st.session_state.short_delay, int))
st.toast(f"Populated settings for {selected_label}", icon="✅")
# Selectbox with on_change callback
st.sidebar.selectbox(
"Select User Setup:",
list(setup_options.keys()),
key="widget_user_setup_select_key",
on_change=on_user_setup_change,
help="Selecting a setup will immediately populate the sidebar settings below."
)
else:
# --- DEVELOPER BUTTONS (Original) ---
if st.sidebar.button("🚀 Run Analysis (Developer)", type="primary", key="run_analysis_dev"):
st.session_state.run_analysis_button = True
st.session_state.run_advanced_advisor = False
st.session_state.run_user_advisor_setup = False
st.session_state.run_scan_user_setups = False
st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None
st.session_state.param_results_df = None; st.session_state.confidence_results_df = None
st.session_state.summary_df = None; st.session_state.single_ticker_results = None
st.session_state.open_trades_df = None; st.session_state.last_run_stats = None
st.session_state.markov_results_df = None
st.rerun()
if st.sidebar.button("👨💼 Run Advanced Advisor (Find Trades)", key="run_adv_find_trades"):
st.session_state.run_advanced_advisor = True
st.session_state.run_analysis_button = False
st.session_state.run_user_advisor_setup = False
st.session_state.run_scan_user_setups = False
st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None
st.session_state.param_results_df = None; st.session_state.confidence_results_df = None
st.session_state.summary_df = None
st.rerun()
if st.sidebar.button("⚙️ Run User-Defined Advisor (Find Trades)", key="run_user_find_trades"):
st.session_state.run_user_advisor_setup = True
st.session_state.run_analysis_button = False
st.session_state.run_advanced_advisor = False
st.session_state.run_scan_user_setups = False
st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None
st.session_state.param_results_df = None; st.session_state.confidence_results_df = None
st.session_state.summary_df = None
st.rerun()
st.sidebar.markdown("---")
# --- SECTION 2: Confidence Score Factors ---
st.sidebar.header("2. Confidence Score Factors")
if production_mode:
# --- PRODUCTION VIEW (Simplified) ---
# Force hidden factors OFF
st.session_state.use_vol = False
st.session_state.use_trend = False
st.session_state.use_ma_slope = False
st.session_state.use_markov = False
st.session_state.rsi_logic = "Level"
# RSI (Momentum)
if 'widget_use_rsi' not in st.session_state: st.session_state.widget_use_rsi = st.session_state.use_rsi
st.sidebar.toggle("Use Momentum (RSI)", key='widget_use_rsi', on_change=update_state, help="Enable Relative Strength Index (Momentum) factor.")
rsi_val = st.session_state.rsi_w if st.session_state.rsi_w != 1.0 else 0.5
if 'widget_rsi_w' not in st.session_state: st.session_state.widget_rsi_w = rsi_val
st.sidebar.number_input("RSI Weight", 0.1, 5.0, step=0.1, key='widget_rsi_w', on_change=update_state, disabled=not st.session_state.get('use_rsi', True))
# Volume Spike
if 'widget_use_volume' not in st.session_state: st.session_state.widget_use_volume = st.session_state.use_volume
st.sidebar.toggle("Use Volume Spike", key='widget_use_volume', on_change=update_state, help="Enable Volume Spike detection.")
vol_val = st.session_state.volume_w if st.session_state.volume_w != 1.0 else 0.5
if 'widget_volume_w' not in st.session_state: st.session_state.widget_volume_w = vol_val
st.sidebar.number_input("Volume Weight", 0.1, 5.0, step=0.1, key='widget_volume_w', on_change=update_state, disabled=not st.session_state.get('use_volume', True))
# ADX Filter
if 'widget_use_adx_filter' not in st.session_state: st.session_state.widget_use_adx_filter = st.session_state.use_adx_filter
st.sidebar.toggle("Use ADX Filter (<)", key='widget_use_adx_filter', on_change=update_state, help="Enable Trend Strength filter (ADX). Good for hunting Shorts, bad for Longs")
curr_adx = max(20.0, min(30.0, st.session_state.adx_threshold))
if 'widget_adx_threshold' not in st.session_state: st.session_state.widget_adx_threshold = curr_adx
st.sidebar.number_input("ADX Threshold", 20.0, 30.0, step=1.0, key='widget_adx_threshold', on_change=update_state, disabled=not st.session_state.get('use_adx_filter', True))
# MACD
if 'widget_use_macd' not in st.session_state: st.session_state.widget_use_macd = st.session_state.use_macd
st.sidebar.toggle("Use MACD Signals", key='widget_use_macd', on_change=update_state, help="Enable MACD crossover/histogram signals.")
macd_val = st.session_state.macd_w if st.session_state.macd_w != 1.0 else 2.0
if 'widget_macd_w' not in st.session_state: st.session_state.widget_macd_w = macd_val
st.sidebar.number_input("MACD Weight", 0.1, 5.0, step=0.1, key='widget_macd_w', on_change=update_state, disabled=not st.session_state.get('use_macd', True))
# Confidence Slider
# [FIX] Force the widget key to match the stored variable BEFORE creating the widget
st.session_state['widget_confidence_slider'] = st.session_state.confidence_slider
# [FIX] Remove 'value=' argument. Streamlit will use the key we just populated.
st.sidebar.slider(
"Minimum Confidence Threshold (%)",
0, 100,
step=5,
key='widget_confidence_slider',
on_change=update_state,
help="Acts as a quality filter. Higher values increase trade quality but reduce the number of trades found."
)
else:
# --- DEVELOPER VIEW (Original) ---
if 'widget_use_rsi' not in st.session_state: st.session_state.widget_use_rsi = st.session_state.use_rsi
st.sidebar.toggle("Use Momentum (RSI)", key='widget_use_rsi', on_change=update_state)
if 'widget_rsi_w' not in st.session_state: st.session_state.widget_rsi_w = st.session_state.rsi_w
st.sidebar.number_input("RSI Weight", 0.1, 5.0, step=0.1, key='widget_rsi_w', on_change=update_state, disabled=not st.session_state.get('use_rsi', True))
rsi_logic_options = ["Crossover", "Level"]
rsi_logic_index = rsi_logic_options.index(st.session_state.rsi_logic) if st.session_state.rsi_logic in rsi_logic_options else 0
st.sidebar.radio("RSI Entry Logic:", rsi_logic_options, index=rsi_logic_index, key='widget_rsi_logic', on_change=update_state,
help="Level: Enter <= 30/>= 70. Crossover: Enter on cross back.",
disabled=not st.session_state.get('use_rsi', True))
if 'widget_use_vol' not in st.session_state: st.session_state.widget_use_vol = st.session_state.use_vol
st.sidebar.toggle("Use Volatility", key='widget_use_vol', on_change=update_state)
if 'widget_vol_w' not in st.session_state: st.session_state.widget_vol_w = st.session_state.vol_w
st.sidebar.number_input("Volatility Weight", 0.1, 5.0, step=0.1, key='widget_vol_w', on_change=update_state, disabled=not st.session_state.get('use_vol', True))
if 'widget_use_trend' not in st.session_state: st.session_state.widget_use_trend = st.session_state.use_trend
st.sidebar.toggle("Use Trend (200d MA)", key='widget_use_trend', on_change=update_state)
if 'widget_trend_w' not in st.session_state: st.session_state.widget_trend_w = st.session_state.trend_w
st.sidebar.number_input("Trend Weight", 0.1, 5.0, step=0.1, key='widget_trend_w', on_change=update_state, disabled=not st.session_state.get('use_trend', True))
if 'widget_use_volume' not in st.session_state: st.session_state.widget_use_volume = st.session_state.use_volume
st.sidebar.toggle("Use Volume Spike", key='widget_use_volume', on_change=update_state)
if 'widget_volume_w' not in st.session_state: st.session_state.widget_volume_w = st.session_state.volume_w
st.sidebar.number_input("Volume Weight", 0.1, 5.0, step=0.1, key='widget_volume_w', on_change=update_state, disabled=not st.session_state.get('use_volume', True))
if 'widget_use_adx_filter' not in st.session_state: st.session_state.widget_use_adx_filter = st.session_state.use_adx_filter
st.sidebar.toggle("Use ADX Filter (<)", key='widget_use_adx_filter', on_change=update_state)
if 'widget_adx_threshold' not in st.session_state: st.session_state.widget_adx_threshold = st.session_state.adx_threshold
st.sidebar.number_input("ADX Threshold", 10.0, 50.0, step=1.0, key='widget_adx_threshold', on_change=update_state, disabled=not st.session_state.get('use_adx_filter', True), help="Allow entries only when ADX is BELOW this value.")
if 'widget_adx_period' not in st.session_state: st.session_state.widget_adx_period = st.session_state.adx_period
st.sidebar.number_input("ADX Period", 5, 50, step=1, key='widget_adx_period', on_change=update_state, help="The lookback period for the ADX calculation.")
if 'widget_use_macd' not in st.session_state: st.session_state.widget_use_macd = st.session_state.use_macd
st.sidebar.toggle("Use MACD Signals", key='widget_use_macd', on_change=update_state)
if 'widget_macd_w' not in st.session_state: st.session_state.widget_macd_w = st.session_state.macd_w
st.sidebar.number_input("MACD Weight", 0.1, 5.0, step=0.1, key='widget_macd_w', on_change=update_state, disabled=not st.session_state.get('use_macd', True))
if 'widget_use_ma_slope' not in st.session_state: st.session_state.widget_use_ma_slope = st.session_state.use_ma_slope
st.sidebar.toggle("Use MA Slope", key='widget_use_ma_slope', on_change=update_state)
if 'widget_ma_slope_w' not in st.session_state: st.session_state.widget_ma_slope_w = st.session_state.ma_slope_w
st.sidebar.number_input("MA Slope Weight", 0.1, 5.0, step=0.1, key='widget_ma_slope_w', on_change=update_state, disabled=not st.session_state.get('use_ma_slope', True))
if 'widget_use_markov' not in st.session_state: st.session_state.widget_use_markov = st.session_state.use_markov
st.sidebar.toggle("Use Markov State", key='widget_use_markov', on_change=update_state, help="Use the best-found Markov state as a confidence factor. You must run 'Find Best Markov Setup' (Section 7) first.")
if 'widget_markov_w' not in st.session_state: st.session_state.widget_markov_w = st.session_state.markov_w
st.sidebar.number_input("Markov Weight", 0.1, 5.0, step=0.1, key='widget_markov_w', on_change=update_state, disabled=not st.session_state.get('use_markov', True))
# --- NEW: MFI Controls ---
if 'widget_use_mfi' not in st.session_state: st.session_state.widget_use_mfi = st.session_state.use_mfi
st.sidebar.toggle("Use Money Flow (MFI)", key='widget_use_mfi', on_change=update_state)
if 'widget_mfi_w' not in st.session_state: st.session_state.widget_mfi_w = st.session_state.mfi_w
st.sidebar.number_input("MFI Weight", 0.1, 5.0, step=0.1, key='widget_mfi_w', on_change=update_state, disabled=not st.session_state.get('use_mfi', True))
# --- NEW: SuperTrend Controls ---
if 'widget_use_supertrend' not in st.session_state: st.session_state.widget_use_supertrend = st.session_state.use_supertrend
st.sidebar.toggle("Use SuperTrend", key='widget_use_supertrend', on_change=update_state)
if 'widget_supertrend_w' not in st.session_state: st.session_state.widget_supertrend_w = st.session_state.supertrend_w
st.sidebar.number_input("SuperTrend Weight", 0.1, 5.0, step=0.1, key='widget_supertrend_w', on_change=update_state, disabled=not st.session_state.get('use_supertrend', True))
# [FIX] Force the widget key to match the stored variable
st.session_state['widget_confidence_slider'] = st.session_state.confidence_slider
# [FIX] Remove 'value=' argument.
st.sidebar.slider(
"Minimum Confidence Threshold (%)",
0, 100,
step=5,
key='widget_confidence_slider',
on_change=update_state
)
st.sidebar.markdown("---")
st.sidebar.header("3. Strategy Parameters")
if production_mode:
# --- PRODUCTION VIEW (Simplified) ---
# Large MA and BB Period are HIDDEN here (values persist in session_state)
# Dropdown for Std Dev (1.4, 1.6, 2.2)
std_options = [1.4, 1.6, 2.2]
current_std = st.session_state.bb_std
std_index = std_options.index(current_std) if current_std in std_options else 2
st.sidebar.selectbox(
"Bollinger Band Std Dev",
std_options,
index=std_index,
key='widget_bb_std',
on_change=update_state,
help="Controls volatility sensitivity. Lower (1.4/1.6) = more trades, Higher (2.2) = fewer, stricter trades."
)
else:
# --- DEVELOPER VIEW (Original) ---
st.sidebar.number_input("Large MA Period", 10, 200, value=st.session_state.ma_period, step=1, key='widget_ma_period', on_change=update_state)
st.sidebar.number_input("Bollinger Band Period", 10, 100, value=st.session_state.bb_period, step=1, key='widget_bb_period', on_change=update_state)
st.sidebar.number_input("Bollinger Band Std Dev", 1.0, 5.0, value=st.session_state.bb_std, step=0.1, key='widget_bb_std', on_change=update_state)
st.sidebar.subheader("Long Trade Logic")
st.sidebar.slider("Entry Threshold (%)", 0.0, 10.0, value=st.session_state.long_entry, step=0.1, key='widget_long_entry', on_change=update_state)
st.sidebar.slider("Exit MA Threshold (%)", 0.0, 10.0, value=st.session_state.long_exit, step=0.1, key='widget_long_exit', on_change=update_state)
st.sidebar.slider("Trailing Stop Loss (%)", 0.0, 30.0, value=st.session_state.long_sl, step=0.5, key='widget_long_sl', on_change=update_state, help="A trailing stop-loss set from the trade's high-water mark. Set to 0 to disable.")
st.sidebar.number_input("Delay Entry (days)", 0, 10, value=st.session_state.long_delay, step=1, key='widget_long_delay', on_change=update_state)
st.sidebar.markdown("---")
st.sidebar.subheader("Exit Logic")
exit_options = ["Standard (Price-Based)", "Intelligent (ADX/MACD/ATR)"]
if st.session_state.exit_logic_type not in exit_options: st.session_state.exit_logic_type = "Standard (Price-Based)"
exit_index = exit_options.index(st.session_state.exit_logic_type)
st.sidebar.selectbox(
"Exit Logic Type:", exit_options, index=exit_index, key='widget_exit_logic_type', on_change=update_state,
help="Standard: Exit on MA/BB cross. Intelligent: ADX/MACD profit-take with an optional TSL."
)
st.sidebar.slider(
"Intelligent: Trailing Stop Loss (%)", 0.0, 60.0, value=st.session_state.intelligent_tsl_pct, step=1.0, key='widget_intelligent_tsl_pct', on_change=update_state,
format="%.1f%%", disabled=(st.session_state.exit_logic_type != 'Intelligent (ADX/MACD/ATR)'),
help="The TSL for the 'Intelligent' exit. Set to 60.0% to replicate the 'Profit-Take or Time-Out' strategy."
)
st.sidebar.toggle("Apply MA/BB Price Lock Floor (Catcher)", value=st.session_state.use_ma_floor_filter, key='widget_use_ma_floor_filter', on_change=update_state,
help="If ON, the Trailing Stop Loss is prevented from dropping below the Mean Reversion price.")
# [NEW SLIDER] Catcher Offset
# Only visible in Developer Mode. Disabled if Catcher is OFF.
st.sidebar.slider(
"Catcher Offset (%)",
min_value=-10.0, max_value=10.0, value=st.session_state.catcher_stop_pct, step=0.1,
key='widget_catcher_stop_pct', on_change=update_state,
format="%.1f%%",
disabled=not st.session_state.use_ma_floor_filter,
help="Adjusts the hard floor level relative to Entry Price.\nPositive = Secure Profit (e.g. +1% Floor).\nNegative = Allow wiggle room (e.g. -1% Floor)."
)
st.sidebar.markdown("---")
st.sidebar.subheader("Short Trade Logic")
st.sidebar.slider("Entry Threshold (%)", 0.0, 10.0, value=st.session_state.short_entry, step=0.1, key='widget_short_entry', on_change=update_state)
st.sidebar.slider("Exit MA Threshold (%)", 0.0, 10.0, value=st.session_state.short_exit, step=0.1, key='widget_short_exit', on_change=update_state)
st.sidebar.slider("Trailing Stop Loss (%)", 0.0, 30.0, value=st.session_state.short_sl, step=0.5, key='widget_short_sl', on_change=update_state, help="A trailing stop-loss set from the trade's low-water mark. Set to 0 to disable.")
st.sidebar.number_input("Delay Entry (days)", 0, 10, value=st.session_state.short_delay, step=1, key='widget_short_delay', on_change=update_state)
st.sidebar.markdown("---")
st.sidebar.subheader("Time Limits (Days)")
c_time1, c_time2 = st.sidebar.columns(2)
st.session_state.max_long_duration = c_time1.number_input("Max Long Duration", min_value=1, max_value=365, value=st.session_state.get('max_long_duration', 60), step=1, key='widget_max_long_duration', on_change=update_state)
st.session_state.max_short_duration = c_time2.number_input("Max Short Duration", min_value=1, max_value=365, value=st.session_state.get('max_short_duration', 10), step=1, key='widget_max_short_duration', on_change=update_state)
if 'best_params' not in st.session_state: st.session_state.best_params = None
# --- SECTIONS 4, 5, 6, 7 (HIDDEN IN PRODUCTION) ---
if not production_mode:
st.sidebar.markdown("---")
st.sidebar.header("4. Find Best Parameters")
with st.sidebar.expander("Set Optimisation Ranges"):
use_squared_weighting = st.toggle("Prioritise Profit per Trade (Params)", value=st.session_state.sq_params_toggle, key="widget_sq_params_toggle", on_change=update_state)
st.markdown("---")
optimise_ma = st.checkbox("Optimise MA Period", value=st.session_state.opt_ma_cb, key="widget_opt_ma_cb", on_change=update_state); c1,c2,c3 = st.columns(3)
c1.number_input("MA Start", 10, 200, value=st.session_state.ma_start, step=5, disabled=not optimise_ma, key='widget_ma_start', on_change=update_state)
c2.number_input("MA End", 10, 200, value=st.session_state.ma_end, step=5, disabled=not optimise_ma, key='widget_ma_end', on_change=update_state)
c3.number_input("MA Step", 1, 20, value=st.session_state.ma_step, step=1, disabled=not optimise_ma, key='widget_ma_step', on_change=update_state)
optimise_bb = st.checkbox("Optimise BB Period", value=st.session_state.opt_bb_cb, key="widget_opt_bb_cb", on_change=update_state); c1,c2,c3 = st.columns(3)
c1.number_input("BB Start", 10, 100, value=st.session_state.bb_start, step=5, disabled=not optimise_bb, key='widget_bb_start', on_change=update_state)
c2.number_input("BB End", 10, 100, value=st.session_state.bb_end, step=5, disabled=not optimise_bb, key='widget_bb_end', on_change=update_state)
c3.number_input("BB Step", 1, 10, value=st.session_state.bb_step, step=1, disabled=not optimise_bb, key='widget_bb_step', on_change=update_state)
optimise_std = st.checkbox("Optimise BB Std Dev", value=st.session_state.opt_std_cb, key="widget_opt_std_cb", on_change=update_state); c1,c2,c3 = st.columns(3)
c1.number_input("Std Start", 1.0, 5.0, value=st.session_state.std_start, step=0.1, format="%.1f", disabled=not optimise_std, key='widget_std_start', on_change=update_state)
c2.number_input("Std End", 1.0, 5.0, value=st.session_state.std_end, step=0.1, format="%.1f", disabled=not optimise_std, key='widget_std_end', on_change=update_state)
c3.number_input("Std Step", 0.1, 1.0, value=st.session_state.std_step, step=0.1, format="%.1f", disabled=not optimise_std, key='widget_std_step', on_change=update_state)
st.markdown("---")
optimise_conf = st.checkbox("Optimise Confidence Threshold", value=st.session_state.opt_conf_cb, key="widget_opt_conf_cb", on_change=update_state); c1,c2,c3 = st.columns(3)
c1.number_input("Conf Start", 0, 100, value=st.session_state.conf_start, step=5, disabled=not optimise_conf, key='widget_conf_start', on_change=update_state)
c2.number_input("Conf End", 0, 100, value=st.session_state.conf_end, step=5, disabled=not optimise_conf, key='widget_conf_end', on_change=update_state)
c3.number_input("Conf Step", 5, 25, value=st.session_state.conf_step, step=5, disabled=not optimise_conf, key='widget_conf_step', on_change=update_state)
optimise_sl = st.checkbox("Optimise Stop Loss %", value=st.session_state.opt_sl_cb, key="widget_opt_sl_cb", on_change=update_state); c1,c2,c3 = st.columns(3)
c1.number_input("SL Start", 0.0, 30.0, value=st.session_state.sl_start, step=0.5, disabled=not optimise_sl, key='widget_sl_start', on_change=update_state)
c2.number_input("SL End", 0.0, 30.0, value=st.session_state.sl_end, step=0.5, disabled=not optimise_sl, key='widget_sl_end', on_change=update_state)
c3.number_input("SL Step", 0.1, 5.0, value=st.session_state.sl_step, step=0.5, disabled=not optimise_sl, key='widget_sl_step', on_change=update_state)
optimise_delay = st.checkbox("Optimise Delay Days", value=st.session_state.opt_delay_cb, key="widget_opt_delay_cb", on_change=update_state); c1,c2,c3 = st.columns(3)
c1.number_input("Delay Start", 0, 10, value=st.session_state.delay_start, step=1, disabled=not optimise_delay, key='widget_delay_start', on_change=update_state)
c2.number_input("Delay End", 0, 10, value=st.session_state.delay_end, step=1, disabled=not optimise_delay, key='widget_delay_end', on_change=update_state)
c3.number_input("Delay Step", 1, 5, value=st.session_state.delay_step, step=1, disabled=not optimise_delay, key='widget_delay_step', on_change=update_state)
optimise_entry = st.checkbox("Optimise Entry %", value=st.session_state.opt_entry_cb, key="widget_opt_entry_cb", on_change=update_state); c1,c2,c3 = st.columns(3)
c1.number_input("Entry Start", 0.0, 10.0, value=st.session_state.entry_start, step=0.1, disabled=not optimise_entry, key='widget_entry_start', on_change=update_state)
c2.number_input("Entry End", 0.0, 10.0, value=st.session_state.entry_end, step=0.1, disabled=not optimise_entry, key='widget_entry_end', on_change=update_state)
c3.number_input("Entry Step", 0.1, 1.0, value=st.session_state.entry_step, step=0.1, disabled=not optimise_entry, key='widget_entry_step', on_change=update_state)
optimise_exit = st.checkbox("Optimise Exit MA %", value=st.session_state.opt_exit_cb, key="widget_opt_exit_cb", on_change=update_state); c1,c2,c3 = st.columns(3)
c1.number_input("Exit Start", 0.0, 10.0, value=st.session_state.exit_start, step=0.1, disabled=not optimise_exit, key='widget_exit_start', on_change=update_state)
c2.number_input("Exit End", 0.0, 10.0, value=st.session_state.exit_end, step=0.1, disabled=not optimise_exit, key='widget_exit_end', on_change=update_state)
c3.number_input("Exit Step", 0.1, 1.0, value=st.session_state.exit_step, step=0.1, disabled=not optimise_exit, key='widget_exit_step', on_change=update_state)
st.markdown("---")
optimise_dur = st.checkbox("Optimise Max Duration", value=st.session_state.opt_duration_cb, key="widget_opt_duration_cb", on_change=update_state); c1,c2,c3 = st.columns(3)
c1.number_input("Dur Start", 1, 365, value=st.session_state.dur_start, step=1, disabled=not optimise_dur, key='widget_dur_start', on_change=update_state)
c2.number_input("Dur End", 1, 365, value=st.session_state.dur_end, step=1, disabled=not optimise_dur, key='widget_dur_end', on_change=update_state)
c3.number_input("Dur Step", 1, 30, value=st.session_state.dur_step, step=1, disabled=not optimise_dur, key='widget_dur_step', on_change=update_state)
st.markdown("---")
col1, col2 = st.columns(2)
if col1.button("💡 Find Best Long"): generate_and_run_optimisation(master_df, main_content_placeholder, 'long', use_squared_weighting)
if col2.button("💡 Find Best Short"): generate_and_run_optimisation(master_df, main_content_placeholder, 'short', use_squared_weighting)
st.markdown("---")
col_comb_1, col_comb_2 = st.columns(2)
if col_comb_1.button("🔥 Find Combined Best Long", key="find_combined_long", use_container_width=True):
st.session_state.run_combined_optimisation = True; st.session_state.optimise_side = 'long'; st.rerun()
if col_comb_2.button("🔥 Find Combined Best Short", key="find_combined_short", use_container_width=True):
st.session_state.run_combined_optimisation = True; st.session_state.optimise_side = 'short'; st.rerun()
st.sidebar.markdown("---")
st.sidebar.header("5. Find Best/Worst Confidence Setup")
with st.sidebar.expander("Optimise Confidence Factors"):
st.info("Finds good setups (using Section 2 factors) or bad setups (using the factors below).")
st.write("**Find Best Setups (High Profit)**"); c1, c2 = st.columns(2)
if c1.button("💡 Find Best Long Confidence", key="find_conf_long"):
st.session_state.run_confidence_optimisation = True; st.session_state.confidence_optimise_side = 'long'; st.session_state.confidence_optimise_mode = 'best'; st.session_state.confidence_optimise_veto_factors = None; st.rerun()
if c2.button("💡 Find Best Short Confidence", key="find_conf_short"):
st.session_state.run_confidence_optimisation = True; st.session_state.confidence_optimise_side = 'short'; st.session_state.confidence_optimise_mode = 'best'; st.session_state.confidence_optimise_veto_factors = None; st.rerun()
st.markdown("---")
st.write("**Find Worst Setups (for Veto Filter)**")
st.caption("Select the factors to test for the Veto signal:")
c1_veto, c2_veto = st.columns(2)
veto_rsi = c1_veto.toggle("Veto RSI", value=st.session_state.veto_rsi_cb, key="widget_veto_rsi_cb", on_change=update_state)
veto_vol = c2_veto.toggle("Veto Volatility", value=st.session_state.veto_vol_cb, key="widget_veto_vol_cb", on_change=update_state)
veto_trend = c1_veto.toggle("Veto Trend", value=st.session_state.veto_trend_cb, key="widget_veto_trend_cb", on_change=update_state)
veto_volume = c2_veto.toggle("Veto Volume", value=st.session_state.veto_volume_cb, key="widget_veto_volume_cb", on_change=update_state)
veto_macd = c1_veto.toggle("Veto MACD", value=st.session_state.veto_macd_cb, key="widget_veto_macd_cb", on_change=update_state)
veto_ma_slope = c2_veto.toggle("Veto MA Slope", value=st.session_state.veto_ma_slope_cb, key="widget_veto_ma_slope_cb", on_change=update_state)
veto_markov = c1_veto.toggle("Veto Markov", value=st.session_state.get('veto_markov_cb', False), key="widget_veto_markov_cb", on_change=update_state)
veto_factors = (veto_rsi, veto_vol, veto_trend, veto_volume, veto_macd, veto_ma_slope, veto_markov)
c1_veto_btn, c2_veto_btn = st.columns(2)
if c1_veto_btn.button("❌ Find Worst Long", key="find_worst_long"):
st.session_state.run_confidence_optimisation = True; st.session_state.confidence_optimise_side = 'long'; st.session_state.confidence_optimise_mode = 'worst'; st.session_state.confidence_optimise_veto_factors = veto_factors; st.rerun()
if c2_veto_btn.button("❌ Find Worst Short", key="find_worst_short"):
st.session_state.run_confidence_optimisation = True; st.session_state.confidence_optimise_side = 'short'; st.session_state.confidence_optimise_mode = 'worst'; st.session_state.confidence_optimise_veto_factors = veto_factors; st.rerun()
st.sidebar.markdown("---")
st.sidebar.header("6. Find Best Weights")
with st.sidebar.expander("Optimise Specific Factor Weights"):
st.info("Dynamically optimises weights (0.5 to 2.0) for all active, non-primary factors. Uses other sidebar settings as fixed values.")
use_sq_weighting_for_weights = st.toggle("Prioritise Profit per Trade (Weights)", value=st.session_state.sq_weights_toggle, key="widget_sq_weights_toggle", on_change=update_state)
col1_w, col2_w = st.columns(2)
if col1_w.button("⚖️ Find Best Long Weights", key="find_weights_long"):
st.session_state.run_weight_optimisation = True; st.session_state.weight_optimise_side = 'long'; st.rerun()
if col2_w.button("⚖️ Find Best Short Weights", key="find_weights_short"):
st.session_state.run_weight_optimisation = True; st.session_state.weight_optimise_side = 'short'; st.rerun()
st.sidebar.markdown("---")
st.sidebar.header("7. Find Best Markov Setup")
with st.sidebar.expander("Optimise Markov Probabilities"):
st.info("Brute-forces Run-Up/Future periods to find the highest-probability trading states.")
st.write("Run-Up Period (Past)")
c1, c2, c3 = st.columns(3)
c1.number_input("Start", 1, 100, value=st.session_state.markov_run_up_start, key='widget_markov_run_up_start', on_change=update_state)
c2.number_input("End", 1, 100, value=st.session_state.markov_run_up_end, key='widget_markov_run_up_end', on_change=update_state)
c3.number_input("Step", 1, 10, value=st.session_state.markov_run_up_step, key='widget_markov_run_up_step', on_change=update_state)
st.write("Future Period (Prediction)")
c4, c5, c6 = st.columns(3)
c4.number_input("Start", 1, 100, value=st.session_state.markov_future_start, key='widget_markov_future_start', on_change=update_state)
c5.number_input("End", 1, 100, value=st.session_state.markov_future_end, key='widget_markov_future_end', on_change=update_state)
c6.number_input("Step", 1, 10, value=st.session_state.markov_future_step, key='widget_markov_future_step', on_change=update_state)
c1_markov, c2_markov = st.columns(2)
if c1_markov.button("🔮 Find Best Long Markov", key="markov_long_button"):
st.session_state.run_markov_optimisation = True; st.session_state.markov_side = 'long'; st.rerun()
if c2_markov.button("🔮 Find Best Short Markov", key="markov_short_button"):
st.session_state.run_markov_optimisation = True; st.session_state.markov_side = 'short'; st.rerun()
# --- Veto / Save Settings (Only in Developer Mode) ---
st.sidebar.markdown("---")
current_veto_list = st.session_state.get('veto_setup_list', [])
if current_veto_list:
st.sidebar.header("Veto Filter(s)")
st.sidebar.success(f"{len(current_veto_list)} Veto filter(s) ACTIVE.")
with st.sidebar.expander("Show Veto Filters"): st.json(current_veto_list)
if st.sidebar.button("💾 Save Veto Filters as Default"): save_veto_setup(current_veto_list)
if st.sidebar.button("Clear Veto Filters"):
st.session_state.veto_setup_list = [];
if os.path.exists(VETO_CONFIG_FILE):
try: os.remove(VETO_CONFIG_FILE)
except OSError as e: print(f"Error removing veto file: {e}")
st.rerun()
st.sidebar.markdown("---")
# --- SAVE SETTINGS (Hidden in Production Mode) ---
if not production_mode:
if st.sidebar.button("💾 Save Settings as Default"):
settings_to_save = {
"large_ma_period": st.session_state.ma_period, "bband_period": st.session_state.bb_period,
"bband_std_dev": st.session_state.bb_std, "confidence_threshold": st.session_state.confidence_slider,
"long_entry_threshold_pct": st.session_state.long_entry / 100, "long_exit_ma_threshold_pct": st.session_state.long_exit / 100,
"long_trailing_stop_loss_pct": st.session_state.long_sl / 100, "long_delay_days": st.session_state.long_delay,
"short_entry_threshold_pct": st.session_state.short_entry / 100, "short_exit_ma_threshold_pct": st.session_state.short_exit / 100,
"short_trailing_stop_loss_pct": st.session_state.short_sl / 100, "short_delay_days": st.session_state.short_delay,
"use_rsi": st.session_state.use_rsi, "rsi_w": st.session_state.rsi_w,
"rsi_logic": st.session_state.get('rsi_logic', 'Crossover'),
"primary_driver": st.session_state.get('primary_driver', 'Bollinger Bands'),
"exit_logic_type": st.session_state.get('exit_logic_type', 'Standard (Price-Based)'),
"use_ma_floor_filter": st.session_state.use_ma_floor_filter,
"catcher_stop_pct": st.session_state.catcher_stop_pct / 100.0,
"exit_confidence_threshold": st.session_state.get('exit_confidence_threshold', 50),
"smart_trailing_stop_pct": st.session_state.get('smart_trailing_stop_pct', 5.0),
"smart_exit_atr_period": st.session_state.get('smart_exit_atr_period', 14),
"smart_exit_atr_multiplier": st.session_state.get('smart_exit_atr_multiplier', 3.0),
"intelligent_tsl_pct": st.session_state.intelligent_tsl_pct / 100.0,
"norm_lookback_years": norm_lookback_years,
"use_rolling_benchmark": use_rolling_benchmark,
"benchmark_rank": benchmark_percentile_setting,
"use_vol": st.session_state.use_vol, "vol_w": st.session_state.vol_w,
"use_trend": st.session_state.use_trend, "trend_w": st.session_state.trend_w,
"use_volume": st.session_state.use_volume, "volume_w": st.session_state.volume_w,
"use_adx_filter": st.session_state.use_adx_filter, "adx_threshold": st.session_state.adx_threshold,
"adx_period": st.session_state.adx_period,
"use_macd": st.session_state.use_macd, "macd_w": st.session_state.macd_w,
"use_ma_slope": st.session_state.use_ma_slope, "ma_slope_w": st.session_state.ma_slope_w,
"use_markov": st.session_state.use_markov, "markov_w": st.session_state.markov_w,
"max_trading_days": st.session_state.max_duration,
"max_long_duration": st.session_state.max_long_duration,
"max_short_duration": st.session_state.max_short_duration
}
save_settings(settings_to_save)
# --- End of Sidebar Definitions ---
# --- MAIN CONTENT AREA LOGIC ---
with main_content_placeholder.container():
# 1. User Defined Advisor Setup UI (Developer Only)
if st.session_state.get('run_user_advisor_setup'):
st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None; st.session_state.single_ticker_results = None
st.session_state.open_trades_df = None; st.session_state.confidence_results_df = None
st.session_state.best_params = None; st.session_state.last_run_stats = None
generate_user_advisor_ui_and_run(master_df)
# 2. Run User Defined Advisor Scan (Used by BOTH Dev "Scan with User Setups" AND Prod "Run User-Selected Option")
elif st.session_state.get('run_scan_user_setups'):
st.session_state.run_scan_user_setups = False
st.session_state.last_run_stats = None
setups_for_scan = st.session_state.get('setups_to_scan', [])
if setups_for_scan:
with st.spinner("Running advisor scan..."):
# Determine label based on count
label = "User-Defined" if len(setups_for_scan) > 1 else "Selected-Setup"
run_advisor_scan(master_df, setups_for_scan, label)
else:
st.warning("No setup selected for scanning.")
# 3. Confidence Optimisation (Developer Only)
elif st.session_state.get('run_confidence_optimisation'):
st.session_state.run_advanced_advisor = False; st.session_state.run_analysis_button = False
st.session_state.run_user_advisor_setup = False; st.session_state.run_scan_user_setups = False
st.session_state.run_markov_optimisation = False; st.session_state.run_weight_optimisation = False
st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None
st.session_state.markov_results_df = None; st.session_state.summary_df = None; st.session_state.last_run_stats = None
side_to_run = st.session_state.get('confidence_optimise_side', 'long')
mode_to_run = st.session_state.get('confidence_optimise_mode', 'best')
veto_factors_to_run = st.session_state.get('confidence_optimise_veto_factors', None)
run_confidence_optimisation(side_to_run, mode_to_run, master_df, main_content_placeholder, veto_factors_to_run)
# 4. Weight Optimisation (Developer Only)
elif st.session_state.get('run_weight_optimisation'):
st.session_state.run_advanced_advisor = False; st.session_state.run_analysis_button = False
st.session_state.run_user_advisor_setup = False; st.session_state.run_scan_user_setups = False
st.session_state.run_markov_optimisation = False; st.session_state.advisor_df = None
st.session_state.raw_df = None; st.session_state.deduped_df = None; st.session_state.markov_results_df = None
st.session_state.summary_df = None; st.session_state.confidence_results_df = None; st.session_state.last_run_stats = None
side_to_run = st.session_state.get('weight_optimise_side', 'long')
use_sq = st.session_state.get('sq_weights_toggle', False)
generate_and_run_weight_optimisation(master_df, main_content_placeholder, side_to_run, use_sq)
# 5. Parameter Optimisation Results Display (Developer Only)
elif st.session_state.get('param_results_df') is not None:
st.subheader("🏆 Parameter Optimisation Results")
st.caption("Results are pre-sorted by Score (High to Low).")
if st.session_state.get('best_params'):
st.subheader("Optimal Parameters Found:")
st.json(st.session_state.best_params)
c_view1, c_view2 = st.columns([1, 2])
dedupe_params = c_view1.checkbox("De-duplicate (Show unique scores only)", value=True)
results_df = st.session_state.param_results_df.copy()
if dedupe_params:
results_df = results_df.drop_duplicates(subset=['Strategy Score', 'Avg Profit/Trade'], keep='first')
display_cols = ["Strategy Score", "Avg Profit/Trade", "Ticker G/B Ratio", "Total Trades", "Avg Entry Conf."]
if 'max_trading_days' in results_df.columns: display_cols.append('max_trading_days')
optional_cols = ["confidence_threshold", "bband_period", "large_ma_period", "long_entry_threshold_pct", "long_exit_ma_threshold_pct", "long_delay_days", "long_trailing_stop_loss_pct"]
for col in optional_cols:
if col in results_df.columns:
if results_df[col].nunique() > 1 or st.session_state.get(f"opt_{col[:3]}_cb", True):
display_cols.append(col)
st.dataframe(results_df.head(100)[display_cols].style.format({
"Strategy Score": "{:.2f}%", "Avg Profit/Trade": "{:.2%}", "Ticker G/B Ratio": "{:.2f}",
"Avg Entry Conf.": "{:.1f}%", "max_trading_days": "{:.0f}"
}, na_rep='-'))
csv_data = results_df.to_csv(index=False).encode('utf-8')
st.download_button(label="⬇️ Download Full Results (CSV)", data=csv_data, file_name="parameter_optimisation_results.csv", mime="text/csv")
if st.button("Close Results"):
st.session_state.param_results_df = None
st.rerun()
# 6. Combined Optimisation (Developer Only)
elif st.session_state.get('run_combined_optimisation'):
st.session_state.run_advanced_advisor = False; st.session_state.run_analysis_button = False
st.session_state.run_user_advisor_setup = False; st.session_state.run_scan_user_setups = False
st.session_state.run_markov_optimisation = False; st.session_state.run_weight_optimisation = False
st.session_state.run_confidence_optimisation = False; st.session_state.advisor_df = None
st.session_state.raw_df = None; st.session_state.deduped_df = None; st.session_state.markov_results_df = None
st.session_state.summary_df = None; st.session_state.confidence_results_df = None; st.session_state.last_run_stats = None
side_to_run = st.session_state.get('optimise_side', 'long')
generate_and_run_combined_optimisation(master_df, main_content_placeholder, side_to_run)
st.session_state.run_combined_optimisation = False
# 7. Markov Optimisation (Developer Only)
elif st.session_state.get('run_markov_optimisation'):
st.session_state.run_advanced_advisor = False; st.session_state.run_analysis_button = False
st.session_state.run_user_advisor_setup = False; st.session_state.run_scan_user_setups = False
st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None
st.session_state.last_run_stats = None
side_to_run = st.session_state.get('markov_side', 'long')
generate_and_run_markov_optimisation(master_df, main_content_placeholder, side_to_run)
# 8. Top Setups Advisor UI (Developer Only)
elif st.session_state.get('run_advanced_advisor'):
st.session_state.summary_df = None; st.session_state.single_ticker_results = None
st.session_state.open_trades_df = None; st.session_state.confidence_results_df = None
st.session_state.best_params = None; st.session_state.last_run_stats = None
generate_advisor_report(master_df)
# 9. RUN ANALYSIS (Used by BOTH Dev "Rocket" button AND Prod "Run Analysis" button)
elif st.session_state.get("run_analysis_button"):
st.session_state.run_analysis_button = False # Reset flag
# Clear all results
st.session_state.summary_df = None; st.session_state.single_ticker_results = None
st.session_state.open_trades_df = None; st.session_state.confidence_results_df = None
st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None
st.session_state.best_params = None; st.session_state.worst_confidence_setup = None
st.session_state.worst_confidence_setups_list = []; st.session_state.markov_results_df = None
st.session_state.last_run_stats = None
with st.spinner("Running analysis..."):
# --- VETO LOGIC FIX ---
if production_mode:
veto_list_to_use = [] # Disable veto in simplified mode
else:
veto_list_to_use = st.session_state.get('veto_setup_list', [])
if veto_list_to_use: st.info(f"{len(veto_list_to_use)} Veto filter(s) active.")
# Gather parameters
manual_params = {
"large_ma_period": st.session_state.get('ma_period', 50),
"bband_period": st.session_state.get('bb_period', 20),
"bband_std_dev": st.session_state.get('bb_std', 2.0),
"confidence_threshold": st.session_state.get('confidence_slider', 50),
"long_entry_threshold_pct": st.session_state.get('long_entry', 0.0) / 100,
"long_exit_ma_threshold_pct": st.session_state.get('long_exit', 0.0) / 100,
"long_trailing_stop_loss_pct": st.session_state.get('long_sl', 8.0) / 100,
"long_delay_days": st.session_state.get('long_delay', 0),
"short_entry_threshold_pct": st.session_state.get('short_entry', 0.0) / 100,
"short_exit_ma_threshold_pct": st.session_state.get('short_exit', 0.0) / 100,
"short_trailing_stop_loss_pct": st.session_state.get('short_sl', 8.0) / 100,
"short_delay_days": st.session_state.get('short_delay', 0),
"use_ma_floor_filter": st.session_state.get('use_ma_floor_filter', True),
"catcher_stop_pct": st.session_state.get('catcher_stop_pct', 0.0) / 100.0,
"max_long_duration": st.session_state.get('max_long_duration', 60),
"max_short_duration": st.session_state.get('max_short_duration', 10)
}
markov_setup_to_use = None
if st.session_state.primary_driver == 'Markov State' or st.session_state.use_markov:
if 'best_markov_setup' in st.session_state and st.session_state.best_markov_setup:
markov_setup_to_use = st.session_state.best_markov_setup
elif not production_mode:
st.error("Markov State selected but no setup found. Run Section 7."); st.stop()
exit_logic = st.session_state.exit_logic_type
exit_thresh = st.session_state.exit_confidence_threshold
smart_trailing_stop = st.session_state.smart_trailing_stop_pct / 100.0
smart_atr_p = st.session_state.smart_exit_atr_period
smart_atr_m = st.session_state.smart_exit_atr_multiplier
intelligent_tsl = st.session_state.intelligent_tsl_pct / 100.0
# --- NEW: Date Buffer Logic ---
# We load data starting 365 days BEFORE the user selected start date
# to ensure indicators (like 200MA) are fully calculated by the time the analysis starts.
user_start_date = pd.Timestamp(st.session_state.start_date)
end_date = pd.Timestamp(st.session_state.end_date)
warmup_delta = timedelta(days=365) # Safe buffer
data_load_start = user_start_date - warmup_delta
# A) Single Ticker Analysis
if st.session_state.run_mode == "Analyse Single Ticker":
# --- [FIX START] Logic integrated from old_but_working_app.py ---
selected_ticker = st.session_state.get('ticker_select', ticker_list[0] if ticker_list else None)
if not selected_ticker:
st.error("No ticker selected.")
st.stop()
# Define Start Date for Single Ticker
user_start_date = pd.Timestamp(st.session_state.start_date)
cols_to_use = [selected_ticker]
if f'{selected_ticker}_High' in master_df.columns: cols_to_use.append(f'{selected_ticker}_High')
if f'{selected_ticker}_Low' in master_df.columns: cols_to_use.append(f'{selected_ticker}_Low')
if f'{selected_ticker}_Volume' in master_df.columns: cols_to_use.append(f'{selected_ticker}_Volume')
existing_cols = [col for col in cols_to_use if col in master_df.columns]
if selected_ticker not in existing_cols:
st.error(f"Ticker '{selected_ticker}' not found.")
st.stop()
# [FIX] Smart Lookback Logic: Uses the wider of (User Range) or (Slider Lookback)
user_start = pd.Timestamp(st.session_state.start_date)
user_end = pd.Timestamp(st.session_state.end_date)
# Calculate the slider-based start date
slider_start = user_end - pd.DateOffset(years=norm_lookback_years)
# 1. Logic: Start loading from the EARLIER of the two dates (Wider Window)
effective_start = min(user_start, slider_start)
# 2. Safety: Add 365 days EXTRA buffer so MA-200 is ready on Day 1
buffer_start = effective_start - pd.DateOffset(days=365)
# Ensure we don't go before available data
if not master_df.empty:
buffer_start = max(buffer_start, master_df.index[0])
data_for_backtest = master_df.loc[buffer_start:user_end, existing_cols].copy()
rename_dict = {selected_ticker: 'Close', f'{selected_ticker}_High': 'High', f'{selected_ticker}_Low': 'Low', f'{selected_ticker}_Volume': 'Volume'}
rename_dict_filtered = {k: v for k, v in rename_dict.items() if k in existing_cols}
data_for_backtest = data_for_backtest.rename(columns=rename_dict_filtered)
if not data_for_backtest.empty and 'Close' in data_for_backtest.columns and not data_for_backtest['Close'].isna().all():
# --- [FIX END] ---
long_pnl, short_pnl, avg_long_trade, avg_short_trade, results_df, trades, open_trades, trade_counts, durations, trade_dates, exit_breakdown = run_backtest(
data_for_backtest, manual_params,
st.session_state.use_rsi, st.session_state.use_vol, st.session_state.use_trend, st.session_state.use_volume,
st.session_state.use_macd, st.session_state.use_ma_slope, st.session_state.use_markov,
st.session_state.use_mfi, st.session_state.use_supertrend,
st.session_state.rsi_w, st.session_state.vol_w, st.session_state.trend_w, st.session_state.volume_w,
st.session_state.macd_w, st.session_state.ma_slope_w, st.session_state.markov_w,
st.session_state.mfi_w, st.session_state.supertrend_w,
st.session_state.use_adx_filter, st.session_state.adx_threshold,
st.session_state.get('rsi_logic', 'Crossover'),
st.session_state.adx_period,
veto_setups_list=veto_list_to_use,
primary_driver=st.session_state.primary_driver,
markov_setup=markov_setup_to_use,
exit_logic_type=exit_logic,
exit_confidence_threshold=exit_thresh,
smart_trailing_stop_pct=smart_trailing_stop,
smart_exit_atr_period=smart_atr_p,
smart_exit_atr_multiplier=smart_atr_m,
intelligent_tsl_pct=intelligent_tsl,
benchmark_rank=benchmark_percentile_setting / 100.0,
analysis_start_date=user_start_date,
analysis_end_date=end_date,
benchmark_lookback_years=norm_lookback_years,
use_rolling_benchmark=use_rolling_benchmark
)
st.session_state.single_ticker_results = {"long_pnl": long_pnl, "short_pnl": short_pnl, "avg_long_trade": avg_long_trade, "avg_short_trade": avg_short_trade, "results_df": results_df, "trades": trades}
st.session_state.open_trades_df = pd.DataFrame(open_trades) if open_trades else pd.DataFrame()
st.session_state.exit_breakdown_totals = {'long_profit_take_count': exit_breakdown[0], 'long_tsl_count': exit_breakdown[1], 'long_time_exit_count': exit_breakdown[2], 'short_profit_take_count': exit_breakdown[3], 'short_tsl_count': exit_breakdown[4], 'short_time_exit_count': exit_breakdown[5]}
else: st.warning("No data for ticker.")
# B) Full List Analysis
elif st.session_state.run_mode.startswith("Analyse Full List"):
# [FIX] Define Start Date for Full List (Critical Fix)
user_start_date = pd.Timestamp(st.session_state.start_date)
summary_results, all_open_trades = [], []
total_long_wins, total_long_losses, total_short_wins, total_short_losses = 0, 0, 0, 0
all_long_durations = []; all_short_durations = []
total_exit_breakdown = [0, 0, 0, 0, 0, 0]
PROFIT_THRESHOLD = 1.0; excluded_analysis_tickers = []
progress_bar = st.progress(0, text="Starting analysis...")
num_tickers = len(ticker_list)
for i, ticker_symbol in enumerate(ticker_list):
progress_bar.progress((i + 1) / num_tickers, text=f"Analysing {ticker_symbol}...")
cols_to_use = [ticker_symbol]
if f'{ticker_symbol}_High' in master_df.columns: cols_to_use.append(f'{ticker_symbol}_High')
if f'{ticker_symbol}_Low' in master_df.columns: cols_to_use.append(f'{ticker_symbol}_Low')
if f'{ticker_symbol}_Volume' in master_df.columns: cols_to_use.append(f'{ticker_symbol}_Volume')
existing_cols = [col for col in cols_to_use if col in master_df.columns]
if ticker_symbol not in existing_cols: continue
# [FIX] Smart Lookback Logic: Uses the wider of (User Range) or (Slider Lookback)
user_start = pd.Timestamp(st.session_state.start_date)
user_end = pd.Timestamp(st.session_state.end_date)
slider_start = user_end - pd.DateOffset(years=norm_lookback_years)
# 1. Logic: Start loading from the EARLIER of the two dates
effective_start = min(user_start, slider_start)
# 2. Safety: Add 365 days EXTRA buffer
buffer_start = effective_start - pd.DateOffset(days=365)
if not master_df.empty:
buffer_start = max(buffer_start, master_df.index[0])
ticker_data_series = master_df.loc[buffer_start:user_end, existing_cols]
rename_dict = {ticker_symbol: 'Close', f'{ticker_symbol}_High': 'High', f'{ticker_symbol}_Low': 'Low', f'{ticker_symbol}_Volume': 'Volume'}
ticker_data_series = ticker_data_series.rename(columns={k:v for k,v in rename_dict.items() if k in existing_cols})
if not ticker_data_series.empty and 'Close' in ticker_data_series.columns and not ticker_data_series['Close'].isna().all():
long_pnl, short_pnl, avg_long_trade, avg_short_trade, results_df, trades, open_trades, trade_counts, durations, trade_dates, exit_breakdown = run_backtest(
ticker_data_series, manual_params,
st.session_state.use_rsi, st.session_state.use_vol, st.session_state.use_trend, st.session_state.use_volume,
st.session_state.use_macd, st.session_state.use_ma_slope, st.session_state.use_markov,
st.session_state.use_mfi, st.session_state.use_supertrend,
st.session_state.rsi_w, st.session_state.vol_w, st.session_state.trend_w, st.session_state.volume_w,
st.session_state.macd_w, st.session_state.ma_slope_w, st.session_state.markov_w,
st.session_state.mfi_w, st.session_state.supertrend_w,
st.session_state.use_adx_filter, st.session_state.adx_threshold, st.session_state.get('rsi_logic', 'Crossover'), st.session_state.adx_period,
veto_setups_list=veto_list_to_use, primary_driver=st.session_state.primary_driver,
markov_setup=markov_setup_to_use, exit_logic_type=exit_logic, exit_confidence_threshold=exit_thresh,
smart_trailing_stop_pct=smart_trailing_stop, smart_exit_atr_period=smart_atr_p,
smart_exit_atr_multiplier=smart_atr_m, intelligent_tsl_pct=intelligent_tsl,
benchmark_rank=benchmark_percentile_setting / 100.0,
analysis_start_date=user_start_date,
analysis_end_date=end_date,
benchmark_lookback_years=norm_lookback_years,
use_rolling_benchmark=use_rolling_benchmark
)
if abs(long_pnl) > PROFIT_THRESHOLD or abs(short_pnl) > PROFIT_THRESHOLD or \
(avg_long_trade is not None and pd.notna(avg_long_trade) and abs(avg_long_trade) > PROFIT_THRESHOLD) or \
(avg_short_trade is not None and pd.notna(avg_short_trade) and abs(avg_short_trade) > PROFIT_THRESHOLD):
excluded_analysis_tickers.append(ticker_symbol); continue
total_exit_breakdown = [sum(x) for x in zip(total_exit_breakdown, exit_breakdown)]
long_wins, long_losses, short_wins, short_losses = trade_counts
long_durations, short_durations = durations
first_long_entry, last_long_exit, first_short_entry, last_short_exit = trade_dates
total_long_wins += long_wins; total_long_losses += long_losses
total_short_wins += short_wins; total_short_losses += short_losses
all_long_durations.extend(long_durations); all_short_durations.extend(short_durations)
long_conf = np.mean([t['confidence'] for t in trades[0] if pd.notna(t.get('confidence'))]) if trades[0] else 0
short_conf = np.mean([t['confidence'] for t in trades[2] if pd.notna(t.get('confidence'))]) if trades[2] else 0
avg_long_dur_ticker = np.mean(long_durations) if long_durations else 0
avg_short_dur_ticker = np.mean(short_durations) if short_durations else 0
summary_results.append({
"Ticker": ticker_symbol, "Cumulative Long P&L": long_pnl, "Avg Long Profit per Trade": avg_long_trade,
"Num Long Trades": len(trades[0]), "Avg Long Confidence": long_conf, "Avg Long Duration (Days)": avg_long_dur_ticker,
"First Long Entry": first_long_entry, "Last Long Exit": last_long_exit,
"Cumulative Short P&L": short_pnl, "Avg Short Profit per Trade": avg_short_trade,
"Num Short Trades": len(trades[2]), "Avg Short Confidence": short_conf, "Avg Short Duration (Days)": avg_short_dur_ticker,
"First Short Entry": first_short_entry, "Last Short Exit": last_short_exit
})
if open_trades:
for trade in open_trades: trade['Ticker'] = ticker_symbol; all_open_trades.append(trade)
progress_bar.empty()
if excluded_analysis_tickers and not production_mode: st.warning(f"Excluded {len(excluded_analysis_tickers)} tickers due to unrealistic profit.")
if summary_results:
st.session_state.summary_df = pd.DataFrame(summary_results).set_index('Ticker')
st.session_state.trade_counts = {"long_wins": total_long_wins, "long_losses": total_long_losses, "short_wins": total_short_wins, "short_losses": total_short_losses}
st.session_state.trade_durations = { "avg_long_duration": np.mean(all_long_durations) if all_long_durations else 0, "max_long_duration": np.max(all_long_durations) if all_long_durations else 0, "avg_short_duration": np.mean(all_short_durations) if all_short_durations else 0, "max_short_duration": np.max(all_short_durations) if all_short_durations else 0 }
st.session_state.exit_breakdown_totals = {
'long_profit_take_count': total_exit_breakdown[0], 'long_tsl_count': total_exit_breakdown[1], 'long_time_exit_count': total_exit_breakdown[2],
'short_profit_take_count': total_exit_breakdown[3], 'short_tsl_count': total_exit_breakdown[4], 'short_time_exit_count': total_exit_breakdown[5]
}
else:
st.warning("No trades found.")
st.session_state.trade_durations = {}
st.session_state.exit_breakdown_totals = {}
st.session_state.open_trades_df = pd.DataFrame(all_open_trades) if all_open_trades else pd.DataFrame()
# 10. Display Advisor Scan Results (raw_df)
elif 'raw_df' in st.session_state and st.session_state.raw_df is not None:
advisor_type = st.session_state.get('advisor_type', 'Advisor')
st.subheader(f"👨💼 {advisor_type} Advisor: Trade Signals")
c_view1, c_view2 = st.columns(2)
dedupe_view = c_view1.checkbox("De-duplicate trades", value=True, key="advisor_dedupe_check")
filter_trades = c_view2.checkbox("Hide Empty Tickers", value=True, key="advisor_filter_check")
display_df = st.session_state.deduped_df if dedupe_view else st.session_state.raw_df
if display_df is not None and not display_df.empty:
if filter_trades and 'Status' in display_df.columns: display_df = display_df[display_df['Status'].notna()]
format_dict = {
"Final % P/L": lambda x: f"{x:.2%}" if pd.notna(x) else '-',
"Date Open": lambda x: x.strftime('%Y-%m-%d') if pd.notna(x) else '-',
"Date Closed": lambda x: x.strftime('%Y-%m-%d') if pd.notna(x) else '-',
"Start Confidence": lambda x: f"{x:.0f}%" if pd.notna(x) else '-'
}
if "Setup G/B Ratio" in display_df.columns: format_dict["Setup G/B Ratio"] = lambda x: f"{x:.2f}" if pd.notna(x) else '-'
for col in display_df.columns:
if any(x in col for x in ["Threshold", "Std Dev", "Ratio"]) and col not in format_dict:
format_dict[col] = lambda x: f"{x:.2f}" if pd.notna(x) and isinstance(x, (int, float)) else x
st.dataframe(display_df.style.format(format_dict, na_rep='-'))
else: st.info("No trades found.")
if st.button("Back to Main Analysis"):
st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None
st.session_state.run_advanced_advisor = False; st.session_state.run_user_advisor_setup = False
st.session_state.run_scan_user_setups = False; st.rerun()
# 11. Display Confidence Optimisation Results
elif st.session_state.get('confidence_results_df') is not None and not st.session_state.confidence_results_df.empty:
st.subheader("📊 Confidence Setup Optimisation Results")
# [CHANGE] Rename the column for display
display_df_conf = st.session_state.confidence_results_df.head(60).rename(columns={'Net % Return': 'Moneypile Score'})
# [CHANGE] Update formatters to match
conf_formatters = {
"Avg Profit/Trade": "{:.2%}",
"Ticker G/B Ratio": "{:.2f}",
"Trade G/B Ratio": "{:.2f}",
"Moneypile Score": "{:.1f}",
"Avg Entry Conf.": "{:.1f}%",
"Good Score": "{:.4f}",
"Bad Score": "{:.4f}",
"Norm. Score %": "{:.2f}%"
}
if 'MACD' in display_df_conf.columns: conf_formatters['MACD'] = '{}'
if 'MA Slope' in display_df_conf.columns: conf_formatters['MA Slope'] = '{}'
if 'Markov' in display_df_conf.columns: conf_formatters['Markov'] = '{}'
valid_conf_formatters = {k: (lambda val, fmt=v: fmt.format(val) if pd.notna(val) else '-') for k, v in conf_formatters.items() if k in display_df_conf.columns}
st.dataframe(display_df_conf.style.format(valid_conf_formatters, na_rep='-'))
# 12. Display Single Ticker Results
# [FIX] Changed 'elif' to 'if' so this block runs immediately after analysis
if st.session_state.get('single_ticker_results') is not None:
res = st.session_state.single_ticker_results
st.subheader(f"Results for {st.session_state.get('ticker_select')}")
c1, c2, c3, c4 = st.columns(4)
c1.metric("Cumulative Long P&L", f"{res.get('long_pnl', 0):.2%}")
c2.metric("Avg Long Trade P&L", f"{res.get('avg_long_trade', 0):.2%}")
c3.metric("Cumulative Short P&L", f"{res.get('short_pnl', 0):.2%}")
c4.metric("Avg Short Trade P&L", f"{res.get('avg_short_trade', 0):.2%}")
if res.get('results_df') is not None and not res['results_df'].empty:
try:
st.plotly_chart(generate_long_plot(res['results_df'], res['trades'], st.session_state.get('ticker_select')), use_container_width=True)
st.plotly_chart(generate_short_plot(res['results_df'], res['trades'], st.session_state.get('ticker_select')), use_container_width=True)
except Exception as e: st.error(f"Error generating plot: {e}")
else: st.info("No chart data available.")
# 13. Display Full Analysis Summary
if st.session_state.get('summary_df') is not None and not st.session_state.summary_df.empty:
# 1. Top Cards
display_summary_analytics(st.session_state.summary_df)
# 2. Histogram 1: Profit Distribution
st.markdown("---")
try:
if 'generate_profit_distribution_chart' in globals():
dist_fig = generate_profit_distribution_chart(st.session_state.summary_df)
if dist_fig: st.plotly_chart(dist_fig, use_container_width=True)
except Exception: pass
# 3. Histogram 2: Trades Over Time (NEW)
try:
if 'generate_trades_timeline_histogram' in globals() and st.session_state.get('open_trades_df') is not None:
timeline_fig = generate_trades_timeline_histogram(
st.session_state.open_trades_df,
st.session_state.start_date,
st.session_state.end_date
)
if timeline_fig: st.plotly_chart(timeline_fig, use_container_width=True)
except Exception as e: st.error(f"Chart Error: {e}")
st.markdown("---")
# 4. Results Per Ticker Table
st.subheader("Results per Ticker")
if st.checkbox("Only show tickers with trades", value=True):
df_to_display = st.session_state.summary_df[(st.session_state.summary_df['Num Long Trades'] > 0) | (st.session_state.summary_df['Num Short Trades'] > 0)].copy()
else:
df_to_display = st.session_state.summary_df.copy()
date_cols = ["First Long Entry", "Last Long Exit", "First Short Entry", "Last Short Exit"]
for col in date_cols:
if col in df_to_display.columns: df_to_display[col] = pd.to_datetime(df_to_display[col], errors='coerce').dt.strftime('%Y-%m-%d')
df_to_display.fillna('-', inplace=True)
st.dataframe(df_to_display.style.format({ "Cumulative Long P&L": "{:.2%}", "Avg Long Profit per Trade": "{:.2%}", "Avg Long Duration (Days)": "{:.1f}", "Cumulative Short P&L": "{:.2%}", "Avg Short Profit per Trade": "{:.2%}", "Avg Short Duration (Days)": "{:.1f}", "Avg Long Confidence": "{:.0f}%", "Avg Short Confidence": "{:.0f}%" }, na_rep='-'))
if not production_mode:
if st.button("💾 Add these settings to User-Defined List", key="save_setup_from_analysis", on_click=add_setup_to_user_list): pass
# 5. Open Positions Table (With Filter)
st.subheader("👨🏻💼 Open Positions & Recently Closed",
help="This table displays all currently ACTIVE trades, plus any trades that closed within the last 30 days.")
if st.session_state.get('open_trades_df') is not None and not st.session_state.open_trades_df.empty:
full_df = st.session_state.open_trades_df.copy()
# --- FILTER: Show 'Open' OR 'Closed in last 30 days' ---
cutoff = pd.Timestamp.now() - pd.Timedelta(days=30)
full_df['Date Closed'] = pd.to_datetime(full_df['Date Closed'], errors='coerce')
mask_open = (full_df['Status'] == 'Open')
mask_recent = (full_df['Status'] == 'Closed') & (full_df['Date Closed'] >= cutoff)
display_open_df = full_df[mask_open | mask_recent].copy()
# -------------------------------------------------------
# [FIX] Sort strictly by Date Open (Newest First)
display_open_df.sort_values(by='Date Open', ascending=False, inplace=True)
cols_order_manual = ['Ticker', 'Status', 'Final % P/L', 'Side', 'Date Open', 'Date Closed', 'Start Confidence']
existing_cols_open = [col for col in cols_order_manual if col in display_open_df.columns]
if existing_cols_open and not display_open_df.empty:
st.dataframe(display_open_df[existing_cols_open].style.format({
"Final % P/L": lambda x: f"{x:.2%}" if pd.notna(x) else '-',
"Date Open": lambda x: x.strftime('%Y-%m-%d') if pd.notna(x) else '-',
"Date Closed": lambda x: x.strftime('%Y-%m-%d') if pd.notna(x) else '-',
"Start Confidence": lambda x: f"{x:.0f}%" if pd.notna(x) else '-'
}, na_rep='-'))
else:
st.info("No Open or Recent trades found (older trades are hidden).")
else:
st.info("No trades found.")
# --- NEW: SAVE RESULTS BUTTON ---
st.markdown("---")
st.subheader("💾 Save Analysis")
current_config = {
'start_date': st.session_state.start_date, 'end_date': st.session_state.end_date,
'primary_driver': st.session_state.primary_driver, 'confidence_threshold': st.session_state.confidence_threshold,
'use_rsi': st.session_state.use_rsi, 'rsi_w': st.session_state.rsi_w,
'use_volatility': st.session_state.use_volatility, 'vol_w': st.session_state.vol_w,
'use_trend': st.session_state.use_trend, 'trend_w': st.session_state.trend_w,
'use_volume': st.session_state.use_volume, 'vol_w_val': st.session_state.vol_w_val,
'use_macd': st.session_state.use_macd, 'macd_w': st.session_state.macd_w,
'use_ma_slope': st.session_state.use_ma_slope, 'ma_slope_w': st.session_state.ma_slope_w,
'use_markov': st.session_state.use_markov, 'markov_w': st.session_state.markov_w,
'use_mfi': st.session_state.use_mfi, 'mfi_w': st.session_state.mfi_w,
'use_supertrend': st.session_state.use_supertrend, 'supertrend_w': st.session_state.supertrend_w,
'use_adx_filter': st.session_state.use_adx_filter, 'adx_threshold': st.session_state.adx_threshold,
'bband_period': st.session_state.bband_period, 'bband_std_dev': st.session_state.bband_std_dev,
'large_ma_period': st.session_state.get('large_ma_period', 50),
'long_entry_threshold_pct': st.session_state.long_entry_threshold_pct,
'short_entry_threshold_pct': st.session_state.short_entry_threshold_pct,
'exit_logic_type': st.session_state.exit_logic_type,
'smart_trailing_stop_pct': st.session_state.smart_trailing_stop_pct
}
# Generate CSV
if 'convert_results_to_csv' in globals():
csv_data = convert_results_to_csv(st.session_state.summary_df, current_config)
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
file_name = f"Backtest_Results_{timestamp}.csv"
st.download_button(
label="⬇️ Download Results & Settings as CSV",
data=csv_data,
file_name=file_name,
mime='text/csv',
use_container_width=True
)
# 14. Default Message
else:
if not any([st.session_state.get(k) for k in ['run_advanced_advisor','run_user_advisor_setup','advisor_df','confidence_results_df','single_ticker_results','summary_df','load_message']]):
st.info("Click a 'Run' button in the sidebar to start.")
# 15. Footer Buttons (Hidden in Production)
if not production_mode:
st.markdown("---")
col_load1, col_load2, col_load3 = st.columns(3)
with col_load1:
if 'apply_best_params_to_widgets' in locals() and st.session_state.get('best_params'):
st.button("⬇️ Load Optimal Parameters", on_click=apply_best_params_to_widgets, use_container_width=True)
with col_load2:
if 'apply_best_weights_to_widgets' in locals() and st.session_state.get('best_weights'):
st.button("⬇️ Load Optimal Weights", on_click=apply_best_weights_to_widgets, use_container_width=True)
with col_load3:
if st.session_state.get('worst_confidence_setups_list'):
if st.button("Apply Top Worst Setups as Veto Filter", use_container_width=True):
st.session_state.veto_setup_list = st.session_state.worst_confidence_setups_list
st.session_state.worst_confidence_setups_list = None
st.sidebar.info(f"Applying {len(st.session_state.veto_setup_list)} Veto filters.")
st.rerun()
col_load4 = st.columns(1)[0]
with col_load4:
is_markov_relevant = (st.session_state.primary_driver == 'Markov State') or st.session_state.use_markov
if st.session_state.get('best_markov_setup') and is_markov_relevant:
if st.button("⬇️ Load Best Markov Setup", on_click=None, use_container_width=True):
st.session_state.primary_driver = "Markov State"
st.session_state.use_markov = True
st.sidebar.success("Best Markov setup loaded!")
st.rerun()
# --- LOGIC HANDLERS (Hidden execution) ---
if st.session_state.run_advanced_advisor:
with st.spinner("Running Optimization..."):
run_optimization()
st.session_state.run_advanced_advisor = False
# --- CHART FUNCTION (Must be outside main) ---
def generate_trades_timeline_histogram(trades_df, start_date, end_date):
"""
Creates a stacked histogram showing trade results over time.
4 Colors: Long Win, Long Loss, Short Win, Short Loss.
"""
if trades_df is None or trades_df.empty: return None
# Filter by date range
mask = (trades_df['Date Closed'] >= pd.to_datetime(start_date)) & (trades_df['Date Closed'] <= pd.to_datetime(end_date))
df = trades_df[mask].copy()
if df.empty: return None
# Categorize
df = df[df['Status'] == 'Closed']
long_wins = df[(df['Side'] == 'Long') & (df['Final % P/L'] > 0)]
long_loss = df[(df['Side'] == 'Long') & (df['Final % P/L'] <= 0)]
short_wins = df[(df['Side'] == 'Short') & (df['Final % P/L'] > 0)]
short_loss = df[(df['Side'] == 'Short') & (df['Final % P/L'] <= 0)]
fig = go.Figure()
# Add Stacked Traces (Shorts hidden by default via visible='legendonly')
fig.add_trace(go.Histogram(x=long_wins['Date Closed'], name='Long Winners', marker_color='green'))
fig.add_trace(go.Histogram(x=long_loss['Date Closed'], name='Long Losers', marker_color='red'))
fig.add_trace(go.Histogram(x=short_wins['Date Closed'], name='Short Winners', marker_color='blue', visible='legendonly'))
fig.add_trace(go.Histogram(x=short_loss['Date Closed'], name='Short Losers', marker_color='orange', visible='legendonly'))
fig.update_layout(
barmode='stack',
title="Trades Over Time (Win/Loss Stacked)",
xaxis_title="Date",
yaxis_title="Number of Trades",
height=400,
template="plotly_white",
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
# [FIX] Force the X-axis range to match the user's selected date settings
xaxis=dict(range=[start_date, end_date])
)
return fig
if __name__ == "__main__":
main()