# New version --- import streamlit as st import pandas as pd import os import numpy as np from datetime import date, datetime import plotly.graph_objects as go import itertools import json # --- CORRECTED IMPORTS: Moved MACD --- from ta.volatility import BollingerBands from ta.momentum import RSIIndicator # Removed MACD from here from ta.trend import ADXIndicator, MACD from ta.volume import MFIIndicator # Added MFI here # --- [NEW] ADDED ATR FOR INTELLIGENT EXIT --- from ta.volatility import AverageTrueRange # --- [END NEW] --- from multiprocessing import Pool, cpu_count from functools import partial from dateutil.relativedelta import relativedelta from datetime import timedelta # --- 0. Settings Management Functions --- CONFIG_FILE = "config.json" VETO_CONFIG_FILE = "veto_config.json" TOP_SETUPS_FILE = "top_setups.json" USER_SETUPS_FILE = "user_advisor_setups.json" MARKOV_SETUP_FILE = "best_markov.json" def save_settings(params_to_save): with open(CONFIG_FILE, 'w', encoding='utf-8') as f: json.dump(params_to_save, f, indent=4) st.sidebar.success("Settings saved as default!") # --- Replacement for load_settings function --- def load_settings(): default_structure = { "large_ma_period": 70, "bband_period": 32, "bband_std_dev": 1.4, "confidence_threshold": 95, "long_entry_threshold_pct": 0.01, "long_exit_ma_threshold_pct": 0.0, "long_trailing_stop_loss_pct": 0.2, "long_delay_days": 0, "short_entry_threshold_pct": 0.0, "short_exit_ma_threshold_pct": 0.0, "short_trailing_stop_loss_pct": 0.2, "short_delay_days": 0, "use_rsi": True, "rsi_w": 1.5, "rsi_logic": "Level", "primary_driver": "Bollinger Bands", "exit_logic_type": "Intelligent (ADX/MACD/ATR)", "exit_confidence_threshold": 40, "smart_exit_atr_period": 14, "smart_exit_atr_multiplier": 3.0, "intelligent_tsl_pct": 0.2, "norm_lookback_years": 1, 'use_rolling_benchmark': False, "benchmark_rank": 99, "use_ma_floor_filter": True, "catcher_stop_pct": 0.03, "use_vol": False, "vol_w": 0.5, "use_trend": False, "trend_w": 2.0, "use_volume": False, "volume_w": 0.5, "use_adx_filter": False, "adx_threshold": 10.0, "adx_period": 14, "use_macd": False, "macd_w": 2.0, "use_ma_slope": False, "ma_slope_w": 0.5, "use_markov": False, "markov_w": 1.0, "max_trading_days": 60, "max_long_duration": 60, "max_short_duration": 3 } if os.path.exists(CONFIG_FILE): try: with open(CONFIG_FILE, 'r', encoding='utf-8') as f: loaded = json.load(f) for key in default_structure: if key in loaded: default_structure[key] = loaded[key] except (json.JSONDecodeError, Exception) as e: print(f"Error loading config.json: {e}. Using default settings.") return default_structure # --- UPDATED: Save a list of veto setups --- def save_veto_setup(veto_setups_list): # Takes a list now with open(VETO_CONFIG_FILE, 'w', encoding='utf-8') as f: # Save the list directly json.dump(veto_setups_list, f, indent=4) st.sidebar.success(f"Saved {len(veto_setups_list)} Veto filter(s) as default!") # --- UPDATED: Load a list of veto setups, robustly --- def load_veto_setup(): veto_list = [] # Default to empty list if os.path.exists(VETO_CONFIG_FILE): try: with open(VETO_CONFIG_FILE, 'r', encoding='utf-8') as f: loaded_data = json.load(f) # Ensure it's loaded as a list if isinstance(loaded_data, list): veto_list = loaded_data elif isinstance(loaded_data, dict): # Handle old single dict format veto_list = [loaded_data] except (json.JSONDecodeError, Exception) as e: print(f"Error loading veto config: {e}. Using empty list.") # Keep veto_list as empty pass return veto_list # --- UPDATED: Includes new factors, num_setups=8, removed infinity filter --- def save_top_setups(results_df, side, num_setups=8): # Default is now 8 df = results_df.copy() if df.empty: st.sidebar.warning(f"No valid setups found for {side.title()} to save.") return deduplication_cols = [ 'Conf. Threshold', 'Avg Profit/Trade', 'Ticker G/B Ratio', 'Trade G/B Ratio', 'Winning Tickers', 'Losing Tickers', 'Avg Entry Conf.', 'Good Score', 'Bad Score', 'Norm. Score %', 'Total Trades' ] factor_cols = ['RSI', 'Volatility', 'TREND', 'Volume', 'MACD', 'MA Slope'] existing_factor_cols = [col for col in factor_cols if col in df.columns] if existing_factor_cols: df['FactorsOn'] = df[existing_factor_cols].apply(lambda row: (row == 'On').sum(), axis=1) else: df['FactorsOn'] = 0 # --- CONSISTENCY FIX: Sort by the Weighted Score ("Norm. Score %") --- # If Norm. Score % exists, use it. Otherwise fall back to Strategy Score or Trade G/B Ratio. sort_col = 'Trade G/B Ratio' # Default fallback if 'Norm. Score %' in df.columns: sort_col = 'Norm. Score %' elif 'Strategy Score' in df.columns: sort_col = 'Strategy Score' if sort_col in df.columns: df[sort_col] = df[sort_col].fillna(-np.inf) # Sort by Score (Descending), then by Complexity (Ascending - simpler is better) df = df.sort_values( by=[sort_col, 'FactorsOn'], ascending=[False, True] ) else: st.sidebar.error(f"Could not sort top setups: '{sort_col}' column missing.") return existing_cols_for_dedup = [col for col in deduplication_cols if col in df.columns] deduplicated_df = df.drop_duplicates(subset=existing_cols_for_dedup, keep='first') top_setups = deduplicated_df.head(num_setups).to_dict('records') if os.path.exists(TOP_SETUPS_FILE): try: with open(TOP_SETUPS_FILE, 'r', encoding='utf-8') as f: all_top_setups = json.load(f) except json.JSONDecodeError: all_top_setups = {} else: all_top_setups = {} all_top_setups[side] = top_setups with open(TOP_SETUPS_FILE, 'w', encoding='utf-8') as f: json.dump(all_top_setups, f, indent=4) st.sidebar.success(f"Top {len(top_setups)} unique {side.title()} setups saved! (Sorted by {sort_col})") def load_top_setups(): if os.path.exists(TOP_SETUPS_FILE): try: with open(TOP_SETUPS_FILE, 'r', encoding='utf-8') as f: return json.load(f) except json.JSONDecodeError: return None # Return None if file is corrupt return None @st.cache_data # Use caching for efficiency def load_proverbs(): # Default to an empty list proverbs_list = [] if os.path.exists("proverbs.json"): try: with open("proverbs.json", 'r', encoding='utf-8') as f: loaded_data = json.load(f) # Ensure it's a list if isinstance(loaded_data, list): proverbs_list = loaded_data except (json.JSONDecodeError, Exception) as e: print(f"Error loading proverbs.json: {e}. Using fallback.") # Keep proverbs_list empty or provide a default # proverbs_list = ["Error loading proverbs."] # Add a fallback if the list is empty after trying to load if not proverbs_list: proverbs_list = ["Have a great trading day!"] # Fallback message return proverbs_list def load_user_setups(): """Loads user-defined advisor setups from a JSON file.""" # 1. Update Template to include Notes default_row_template = { "Run": False, "Notes": "", # <--- NEW FIELD FOR ADVICE "RSI": "Off", "Volatility": "Off", "TREND": "Off", "Volume": "Off", "MACD": "Off", "MA Slope": "Off", "Markov": "Off", "ADX Filter": "Off", "Conf. Threshold": 50, "Large MA Period": 50, "Bollinger Band Period": 20, "Bollinger Band Std Dev": 2.0, "Catcher Offset (%)": 3.0, "Long Entry Threshold (%)": 0.0, "Long Exit Threshold (%)": 0.0, "Long Stop Loss (%)": 8.0, "Long Delay (Days)": 0, "Short Entry Threshold (%)": 0.0, "Short Exit Threshold (%)": 0.0, "Short Stop Loss (%)": 8.0, "Short Delay (Days)": 0, "Z_Avg_Profit": 0.0, "Z_Num_Trades": 0, "Z_WL_Ratio": 0.0 } # Default Examples default_setup = [default_row_template.copy()] # Pad with defaults while len(default_setup) < 20: default_setup.append(default_row_template.copy()) if os.path.exists(USER_SETUPS_FILE): try: with open(USER_SETUPS_FILE, 'r', encoding='utf-8') as f: user_setups = json.load(f) if isinstance(user_setups, list): processed_setups = [] for setup in user_setups: full_setup = default_row_template.copy() full_setup.update(setup) # Merge loaded data processed_setups.append(full_setup) while len(processed_setups) < 20: processed_setups.append(default_row_template.copy()) return processed_setups[:20] else: return default_setup except Exception as e: print(f"Error loading {USER_SETUPS_FILE}: {e}. Using defaults.") return default_setup return default_setup # --- UPDATED: Callback to Save Setup (Added Notes) --- def add_setup_to_user_list(): try: stats = st.session_state.get('last_run_stats', {}) if not stats: st.toast("⚠️ No stats found. Run analysis first.", icon="⚠️") return def get_weight_or_off(toggle_key, weight_key): if st.session_state.get(toggle_key, False): return round(st.session_state.get(weight_key, 1.0), 2) return "Off" adx_val = "Off" if st.session_state.get("use_adx_filter", False): val = st.session_state.get("adx_threshold", 25.0) adx_val = max(20.0, min(30.0, val)) new_setup = { "Run": True, "Notes": "Auto-Saved Setup", # <--- Default note for auto-saves "RSI": get_weight_or_off('use_rsi', 'rsi_w'), "Volatility": get_weight_or_off('use_vol', 'vol_w'), "TREND": get_weight_or_off('use_trend', 'trend_w'), "Volume": get_weight_or_off('use_volume', 'volume_w'), "MACD": get_weight_or_off('use_macd', 'macd_w'), "MA Slope": get_weight_or_off('use_ma_slope', 'ma_slope_w'), "Markov": get_weight_or_off('use_markov', 'markov_w'), "ADX Filter": adx_val, "Conf. Threshold": st.session_state.confidence_slider, "Large MA Period": st.session_state.ma_period, "Bollinger Band Period": st.session_state.bb_period, "Bollinger Band Std Dev": st.session_state.bb_std, "Long Entry Threshold (%)": st.session_state.long_entry, "Long Exit Threshold (%)": st.session_state.long_exit, "Long Stop Loss (%)": st.session_state.long_sl, "Long Delay (Days)": st.session_state.long_delay, "Short Entry Threshold (%)": st.session_state.short_entry, "Short Exit Threshold (%)": st.session_state.short_exit, "Short Stop Loss (%)": st.session_state.short_sl, "Short Delay (Days)": st.session_state.short_delay, "Z_Avg_Profit": stats.get("Z_Avg_Profit", 0.0) * 100.0, "Z_Num_Trades": stats.get("Z_Num_Trades", 0), "Z_WL_Ratio": stats.get("Z_WL_Ratio", 0.0) } current_setups = st.session_state.get("user_setups_data", []) non_empty_setups = [s for s in current_setups if not is_row_blank(s)] default_row_template = {k:v for k,v in load_user_setups()[0].items()} non_empty_setups.append(new_setup) while len(non_empty_setups) < 20: non_empty_setups.append(default_row_template.copy()) final_setups = non_empty_setups[:20] save_user_setups(final_setups) # Refresh State processed_setups = [] for s in final_setups: processed_setups.append(s.copy()) # Simple copy is enough now st.session_state["user_setups_data"] = processed_setups st.toast("✅ Setup saved!", icon="✅") st.session_state.run_user_advisor_setup = True st.rerun() except Exception as e: st.error(f"Could not save setup: {e}") def save_user_setups(setups_list): """Saves user-defined advisor setups to a JSON file.""" try: # Ensure we only save 20 with open(USER_SETUPS_FILE, 'w', encoding='utf-8') as f: json.dump(setups_list[:20], f, indent=4) # <-- CHANGED TO 20 st.success("User-defined setups saved!") except Exception as e: st.error(f"Error saving user setups: {e}") def save_markov_setup(setup_dict): """Saves the best Markov setup to a JSON file.""" try: with open(MARKOV_SETUP_FILE, 'w', encoding='utf-8') as f: json.dump(setup_dict, f, indent=4) st.sidebar.success("Best Markov setup saved as default!") except Exception as e: st.sidebar.error(f"Error saving Markov setup: {e}") def load_markov_setup(): """Loads the best Markov setup from a JSON file.""" if os.path.exists(MARKOV_SETUP_FILE): try: with open(MARKOV_SETUP_FILE, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"Error loading {MARKOV_SETUP_FILE}: {e}") return None return None # --- UPDATED: Helper to accept filename (Replaces the old load_completed_setups) --- def load_completed_setups(filename): """Reads the existing CSV and returns a set of completed configurations.""" completed_configs = set() if os.path.exists(filename): try: # We only need to read the config columns: Factors and Weights df_completed = pd.read_csv(filename, usecols=['RSI', 'Volatility', 'TREND', 'Volume', 'MACD', 'MA Slope', 'Markov', 'RSI W', 'Volatility W', 'TREND W', 'Volume W', 'MACD W', 'MA Slope W', 'Markov W'], dtype={'RSI': str, 'Volatility': str, 'TREND': str, 'Volume': str, 'MACD': str, 'MA Slope': str, 'Markov': str}) for index, row in df_completed.iterrows(): toggles = tuple(row[['RSI', 'Volatility', 'TREND', 'Volume', 'MACD', 'MA Slope', 'Markov']].values) weights = tuple(row[['RSI W', 'Volatility W', 'TREND W', 'Volume W', 'MACD W', 'MA Slope W', 'Markov W']].values) completed_configs.add((toggles, weights)) except Exception as e: print(f"Warning: Error loading CSV for checkpointing: {e}. Starting from scratch.") return completed_configs # --- 1. Data Loading and Cleaning Functions --- @st.cache_data(ttl=300) # Added TTL to auto-refresh cache every 5 mins def load_all_data(folder_path): if not os.path.exists(folder_path): st.error(f"Folder '{folder_path}' not found.") return None, None all_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')] all_files.sort() # Ensure we load in chronological order (e.g. 2023, then 2024) if not all_files: st.error("No CSV files found in the 'csv_data' folder.") return None, None df_list = [] error_messages = [] for file_name in all_files: file_path = os.path.join(folder_path, file_name) try: # 1. Read CSV without parsing dates yet (load as strings to be safe) df = pd.read_csv(file_path, header=0, index_col=0, encoding='utf-8') # 2. FORCE ISO PARSING (YYYY-MM-DD) # We disable 'dayfirst' because ISO puts Year first. df.index = pd.to_datetime(df.index, format='%Y-%m-%d', errors='coerce') # 3. Drop rows with invalid dates df = df[df.index.notna()] if df.empty: error_messages.append(f"Warning: Skipped {file_name}, no valid dates found.") continue df_list.append(df) except Exception as e: error_messages.append(f"Could not read {file_name}. Error: {e}") if not df_list: return None, "No data could be loaded successfully." # 4. CONSOLIDATE try: master_df = pd.concat(df_list) # 5. DEDUPLICATE (Keep the newest version of any overlapping date) # This prevents "Double Dots" if 2024-2025.csv and 2025-NOW.csv overlap if master_df.index.has_duplicates: master_df = master_df[~master_df.index.duplicated(keep='last')] # 6. FORCE SORT (The Final "Zig-Zag" Killer) # Ensures Jan 1st always comes before Jan 2nd master_df.sort_index(inplace=True) # 7. NUMERIC CONVERSION (From your original code) # Ensures all price columns are numbers, not strings for col in master_df.columns: master_df[col] = pd.to_numeric(master_df[col], errors='coerce') for msg in error_messages: st.warning(msg) return master_df, f"Successfully combined data from {len(df_list)} files." except Exception as e: return None, f"Critical Error merging data: {e}" def clean_data_and_report_outliers(df): """ Cleans data using a 'Rolling Median' filter (User Defined: 1 Month Window). Adapts to long-term trends while catching sudden 'Pence vs Pound' glitches. """ outlier_report = [] # Identify price columns (exclude _Volume, _High, _Low) price_columns = [col for col in df.columns if not any(x in str(col) for x in ['_Volume', '_High', '_Low'])] for ticker in price_columns: if ticker in df.columns: # Ensure numeric series = pd.to_numeric(df[ticker], errors='coerce') # --- ROLLING MEDIAN FILTER (1 Month / 20 Days) --- # center=True looks at 10 days before and 10 days after (if available) to find the 'true' level. # min_periods=1 ensures it works even at the start of the file. rolling_median = series.rolling(window=20, center=True, min_periods=1).median() # 1. Catch the 'Pence vs Pounds' Crash (e.g., 154 -> 1.56) # Logic: If price is < 20% of the recent median (an 80% drop). low_threshold = rolling_median * 0.20 # 2. Catch massive data spikes (e.g., 1.56 -> 154 if logic reversed) # Logic: If price is > 5x the recent median (500% spike). high_threshold = rolling_median * 5.0 bad_data_mask = (series < low_threshold) | (series > high_threshold) bad_days = series[bad_data_mask].index if not bad_days.empty: df.loc[bad_days, ticker] = np.nan outlier_report.append({'Ticker': ticker, 'Type': 'Rolling Filter', 'Count': len(bad_days)}) return df, outlier_report # --- 2. Custom Backtesting Engine --- # --- [UPDATED] 9-Factor Version (with MFI & SuperTrend) --- def calculate_confidence_score(df, primary_driver, use_rsi, use_volatility, use_trend, use_volume, use_macd, use_ma_slope, use_markov, use_mfi, use_supertrend, rsi_w, vol_w, trend_w, vol_w_val, macd_w, ma_slope_w, markov_w, mfi_w, supertrend_w, bband_params, best_markov_setup=None): long_score = pd.Series(0.0, index=df.index) short_score = pd.Series(0.0, index=df.index) # --- Pre-calculate Markov State if needed --- if use_markov and best_markov_setup and 'RunUp_State' not in df.columns: run_up_period = best_markov_setup.get('Run-Up Period', 10) df['RunUp_Return'] = df['Close'].pct_change(periods=run_up_period) df['RunUp_State'] = df['RunUp_Return'].apply(lambda x: 'Up' if x > 0 else 'Down') # --- BBand Factor --- if primary_driver != 'Bollinger Bands' and 'bband_lower' in df.columns: bb_weight = 1.0 long_entry_pct = bband_params.get('long_entry_threshold_pct', 0.0) short_entry_pct = bband_params.get('short_entry_threshold_pct', 0.0) long_bb_trigger_price = df['bband_lower'] * (1 - long_entry_pct) short_bb_trigger_price = df['bband_upper'] * (1 + short_entry_pct) long_score_range = (df['large_ma'] - long_bb_trigger_price).replace(0, np.nan) short_score_range = (short_bb_trigger_price - df['large_ma']).replace(0, np.nan) long_score += ((df['large_ma'] - df['Close']) / long_score_range).clip(0, 1).fillna(0) * bb_weight short_score += ((df['Close'] - df['large_ma']) / short_score_range).clip(0, 1).fillna(0) * bb_weight # Factor 1: RSI if primary_driver != 'RSI Crossover' and use_rsi and 'RSI' in df.columns: long_score += ((30 - df['RSI']) / 30).clip(0, 1).fillna(0) * rsi_w short_score += ((df['RSI'] - 70) / 30).clip(0, 1).fillna(0) * rsi_w # Factor 2: Volatility if use_volatility and 'Volatility_p' in df.columns: vol_signal = (df['Volatility_p'] > 0.025).astype(float) * vol_w long_score += vol_signal; short_score += vol_signal # Factor 3: Trend if use_trend and 'SMA_200' in df.columns and 'Close' in df.columns: valid_sma = (df['SMA_200'] != 0) & df['SMA_200'].notna() pct_dist = pd.Series(0.0, index=df.index) pct_dist.loc[valid_sma] = df.loc[valid_sma].apply(lambda row: (row['Close'] - row['SMA_200']) / row['SMA_200'] if row['SMA_200'] != 0 else 0, axis=1) long_score += (pct_dist / 0.10).clip(0, 1).fillna(0) * trend_w short_score += (-pct_dist / 0.10).clip(0, 1).fillna(0) * trend_w # Factor 4: Volume if use_volume and 'Volume_Ratio' in df.columns: vol_spike_signal = ((df['Volume_Ratio'] - 1.75) / 2.25).clip(0, 1).fillna(0) * vol_w_val long_score += vol_spike_signal; short_score += vol_spike_signal # Factor 5: MACD if primary_driver != 'MACD Crossover' and use_macd and 'MACD_line' in df.columns: macd_cross_long = (df['MACD_line'].shift(1) < df['MACD_signal'].shift(1)) & (df['MACD_line'] >= df['MACD_signal']) macd_cross_short = (df['MACD_line'].shift(1) > df['MACD_signal'].shift(1)) & (df['MACD_line'] <= df['MACD_signal']) long_score += macd_cross_long.astype(float) * macd_w * 0.6 short_score += macd_cross_short.astype(float) * macd_w * 0.6 long_score += (df['MACD_hist'] > 0).astype(float) * macd_w * 0.4 short_score += (df['MACD_hist'] < 0).astype(float) * macd_w * 0.4 # Factor 6: MA Slope if primary_driver != 'MA Slope' and use_ma_slope and 'ma_slope' in df.columns: long_score += (df['ma_slope'] > 0).astype(float) * ma_slope_w short_score += (df['ma_slope'] < 0).astype(float) * ma_slope_w # Factor 7: Markov State if primary_driver != 'Markov State' and use_markov and best_markov_setup and 'RunUp_State' in df.columns: strategy = best_markov_setup.get('Strategy') if strategy == 'Down -> Up': long_score += (df['RunUp_State'] == 'Down').astype(float) * markov_w elif strategy == 'Up -> Up': long_score += (df['RunUp_State'] == 'Up').astype(float) * markov_w elif strategy == 'Up -> Down': short_score += (df['RunUp_State'] == 'Up').astype(float) * markov_w elif strategy == 'Down -> Down': short_score += (df['RunUp_State'] == 'Down').astype(float) * markov_w # Factor 8: MFI (Money Flow Index) [IMPROVED] if use_mfi and 'MFI' in df.columns: # Binary Logic: If MFI < 20 (Oversold), give full weight. # This fixes the "tiny score" issue. long_score += (df['MFI'] < 25).astype(float) * mfi_w short_score += (df['MFI'] > 75).astype(float) * mfi_w # Factor 9: SuperTrend [IMPROVED] if use_supertrend and 'SuperTrend' in df.columns: # Long Score: Price > SuperTrend (Trend is Bullish) long_score += (df['Close'] > df['SuperTrend']).astype(float) * supertrend_w # Short Score: Price < SuperTrend (Trend is Bearish) short_score += (df['Close'] < df['SuperTrend']).astype(float) * supertrend_w return long_score.fillna(0), short_score.fillna(0) # --- FULL REPLACEMENT FOR run_backtest FUNCTION --- def run_backtest(data, params, use_rsi, use_volatility, use_trend, use_volume, use_macd, use_ma_slope, use_markov, use_mfi, use_supertrend, # Added MFI/ST rsi_w, vol_w, trend_w, vol_w_val, macd_w, ma_slope_w, markov_w, mfi_w, supertrend_w, # Added MFI/ST weights use_adx_filter, adx_threshold, rsi_logic, adx_period=14, veto_setups_list=None, primary_driver='Bollinger Bands', markov_setup=None, exit_logic_type='Standard (Price-Based)', exit_confidence_threshold=50, smart_trailing_stop_pct=0.05, long_score_95_percentile=None, short_score_95_percentile=None, benchmark_rank=0.95, smart_exit_atr_period=14, smart_exit_atr_multiplier=3.0, intelligent_tsl_pct=1.0, analysis_start_date=None, analysis_end_date=None, benchmark_lookback_years=None, use_rolling_benchmark=False): df = data.copy() required_cols = ['Close'] if 'High' not in df.columns: df['High'] = df['Close'] if 'Low' not in df.columns: df['Low'] = df['Close'] required_cols.extend(['High', 'Low']) if 'Volume' in df.columns: required_cols.append('Volume') df['Close'] = pd.to_numeric(df['Close'], errors='coerce').replace(0, np.nan) df.dropna(subset=['Close'], inplace=True) min_ma_period = params.get('large_ma_period', 50) min_lookback_needed = min_ma_period if (primary_driver == 'Markov State' or use_markov) and markov_setup is not None: min_lookback_needed = max(min_ma_period, markov_setup.get('Run-Up Period', 10)) if len(df) < min_lookback_needed or len(df) < params.get('bband_period', 20) or len(df) < 30: return 0, 0, 0.0, 0.0, None, ([], [], [], []), [], (0, 0, 0, 0), ([], []), (None, None, None, None), (0, 0, 0, 0, 0, 0) # --- Indicator Calculations --- df['large_ma'] = df['Close'].rolling(window=min_ma_period).mean().ffill() df['ma_slope'] = df['large_ma'].diff(periods=3).ffill() bband_period = params.get('bband_period', 20) if len(df) >= bband_period: try: indicator_bb = BollingerBands(close=df['Close'], window=bband_period, window_dev=params['bband_std_dev']) df['bband_lower'] = indicator_bb.bollinger_lband() df['bband_upper'] = indicator_bb.bollinger_hband() except Exception: df['bband_lower'], df['bband_upper'] = np.nan, np.nan else: df['bband_lower'], df['bband_upper'] = np.nan, np.nan if len(df) >= 14: try: indicator_rsi = RSIIndicator(close=df['Close'], window=14) df['RSI'] = indicator_rsi.rsi() except Exception: df['RSI'] = np.nan df['Volatility_p'] = df['Close'].pct_change().rolling(window=14).std() try: indicator_adx = ADXIndicator(high=df['High'], low=df['Low'], close=df['Close'], window=adx_period, fillna=True) df['ADX'] = indicator_adx.adx().ffill() except Exception: df['ADX'] = np.nan try: atr_window = int(smart_exit_atr_period) indicator_atr = AverageTrueRange(high=df['High'], low=df['Low'], close=df['Close'], window=atr_window, fillna=True) df['ATR'] = indicator_atr.average_true_range() except Exception: df['ATR'] = np.nan else: df['RSI'], df['Volatility_p'], df['ADX'], df['ATR'] = np.nan, np.nan, np.nan, np.nan # --- [NEW] MFI & SUPERTREND CALCULATIONS (Fixed) --- if len(df) >= 14: # 1. MFI Calculation try: mfi_ind = MFIIndicator(high=df['High'], low=df['Low'], close=df['Close'], volume=df['Volume'], window=14, fillna=True) df['MFI'] = mfi_ind.money_flow_index() except Exception: df['MFI'] = 50.0 # 2. Proper Recursive SuperTrend Calculation try: st_period = 10 st_multiplier = 3.0 # Calculate ATR high_low = df['High'] - df['Low'] high_close = np.abs(df['High'] - df['Close'].shift()) low_close = np.abs(df['Low'] - df['Close'].shift()) ranges = pd.concat([high_low, high_close, low_close], axis=1) true_range = np.max(ranges, axis=1) atr = true_range.rolling(st_period).mean().fillna(0) # Basic Bands hl2 = (df['High'] + df['Low']) / 2 basic_upper = hl2 + (st_multiplier * atr) basic_lower = hl2 - (st_multiplier * atr) # Initialize Final Bands final_upper = basic_upper.copy() final_lower = basic_lower.copy() trend = np.zeros(len(df), dtype=int) supertrend = np.zeros(len(df)) # Recursive Loop (Accurate Logic) close = df['Close'].values bu = basic_upper.values bl = basic_lower.values fu = final_upper.values fl = final_lower.values # 1 = Uptrend, -1 = Downtrend curr_trend = 1 for i in range(1, len(df)): # Calculate Final Upper Band if bu[i] < fu[i-1] or close[i-1] > fu[i-1]: fu[i] = bu[i] else: fu[i] = fu[i-1] # Calculate Final Lower Band if bl[i] > fl[i-1] or close[i-1] < fl[i-1]: fl[i] = bl[i] else: fl[i] = fl[i-1] # Determine Trend if curr_trend == 1 and close[i] < fl[i]: curr_trend = -1 elif curr_trend == -1 and close[i] > fu[i]: curr_trend = 1 trend[i] = curr_trend supertrend[i] = fl[i] if curr_trend == 1 else fu[i] df['SuperTrend'] = supertrend except Exception as e: # print(f"ST Error: {e}") # Debug if needed df['SuperTrend'] = np.nan else: df['MFI'], df['SuperTrend'] = 50.0, np.nan df['SMA_200'] = df['Close'].rolling(window=200, min_periods=1).mean() if 'Volume' in df.columns: df['Volume'] = pd.to_numeric(df['Volume'], errors='coerce').fillna(0) df['Volume_MA50'] = df['Volume'].rolling(window=50, min_periods=1).mean() df['Volume_Ratio'] = df.apply(lambda row: row['Volume'] / row['Volume_MA50'] if row['Volume_MA50'] > 0 else 0, axis=1) df['Volume_Ratio'] = df['Volume_Ratio'].replace([np.inf, -np.inf], 0).fillna(0) else: df['Volume_Ratio'] = 0.0 if len(df) >= 26: indicator_macd = MACD(close=df['Close'], window_slow=26, window_fast=12, window_sign=9, fillna=True) df['MACD_line'] = indicator_macd.macd().ffill() df['MACD_signal'] = indicator_macd.macd_signal().ffill() df['MACD_hist'] = indicator_macd.macd_diff().ffill() else: df['MACD_line'], df['MACD_signal'], df['MACD_hist'] = np.nan, np.nan, np.nan if (primary_driver == 'Markov State' or use_markov) and markov_setup is not None: run_up_period = markov_setup.get('Run-Up Period', 10) df['RunUp_Return'] = df['Close'].pct_change(periods=run_up_period) df['RunUp_State'] = df['RunUp_Return'].apply(lambda x: 'Up' if x > 0 else 'Down') bband_params_for_score = { 'long_entry_threshold_pct': params.get('long_entry_threshold_pct', 0.0), 'short_entry_threshold_pct': params.get('short_entry_threshold_pct', 0.0) } raw_long_score, raw_short_score = calculate_confidence_score(df, primary_driver, use_rsi, use_volatility, use_trend, use_volume, use_macd, use_ma_slope, use_markov, use_mfi, use_supertrend, # <--- NEW rsi_w, vol_w, trend_w, vol_w_val, macd_w, ma_slope_w, markov_w, mfi_w, supertrend_w, # <--- NEW bband_params_for_score, best_markov_setup=markov_setup ) # 1. GLOBAL (STATIC) MODE - "The Crystal Ball" # Used for "Elite Filtering" of open trades. if not use_rolling_benchmark: if long_score_95_percentile is None: long_scores_gt_zero = raw_long_score[raw_long_score > 0] val = long_scores_gt_zero.quantile(benchmark_rank) if not long_scores_gt_zero.empty else 1.0 long_95 = pd.Series(val, index=df.index) else: long_95 = pd.Series(long_score_95_percentile, index=df.index) if short_score_95_percentile is None: short_scores_gt_zero = raw_short_score[raw_short_score > 0] val = short_scores_gt_zero.quantile(benchmark_rank) if not short_scores_gt_zero.empty else 1.0 short_95 = pd.Series(val, index=df.index) else: short_95 = pd.Series(short_score_95_percentile, index=df.index) # 2. ADAPTIVE (ROLLING) MODE - "Real World" # Used for realistic historical simulation. else: if long_score_95_percentile is None: # Default to 1 year if lookback is missing years = benchmark_lookback_years if (benchmark_lookback_years is not None and benchmark_lookback_years > 0) else 1 window_days = int(years * 252) # Calculate Rolling Percentile long_95 = raw_long_score.rolling(window=window_days, min_periods=50).quantile(benchmark_rank).fillna(method='bfill') else: long_95 = pd.Series(long_score_95_percentile, index=df.index) if short_score_95_percentile is None: years = benchmark_lookback_years if (benchmark_lookback_years is not None and benchmark_lookback_years > 0) else 1 window_days = int(years * 252) short_95 = raw_short_score.rolling(window=window_days, min_periods=50).quantile(benchmark_rank).fillna(method='bfill') else: short_95 = pd.Series(short_score_95_percentile, index=df.index) # Safety: Ensure we don't divide by zero long_95 = long_95.replace(0, 1.0) short_95 = short_95.replace(0, 1.0) # Final Calculation (Removing 'if' check to avoid Series Ambiguity Error) df['long_confidence_score'] = (raw_long_score / long_95 * 100).clip(0, 100).fillna(0.0) df['short_confidence_score'] = (raw_short_score / short_95 * 100).clip(0, 100).fillna(0.0) apply_veto = bool(veto_setups_list) if apply_veto: df['any_long_veto_trigger'] = False; df['any_short_veto_trigger'] = False for veto_setup in veto_setups_list: veto_threshold = veto_setup.get('Conf. Threshold', 0) veto_rsi_on = veto_setup.get('RSI') == 'On'; veto_vol_on = veto_setup.get('Volatility') == 'On' veto_trend_on = veto_setup.get('TREND') == 'On'; veto_volume_on = veto_setup.get('Volume') == 'On' veto_macd_on = veto_setup.get('MACD', 'Off') == 'On'; veto_ma_slope_on = veto_setup.get('MA Slope', 'Off') == 'On' veto_markov_on = False veto_long_raw, veto_short_raw = calculate_confidence_score(df, 'Bollinger Bands', veto_rsi_on, veto_vol_on, veto_trend_on, veto_volume_on, veto_macd_on, veto_ma_slope_on, veto_markov_on, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, bband_params_for_score, best_markov_setup=markov_setup ) veto_long_norm = (veto_long_raw / 3.0) * 100 current_long_veto_trigger = (veto_long_norm >= veto_threshold); current_short_veto_trigger = (veto_long_norm >= veto_threshold) df['any_long_veto_trigger'] |= current_long_veto_trigger; df['any_short_veto_trigger'] |= current_long_veto_trigger # --- Entry Trigger Logic --- bb_long_signal = df['Close'] < (df['bband_lower'] * (1 - params['long_entry_threshold_pct'])) bb_short_signal = df['Close'] > (df['bband_upper'] * (1 + params['short_entry_threshold_pct'])) if rsi_logic == "Crossover": rsi_long_signal = (df['RSI'].shift(1) < 30) & (df['RSI'] >= 30) & df['RSI'].notna() rsi_short_signal = (df['RSI'].shift(1) > 70) & (df['RSI'] <= 70) & df['RSI'].notna() else: # Level rsi_long_signal = (df['RSI'] <= 30) & df['RSI'].notna() rsi_short_signal = (df['RSI'] >= 70) & df['RSI'].notna() macd_long_signal = (df['MACD_line'].shift(1) < df['MACD_signal'].shift(1)) & (df['MACD_line'] >= df['MACD_signal']) macd_short_signal = (df['MACD_line'].shift(1) > df['MACD_signal'].shift(1)) & (df['MACD_line'] <= df['MACD_signal']) ma_slope_long_signal = (df['ma_slope'].shift(1) <= 0) & (df['ma_slope'] > 0) ma_slope_short_signal = (df['ma_slope'].shift(1) >= 0) & (df['ma_slope'] < 0) if primary_driver == 'RSI Crossover': base_long_trigger = rsi_long_signal; base_short_trigger = rsi_short_signal elif primary_driver == 'MACD Crossover': base_long_trigger = macd_long_signal; base_short_trigger = macd_short_signal elif primary_driver == 'MA Slope': base_long_trigger = ma_slope_long_signal; base_short_trigger = ma_slope_short_signal elif primary_driver == 'Markov State' and markov_setup is not None: strategy = markov_setup.get('Strategy') if strategy == 'Down -> Up': base_long_trigger = (df['RunUp_State'] == 'Down'); base_short_trigger = pd.Series(False, index=df.index) elif strategy == 'Up -> Up': base_long_trigger = (df['RunUp_State'] == 'Up'); base_short_trigger = pd.Series(False, index=df.index) elif strategy == 'Up -> Down': base_long_trigger = pd.Series(False, index=df.index); base_short_trigger = (df['RunUp_State'] == 'Up') elif strategy == 'Down -> Down': base_long_trigger = pd.Series(False, index=df.index); base_short_trigger = (df['RunUp_State'] == 'Down') else: base_long_trigger = pd.Series(False, index=df.index); base_short_trigger = pd.Series(False, index=df.index) else: base_long_trigger = bb_long_signal; base_short_trigger = bb_short_signal if use_adx_filter and 'ADX' in df.columns and df['ADX'].notna().any(): adx_allows_entry = (df['ADX'] < adx_threshold).fillna(False) else: adx_allows_entry = True ma_is_valid = pd.notna(df['large_ma']) # [SURGICAL PATCH START] -------------------------------------------------- # Check if ALL confidence indicators are disabled # CORRECTED: Added use_mfi and use_supertrend to this list so the score isn't ignored when they are On. all_indicators_off = not any([ use_rsi, use_volatility, use_trend, use_volume, use_macd, use_ma_slope, use_markov, use_mfi, use_supertrend ]) # If indicators are OFF, we bypass the confidence check (allow raw signal) # Otherwise, we enforce the confidence threshold as usual long_entry_trigger = base_long_trigger & adx_allows_entry & ((df['long_confidence_score'] >= params['confidence_threshold']) | all_indicators_off) & ma_is_valid short_entry_trigger = base_short_trigger & adx_allows_entry & ((df['short_confidence_score'] >= params['confidence_threshold']) | all_indicators_off) & ma_is_valid # [SURGICAL PATCH END] ---------------------------------------------------- # [FIX] Filter entries by BOTH Start and End date if analysis_start_date is not None: start_mask = df.index >= pd.Timestamp(analysis_start_date) long_entry_trigger &= start_mask short_entry_trigger &= start_mask if analysis_end_date is not None: end_mask = df.index <= pd.Timestamp(analysis_end_date) long_entry_trigger &= end_mask short_entry_trigger &= end_mask if apply_veto: long_entry_trigger &= ~df['any_long_veto_trigger'] short_entry_trigger &= ~df['any_short_veto_trigger'] potential_long_price_exit = (df['Close'] >= (df['large_ma'] * (1 + params['long_exit_ma_threshold_pct']))) | (df['Close'] >= df['bband_upper']) potential_short_price_exit = (df['Close'] <= (df['large_ma'] * (1 - params['short_exit_ma_threshold_pct']))) | (df['Close'] <= df['bband_lower']) long_entry_prices = df['Close'].where(long_entry_trigger).ffill() short_entry_prices = df['Close'].where(short_entry_trigger).ffill() if exit_logic_type == 'Intelligent (ADX/MACD/ATR)': adx_rising = (df['ADX'] > df['ADX'].shift(1)).fillna(False) adx_strong = (df['ADX'] > adx_threshold) macd_bullish = (df['MACD_line'] > df['MACD_signal']) stay_in_long_trade = (adx_rising | adx_strong) & macd_bullish base_long_exit_trigger = potential_long_price_exit & (~stay_in_long_trade) macd_bearish = (df['MACD_line'] < df['MACD_signal']) stay_in_short_trade = (adx_rising | adx_strong) & macd_bearish base_short_exit_trigger = potential_short_price_exit & (~stay_in_short_trade) else: long_is_in_profit = (df['Close'] > long_entry_prices).fillna(False) short_is_in_profit = (df['Close'] < short_entry_prices).fillna(False) base_long_exit_trigger = potential_long_price_exit & long_is_in_profit base_short_exit_trigger = potential_short_price_exit & short_is_in_profit df['long_signal'] = np.nan; df.loc[long_entry_trigger, 'long_signal'] = 1; df.loc[base_long_exit_trigger, 'long_signal'] = 0 df['short_signal'] = np.nan; df.loc[short_entry_trigger, 'short_signal'] = -1; df.loc[base_short_exit_trigger, 'short_signal'] = 0 if primary_driver == 'Markov State' and markov_setup is not None: limit_long = markov_setup.get('Future Period', 5); limit_short = markov_setup.get('Future Period', 5) else: limit_long = params.get('max_long_duration', 120); limit_short = params.get('max_short_duration', 120) temp_long_pos = df['long_signal'].ffill().fillna(0) temp_short_pos = df['short_signal'].ffill().fillna(0) long_groups = (~(temp_long_pos == 1)).cumsum(); df['days_in_long_trade'] = df.groupby(long_groups).cumcount(); df.loc[temp_long_pos == 0, 'days_in_long_trade'] = 0 short_groups = (~(temp_short_pos == -1)).cumsum(); df['days_in_short_trade'] = df.groupby(short_groups).cumcount(); df.loc[temp_short_pos == 0, 'days_in_short_trade'] = 0 long_time_exit_trigger = (df['days_in_long_trade'] > limit_long) & (temp_long_pos == 1) short_time_exit_trigger = (df['days_in_short_trade'] > limit_short) & (temp_short_pos == -1) df.loc[long_time_exit_trigger, 'long_signal'] = 0; df.loc[short_time_exit_trigger, 'short_signal'] = 0 df['long_entry_price_static'] = df['Close'].where(df['long_signal'].shift(1) == 0).ffill().bfill() df['short_entry_price_static'] = df['Close'].where(df['short_signal'].shift(1) == 0).ffill().bfill() # [SURGICAL UPDATE START] --- Catcher Offset Logic --- # We retrieve the offset percentage from params (default to 0.0 if missing) catcher_offset = params.get('catcher_stop_pct', 0.0) # Calculate the Adjusted Floor/Ceiling # Positive Offset = Higher Floor (Profit for Long, Profit for Short) # Negative Offset = Lower Floor (Loss allowance for Long, Loss allowance for Short) long_breakeven_floor = df['long_entry_price_static'] * (1 + catcher_offset) short_breakeven_floor = df['short_entry_price_static'] * (1 - catcher_offset) # [SURGICAL UPDATE END] ----------------------------- use_ma_floor_filter = params.get('use_ma_floor_filter', False) standard_tsl_pct_long = params.get('long_trailing_stop_loss_pct', 0) standard_tsl_pct_short = params.get('short_trailing_stop_loss_pct', 0) intelligent_tsl_pct_long = intelligent_tsl_pct if exit_logic_type == 'Intelligent (ADX/MACD/ATR)' else 0.0 intelligent_tsl_pct_short = intelligent_tsl_pct if exit_logic_type == 'Intelligent (ADX/MACD/ATR)' else 0.0 long_tsl_exit = pd.Series(False, index=df.index); short_tsl_exit = pd.Series(False, index=df.index) if primary_driver != 'Markov State': # --- LONG TSL --- has_hit_target = potential_long_price_exit # Determine which TSL percentage to use (Standard vs Intelligent) tsl_to_use_long = np.where(has_hit_target, intelligent_tsl_pct_long, standard_tsl_pct_long) if use_ma_floor_filter else (intelligent_tsl_pct_long if exit_logic_type == 'Intelligent (ADX/MACD/ATR)' else standard_tsl_pct_long) if not isinstance(tsl_to_use_long, pd.Series): tsl_to_use_long = pd.Series(tsl_to_use_long, index=df.index) # Only run calculation if TSL is active (> 0) if (tsl_to_use_long > 0).any(): in_long_trade = (df['long_signal'].ffill().fillna(0) == 1) long_high_water_mark = df['High'].where(in_long_trade).groupby((~in_long_trade).cumsum()).cummax() tsl_from_hwm = long_high_water_mark * (1 - tsl_to_use_long) # [FIX] Apply Catcher floor ONLY if target has been hit (Persistent during trade) if use_ma_floor_filter: trade_groups = (~in_long_trade).cumsum() # [FIX] Cast to float before cummax to avoid Object dtype error target_hit_persistent = has_hit_target.where(in_long_trade).astype(float).groupby(trade_groups).cummax().fillna(0).astype(bool) long_tsl_price = np.where(target_hit_persistent, np.maximum(tsl_from_hwm, long_breakeven_floor), tsl_from_hwm) else: long_tsl_price = tsl_from_hwm long_tsl_exit = in_long_trade & (df['Close'] < long_tsl_price) df.loc[long_tsl_exit, 'long_signal'] = 0 # --- SHORT TSL --- has_hit_target = potential_short_price_exit # Determine which TSL percentage to use (Standard vs Intelligent) tsl_to_use_short = np.where(has_hit_target, intelligent_tsl_pct_short, standard_tsl_pct_short) if use_ma_floor_filter else (intelligent_tsl_pct_short if exit_logic_type == 'Intelligent (ADX/MACD/ATR)' else standard_tsl_pct_short) if not isinstance(tsl_to_use_short, pd.Series): tsl_to_use_short = pd.Series(tsl_to_use_short, index=df.index) # Only run calculation if TSL is active (> 0) if (tsl_to_use_short > 0).any(): in_short_trade = (df['short_signal'].ffill().fillna(0) == -1) short_low_water_mark = df['Low'].where(in_short_trade).groupby((~in_short_trade).cumsum()).cummin() tsl_from_lwm = short_low_water_mark * (1 + tsl_to_use_short) # [FIX] Apply Catcher floor ONLY if target has been hit (Persistent during trade) if use_ma_floor_filter: trade_groups = (~in_short_trade).cumsum() # [FIX] Cast to float before cummax to avoid Object dtype error target_hit_persistent = has_hit_target.where(in_short_trade).astype(float).groupby(trade_groups).cummax().fillna(0).astype(bool) short_tsl_price = np.where(target_hit_persistent, np.minimum(tsl_from_lwm, short_breakeven_floor), tsl_from_lwm) else: short_tsl_price = tsl_from_lwm short_tsl_exit = in_short_trade & (df['Close'] > short_tsl_price) df.loc[short_tsl_exit, 'short_signal'] = 0 df['long_position'] = df['long_signal'].ffill().fillna(0) df['short_position'] = df['short_signal'].ffill().fillna(0) if params['long_delay_days'] > 0: df['long_position'] = df['long_position'].shift(params['long_delay_days']).fillna(0) if params['short_delay_days'] > 0: df['short_position'] = df['short_position'].shift(params['short_delay_days']).fillna(0) df['daily_return'] = df['Close'].pct_change() df['long_strategy_return'] = df['long_position'].shift(1) * df['daily_return'] df['short_strategy_return'] = df['short_position'].shift(1) * df['daily_return'] final_long_pnl = (1 + df['long_strategy_return'].fillna(0)).prod(skipna=True) - 1 final_short_pnl = (1 + df['short_strategy_return'].fillna(0)).prod(skipna=True) - 1 long_entries = df[(df['long_position'] == 1) & (df['long_position'].shift(1) == 0)] long_exits = df[(df['long_position'] == 0) & (df['long_position'].shift(1) == 1)] short_entries = df[(df['short_position'] == -1) & (df['short_position'].shift(1) == 0)] short_exits = df[(df['short_position'] == 0) & (df['short_position'].shift(1) == -1)] if not df.empty: end_date = df.index.max(); else: end_date = pd.NaT; # --- CHANGED: COLLECT ALL HISTORY (Removed "30 day" filter) --- all_historical_trades = [] long_trade_profits, long_durations, first_long_entry_date, last_long_exit_date = [], [], None, None short_trade_profits, short_durations, first_short_entry_date, last_short_exit_date = [], [], None, None long_profit_take_count, long_tsl_count, long_time_exit_count = 0, 0, 0 short_profit_take_count, short_tsl_count, short_time_exit_count = 0, 0, 0 df_indices = pd.Series(range(len(df)), index=df.index) for idx, row in long_entries.iterrows(): if first_long_entry_date is None: first_long_entry_date = idx future_exits = long_exits[long_exits.index > idx] if not future_exits.empty: exit_row = future_exits.iloc[0]; last_long_exit_date = exit_row.name exit_date = exit_row.name is_tsl = long_tsl_exit.loc[exit_row.name]; is_time = long_time_exit_trigger.loc[exit_row.name] if is_tsl: long_tsl_count += 1 elif is_time: long_time_exit_count += 1 else: long_profit_take_count += 1 profit = (exit_row['Close'] / row['Close']) - 1 if pd.notna(exit_row['Close']) and pd.notna(row['Close']) and row['Close'] != 0 else np.nan long_trade_profits.append(profit) # APPEND ALL TRADES (No Date Filter) all_historical_trades.append({'Side': 'Long', 'Date Open': idx, 'Date Closed': exit_date, 'Start Confidence': row.get('long_confidence_score', np.nan), 'Final % P/L': profit, 'Status': 'Closed', 'Exit Reason': 'TSL' if is_tsl else ('Time' if is_time else 'Profit')}) try: long_durations.append(df_indices.loc[exit_row.name] - df_indices.loc[idx]) except KeyError: long_durations.append(np.nan) avg_long_profit_per_trade = np.nanmean(long_trade_profits) if long_trade_profits else 0.0 for idx, row in short_entries.iterrows(): if first_short_entry_date is None: first_short_entry_date = idx future_exits = short_exits[short_exits.index > idx] if not future_exits.empty: exit_row = future_exits.iloc[0]; last_short_exit_date = exit_row.name exit_date = exit_row.name is_tsl = short_tsl_exit.loc[exit_row.name]; is_time = short_time_exit_trigger.loc[exit_row.name] if is_tsl: short_tsl_count += 1 elif is_time: short_time_exit_count += 1 else: short_profit_take_count += 1 profit = ((exit_row['Close'] / row['Close']) - 1) * -1 if pd.notna(exit_row['Close']) and pd.notna(row['Close']) and row['Close'] != 0 else np.nan short_trade_profits.append(profit) # APPEND ALL TRADES (No Date Filter) all_historical_trades.append({'Side': 'Short', 'Date Open': idx, 'Date Closed': exit_date, 'Start Confidence': row.get('short_confidence_score', np.nan), 'Final % P/L': profit, 'Status': 'Closed', 'Exit Reason': 'TSL' if is_tsl else ('Time' if is_time else 'Profit')}) try: short_durations.append(df_indices.loc[exit_row.name] - df_indices.loc[idx]) except KeyError: short_durations.append(np.nan) avg_short_profit_per_trade = np.nanmean(short_trade_profits) if short_trade_profits else 0.0 long_wins = sum(1 for p in long_trade_profits if pd.notna(p) and p > 0); long_losses = sum(1 for p in long_trade_profits if pd.notna(p) and p < 0) short_wins = sum(1 for p in short_trade_profits if pd.notna(p) and p > 0); short_losses = sum(1 for p in short_trade_profits if pd.notna(p) and p < 0) long_trades_log = [{'date': idx, 'price': row['Close'], 'confidence': row.get('long_confidence_score', np.nan)} for idx, row in long_entries.iterrows()] short_trades_log = [{'date': idx, 'price': row['Close'], 'confidence': row.get('short_confidence_score', np.nan)} for idx, row in short_entries.iterrows()] # --- ROBUST OPEN TRADE CAPTURE --- open_trades = [] if not df.empty and 'Close' in df.columns: last_close = df['Close'].iloc[-1] # 1. Check Long if df['long_position'].iloc[-1] == 1: if not long_entries.empty: last_entry_time = long_entries.index[-1] last_entry = long_entries.loc[last_entry_time] entry_price = last_entry['Close'] entry_conf = last_entry.get('long_confidence_score', np.nan) else: last_entry_time = pd.NaT entry_price = df['long_entry_price_static'].iloc[-1] entry_conf = np.nan if pd.notna(last_close) and pd.notna(entry_price) and entry_price != 0: pnl = (last_close / entry_price) - 1 open_trades.append({ 'Side': 'Long', 'Date Open': last_entry_time, 'Date Closed': pd.NaT, 'Start Confidence': entry_conf, 'Final % P/L': pnl, 'Status': 'Open', 'Exit Reason': 'N/A (Open)' }) # 2. Check Short if df['short_position'].iloc[-1] == -1: if not short_entries.empty: last_entry_time = short_entries.index[-1] last_entry = short_entries.loc[last_entry_time] entry_price = last_entry['Close'] entry_conf = last_entry.get('short_confidence_score', np.nan) else: last_entry_time = pd.NaT entry_price = df['short_entry_price_static'].iloc[-1] entry_conf = np.nan if pd.notna(last_close) and pd.notna(entry_price) and entry_price != 0: pnl = ((last_close / entry_price) - 1) * -1 open_trades.append({ 'Side': 'Short', 'Date Open': last_entry_time, 'Date Closed': pd.NaT, 'Start Confidence': entry_conf, 'Final % P/L': pnl, 'Status': 'Open', 'Exit Reason': 'N/A (Open)' }) # --------------------------------- # Combine: [All Historical Closed] + [Current Open] open_trades.extend(all_historical_trades) df.sort_index(inplace=True) trade_dates = (first_long_entry_date, last_long_exit_date, first_short_entry_date, last_short_exit_date) long_durations = [d for d in long_durations if pd.notna(d)]; short_durations = [d for d in short_durations if pd.notna(d)] avg_long_profit = float(avg_long_profit_per_trade) if pd.notna(avg_long_profit_per_trade) else 0.0 avg_short_profit = float(avg_short_profit_per_trade) if pd.notna(avg_short_profit_per_trade) else 0.0 final_long_pnl_float = float(final_long_pnl) if pd.notna(final_long_pnl) else 0.0 final_short_pnl_float = float(final_short_pnl) if pd.notna(final_short_pnl) else 0.0 final_trade_logs = (long_trades_log, long_exits.index, short_trades_log, short_exits.index) exit_breakdown = (long_profit_take_count, long_tsl_count, long_time_exit_count, short_profit_take_count, short_tsl_count, short_time_exit_count) return final_long_pnl_float, final_short_pnl_float, avg_long_profit, avg_short_profit, df, final_trade_logs, open_trades, (long_wins, long_losses, short_wins, short_losses), (long_durations, short_durations), trade_dates, exit_breakdown # --- 3. Charting and Display Functions --- def generate_long_plot(df, trades, ticker): fig = go.Figure() # Add Price and MA Lines fig.add_trace(go.Scatter(x=df.index, y=df['Close'], mode='lines', name='Close Price', line=dict(color='blue'))) if 'large_ma' in df.columns: fig.add_trace(go.Scatter(x=df.index, y=df['large_ma'], mode='lines', name='Large MA', line=dict(color='orange', dash='dash'))) # Add Bollinger Bands if 'bband_upper' in df.columns and 'bband_lower' in df.columns: fig.add_trace(go.Scatter(x=df.index, y=df['bband_upper'], mode='lines', name='Upper Band', line=dict(color='gray', width=0.5))) fig.add_trace(go.Scatter(x=df.index, y=df['bband_lower'], mode='lines', name='Lower Band', line=dict(color='gray', width=0.5), fill='tonexty', fillcolor='rgba(211,211,211,0.2)')) # Unpack Trades Tuple: (Long Entries Log, Long Exits Index, Short Entries Log, Short Exits Index) long_entries_log, long_exits, _, _ = trades # 1. Plot Long Entries if long_entries_log: dates = [t['date'] for t in long_entries_log] prices = [t['price'] for t in long_entries_log] scores = [f"Confidence: {t['confidence']:.0f}%" for t in long_entries_log] # Filter out entries that fall outside the dataframe's date range (just in case) valid_points = [(d, p, s) for d, p, s in zip(dates, prices, scores) if d in df.index] if valid_points: v_dates, v_prices, v_scores = zip(*valid_points) fig.add_trace(go.Scatter(x=v_dates, y=v_prices, mode='markers', name='Long Entry', marker=dict(color='green', symbol='triangle-up', size=12), text=v_scores, hoverinfo='text')) # 2. Plot Long Exits # Ensure long_exits is not empty and contains valid dates found in df if not long_exits.empty: valid_exits = [date for date in long_exits if date in df.index] if valid_exits: exit_prices = df.loc[valid_exits, 'Close'] fig.add_trace(go.Scatter(x=exit_prices.index, y=exit_prices, mode='markers', name='Long Exit', marker=dict(color='darkgreen', symbol='x', size=8))) fig.update_layout(title=f'Long Trades for {ticker}', xaxis_title='Date', yaxis_title='Price', legend_title="Indicator") return fig def generate_short_plot(df, trades, ticker): fig = go.Figure() # Add Price and MA Lines fig.add_trace(go.Scatter(x=df.index, y=df['Close'], mode='lines', name='Close Price', line=dict(color='blue'))) if 'large_ma' in df.columns: fig.add_trace(go.Scatter(x=df.index, y=df['large_ma'], mode='lines', name='Large MA', line=dict(color='orange', dash='dash'))) # Add Bollinger Bands if 'bband_upper' in df.columns and 'bband_lower' in df.columns: fig.add_trace(go.Scatter(x=df.index, y=df['bband_upper'], mode='lines', name='Upper Band', line=dict(color='gray', width=0.5))) fig.add_trace(go.Scatter(x=df.index, y=df['bband_lower'], mode='lines', name='Lower Band', line=dict(color='gray', width=0.5), fill='tonexty', fillcolor='rgba(211,211,211,0.2)')) # Add Trades _, _, short_entries_log, short_exits = trades if short_entries_log: dates = [t['date'] for t in short_entries_log] prices = [t['price'] for t in short_entries_log] scores = [f"Confidence: {t['confidence']:.0f}%" for t in short_entries_log] fig.add_trace(go.Scatter(x=dates, y=prices, mode='markers', name='Short Entry', marker=dict(color='red', symbol='triangle-down', size=12), text=scores, hoverinfo='text')) if not short_exits.empty and 'Close' in df.columns: # Check if Close exists exit_prices = df.loc[short_exits,'Close'].dropna() # Drop exits where price might be NaN fig.add_trace(go.Scatter(x=exit_prices.index, y=exit_prices, mode='markers', name='Short Exit', marker=dict(color='darkred', symbol='x', size=8))) fig.update_layout(title=f'Short Trades for {ticker}', xaxis_title='Date', yaxis_title='Price', legend_title="Indicator"); return fig def normalise_strategy_score(raw_score, benchmark_for_100_percent=0.25): if not np.isfinite(raw_score) or raw_score <= 0: return 0.0 # Handle NaN/inf return min((raw_score / benchmark_for_100_percent) * 100, 100.0) def calculate_strategy_score(avg_profit, gb_ratio, total_trades): """ Calculates a Weighted Strategy Score with a 'Quality Gate'. 1. Weights: Profit (42.5%), Win/Loss (42.5%), Trades (15%). 2. Targets: Profit 3.0%, Win/Loss 5.0, Trades 3000. 3. QUALITY GATE: If Win/Loss Ratio < 2.0, Profit Score is CAPPED at 1.0. (You cannot get 'extra credit' for high profit if the structure is risky). """ # 1. Define Targets target_profit = 0.03 # 3.0% per trade target_gb = 5.0 # 5.0 Win/Loss Ratio target_trades = 3000.0 # Perfect trade count # 2. Define Weights (Sum = 1.0) w_profit = 0.425 w_gb = 0.425 w_trades = 0.15 # 3. Calculate Scores # Profit: Uncapped Base s_profit = avg_profit / target_profit if s_profit < 0: s_profit = 0 # G/B Ratio: Uncapped Base s_gb = gb_ratio / target_gb if s_gb < 0: s_gb = 0 # --- QUALITY GATE --- # If the strategy has a poor Win/Loss ratio (< 2.0), # we cap the Profit score at 1.0 (100%). # This prevents high volatility/lucky profit from masking a bad ratio. if gb_ratio < 2.0: s_profit = min(s_profit, 1.0) # Trade Count: Pyramid Penalty dist = abs(total_trades - target_trades) s_trades = 1.0 - (dist / target_trades) if s_trades < 0: s_trades = 0 # 4. Final Weighted Score final_score = (s_profit * w_profit) + (s_gb * w_gb) + (s_trades * w_trades) return final_score * 100 def display_summary_analytics(summary_df): st.subheader("Overall Strategy Performance") trade_counts = st.session_state.get('trade_counts', {}) trade_durations = st.session_state.get('trade_durations', {}) exit_totals = st.session_state.get('exit_breakdown_totals', {}) # --- Clear last run stats before calculating --- st.session_state.last_run_stats = {} col1, col2 = st.columns(2) for side in ["Long", "Short"]: # Ensure summary_df exists and has the required columns req_cols = [f'Num {side} Trades', f'Avg {side} Profit per Trade', f'Cumulative {side} P&L', f'Avg {side} Confidence'] if summary_df is None or not all(col in summary_df.columns for col in req_cols): st.warning(f"Summary data for {side} trades is missing or incomplete.") continue active_trades_df = summary_df[summary_df[f'Num {side} Trades'] > 0] container = col1 if side == "Long" else col2 with container: st.subheader(f"{side} Trades") if not active_trades_df.empty: total_trades = active_trades_df[f'Num {side} Trades'].sum() avg_trade_profit = (active_trades_df[f'Avg {side} Profit per Trade'].fillna(0) * active_trades_df[f'Num {side} Trades']).sum() / total_trades if total_trades > 0 else 0 avg_cumulative_profit = active_trades_df[f'Cumulative {side} P&L'].mean() avg_confidence = active_trades_df[f'Avg {side} Confidence'].mean() if pd.isna(avg_confidence): avg_confidence = 0 good_tickers = (active_trades_df[f'Cumulative {side} P&L'] > 0).sum() bad_tickers = (active_trades_df[f'Cumulative {side} P&L'] < 0).sum() ticker_good_bad_ratio = good_tickers / bad_tickers if bad_tickers > 0 else 99999.0 display_score = calculate_strategy_score(avg_trade_profit, ticker_good_bad_ratio, total_trades) st.metric("Strategy Score", f"{display_score:.2f}%") st.metric("Avg Profit per Trade (Active Tickers)", f"{avg_trade_profit:.2%}") net_return_pct = avg_trade_profit * total_trades * 100 st.metric("Moneypile Score", f"{net_return_pct:.1f}", help="The total 'Pile of Money'. Sum of all trade percentages (Winners - Losers).") st.text(f"Profitable Tickers: {good_tickers}") st.text(f"Losing Tickers: {bad_tickers}") st.text(f"Ticker Good/Bad Ratio: {ticker_good_bad_ratio:.2f}") side_lower = side.lower() wins = trade_counts.get(f"{side_lower}_wins", 0) losses = trade_counts.get(f"{side_lower}_losses", 0) # --- NEW: Calculate Open Trades explicitly --- open_trades_count = int(total_trades) - (wins + losses) # --- TRADING STATS --- st.markdown("---") st.text(f"Total Individual Trades: {int(total_trades)}") st.text(f"Winning Trades: {wins}") st.text(f"Losing Trades: {losses}") st.text(f"Open Trades: {open_trades_count}") trade_win_loss_ratio = 0.0 if wins > 0 or losses > 0: trade_win_loss_ratio = wins / losses if losses > 0 else 99999.0 st.text(f"Trade Win/Loss Ratio: {trade_win_loss_ratio:.2f}") if side == "Long": st.session_state.last_run_stats = { "Z_Avg_Profit": avg_trade_profit, "Z_Num_Trades": int(total_trades), "Z_WL_Ratio": trade_win_loss_ratio } # --- EXIT BREAKDOWN --- if exit_totals: st.markdown("---") st.subheader("Exit Breakdown") if side == "Long": profit_count = exit_totals.get('long_profit_take_count', 0) tsl_count = exit_totals.get('long_tsl_count', 0) time_count = exit_totals.get('long_time_exit_count', 0) # Get Long Timeout Setting limit_days = st.session_state.get('max_long_duration', 60) else: profit_count = exit_totals.get('short_profit_take_count', 0) tsl_count = exit_totals.get('short_tsl_count', 0) time_count = exit_totals.get('short_time_exit_count', 0) # Get Short Timeout Setting limit_days = st.session_state.get('max_short_duration', 10) exit_total = profit_count + tsl_count + time_count if exit_total > 0: st.markdown(f"**Profit Take:** {profit_count} ({profit_count/exit_total:.1%})") st.markdown(f"**Stop Loss:** {tsl_count} ({tsl_count/exit_total:.1%})") # Dynamic Label st.markdown(f"**Time Out ({limit_days}d):** {time_count} ({time_count/exit_total:.1%})") # --- DURATION STATS --- avg_duration = trade_durations.get(f"avg_{side_lower}_duration", 0) max_duration = trade_durations.get(f"max_{side_lower}_duration", 0) if avg_duration > 0 or max_duration > 0: st.text(f"Avg Trade Duration: {avg_duration:.1f} days") st.text(f"Longest Trade: {max_duration:.0f} days") else: st.info("No trades found for this side with current settings.") def generate_profit_distribution_chart(summary_df): """ Creates two histograms showing the distribution of Average Profit per Trade for Long and Short strategies. [UPDATED] Now weighted by 'Num Trades' so the Y-axis shows Total Trades, not just Total Tickers. """ if summary_df is None or summary_df.empty: return None fig = go.Figure() # 1. Long Distribution if 'Avg Long Profit per Trade' in summary_df.columns and 'Num Long Trades' in summary_df.columns: # Filter out tickers that didn't trade mask = summary_df['Num Long Trades'] > 0 long_data = summary_df.loc[mask, 'Avg Long Profit per Trade'] long_weights = summary_df.loc[mask, 'Num Long Trades'] # Use trade count as weight if not long_data.empty: fig.add_trace(go.Histogram( x=long_data, y=long_weights, # [FIX] Weight by number of trades histfunc='sum', # [FIX] Sum the weights to get Total Trades per bin name='Long Strategy', marker_color='green', opacity=0.7, nbinsx=50, histnorm='' )) # 2. Short Distribution if 'Avg Short Profit per Trade' in summary_df.columns and 'Num Short Trades' in summary_df.columns: mask = summary_df['Num Short Trades'] > 0 short_data = summary_df.loc[mask, 'Avg Short Profit per Trade'] short_weights = summary_df.loc[mask, 'Num Short Trades'] # Use trade count as weight if not short_data.empty: fig.add_trace(go.Histogram( x=short_data, y=short_weights, # [FIX] Weight by number of trades histfunc='sum', # [FIX] Sum the weights name='Short Strategy', marker_color='red', opacity=0.7, nbinsx=50, visible='legendonly' )) # 3. Add Zero Line fig.add_vline(x=0, line_width=2, line_dash="dash", line_color="black", annotation_text="Break Even") # 4. Layout fig.update_layout( title="Distribution of Average Profit per Trade (Weighted by Trade Count)", xaxis_title="Average Profit per Trade (decimal, e.g. 0.01 = 1%)", yaxis_title="Number of Trades", # [FIX] Updated Label barmode='overlay', bargap=0.1, template="plotly_white", legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), height=400 ) # Format X axis as percentage fig.update_xaxes(tickformat=".1%") return fig # --- 4. Optimisation Functions (Parallelised) --- # --- FULL REPLACEMENT WORKER: ACCEPTS A SINGLE DICTIONARY ARGUMENT --- def run_single_parameter_test(task_data): # Unpack all arguments from the single dictionary provided by the multiprocessing pool params = task_data['params'] confidence_settings = task_data['confidence_settings'] master_df = task_data['master_df'] optimise_for = task_data['optimise_for'] tickers = task_data['tickers'] date_range = task_data['date_range'] power = task_data['power'] # --- FIX: Correctly unpack ALL 7 items from the list --- toggles_list = list(confidence_settings['toggles']) weights_list = list(confidence_settings['weights']) # Unpack all 7 factors (matches the order sent by generate_and_run_optimisation) use_rsi, use_vol, use_trend, use_volume, use_macd, use_ma_slope, use_markov = toggles_list rsi_w, vol_w, trend_w, volume_w, macd_w, ma_slope_w, markov_w = weights_list # --- END FIX --- # --- Rest of Unpack --- use_adx_filter, adx_threshold, adx_period = confidence_settings['adx_settings'] rsi_logic = confidence_settings['rsi_logic'] primary_driver = confidence_settings['primary_driver'] markov_setup = confidence_settings['markov_setup'] exit_logic = confidence_settings['exit_logic'] exit_thresh = confidence_settings['exit_thresh'] smart_trailing_stop = confidence_settings['smart_trailing_stop'] smart_exit_atr_p = confidence_settings['smart_exit_atr_period'] smart_exit_atr_m = confidence_settings['smart_exit_atr_multiplier'] intelligent_tsl_pct = confidence_settings['intelligent_tsl_pct'] long_95_percentile = confidence_settings['long_95_percentile'] short_95_percentile = confidence_settings['short_95_percentile'] veto_list = confidence_settings['veto_list'] # --- End Unpack --- total_profit_weighted_avg, total_trades, winning_tickers, losing_tickers = 0, 0, 0, 0 total_wins, total_losses = 0, 0 all_confidences = [] PROFIT_THRESHOLD = 1.0 excluded_tickers = [] # Initialize exit breakdown counters (Long and Short) total_exit_breakdown = [0, 0, 0, 0, 0, 0] # LP, LT, LE, SP, ST, SE if not isinstance(tickers, list): tickers = [tickers] for ticker in tickers: cols_to_use = [ticker] if f'{ticker}_High' in master_df.columns: cols_to_use.append(f'{ticker}_High') if f'{ticker}_Low' in master_df.columns: cols_to_use.append(f'{ticker}_Low') if f'{ticker}_Volume' in master_df.columns: cols_to_use.append(f'{ticker}_Volume') existing_cols = [col for col in cols_to_use if col in master_df.columns] if ticker not in existing_cols: continue ticker_data_full = master_df.loc[:, existing_cols] ticker_data = ticker_data_full.loc[date_range[0]:date_range[1]] rename_dict = { ticker: 'Close', f'{ticker}_High': 'High', f'{ticker}_Low': 'Low', f'{ticker}_Volume': 'Volume' } rename_dict_filtered = {k: v for k, v in rename_dict.items() if k in existing_cols} ticker_data = ticker_data.rename(columns=rename_dict_filtered) if not ticker_data.empty and 'Close' in ticker_data.columns and not ticker_data['Close'].isna().all(): # --- CAPTURE FIX: run_backtest now returns exit_breakdown (11th return value) --- long_pnl, short_pnl, avg_long_trade, avg_short_trade, _, trades, _, trade_counts, _, _, exit_breakdown = run_backtest( ticker_data, params, use_rsi, use_vol, use_trend, use_volume, use_macd, use_ma_slope, use_markov, rsi_w, vol_w, trend_w, volume_w, macd_w, ma_slope_w, markov_w, use_adx_filter, adx_threshold, rsi_logic, adx_period, veto_setups_list=veto_list, primary_driver=primary_driver, markov_setup=markov_setup, exit_logic_type=exit_logic, exit_confidence_threshold=exit_thresh, smart_trailing_stop_pct=smart_trailing_stop, smart_exit_atr_period=smart_exit_atr_p, smart_exit_atr_multiplier=smart_exit_atr_m, intelligent_tsl_pct=intelligent_tsl_pct, long_score_95_percentile=long_95_percentile, short_score_95_percentile=short_95_percentile ) if abs(long_pnl) > PROFIT_THRESHOLD or abs(short_pnl) > PROFIT_THRESHOLD or \ (avg_long_trade is not None and pd.notna(avg_long_trade) and abs(avg_long_trade) > PROFIT_THRESHOLD) or \ (avg_short_trade is not None and pd.notna(avg_short_trade) and abs(avg_short_trade) > PROFIT_THRESHOLD): excluded_tickers.append(ticker) continue # Aggregate exit breakdown counters total_exit_breakdown = [sum(x) for x in zip(total_exit_breakdown, exit_breakdown)] if optimise_for == 'long': pnl, avg_trade_profit, trade_log = long_pnl, avg_long_trade, trades[0] total_wins += trade_counts[0]; total_losses += trade_counts[1] else: pnl, avg_trade_profit, trade_log = short_pnl, avg_short_trade, trades[2] total_wins += trade_counts[2]; total_losses += trade_counts[3] num_trades = len(trade_log) if num_trades > 0 and avg_trade_profit is not None and pd.notna(avg_trade_profit): total_trades += num_trades total_profit_weighted_avg += avg_trade_profit * num_trades if pnl > 0: winning_tickers += 1 elif pnl < 0: losing_tickers += 1 all_confidences.extend([trade['confidence'] for trade in trade_log if pd.notna(trade.get('confidence'))]) overall_avg_profit = 0.0 good_bad_ratio = 0.0 trade_good_bad_ratio = 0.0 if total_trades > 0: overall_avg_profit = total_profit_weighted_avg / total_trades if losing_tickers > 0: good_bad_ratio = winning_tickers / losing_tickers elif winning_tickers > 0: good_bad_ratio = 99999.0 if total_losses > 0: trade_good_bad_ratio = total_wins / total_losses elif total_wins > 0: trade_good_bad_ratio = 99999.0 avg_entry_confidence = np.mean(all_confidences) if all_confidences else 0.0 # Return a single dictionary containing all calculated metrics and the configuration data return { # CORE METRICS "Avg Profit/Trade": overall_avg_profit, "Ticker G/B Ratio": good_bad_ratio, "Trade G/B Ratio": trade_good_bad_ratio, "Total Trades": total_trades, "Net % Return": total_profit_weighted_avg * 100, "Avg Entry Conf.": avg_entry_confidence, "Winning Tickers": winning_tickers, "Losing Tickers": losing_tickers, "Exit Breakdown": total_exit_breakdown, # NEW: Return the aggregated breakdown # CONFIG DATA (for Orchestrator unpacking) "params": params, "confidence_settings": confidence_settings } # --- UPDATED: Calculation Only (Saves to Session State) --- def generate_and_run_optimisation(master_df, main_content_placeholder, optimise_for, use_squared_weighting): # Clear previous results to avoid confusion st.session_state.param_results_df = None st.session_state.best_params = None # Clear other sections st.session_state.summary_df = None st.session_state.single_ticker_results = None st.session_state.confidence_results_df = None st.session_state.open_trades_df = None st.session_state.advisor_df = None with main_content_placeholder.container(): veto_list_to_use = st.session_state.get('veto_setup_list', []) if veto_list_to_use: st.info(f"{len(veto_list_to_use)} Veto filter(s) are ACTIVE for this optimisation run.") st.info("Calibrating confidence scores (0-100%)...") # --- 1. Gather Settings & Run Calibration (Same as before) --- use_rsi = st.session_state.use_rsi; use_vol = st.session_state.use_vol; use_trend = st.session_state.use_trend use_volume = st.session_state.use_volume; use_macd = st.session_state.use_macd use_ma_slope = st.session_state.use_ma_slope; use_markov = st.session_state.use_markov rsi_w = st.session_state.rsi_w; vol_w = st.session_state.vol_w; trend_w = st.session_state.trend_w vol_w_val = st.session_state.volume_w; macd_w = st.session_state.macd_w ma_slope_w = st.session_state.ma_slope_w; markov_w = st.session_state.markov_w calibration_params = { "large_ma_period": st.session_state.ma_period, "bband_period": st.session_state.bb_period, "bband_std_dev": st.session_state.bb_std, "confidence_threshold": st.session_state.confidence_slider, "long_entry_threshold_pct": st.session_state.long_entry / 100.0, "long_exit_ma_threshold_pct": st.session_state.long_exit / 100.0, "long_trailing_stop_loss_pct": st.session_state.long_sl / 100.0, "long_delay_days": st.session_state.long_delay, "short_entry_threshold_pct": st.session_state.short_entry / 100.0, "short_exit_ma_threshold_pct": st.session_state.short_exit / 100.0, "short_trailing_stop_loss_pct": st.session_state.short_sl / 100.0, "short_delay_days": st.session_state.short_delay, "max_trading_days": st.session_state.max_duration } markov_setup = st.session_state.get('best_markov_setup') primary_driver = st.session_state.primary_driver calib_adx_filter = st.session_state.use_adx_filter calib_adx_thresh = st.session_state.adx_threshold calib_adx_period = st.session_state.adx_period calib_rsi_logic = st.session_state.rsi_logic calib_exit_logic = st.session_state.exit_logic_type calib_exit_thresh = st.session_state.exit_confidence_threshold calib_smart_trailing_stop = st.session_state.smart_trailing_stop_pct / 100.0 calib_smart_atr_p = st.session_state.smart_exit_atr_period calib_smart_atr_m = st.session_state.smart_exit_atr_multiplier calib_intelligent_tsl = st.session_state.intelligent_tsl_pct / 100.0 all_long_scores = []; all_short_scores = [] tickers_to_run_calib = [col for col in master_df.columns if not ('_Volume' in str(col) or '_High' in str(col) or '_Low' in str(col))] date_range_calib = (pd.Timestamp(st.session_state.start_date), pd.Timestamp(st.session_state.end_date)) for ticker_symbol in tickers_to_run_calib: cols_to_use = [ticker_symbol, f'{ticker_symbol}_High', f'{ticker_symbol}_Low', f'{ticker_symbol}_Volume'] existing_cols = [col for col in cols_to_use if col in master_df.columns] if ticker_symbol not in existing_cols: continue ticker_data_full = master_df.loc[:, existing_cols] ticker_data = ticker_data_full.loc[date_range_calib[0]:date_range_calib[1]] rename_dict = {ticker_symbol: 'Close', f'{ticker_symbol}_High': 'High', f'{ticker_symbol}_Low': 'Low', f'{ticker_symbol}_Volume': 'Volume'} rename_dict_filtered = {k: v for k, v in rename_dict.items() if k in existing_cols} ticker_data = ticker_data.rename(columns=rename_dict_filtered) l_pnl, s_pnl, al, as_, _, _, _, _, _, _, _ = run_backtest( data=ticker_data, params=calibration_params, use_rsi=use_rsi, use_volatility=use_vol, use_trend=use_trend, use_volume=use_volume, use_macd=use_macd, use_ma_slope=use_ma_slope, use_markov=use_markov, rsi_w=rsi_w, vol_w=vol_w, trend_w=trend_w, vol_w_val=vol_w_val, macd_w=macd_w, ma_slope_w=ma_slope_w, markov_w=markov_w, use_adx_filter=calib_adx_filter, adx_threshold=calib_adx_thresh, rsi_logic=calib_rsi_logic, adx_period=calib_adx_period, veto_setups_list=veto_list_to_use, primary_driver=primary_driver, markov_setup=markov_setup, exit_logic_type=calib_exit_logic, exit_confidence_threshold=calib_exit_thresh, smart_trailing_stop_pct=calib_smart_trailing_stop, smart_exit_atr_period=calib_smart_atr_p, smart_exit_atr_multiplier=calib_smart_atr_m, intelligent_tsl_pct=calib_intelligent_tsl, long_score_95_percentile=1.0, short_score_95_percentile=1.0 ) raw_long, raw_short = calculate_confidence_score( ticker_data, primary_driver, use_rsi, use_vol, use_trend, use_volume, use_macd, use_ma_slope, use_markov, rsi_w, vol_w, trend_w, vol_w_val, macd_w, ma_slope_w, markov_w, calibration_params, markov_setup ) all_long_scores.append(raw_long[raw_long > 0]) all_short_scores.append(raw_short[raw_short > 0]) long_95 = pd.concat(all_long_scores).quantile(0.95) if all_long_scores else 1.0 short_95 = pd.concat(all_short_scores).quantile(0.95) if all_short_scores else 1.0 if pd.isna(long_95) or long_95 == 0: long_95 = 1.0 if pd.isna(short_95) or short_95 == 0: short_95 = 1.0 st.info(f"Confidence calibrated: Long 95th percentile = {long_95:.2f}, Short 95th percentile = {short_95:.2f}") # --- 2. Build Combinations (Same as before) --- ma_range = range(st.session_state.ma_start, st.session_state.ma_end + 1, st.session_state.ma_step) if st.session_state.opt_ma_cb else [st.session_state.ma_period] bb_range = range(st.session_state.bb_start, st.session_state.bb_end + 1, st.session_state.bb_step) if st.session_state.opt_bb_cb else [st.session_state.bb_period] std_range = np.arange(st.session_state.std_start, st.session_state.std_end + 0.001, st.session_state.std_step) if st.session_state.opt_std_cb else [st.session_state.bb_std] sl_range = np.arange(st.session_state.sl_start, st.session_state.sl_end + 0.001, st.session_state.sl_step) / 100 if st.session_state.opt_sl_cb else [st.session_state.long_sl / 100] delay_range = range(st.session_state.delay_start, st.session_state.delay_end + 1, st.session_state.delay_step) if st.session_state.opt_delay_cb else [st.session_state.long_delay] entry_range = np.arange(st.session_state.entry_start, st.session_state.entry_end + 0.001, st.session_state.entry_step) / 100 if st.session_state.opt_entry_cb else [st.session_state.long_entry / 100] exit_range = np.arange(st.session_state.exit_start, st.session_state.exit_end + 0.001, st.session_state.exit_step) / 100 if st.session_state.opt_exit_cb else [st.session_state.long_exit / 100] conf_range = range(st.session_state.conf_start, st.session_state.conf_end + 1, st.session_state.conf_step) if st.session_state.opt_conf_cb else [st.session_state.confidence_slider] dur_range = range(st.session_state.dur_start, st.session_state.dur_end + 1, st.session_state.dur_step) if st.session_state.opt_duration_cb else [st.session_state.max_duration] param_product = itertools.product(ma_range, bb_range, std_range, sl_range, delay_range, entry_range, exit_range, conf_range, dur_range) param_combinations = [{ "large_ma_period": p[0], "bband_period": p[1], "bband_std_dev": p[2], "long_trailing_stop_loss_pct": p[3], "short_trailing_stop_loss_pct": p[3], "long_delay_days": p[4], "short_delay_days": p[4], "long_entry_threshold_pct": p[5], "short_entry_threshold_pct": p[5], "long_exit_ma_threshold_pct": p[6], "short_exit_ma_threshold_pct": p[6], "confidence_threshold": p[7], "max_trading_days": p[8] } for p in param_product] total_combinations = len(param_combinations) if total_combinations <= 1: st.warning("No optimisation parameters selected."); return confidence_settings = { 'toggles': (use_rsi, use_vol, use_trend, use_volume, use_macd, use_ma_slope, use_markov), 'weights': (rsi_w, vol_w, trend_w, vol_w_val, macd_w, ma_slope_w, markov_w), 'adx_settings': (st.session_state.use_adx_filter, st.session_state.adx_threshold, st.session_state.adx_period), 'rsi_logic': st.session_state.rsi_logic, 'primary_driver': primary_driver, 'markov_setup': markov_setup, 'exit_logic': st.session_state.exit_logic_type, 'exit_thresh': st.session_state.exit_confidence_threshold, 'smart_trailing_stop': st.session_state.smart_trailing_stop_pct / 100.0, 'smart_exit_atr_period': st.session_state.smart_exit_atr_period, 'smart_exit_atr_multiplier': st.session_state.smart_exit_atr_multiplier, 'intelligent_tsl_pct': st.session_state.intelligent_tsl_pct / 100.0, 'long_95_percentile': long_95, 'short_95_percentile': short_95, 'veto_list': veto_list_to_use } # --- 3. Execution --- num_cores = cpu_count() st.info(f"Starting {optimise_for.upper()} optimisation on {num_cores} cores... Testing {total_combinations} combinations.") if st.session_state.run_mode.startswith("Analyse Full List"): tickers_to_run = [col for col in master_df.columns if not ('_Volume' in str(col) or '_High' in str(col) or '_Low' in str(col))] else: tickers_to_run = [st.session_state.ticker_select] date_range = (pd.Timestamp(st.session_state.start_date), pd.Timestamp(st.session_state.end_date)) power = 2 if use_squared_weighting else 1 status_text = st.empty(); status_text.text("Optimisation starting...") progress_bar = st.progress(0) tasks = [] for p in param_combinations: tasks.append({ 'params': p, 'confidence_settings': confidence_settings, 'master_df': master_df, 'optimise_for': optimise_for, 'tickers': tickers_to_run, 'date_range': date_range, 'power': power }) results_list = [] with Pool(processes=num_cores) as pool: try: iterator = pool.imap_unordered(run_single_parameter_test, tasks) for i, result_dict in enumerate(iterator, 1): results_list.append(result_dict) progress_bar.progress(i / total_combinations, text=f"Optimising... {i}/{total_combinations} combinations complete.") except Exception as e: st.error(f"An error occurred during multiprocessing: {e}") status_text.text("Optimisation failed due to an error."); return status_text.text("Optimisation complete. Processing results...") if not results_list: st.warning("Optimisation finished, but no valid results were found."); return flattened_results = [] for r in results_list: flat_row = r.copy() del flat_row['params']; del flat_row['confidence_settings'] flat_row.update(r['params']) flattened_results.append(flat_row) results_df = pd.DataFrame(flattened_results) min_trades_threshold = 10 if 'Total Trades' in results_df.columns: results_df = results_df[results_df['Total Trades'] >= min_trades_threshold].copy() if results_df.empty: st.warning(f"No results found with at least {min_trades_threshold} trades. Try a smaller threshold or different settings."); return results_df['Strategy Score'] = results_df.apply( lambda row: calculate_strategy_score(row['Avg Profit/Trade'], row['Ticker G/B Ratio'], row['Total Trades']), axis=1 ) # --- 4. PRE-SORT (Descending Duration) --- # We sort by Score (Desc) AND Duration (Desc) so the "Biggest Number" comes first. if 'max_trading_days' in results_df.columns: results_df = results_df.sort_values(by=['Strategy Score', 'max_trading_days'], ascending=[False, False]) else: results_df = results_df.sort_values(by='Strategy Score', ascending=False) # Save the full results to session state so `main()` can display them st.session_state.param_results_df = results_df # Save best params (first row is best score + highest duration) best_setup_series = results_df.iloc[0] st.session_state.best_params = {k: best_setup_series[k] for k in param_combinations[0].keys()} status_text.empty() st.success(f"Optimisation Complete! Best Strategy Score: {best_setup_series['Strategy Score']:.2f}%") st.subheader("Optimal Parameters Found (based on Strategy Score):") st.json(st.session_state.best_params) # Note: We do NOT display the table here anymore. main() handles it. # --- 4.5. NEW COMBINED OPTIMISATION FUNCTION (Optimized: 32k -> 3k Tasks) --- def generate_and_run_combined_optimisation(master_df, main_content_placeholder, optimise_for): st.session_state.summary_df = None st.session_state.single_ticker_results = None st.session_state.confidence_results_df = None st.session_state.open_trades_df = None st.session_state.advisor_df = None st.session_state.best_params = None st.session_state.best_weights = None # Define side-specific filename to separate Long/Short results results_file = f"combined_optimisation_results_{optimise_for}.csv" with main_content_placeholder.container(): # --- Visual Feedback --- st.header(f"⚡ Running Combined Factor & Weight Optimisation ({optimise_for.title()})") st.caption(f"Results will be saved to: {results_file}") # --- FIXED PARAMETERS --- fixed_params = { "large_ma_period": st.session_state.ma_period, "bband_period": st.session_state.bb_period, "bband_std_dev": st.session_state.bb_std, "confidence_threshold": 50, "long_entry_threshold_pct": st.session_state.long_entry / 100, "short_entry_threshold_pct": st.session_state.short_entry / 100, "long_exit_ma_threshold_pct": st.session_state.long_exit / 100, "short_exit_ma_threshold_pct": st.session_state.short_exit / 100, "long_trailing_stop_loss_pct": st.session_state.long_sl / 100, "short_trailing_stop_loss_pct": st.session_state.short_sl / 100, "long_delay_days": st.session_state.long_delay, "short_delay_days": st.session_state.short_delay } # --- CHECKPOINT LOADING --- completed_configs = load_completed_setups(results_file) st.info(f"Loaded {len(completed_configs)} completed combinations from {results_file}. Resuming job...") # --- DYNAMIC FACTOR/WEIGHT GENERATION (OPTIMIZED) --- # Instead of looping Toggles and Weights separately (which creates duplicates), # we treat 0.0 as "Off" and any other value as "On + Weight". # 5 States per Factor: [Off, 0.5, 1.0, 1.5, 2.0] factors = ['RSI', 'Volatility', 'Volume', 'MACD', 'MA Slope'] possible_states = [0.0, 0.5, 1.0, 1.5, 2.0] # 0.0 = Off # Generate all 3,125 unique combinations (5^5) all_combos = list(itertools.product(possible_states, repeat=len(factors))) all_tasks = [] fixed_adx_settings = (st.session_state.use_adx_filter, st.session_state.adx_threshold, st.session_state.adx_period) fixed_markov_setup = st.session_state.get('best_markov_setup') fixed_exit_settings = { 'rsi_logic': st.session_state.rsi_logic, 'primary_driver': st.session_state.primary_driver, 'exit_logic': st.session_state.exit_logic_type, 'exit_thresh': st.session_state.exit_confidence_threshold, 'smart_trailing_stop': st.session_state.smart_trailing_stop_pct / 100.0, 'smart_exit_atr_period': st.session_state.smart_exit_atr_period, 'smart_exit_atr_multiplier': st.session_state.smart_exit_atr_multiplier, 'intelligent_tsl_pct': st.session_state.intelligent_tsl_pct / 100.0 } tickers_to_run = [col for col in master_df.columns if not ('_Volume' in str(col) or '_High' in str(col) or '_Low' in str(col))] date_range = (pd.Timestamp(st.session_state.start_date), pd.Timestamp(st.session_state.end_date)) for combo in all_combos: weights_5 = list(combo) toggles_5 = [w > 0.0 for w in weights_5] # If weight > 0, it's On. If 0.0, it's Off. # --- EXPAND TO 7 ITEMS (Insert False/0.0 for Trend and Markov) --- # Standard Order: RSI, Vol, TREND, Volume, MACD, MA Slope, Markov toggles_7 = ( toggles_5[0], toggles_5[1], False, toggles_5[2], toggles_5[3], toggles_5[4], False ) weights_7 = ( weights_5[0], weights_5[1], 0.0, weights_5[2], weights_5[3], weights_5[4], 0.0 ) # --- CHECKPOINT LOGIC --- toggles_str = tuple(["On" if t else "Off" for t in toggles_7]) weights_round = tuple([round(w, 2) for w in weights_7]) task_key = (toggles_str, weights_round) if task_key not in completed_configs: confidence_settings = { 'toggles': toggles_7, 'weights': weights_7, 'adx_settings': fixed_adx_settings, 'markov_setup': fixed_markov_setup, 'confidence_threshold': fixed_params['confidence_threshold'], 'long_95_percentile': 1.0, 'short_95_percentile': 1.0, 'veto_list': st.session_state.get('veto_setup_list', []), } confidence_settings.update(fixed_exit_settings) all_tasks.append({ 'params': fixed_params, 'confidence_settings': confidence_settings, 'master_df': master_df, 'optimise_for': optimise_for, 'tickers': tickers_to_run, 'date_range': date_range, 'power': 1 }) # --- Multiprocessing Execution Setup --- total_combinations_theoretical = 3125 # Corrected to 5^5 tasks_remaining = len(all_tasks) if tasks_remaining == 0: st.success(f"Optimisation complete. All {total_combinations_theoretical} combinations for {optimise_for.title()} have been processed."); pass current_progress_base = total_combinations_theoretical - tasks_remaining BATCH_SIZE = 50 # Smaller batch for smoother progress if tasks_remaining > 0: num_cores = cpu_count() st.info(f"Starting COMBINED optimisation ({optimise_for.title()}) on {num_cores} cores... {tasks_remaining} unique tasks remaining.") results_list = [] status_text = st.empty(); status_text.text("Optimisation starting...") progress_bar = st.progress(0) with Pool(processes=cpu_count()) as pool: try: iterator = pool.imap_unordered(run_single_parameter_test, all_tasks) for i, result_dict in enumerate(iterator, 1): results_list.append(result_dict) total_progress = (current_progress_base + i) / total_combinations_theoretical progress_bar.progress(total_progress, text=f"Optimising {optimise_for.title()}... {i}/{tasks_remaining} processed.") if i % BATCH_SIZE == 0 or i == tasks_remaining: factors_csv_order = ['RSI', 'Volatility', 'TREND', 'Volume', 'MACD', 'MA Slope', 'Markov'] def unpack_csv_config(row): conf_settings = row['confidence_settings'] result = {} togs = conf_settings['toggles'] wgts = conf_settings['weights'] for idx, name in enumerate(factors_csv_order): result[name] = "On" if togs[idx] else "Off" result[name + ' W'] = wgts[idx] return pd.Series(result) config_df = pd.DataFrame(results_list).apply(unpack_csv_config, axis=1) results_df_batch = pd.concat([pd.DataFrame(results_list).drop(['params', 'confidence_settings'], axis=1), config_df], axis=1) results_df_batch['Strategy Score'] = results_df_batch.apply( lambda row: calculate_strategy_score(row['Avg Profit/Trade'], row['Trade G/B Ratio'], row['Total Trades']), axis=1 ) write_header = not os.path.exists(results_file) results_df_batch.to_csv(results_file, mode='a', header=write_header, index=False) status_text.text(f"CHECKPOINT: Saved {i} new {optimise_for} combinations. Total: {current_progress_base + i} / {total_combinations_theoretical}") results_list = [] except Exception as e: st.error(f"FATAL ERROR during multiprocessing. Checkpoint saved. Error: {e}") status_text.text("Optimization stopped.") return status_text.empty() st.success(f"Optimization finished. All {optimise_for.title()} results saved.") # --- FINAL DISPLAY --- if os.path.exists(results_file): final_df = pd.read_csv(results_file) final_df = final_df.sort_values(by='Strategy Score', ascending=False) st.subheader(f"Top 20 Complete {optimise_for.title()} Setups:") display_cols = ['Strategy Score', 'Avg Profit/Trade', 'Trade G/B Ratio', 'Total Trades'] factors_display = ['RSI', 'Volatility', 'TREND', 'Volume', 'MACD', 'MA Slope', 'Markov'] display_cols.extend(factors_display) display_cols.extend([f + ' W' for f in factors_display]) display_cols = [c for c in display_cols if c in final_df.columns] display_df = final_df.head(20) st.dataframe(display_df[display_cols].style.format({ "Strategy Score": "{:.2f}%", "Avg Profit/Trade": "{:.2%}", "Trade G/B Ratio": "{:.2f}", })) else: st.info(f"No results found yet for {optimise_for.title()}.") # --- CORRECTED: Ensures ALL arguments are correctly passed to run_backtest --- def run_single_confidence_test(task, base_params, master_df, date_range, tickers_to_run, optimise_for, factor_weights): # Unpack the factor combo tuple (now has 7 elements) combo, threshold, _ = task use_rsi_combo, use_volatility_combo, use_trend_combo, use_volume_combo, use_macd_combo, use_ma_slope_combo, use_markov_combo = combo test_params = base_params.copy() test_params["confidence_threshold"] = threshold total_profit_weighted_avg, total_trades, winning_tickers, losing_tickers = 0, 0, 0, 0 all_confidences = [] total_wins, total_losses = 0, 0 PROFIT_THRESHOLD = 1.0 excluded_tickers_conf = [] # --- Get STATIC settings from factor_weights --- rsi_logic = factor_weights.get('rsi_logic', 'Crossover') use_adx_filter = factor_weights.get('use_adx', True) adx_threshold = factor_weights.get('adx_thresh', 25.0) adx_period = factor_weights.get('adx_period', 14) primary_driver = factor_weights.get('primary_driver', 'Bollinger Bands') markov_setup = factor_weights.get('markov_setup') exit_logic = factor_weights.get('exit_logic') exit_thresh = factor_weights.get('exit_thresh') smart_trailing_stop = factor_weights.get('smart_trailing_stop') smart_exit_atr_p = factor_weights.get('smart_exit_atr_period', 14) smart_exit_atr_m = factor_weights.get('smart_exit_atr_multiplier', 3.0) intelligent_tsl_pct = factor_weights.get('intelligent_tsl_pct', 0.60) # Get weights rsi_w = factor_weights.get('rsi', 1.0); vol_w = factor_weights.get('vol', 1.0) trend_w = factor_weights.get('trend', 1.0); volume_w = factor_weights.get('volume', 1.0) macd_w = factor_weights.get('macd', 1.0); ma_slope_w = factor_weights.get('ma_slope', 1.0) markov_w = factor_weights.get('markov', 1.0) for ticker in tickers_to_run: cols_to_use = [ticker] if f'{ticker}_High' in master_df.columns: cols_to_use.append(f'{ticker}_High') if f'{ticker}_Low' in master_df.columns: cols_to_use.append(f'{ticker}_Low') if f'{ticker}_Volume' in master_df.columns: cols_to_use.append(f'{ticker}_Volume') existing_cols = [col for col in cols_to_use if col in master_df.columns] if ticker not in existing_cols: continue ticker_data_full = master_df.loc[:, existing_cols] ticker_data = ticker_data_full.loc[date_range[0]:date_range[1]] rename_dict = {ticker: 'Close', f'{ticker}_High': 'High', f'{ticker}_Low': 'Low', f'{ticker}_Volume': 'Volume'} rename_dict_filtered = {k: v for k, v in rename_dict.items() if k in existing_cols} ticker_data = ticker_data.rename(columns=rename_dict_filtered) if not ticker_data.empty and 'Close' in ticker_data.columns and not ticker_data['Close'].isna().all(): # --- CORRECTED run_backtest CALL --- long_pnl, short_pnl, avg_long_trade, avg_short_trade, _, trades, _, trade_counts, _, _ = run_backtest( ticker_data, test_params, use_rsi_combo, use_volatility_combo, use_trend_combo, use_volume_combo, use_macd_combo, use_ma_slope_combo, use_markov_combo, rsi_w, vol_w, trend_w, volume_w, macd_w, ma_slope_w, markov_w, use_adx_filter, adx_threshold, rsi_logic, adx_period, veto_setups_list=None, primary_driver=primary_driver, markov_setup=markov_setup, exit_logic_type=exit_logic, exit_confidence_threshold=exit_thresh, smart_trailing_stop_pct=smart_trailing_stop, smart_exit_atr_period=smart_exit_atr_p, smart_exit_atr_multiplier=smart_exit_atr_m, intelligent_tsl_pct=intelligent_tsl_pct ) if abs(long_pnl) > PROFIT_THRESHOLD or abs(short_pnl) > PROFIT_THRESHOLD or \ (avg_long_trade is not None and pd.notna(avg_long_trade) and abs(avg_long_trade) > PROFIT_THRESHOLD) or \ (avg_short_trade is not None and pd.notna(avg_short_trade) and abs(avg_short_trade) > PROFIT_THRESHOLD): excluded_tickers_conf.append(ticker); continue if optimise_for == 'long': pnl, avg_trade_profit, trade_log = long_pnl, avg_long_trade, trades[0] total_wins += trade_counts[0]; total_losses += trade_counts[1] else: pnl, avg_trade_profit, trade_log = short_pnl, avg_short_trade, trades[2] total_wins += trade_counts[2]; total_losses += trade_counts[3] num_trades = len(trade_log) if num_trades > 0 and avg_trade_profit is not None and pd.notna(avg_trade_profit): total_trades += num_trades total_profit_weighted_avg += avg_trade_profit * num_trades if pnl > 0: winning_tickers += 1 elif pnl < 0: losing_tickers += 1 all_confidences.extend([trade['confidence'] for trade in trade_log if pd.notna(trade.get('confidence'))]) # --- [NEW SCORING LOGIC] --- overall_avg_profit = 0.0 ticker_good_bad_ratio = 0.0 badness_score = 0.0 if total_trades > 0: overall_avg_profit = total_profit_weighted_avg / total_trades if losing_tickers > 0: ticker_good_bad_ratio = winning_tickers / losing_tickers elif winning_tickers > 0: ticker_good_bad_ratio = 99999.0 if winning_tickers > 0 and overall_avg_profit < 0: badness_score = (losing_tickers / winning_tickers) * abs(overall_avg_profit) avg_entry_confidence = np.mean(all_confidences) if all_confidences else 0.0 trade_good_bad_ratio = total_wins / total_losses if total_losses > 0 else 99999.0 if pd.isna(avg_entry_confidence): avg_entry_confidence = 0.0 # Use the new 3-Factor Score # We pass TRADE G/B ratio as the second argument because it is more granular/reliable norm_score = calculate_strategy_score(overall_avg_profit, trade_good_bad_ratio, total_trades) # For sorting consistency, we set "Good Score" to the same value as Norm Score raw_score = norm_score return { "RSI": use_rsi_combo, "Volatility": use_volatility_combo, "TREND": use_trend_combo, "Volume": use_volume_combo, "MACD": use_macd_combo, "MA Slope": use_ma_slope_combo, "Markov": use_markov_combo, "Conf. Threshold": threshold, "Avg Profit/Trade": overall_avg_profit if pd.notna(overall_avg_profit) else 0.0, "Ticker G/B Ratio": ticker_good_bad_ratio if pd.notna(ticker_good_bad_ratio) else 0.0, "Trade G/B Ratio": trade_good_bad_ratio if pd.notna(trade_good_bad_ratio) else 0.0, "Winning Tickers": winning_tickers, "Losing Tickers": losing_tickers, "Avg Entry Conf.": avg_entry_confidence, "Good Score": raw_score if pd.notna(raw_score) else 0.0, "Bad Score": badness_score if pd.notna(badness_score) else 0.0, "Norm. Score %": norm_score, "Total Trades": total_trades } def run_single_weight_test(confidence_settings, base_params, master_df, optimise_for, tickers, date_range, power): """ Worker function for weight optimisation. 'base_params' (MA, BB, etc.) are fixed. 'confidence_settings' (weights) are variable. """ total_profit_weighted_avg, total_trades, winning_tickers, losing_tickers = 0, 0, 0, 0 total_wins, total_losses = 0, 0 all_confidences = [] # --- UNPACK all settings from the variable dictionary --- use_rsi, use_vol, use_trend, use_volume, use_macd, use_ma_slope, use_markov = confidence_settings['toggles'] rsi_w, vol_w, trend_w, volume_w, macd_w, ma_slope_w, markov_w = confidence_settings['weights'] use_adx_filter, adx_threshold, adx_period = confidence_settings['adx_settings'] # <-- MODIFIED rsi_logic = confidence_settings['rsi_logic'] primary_driver = confidence_settings['primary_driver'] markov_setup = confidence_settings['markov_setup'] exit_logic = confidence_settings['exit_logic'] exit_thresh = confidence_settings['exit_thresh'] smart_trailing_stop = confidence_settings['smart_trailing_stop'] smart_exit_atr_p = confidence_settings['smart_exit_atr_period'] smart_exit_atr_m = confidence_settings['smart_exit_atr_multiplier'] intelligent_tsl_pct = confidence_settings['intelligent_tsl_pct'] # <-- ADDED # --- End Unpack --- PROFIT_THRESHOLD = 1.0 if not isinstance(tickers, list): tickers = [tickers] for ticker in tickers: cols_to_use = [ticker] if f'{ticker}_High' in master_df.columns: cols_to_use.append(f'{ticker}_High') if f'{ticker}_Low' in master_df.columns: cols_to_use.append(f'{ticker}_Low') if f'{ticker}_Volume' in master_df.columns: cols_to_use.append(f'{ticker}_Volume') existing_cols = [col for col in cols_to_use if col in master_df.columns] if ticker not in existing_cols: continue ticker_data_full = master_df.loc[:, existing_cols] ticker_data = ticker_data_full.loc[date_range[0]:date_range[1]] rename_dict = { ticker: 'Close', f'{ticker}_High': 'High', f'{ticker}_Low': 'Low', f'{ticker}_Volume': 'Volume' } rename_dict_filtered = {k: v for k, v in rename_dict.items() if k in existing_cols} ticker_data = ticker_data.rename(columns=rename_dict_filtered) if not ticker_data.empty and 'Close' in ticker_data.columns and not ticker_data['Close'].isna().all(): long_pnl, short_pnl, avg_long_trade, avg_short_trade, _, trades, _, trade_counts, _, _ = run_backtest( ticker_data, base_params, use_rsi, use_vol, use_trend, use_volume, use_macd, use_ma_slope, use_markov, rsi_w, vol_w, trend_w, volume_w, macd_w, ma_slope_w, markov_w, use_adx_filter, adx_threshold, rsi_logic, adx_period, # <-- ADDED veto_setups_list=None, # Veto is OFF during optimisation primary_driver=primary_driver, markov_setup=markov_setup, exit_logic_type=exit_logic, exit_confidence_threshold=exit_thresh, smart_trailing_stop_pct=smart_trailing_stop, smart_exit_atr_period=smart_exit_atr_p, smart_exit_atr_multiplier=smart_exit_atr_m, intelligent_tsl_pct=intelligent_tsl_pct # <-- ADDED ) if abs(long_pnl) > PROFIT_THRESHOLD or abs(short_pnl) > PROFIT_THRESHOLD or \ (avg_long_trade is not None and pd.notna(avg_long_trade) and abs(avg_long_trade) > PROFIT_THRESHOLD) or \ (avg_short_trade is not None and pd.notna(avg_short_trade) and abs(avg_short_trade) > PROFIT_THRESHOLD): continue # Skip outlier tickers if optimise_for == 'long': pnl, avg_trade_profit, trade_log = long_pnl, avg_long_trade, trades[0] total_wins += trade_counts[0]; total_losses += trade_counts[1] else: pnl, avg_trade_profit, trade_log = short_pnl, avg_short_trade, trades[2] total_wins += trade_counts[2]; total_losses += trade_counts[3] num_trades = len(trade_log) if num_trades > 0 and avg_trade_profit is not None and pd.notna(avg_trade_profit): total_trades += num_trades total_profit_weighted_avg += avg_trade_profit * num_trades if pnl > 0: winning_tickers += 1 elif pnl < 0: losing_tickers += 1 all_confidences.extend([trade['confidence'] for trade in trade_log if pd.notna(trade.get('confidence'))]) overall_avg_profit = 0.0 good_bad_ratio = 0.0 trade_good_bad_ratio = 0.0 if total_trades > 0: overall_avg_profit = total_profit_weighted_avg / total_trades if losing_tickers > 0: good_bad_ratio = winning_tickers / losing_tickers elif winning_tickers > 0: good_bad_ratio = 99999.0 if total_losses > 0: trade_good_bad_ratio = total_wins / total_losses elif total_wins > 0: trade_good_bad_ratio = 99999.0 avg_entry_confidence = np.mean(all_confidences) if all_confidences else 0.0 weights_tested = { "rsi_w": rsi_w, "vol_w": vol_w, "trend_w": trend_w, "volume_w": volume_w, "macd_w": macd_w, "ma_slope_w": ma_slope_w, "markov_w": markov_w } return { "weights": weights_tested, "Avg Profit/Trade": overall_avg_profit, "Ticker G/B Ratio": good_bad_ratio, "Trade G/B Ratio": trade_good_bad_ratio, "Total Trades": total_trades, "Avg Entry Conf.": avg_entry_confidence, "Winning Tickers": winning_tickers, "Losing Tickers": losing_tickers } def run_advisor_scan(main_df, setups_to_run, advisor_type="Advisor"): """ Scans tickers for open trades using a list of setups. Updated to provide real-time granular feedback per ticker. """ st.info(f"Scanning tickers for open trades based on {len(setups_to_run)} {advisor_type} setups...") base_params = { "large_ma_period": st.session_state.ma_period, "bband_period": st.session_state.bb_period, "bband_std_dev": st.session_state.bb_std, "long_entry_threshold_pct": st.session_state.long_entry / 100, "long_exit_ma_threshold_pct": st.session_state.long_exit / 100, "long_trailing_stop_loss_pct": st.session_state.long_sl / 100, "long_delay_days": st.session_state.long_delay, "short_entry_threshold_pct": st.session_state.short_entry / 100, "short_exit_ma_threshold_pct": st.session_state.short_exit / 100, "short_trailing_stop_loss_pct": st.session_state.short_sl / 100, "short_delay_days": st.session_state.short_delay } lookbacks = [ base_params.get('large_ma_period', 50), base_params.get('bband_period', 20), 200, 50, 26, 14, st.session_state.adx_period ] markov_setup_to_use = None if st.session_state.primary_driver == 'Markov State' or st.session_state.use_markov: if 'best_markov_setup' in st.session_state and st.session_state.best_markov_setup: markov_setup_to_use = st.session_state.best_markov_setup if st.session_state.primary_driver == 'Markov State': st.info("Using saved Best Markov Setup as Primary Driver.") lookbacks.append(markov_setup_to_use.get('Run-Up Period', 10)) else: st.error("Markov State is active, but no Best Markov Setup found. Run Section 7 first.") st.stop() if advisor_type == "User-Defined": for setup in setups_to_run: try: lookbacks.append(int(setup.get("Large MA Period", 50))) lookbacks.append(int(setup.get("Bollinger Band Period", 20))) except (ValueError, TypeError): lookbacks.append(50) lookbacks.append(20) max_lookback_period = max(lookbacks) active_scan_days = 120 buffer_days = 10 total_days_needed = max_lookback_period + active_scan_days + buffer_days scan_end_date = pd.Timestamp(st.session_state.end_date) scan_start_date = scan_end_date - timedelta(days=total_days_needed) earliest_data_date = main_df.index.min() if scan_start_date < earliest_data_date: scan_start_date = earliest_data_date st.caption(f"Analysing data from {scan_start_date.date()} to {scan_end_date.date()}.") factor_settings = { "use_adx": st.session_state.use_adx_filter, "adx_thresh": st.session_state.adx_threshold, "adx_period": st.session_state.adx_period, "rsi_logic": st.session_state.get('rsi_logic', 'Crossover'), "primary_driver": st.session_state.get('primary_driver', 'Bollinger Bands'), "markov_setup": markov_setup_to_use, "exit_logic": st.session_state.exit_logic_type, "exit_thresh": st.session_state.exit_confidence_threshold, "smart_trailing_stop": st.session_state.smart_trailing_stop_pct / 100.0, "rsi_w": st.session_state.rsi_w, "vol_w": st.session_state.vol_w, "trend_w": st.session_state.trend_w, "volume_w": st.session_state.volume_w, "macd_w": st.session_state.macd_w, "ma_slope_w": st.session_state.ma_slope_w, "markov_w": st.session_state.markov_w, 'smart_exit_atr_period': st.session_state.smart_exit_atr_period, 'smart_exit_atr_multiplier': st.session_state.smart_exit_atr_multiplier, 'intelligent_tsl_pct': st.session_state.intelligent_tsl_pct / 100.0 } all_advisor_trades = [] ticker_list = sorted([col for col in main_df.columns if not ('_Volume' in str(col) or '_High' in str(col) or '_Low' in str(col))]) # --- [NEW] Better Progress Tracking --- num_setups = len(setups_to_run) num_tickers = len(ticker_list) total_ops = num_setups * num_tickers current_op = 0 progress_bar = st.progress(0, text="Initializing scan...") status_text = st.empty() # Placeholder for rolling ticker names for i, setup in enumerate(setups_to_run): if advisor_type == "Top Setups": params_for_run = base_params.copy() params_for_run['confidence_threshold'] = setup.get('Conf. Threshold', 50) setup_use_rsi = setup.get('RSI', 'Off') == 'On' setup_use_vol = setup.get('Volatility', 'Off') == 'On' setup_use_trend = setup.get('TREND', 'Off') == 'On' setup_use_volume = setup.get('Volume', 'Off') == 'On' setup_use_macd = setup.get('MACD', 'Off') == 'On' setup_use_ma_slope = setup.get('MA Slope', 'Off') == 'On' setup_use_markov = setup.get('Markov', 'Off') == 'On' scan_rsi_w = factor_settings['rsi_w']; scan_vol_w = factor_settings['vol_w'] scan_trend_w = factor_settings['trend_w']; scan_volume_w = factor_settings['volume_w'] scan_macd_w = factor_settings['macd_w']; scan_ma_slope_w = factor_settings['ma_slope_w'] scan_markov_w = factor_settings['markov_w'] elif advisor_type == "User-Defined": try: ma_p = int(setup.get("Large MA Period", 50)) except: ma_p = 50 try: bb_p = int(setup.get("Bollinger Band Period", 20)) except: bb_p = 20 try: long_d = int(setup.get("Long Delay (Days)", 0)) except: long_d = 0 try: short_d = int(setup.get("Short Delay (Days)", 0)) except: short_d = 0 catcher_pct = setup.get("Catcher Offset (%)", 3.0) try: catcher_decimal = float(catcher_pct) / 100.0 except: catcher_decimal = 0.03 params_for_run = { "large_ma_period": ma_p, "bband_period": bb_p, "long_delay_days": long_d, "short_delay_days": short_d, "bband_std_dev": setup.get("Bollinger Band Std Dev", 2.0), "confidence_threshold": setup.get("Conf. Threshold", 50), "catcher_stop_pct": catcher_decimal, "long_entry_threshold_pct": setup.get("Long Entry Threshold (%)", 0.0) / 100.0, "long_exit_ma_threshold_pct": setup.get("Long Exit Threshold (%)", 0.0) / 100.0, "long_trailing_stop_loss_pct": setup.get("Long Stop Loss (%)", 0.0) / 100.0, "short_entry_threshold_pct": setup.get("Short Entry Threshold (%)", 0.0) / 100.0, "short_exit_ma_threshold_pct": setup.get("Short Exit Threshold (%)", 0.0) / 100.0, "short_trailing_stop_loss_pct": setup.get("Short Stop Loss (%)", 0.0) / 100.0, "use_ma_floor_filter": st.session_state.use_ma_floor_filter # Pass this through } def get_weight_toggle(val): try: w = float(val) if w > 0: return w, True except: pass return 0.0, False scan_rsi_w, setup_use_rsi = get_weight_toggle(setup.get('RSI', 'Off')) scan_vol_w, setup_use_vol = get_weight_toggle(setup.get('Volatility', 'Off')) scan_trend_w, setup_use_trend = get_weight_toggle(setup.get('TREND', 'Off')) scan_volume_w, setup_use_volume = get_weight_toggle(setup.get('Volume', 'Off')) scan_macd_w, setup_use_macd = get_weight_toggle(setup.get('MACD', 'Off')) scan_ma_slope_w, setup_use_ma_slope = get_weight_toggle(setup.get('MA Slope', 'Off')) scan_markov_w, setup_use_markov = get_weight_toggle(setup.get('Markov', 'Off')) else: continue # --- INNER LOOP --- for ticker_symbol in ticker_list: # Update progress per ticker current_op += 1 if current_op % 10 == 0: # Update visual every 10 items to be faster progress_bar.progress(current_op / total_ops, text=f"Setup {i+1}/{num_setups}: Scanning {ticker_symbol}...") cols_to_use = [ticker_symbol] if f'{ticker_symbol}_High' in main_df.columns: cols_to_use.append(f'{ticker_symbol}_High') if f'{ticker_symbol}_Low' in main_df.columns: cols_to_use.append(f'{ticker_symbol}_Low') if f'{ticker_symbol}_Volume' in main_df.columns: cols_to_use.append(f'{ticker_symbol}_Volume') existing_cols = [col for col in cols_to_use if col in main_df.columns] if ticker_symbol not in existing_cols: continue data_for_backtest_full = main_df.loc[:, existing_cols] data_for_scan = data_for_backtest_full.loc[scan_start_date:scan_end_date] rename_dict = {ticker_symbol: 'Close', f'{ticker_symbol}_High': 'High', f'{ticker_symbol}_Low': 'Low', f'{ticker_symbol}_Volume': 'Volume'} rename_dict_filtered = {k: v for k, v in rename_dict.items() if k in existing_cols} data_for_scan = data_for_scan.rename(columns=rename_dict_filtered) if advisor_type == "User-Defined": current_lookback = max(params_for_run.get('large_ma_period', 50), params_for_run.get('bband_period', 20), 200, 50, 26, 14, factor_settings['adx_period']) else: current_lookback = max_lookback_period if (factor_settings['primary_driver'] == 'Markov State' or setup_use_markov) and factor_settings['markov_setup']: current_lookback = max(current_lookback, factor_settings['markov_setup'].get('Run-Up Period', 10)) if not data_for_scan.empty and 'Close' in data_for_scan.columns and not data_for_scan['Close'].isna().all() and len(data_for_scan) >= current_lookback : try: # Added missing MFI and SuperTrend arguments (set to False/1.0) --- _, _, _, _, _, _, open_trades, _, _, _, _ = run_backtest( data_for_scan, params_for_run, setup_use_rsi, setup_use_vol, setup_use_trend, setup_use_volume, setup_use_macd, setup_use_ma_slope, setup_use_markov, False, False, # use_mfi, use_supertrend (Default Off for Advisor scans) scan_rsi_w, scan_vol_w, scan_trend_w, scan_volume_w, scan_macd_w, scan_ma_slope_w, scan_markov_w, 1.0, 1.0, # mfi_w, supertrend_w (Default 1.0) factor_settings['use_adx'], factor_settings['adx_thresh'], factor_settings['rsi_logic'], factor_settings['adx_period'], veto_setups_list=None, primary_driver=factor_settings['primary_driver'], markov_setup=factor_settings['markov_setup'], exit_logic_type=factor_settings['exit_logic'], exit_confidence_threshold=factor_settings['exit_thresh'], smart_trailing_stop_pct=factor_settings['smart_trailing_stop'], smart_exit_atr_period=factor_settings['smart_exit_atr_period'], smart_exit_atr_multiplier=factor_settings['smart_exit_atr_multiplier'], intelligent_tsl_pct=factor_settings['intelligent_tsl_pct'] ) except Exception as e: print(f"Error during advisor backtest for {ticker_symbol} with setup {i+1}: {e}") continue if open_trades: for trade in open_trades: trade['Ticker'] = ticker_symbol trade['Setup Rank'] = i + 1 if advisor_type == "Top Setups": trade['Setup G/B Ratio'] = setup.get('Ticker G/B Ratio', np.nan) all_advisor_trades.append(trade) progress_bar.empty() status_text.empty() print(f"\nFound {len(all_advisor_trades)} total trades before de-duplication.") best_trades = {} for trade in all_advisor_trades: try: trade_key = (trade.get('Ticker'), trade.get('Entry Date'), trade.get('Trade Type')) except Exception as e: print(f"Warning: Could not create trade key for {trade}. Error: {e}") continue try: current_rank = int(trade.get('Setup Rank', 999)) except (ValueError, TypeError): current_rank = 999 if trade_key not in best_trades: best_trades[trade_key] = trade else: existing_rank = int(best_trades[trade_key].get('Setup Rank', 999)) if current_rank < existing_rank: best_trades[trade_key] = trade deduplicated_trades = list(best_trades.values()) if all_advisor_trades: raw_advisor_df = pd.DataFrame(all_advisor_trades) deduped_advisor_df = pd.DataFrame(deduplicated_trades) cols_order = ['Ticker', 'Status', 'Setup Rank', 'Final % P/L', 'Side', 'Date Open', 'Date Closed', 'Start Confidence'] if advisor_type == "Top Setups": cols_order.append('Setup G/B Ratio') if advisor_type == "User-Defined": param_keys = [k for k in setups_to_run[0].keys() if k not in ["RSI", "Volatility", "TREND", "Volume", "MACD", "MA Slope", "Conf. Threshold", "Markov"]] cols_order.extend(param_keys) setups_df = pd.DataFrame(setups_to_run) setups_df['Setup Rank'] = setups_df.index + 1 raw_advisor_df = pd.merge(raw_advisor_df, setups_df, on='Setup Rank', how='left') deduped_advisor_df = pd.merge(deduped_advisor_df, setups_df, on='Setup Rank', how='left') existing_cols_final = [col for col in cols_order if col in raw_advisor_df.columns] st.session_state.raw_df = raw_advisor_df[existing_cols_final].sort_values(by=['Status', 'Date Open'], ascending=[True, False]) st.session_state.deduped_df = deduped_advisor_df[existing_cols_final].sort_values(by=['Status', 'Date Open'], ascending=[True, False]) st.session_state.advisor_type = advisor_type else: st.session_state.raw_df = None st.session_state.deduped_df = None st.session_state.advisor_type = advisor_type st.session_state.run_advanced_advisor = False st.session_state.run_user_advisor_setup = False st.session_state.run_scan_user_setups = False st.rerun() def generate_and_run_weight_optimisation(master_df, main_content_placeholder, side, use_sq_weighting): """ Runs weight optimisation using multiprocessing. """ st.session_state.summary_df = None st.session_state.single_ticker_results = None st.session_state.confidence_results_df = None st.session_state.open_trades_df = None st.session_state.advisor_df = None st.session_state.best_params = None st.session_state.best_weights = None with main_content_placeholder.container(): # Get FIXED strategy parameters base_params = { "large_ma_period": st.session_state.ma_period, "bband_period": st.session_state.bb_period, "bband_std_dev": st.session_state.bb_std, "long_entry_threshold_pct": st.session_state.long_entry / 100, "long_exit_ma_threshold_pct": st.session_state.long_exit / 100, "long_trailing_stop_loss_pct": st.session_state.long_sl / 100, "long_delay_days": st.session_state.long_delay, "short_entry_threshold_pct": st.session_state.short_entry / 100, "short_exit_ma_threshold_pct": st.session_state.short_exit / 100, "short_trailing_stop_loss_pct": st.session_state.short_sl / 100, "short_delay_days": st.session_state.short_delay, "confidence_threshold": st.session_state.confidence_slider } # Get FIXED confidence settings fixed_toggles = (st.session_state.use_rsi, st.session_state.use_vol, st.session_state.use_trend, st.session_state.use_volume, st.session_state.use_macd, st.session_state.use_ma_slope, st.session_state.use_markov) fixed_adx_settings = (st.session_state.use_adx_filter, st.session_state.adx_threshold, st.session_state.adx_period) fixed_rsi_logic = st.session_state.rsi_logic fixed_primary_driver = st.session_state.primary_driver fixed_markov_setup = st.session_state.get('best_markov_setup') fixed_exit_logic = st.session_state.exit_logic_type fixed_exit_thresh = st.session_state.exit_confidence_threshold fixed_smart_trailing_stop = st.session_state.smart_trailing_stop_pct / 100.0 smart_atr_p = st.session_state.smart_exit_atr_period smart_atr_m = st.session_state.smart_exit_atr_multiplier # Check if key exists, default to 0.6 if not (backward compatibility) fixed_intelligent_tsl = st.session_state.get('intelligent_tsl_pct', 60.0) / 100.0 # Dynamic Weight Optimisation Logic weight_step = 0.5 weight_range = np.arange(0.5, 2.0 + weight_step, weight_step) all_factor_info = { 'RSI': {'toggle_key': 'use_rsi', 'weight_key': 'rsi_w'}, 'Volatility': {'toggle_key': 'use_vol', 'weight_key': 'vol_w'}, 'TREND': {'toggle_key': 'use_trend', 'weight_key': 'trend_w'}, 'Volume': {'toggle_key': 'use_volume', 'weight_key': 'volume_w'}, 'MACD': {'toggle_key': 'use_macd', 'weight_key': 'macd_w'}, 'MA Slope': {'toggle_key': 'use_ma_slope', 'weight_key': 'ma_slope_w'}, 'Markov': {'toggle_key': 'use_markov', 'weight_key': 'markov_w'} } driver_map = {'RSI Crossover': 'RSI', 'MACD Crossover': 'MACD', 'MA Slope': 'MA Slope', 'Markov State': 'Markov', 'Bollinger Bands': None} primary_factor_key = driver_map.get(fixed_primary_driver) factors_to_optimise = [] for factor_key, info in all_factor_info.items(): is_active = st.session_state.get(info['toggle_key'], False) is_primary = (factor_key == primary_factor_key) if factor_key == 'Markov' and not fixed_markov_setup: st.warning("Skipping Markov weight optimisation: No 'Best Markov Setup' found. Please run Section 7 first.") is_active = False if is_active and not is_primary: factors_to_optimise.append(factor_key) if not factors_to_optimise: st.warning("No active, non-primary factors to optimise. Toggle some factors 'On' in Section 2.") st.session_state.run_weight_optimisation = False return st.info(f"Optimising weights for: {', '.join(factors_to_optimise)}") weight_product = itertools.product(weight_range, repeat=len(factors_to_optimise)) base_weight_keys = ('rsi_w', 'vol_w', 'trend_w', 'volume_w', 'macd_w', 'ma_slope_w', 'markov_w') base_weights_tuple = (st.session_state.rsi_w, st.session_state.vol_w, st.session_state.trend_w, st.session_state.volume_w, st.session_state.macd_w, st.session_state.ma_slope_w, st.session_state.markov_w) confidence_combinations = [] for weight_tuple in weight_product: current_weights_map = dict(zip(base_weight_keys, base_weights_tuple)) for factor_key, new_weight_value in zip(factors_to_optimise, weight_tuple): weight_key = all_factor_info[factor_key]['weight_key'] current_weights_map[weight_key] = new_weight_value final_weights_tuple = (current_weights_map['rsi_w'], current_weights_map['vol_w'], current_weights_map['trend_w'], current_weights_map['volume_w'], current_weights_map['macd_w'], current_weights_map['ma_slope_w'], current_weights_map['markov_w']) confidence_combinations.append({ 'toggles': fixed_toggles, 'weights': final_weights_tuple, 'adx_settings': fixed_adx_settings, 'rsi_logic': fixed_rsi_logic, 'primary_driver': fixed_primary_driver, 'markov_setup': fixed_markov_setup, 'exit_logic': fixed_exit_logic, 'exit_thresh': fixed_exit_thresh, 'smart_trailing_stop': fixed_smart_trailing_stop, 'smart_exit_atr_period': smart_atr_p, 'smart_exit_atr_multiplier': smart_atr_m, 'intelligent_tsl_pct': fixed_intelligent_tsl }) total_combinations = len(confidence_combinations) num_cores = cpu_count() st.info(f"Starting {side.upper()} weight optimisation on {num_cores} cores... Testing {total_combinations} weight combinations.") if st.session_state.run_mode.startswith("Analyse Full List"): tickers_to_run = [col for col in master_df.columns if not ('_Volume' in str(col) or '_High' in str(col) or '_Low' in str(col))] else: tickers_to_run = [st.session_state.ticker_select] date_range = (pd.Timestamp(st.session_state.start_date), pd.Timestamp(st.session_state.end_date)) power = 2 if use_sq_weighting else 1 status_text = st.empty(); status_text.text("Optimisation starting...") progress_bar = st.progress(0) worker_func = partial(run_single_weight_test, base_params=base_params, master_df=master_df, optimise_for=side, tickers=tickers_to_run, date_range=date_range, power=power) results_list = [] with Pool(processes=num_cores) as pool: try: iterator = pool.imap_unordered(worker_func, confidence_combinations) for i, result_dict in enumerate(iterator, 1): results_list.append(result_dict) progress_bar.progress(i / total_combinations, text=f"Optimising... {i}/{total_combinations} combinations complete.") except Exception as e: st.error(f"An error occurred during multiprocessing: {e}") status_text.text("Optimisation failed due to an error."); st.session_state.run_weight_optimisation = False; return status_text.text("Optimisation complete. Calculating scores...") if not results_list: st.warning("Optimisation finished, but no valid results were found."); st.session_state.run_weight_optimisation = False; return results_df = pd.DataFrame(results_list) weights_df = results_df['weights'].apply(pd.Series) results_df = pd.concat([results_df.drop('weights', axis=1), weights_df], axis=1) min_trades_threshold = 10 if 'Total Trades' in results_df.columns: results_df = results_df[results_df['Total Trades'] >= min_trades_threshold].copy() if results_df.empty: st.warning(f"No results found with at least {min_trades_threshold} trades."); st.session_state.run_weight_optimisation = False; return results_df['Strategy Score'] = results_df.apply(lambda row: calculate_strategy_score(row['Avg Profit/Trade'], row['Ticker G/B Ratio'], row['Total Trades']), axis=1) results_df = results_df.sort_values(by='Strategy Score', ascending=False) best_setup_series = results_df.iloc[0] best_metric = best_setup_series['Strategy Score'] best_weights_dict = {key: float(best_setup_series[key]) for key in base_weight_keys} status_text.empty() st.success(f"Weight Optimisation Complete! Best Strategy Score: {best_metric:.2f}%") st.subheader("Optimal Weights Found (based on Strategy Score):"); st.json(best_weights_dict) st.session_state.best_weights = best_weights_dict st.subheader("Full Weight Optimisation Results:") display_cols = ["Strategy Score", "Avg Profit/Trade", "Ticker G/B Ratio", "Total Trades"] + list(base_weight_keys) display_cols = [col for col in display_cols if col in results_df.columns] formatters = {"Strategy Score": "{:.2f}%", "Avg Profit/Trade": "{:.2%}", "Ticker G/B Ratio": "{:.2f}"} for w_key in base_weight_keys: formatters[w_key] = "{:.1f}" st.dataframe(results_df[display_cols].style.format(formatters)) st.session_state.run_weight_optimisation = False def apply_best_weights_to_widgets(): """Loads optimised weights from st.session_state.best_weights into the sidebar widgets.""" if 'best_weights' in st.session_state and st.session_state.best_weights: weights = st.session_state.best_weights if 'rsi_w' in weights: st.session_state.rsi_w = weights['rsi_w'] if 'vol_w' in weights: st.session_state.vol_w = weights['vol_w'] if 'trend_w' in weights: st.session_state.trend_w = weights['trend_w'] if 'volume_w' in weights: st.session_state.volume_w = weights['volume_w'] if 'macd_w' in weights: st.session_state.macd_w = weights['macd_w'] if 'ma_slope_w' in weights: st.session_state.ma_slope_w = weights['ma_slope_w'] st.sidebar.success("Optimal weights loaded into sidebar!") st.rerun() else: st.sidebar.error("No optimal weights found in session state.") def apply_best_params_to_widgets(): """Loads parameters from st.session_state.best_params into the sidebar widgets.""" if 'best_params' in st.session_state and st.session_state.best_params: params = st.session_state.best_params if 'large_ma_period' in params: st.session_state.ma_period = params['large_ma_period'] if 'bband_period' in params: st.session_state.bb_period = params['bband_period'] if 'bband_std_dev' in params: st.session_state.bb_std = params['bband_std_dev'] if 'confidence_threshold' in params: st.session_state.confidence_slider = params['confidence_threshold'] if 'long_entry_threshold_pct' in params: st.session_state.long_entry = params['long_entry_threshold_pct'] * 100 if 'long_exit_ma_threshold_pct' in params: st.session_state.long_exit = params['long_exit_ma_threshold_pct'] * 100 if 'long_trailing_stop_loss_pct' in params: st.session_state.long_sl = params['long_trailing_stop_loss_pct'] * 100 if 'long_delay_days' in params: st.session_state.long_delay = params['long_delay_days'] if 'long_entry_threshold_pct' in params: st.session_state.short_entry = params['long_entry_threshold_pct'] * 100 if 'long_exit_ma_threshold_pct' in params: st.session_state.short_exit = params['long_exit_ma_threshold_pct'] * 100 if 'long_trailing_stop_loss_pct' in params: st.session_state.short_sl = params['long_trailing_stop_loss_pct'] * 100 if 'long_delay_days' in params: st.session_state.short_delay = params['long_delay_days'] # --- NEW: Load Max Duration --- if 'max_trading_days' in params: st.session_state.max_duration = params['max_trading_days'] # --- NEW: Load Catcher Offset --- if 'catcher_stop_pct' in params: st.session_state.catcher_stop_pct = params['catcher_stop_pct'] * 100 st.sidebar.success("Optimal parameters loaded into sidebar!") st.rerun() else: st.sidebar.error("No optimal parameters found in session state.") # --- 5. Streamlit User Interface --- def update_state(): """ Callback to update the main session state from the 'widget_' keys. This synchronizes all widgets. """ # Get all keys from session state keys = list(st.session_state.keys()) # Loop through all keys that start with 'widget_' for widget_key in keys: if widget_key.startswith('widget_'): # Find the main key (e.g., 'widget_ma_period' -> 'ma_period') main_key = widget_key[len('widget_'):] # Update the main key with the widget's value if main_key in st.session_state: st.session_state[main_key] = st.session_state[widget_key] # --- UPDATED: Helper to check for blank rows (Checks Factors, Stats, Notes, AND Run status) --- def is_row_blank(s): """Checks if a row from the user-defined table is blank.""" factors = ["RSI", "Volatility", "TREND", "Volume", "MACD", "MA Slope", "Markov", "ADX Filter"] # Check 1: Are all factors Off? all_factors_off = all(str(s.get(f, "Off")).lower() in ["off", "0", "0.0"] for f in factors) # Check 2: Are stats zero? stats_are_zero = s.get("Z_Num_Trades", 0) == 0 # Check 3: Is the note empty? note_is_empty = str(s.get("Notes", "")).strip() == "" # Check 4: Is the Run box unchecked? run_is_unchecked = s.get("Run", False) is False # The row is considered blank ONLY if ALL conditions are met return all_factors_off and stats_are_zero and note_is_empty and run_is_unchecked # --- [NEW] 6. Markov Chain Optimisation Functions --- def run_single_markov_test(params, master_df, tickers, date_range): """ Worker function for the Markov optimisation. Tests a single combination of (run_up_period, future_period). """ run_up_period = params['run_up'] future_period = params['future'] total_results = { 'Up -> Up': {'profit': 0.0, 'count': 0}, 'Up -> Down': {'profit': 0.0, 'count': 0}, 'Down -> Up': {'profit': 0.0, 'count': 0}, 'Down -> Down': {'profit': 0.0, 'count': 0} } for ticker in tickers: if ticker not in master_df.columns: continue # Get only the 'Close' price for this ticker ticker_data = master_df[ticker].loc[date_range[0]:date_range[1]].to_frame(name='Close') # --- [NEW] CLEANING LOGIC TO PREVENT INFINITY --- ticker_data['Close'] = pd.to_numeric(ticker_data['Close'], errors='coerce').replace(0, np.nan) ticker_data.dropna(subset=['Close'], inplace=True) # --- [END NEW] --- if ticker_data.empty or ticker_data['Close'].isna().all(): continue # 1. Calculate Run-Up state (past) # pct_change(N) calculates (price[t] / price[t-N]) - 1 ticker_data['RunUp_Return'] = ticker_data['Close'].pct_change(periods=run_up_period) ticker_data['RunUp_State'] = ticker_data['RunUp_Return'].apply(lambda x: 'Up' if x > 0 else 'Down') # 2. Calculate Future state (what *actually* happened) # shift(-N) looks *forward* N periods ticker_data['Future_Return'] = (ticker_data['Close'].shift(-future_period) / ticker_data['Close']) - 1 # 3. Drop NaNs created by the shifts/pct_change ticker_data.dropna(subset=['RunUp_State', 'Future_Return'], inplace=True) if ticker_data.empty: continue # 4. Tally results # We use .values for speed runup_states = ticker_data['RunUp_State'].values future_returns = ticker_data['Future_Return'].values for i in range(len(runup_states)): state = runup_states[i] ret = future_returns[i] # --- [NEW] Check for infinity returns --- if not np.isfinite(ret): continue # --- [END NEW] --- if state == 'Up': # Strategy: Bet on Up total_results['Up -> Up']['profit'] += ret total_results['Up -> Up']['count'] += 1 # Strategy: Bet on Down total_results['Up -> Down']['profit'] += (ret * -1) total_results['Up -> Down']['count'] += 1 else: # State is 'Down' # Strategy: Bet on Up total_results['Down -> Up']['profit'] += ret total_results['Down -> Up']['count'] += 1 # Strategy: Bet on Down total_results['Down -> Down']['profit'] += (ret * -1) total_results['Down -> Down']['count'] += 1 # 5. Compile final metrics final_report = [] for strategy, results in total_results.items(): if results['count'] > 0: avg_pnl = (results['profit'] / results['count']) # We use a simple score: Avg P&L * log(count) to value consistency score = avg_pnl * np.log10(results['count'] + 1) else: avg_pnl = 0 score = 0 final_report.append({ 'Run-Up Period': run_up_period, 'Future Period': future_period, 'Strategy': strategy, 'Avg. P/L': avg_pnl, 'Total Occurrences': results['count'], 'Total P/L': results['profit'], 'Score': score }) return final_report def generate_and_run_markov_optimisation(master_df, main_content_placeholder, side): """ Main UI and orchestrator for Markov Chain optimisation. [NEW] This version calculates an 'Alpha Score per Day' to find the most efficient predictive edge. """ st.session_state.summary_df = None st.session_state.single_ticker_results = None st.session_state.confidence_results_df = None st.session_state.open_trades_df = None st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None st.session_state.best_params = None st.session_state.best_weights = None st.session_state.markov_results_df = None # Clear previous Markov results st.session_state.best_markov_setup = None with main_content_placeholder.container(): st.header(f"🔮 Finding Best Markov Probabilities ({side.title()})") # --- Get UI inputs from session state --- run_up_start = st.session_state.markov_run_up_start run_up_end = st.session_state.markov_run_up_end run_up_step = st.session_state.markov_run_up_step future_start = st.session_state.markov_future_start future_end = st.session_state.markov_future_end future_step = st.session_state.markov_future_step # --- Create parameter combinations --- run_up_range = range(run_up_start, run_up_end + 1, run_up_step) future_range = range(future_start, future_end + 1, future_step) param_product = itertools.product(run_up_range, future_range) param_combinations = [{ "run_up": p[0], "future": p[1] } for p in param_product] total_combinations = len(param_combinations) if total_combinations == 0: st.warning("No combinations to test. Check your ranges in the sidebar.") st.session_state.run_markov_optimisation = False # Reset flag st.stop() num_cores = cpu_count() st.info(f"Starting Markov optimisation on {num_cores} cores... Testing {total_combinations} period combinations.") tickers_to_run = [col for col in master_df.columns if not ('_Volume' in str(col) or '_High' in str(col) or '_Low' in str(col))] date_range = (pd.Timestamp(st.session_state.start_date), pd.Timestamp(st.session_state.end_date)) status_text = st.empty(); status_text.text("Optimisation starting...") progress_bar = st.progress(0) # <-- The progress bar worker_func = partial(run_single_markov_test, master_df=master_df, tickers=tickers_to_run, date_range=date_range) results_list = [] with Pool(processes=num_cores) as pool: try: iterator = pool.imap_unordered(worker_func, param_combinations) for i, result_group in enumerate(iterator, 1): results_list.extend(result_group) # Add the 4 strategies progress_bar.progress(i / total_combinations, text=f"Optimising... {i}/{total_combinations} combinations complete.") except Exception as e: st.error(f"An error occurred during multiprocessing: {e}") status_text.text("Optimisation failed due to an error."); st.session_state.run_markov_optimisation = False # Reset flag return status_text.text("Optimisation complete. Compiling results...") if not results_list: st.warning("Optimisation finished, but no valid results were found.") st.session_state.run_markov_optimisation = False # Reset flag return results_df = pd.DataFrame(results_list) # --- [NEW ALPHA SCORE PER DAY LOGIC] --- # 1. Pivot the data to get all 4 strategies in one row per time period pivot_df = results_df.pivot_table( index=['Run-Up Period', 'Future Period'], columns='Strategy', values='Avg. P/L' # We use Avg. P/L for the alpha calculation ).reset_index() # 2. Calculate the "Alpha Scores" pivot_df['Long Alpha Score'] = pivot_df.get('Down -> Up', 0) - pivot_df.get('Up -> Up', 0) pivot_df['Short Alpha Score'] = pivot_df.get('Up -> Down', 0) - pivot_df.get('Down -> Down', 0) # 3. Calculate the "Alpha Score per Day" pivot_df['Long Alpha Score per Day'] = pivot_df['Long Alpha Score'] / pivot_df['Future Period'] pivot_df['Short Alpha Score per Day'] = pivot_df['Short Alpha Score'] / pivot_df['Future Period'] # 4. Join this back to the original results to get counts, etc. results_df = results_df.set_index(['Run-Up Period', 'Future Period']) pivot_df = pivot_df.set_index(['Run-Up Period', 'Future Period']) # Join the new Alpha Scores to the main results results_df = results_df.join(pivot_df[['Long Alpha Score', 'Short Alpha Score', 'Long Alpha Score per Day', 'Short Alpha Score per Day']]) results_df = results_df.reset_index() # 5. Determine which Alpha Score and strategies to show if side == 'long': score_to_use = 'Long Alpha Score per Day' target_strategies = ['Down -> Up', 'Up -> Up'] else: # short score_to_use = 'Short Alpha Score per Day' target_strategies = ['Up -> Down', 'Down -> Down'] # Filter for the strategies we care about for this 'side' final_df = results_df[results_df['Strategy'].isin(target_strategies)].copy() if final_df.empty: st.warning(f"No valid results found for {side}-biased strategies.") st.session_state.run_markov_optimisation = False # Reset flag return # 6. Sort by the new "Alpha Score per Day" final_df = final_df.sort_values(by=score_to_use, ascending=False) # Get the best row (which will be the one with the highest Alpha per Day) best_setup_series = final_df.iloc[0] st.success(f"Markov Optimisation Complete! Best 'Alpha' strategy found:") st.subheader("Best Markov Setup (by Alpha Score per Day):") # Save the best setup to session state st.session_state.best_markov_setup = best_setup_series.to_dict() save_markov_setup(st.session_state.best_markov_setup) # <-- SAVE TO FILE st.json(st.session_state.best_markov_setup) st.subheader(f"Top 10 Setups ({side.title()}-biased, sorted by {score_to_use}):") display_cols = [ 'Strategy', 'Run-Up Period', 'Future Period', score_to_use, 'Avg. P/L', 'Total Occurrences' ] st.dataframe(final_df.head(10)[display_cols].style.format({ score_to_use: "{:.4%}", # Format as percentage "Avg. P/L": "{:.4%}", })) # --- [NEW] HEATMAP INFOGRAPHIC --- st.subheader(f"Markov '{side.title()}' Alpha Score per Day Heatmap") st.caption("This heatmap shows the most *efficient* signals (highest predictive value per day held).") try: # We use the pivot_df we created earlier, which has the scores heatmap_data = pivot_df.pivot_table( index='Run-Up Period', columns='Future Period', values=score_to_use # Plot the Alpha Score per Day ) # Create the heatmap st.dataframe( heatmap_data.style .background_gradient(cmap='RdYlGn', axis=None) # Red-Yellow-Green colormap .format("{:.4%}", na_rep='-') # Format as percentage ) except Exception as e: st.warning(f"Could not generate heatmap. Error: {e}") # --- [END NEW] --- st.session_state.markov_results_df = final_df # Save the sorted results st.session_state.run_markov_optimisation = False # Reset flag # --- Corrected: Ensures rsi_logic AND primary_driver are passed --- def run_confidence_optimisation(optimise_for, find_mode, master_df, main_content_placeholder, veto_factors): st.session_state.summary_df = None st.session_state.single_ticker_results = None st.session_state.open_trades_df = None st.session_state.best_params = None st.session_state.advisor_df = None st.session_state.worst_confidence_setups_list = [] with main_content_placeholder.container(): num_cores = cpu_count() st.info(f"Starting to find **{find_mode.upper()}** {optimise_for.upper()} setups on {num_cores} CPU cores...") st.caption("Note: This process runs in parallel for maximum speed. The status below updates as each strategy combination completes.") # --- [NEW] Added Markov --- factors = ['RSI', 'Volatility', 'TREND', 'Volume', 'MACD', 'MA Slope', 'Markov'] num_factors = len(factors) if find_mode == 'worst': if veto_factors is None or len(veto_factors) != num_factors: st.error("Internal error: Veto factors not provided correctly.") return target_combo = veto_factors on_off_combos = [c for c in itertools.product([False, True], repeat=num_factors) if c == target_combo] if not on_off_combos or not any(on_off_combos[0]): st.warning("Please select at least one factor for the Veto search."); return else: on_off_combos = [c for c in itertools.product([False, True], repeat=num_factors) if any(c)] thresholds_to_test = [20, 25, 30, 35, 40, 45, 50] tasks = list(itertools.product(on_off_combos, thresholds_to_test, [1.0])) total_tasks = len(tasks) base_params = { "large_ma_period": st.session_state.ma_period, "bband_period": st.session_state.bb_period, "bband_std_dev": st.session_state.bb_std, "long_entry_threshold_pct": st.session_state.long_entry / 100, "long_exit_ma_threshold_pct": st.session_state.long_exit / 100, "long_trailing_stop_loss_pct": st.session_state.long_sl / 100, "long_delay_days": st.session_state.long_delay, "short_entry_threshold_pct": st.session_state.short_entry / 100, "short_exit_ma_threshold_pct": st.session_state.short_exit / 100, "short_trailing_stop_loss_pct": st.session_state.short_sl / 100, "short_delay_days": st.session_state.short_delay, } tickers_to_run = sorted([col for col in master_df.columns if not ('_Volume' in str(col) or '_High' in str(col) or '_Low' in str(col))]) date_range = (pd.Timestamp(st.session_state.start_date), pd.Timestamp(st.session_state.end_date)) factor_weights = { "rsi": st.session_state.rsi_w, "vol": st.session_state.vol_w, "trend": st.session_state.trend_w, "volume": st.session_state.volume_w, "macd": st.session_state.macd_w, "ma_slope": st.session_state.ma_slope_w, "markov": st.session_state.markov_w, "use_adx": st.session_state.use_adx_filter, "adx_thresh": st.session_state.adx_threshold, "adx_period": st.session_state.adx_period, "rsi_logic": st.session_state.rsi_logic, "primary_driver": st.session_state.primary_driver, 'markov_setup': st.session_state.get('best_markov_setup'), 'exit_logic': st.session_state.exit_logic_type, 'exit_thresh': st.session_state.exit_confidence_threshold, 'smart_trailing_stop': st.session_state.smart_trailing_stop_pct / 100.0, 'smart_exit_atr_period': st.session_state.smart_exit_atr_period, 'smart_exit_atr_multiplier': st.session_state.smart_exit_atr_multiplier, 'intelligent_tsl_pct': st.session_state.intelligent_tsl_pct / 100.0 } worker_func = partial(run_single_confidence_test, base_params=base_params, master_df=master_df, date_range=date_range, tickers_to_run=tickers_to_run, optimise_for=optimise_for, factor_weights=factor_weights) results_list = [] # --- UI Feedback Elements --- progress_bar = st.progress(0, text="Initializing parallel engines...") status_text = st.empty() with Pool(processes=num_cores) as pool: try: iterator = pool.imap_unordered(worker_func, tasks) for i, result in enumerate(iterator, 1): if isinstance(result, dict) and "Trade G/B Ratio" in result: results_list.append(result) # --- DYNAMIC STATUS UPDATE --- # This creates the "Screaming Past" effect for strategies active = [] if result['RSI']: active.append("RSI") if result['Volatility']: active.append("Vol") if result['TREND']: active.append("Trend") if result['Volume']: active.append("VolSpike") if result['MACD']: active.append("MACD") if result['MA Slope']: active.append("Slope") if result['Markov']: active.append("Markov") status_msg = f"Analyzed: [{' + '.join(active)}] @ Thresh {result['Conf. Threshold']} -> Profit: {result['Avg Profit/Trade']:.2%} ({result['Total Trades']} trades)" status_text.text(status_msg) else: print(f"Warning: Worker returned invalid result format: {result}") # Update progress bar progress_bar.progress(i / total_tasks, text=f"Optimising... {i}/{total_tasks} combinations complete.") except Exception as e: st.error(f"An error occurred during multiprocessing: {e}") progress_bar.empty(); return if results_list: results_df = pd.DataFrame(results_list) # --- [NEW] FILTER OUT "INFINITY" RESULTS --- # We exclude results where Trade G/B Ratio is > 50000 (the placeholder for 0 losses). # This prevents low-volume "perfect" trades from gaming the Uncapped Scoring system. if 'Trade G/B Ratio' in results_df.columns: results_df = results_df[results_df['Trade G/B Ratio'] < 50000].copy() if results_df.empty: st.warning("No valid setups found after removing infinite ratio outliers.") st.session_state.run_confidence_optimisation = False return # --- Sort by the NEW 3-Factor Weighted Score --- sort_col = "Good Score" if find_mode == 'best' else "Bad Score" if sort_col in results_df.columns: fill_value = -np.inf if find_mode == 'best' else 0 results_df[sort_col] = results_df[sort_col].fillna(fill_value) results_df = results_df.sort_values(by=sort_col, ascending=False).reset_index(drop=True) else: st.error(f"Sorting column '{sort_col}' not found."); return for factor in factors: if factor in results_df.columns: results_df[factor] = results_df[factor].apply(lambda x: "On" if x else "Off") # Save all results to state so they persist st.session_state.confidence_results_df = results_df if find_mode == 'best': st.subheader(f"📊 Top 60 Confidence Setups ({optimise_for.title()} Trades)") if not results_df.empty: best_setup = results_df.iloc[0] st.session_state.best_confidence_setup = best_setup.to_dict() # Save TOP setups using the same DF (which is now sorted by Weighted Score) save_top_setups(results_df, optimise_for) else: st.warning("No valid 'best' setups found.") st.session_state.best_confidence_setup = None else: # 'worst' mode valid_worst_df = results_df[results_df['Trade G/B Ratio'] < 99999.0].copy() if not valid_worst_df.empty: valid_worst_df['Bad Score'] = valid_worst_df['Bad Score'].fillna(0) valid_worst_df = valid_worst_df.sort_values(by="Bad Score", ascending=False).reset_index(drop=True) st.session_state.worst_confidence_setups_list = valid_worst_df.head(4).to_dict('records') st.info(f"Top {len(st.session_state.worst_confidence_setups_list)} valid 'worst' setups ready.") st.subheader(f"🏆 Top 60 Valid Worst Setups ({optimise_for.title()} Trades)") results_df = valid_worst_df # Use this for display else: st.warning("No valid 'worst' setups found.") st.session_state.worst_confidence_setups_list = [] # --- Display the table --- # [CHANGE] Rename the column for display purposes display_df_conf = results_df.head(60).rename(columns={'Net % Return': 'Moneypile Score'}) # [CHANGE] Update formatter key to 'Moneypile Score' and remove '%' from format string conf_formatters = { "Avg Profit/Trade": "{:.2%}", "Ticker G/B Ratio": "{:.2f}", "Trade G/B Ratio": "{:.2f}", "Moneypile Score": "{:.1f}", "Avg Entry Conf.": "{:.1f}%", "Good Score": "{:.4f}", "Bad Score": "{:.4f}", "Norm. Score %": "{:.2f}%" } if 'MACD' in display_df_conf.columns: conf_formatters['MACD'] = '{}' if 'MA Slope' in display_df_conf.columns: conf_formatters['MA Slope'] = '{}' if 'Markov' in display_df_conf.columns: conf_formatters['Markov'] = '{}' valid_conf_formatters = {k: (lambda val, fmt=v: fmt.format(val) if pd.notna(val) else '-') for k, v in conf_formatters.items() if k in display_df_conf.columns} st.dataframe(display_df_conf.style.format(valid_conf_formatters, na_rep='-')) else: st.warning("Optimisation completed but no results were generated."); st.session_state.confidence_results_df = None st.session_state.run_confidence_optimisation = False # Reset flag def generate_user_advisor_ui_and_run(main_df): st.header("⚙️ User-Defined Advisor Setups") data_key = "user_setups_data" widget_key = "user_setups_editor_state" def load_saved_setups_callback(): st.session_state[data_key] = load_user_setups() if widget_key in st.session_state: del st.session_state[widget_key] def clear_all_setups_callback(): blank_setups = [load_user_setups()[0].copy() for _ in range(20)] st.session_state[data_key] = blank_setups if widget_key in st.session_state: del st.session_state[widget_key] st.caption("Add advice/notes in the 'Notes' column. Check 'Run' to test.") col_config = { "Run": st.column_config.CheckboxColumn("Run", width="small"), # Added Notes Column "Notes": st.column_config.TextColumn("Notes", help="Advice/Description for this setup", width="medium"), "RSI": st.column_config.TextColumn("RSI", width="small"), "Volatility": st.column_config.TextColumn("Volatility", width="small"), "TREND": st.column_config.TextColumn("TREND", width="small"), "Volume": st.column_config.TextColumn("Volume", width="small"), "MACD": st.column_config.TextColumn("MACD", width="small"), "MA Slope": st.column_config.TextColumn("MA Slope", width="small"), "Markov": st.column_config.TextColumn("Markov", width="small"), "ADX Filter": st.column_config.TextColumn("ADX Filter", width="small"), "Conf. Threshold": st.column_config.NumberColumn("Conf.", width="small"), "Large MA Period": st.column_config.NumberColumn("MA", width="small"), "Bollinger Band Period": st.column_config.NumberColumn("BB", width="small"), "Bollinger Band Std Dev": st.column_config.NumberColumn("Std", format="%.1f", width="small"), "Catcher Offset (%)": st.column_config.NumberColumn("Catcher %", format="%.1f", width="small"), "Long Entry Threshold (%)": st.column_config.NumberColumn("L Entry", format="%.1f", width="small"), "Long Exit Threshold (%)": st.column_config.NumberColumn("L Exit", format="%.1f", width="small"), "Long Stop Loss (%)": st.column_config.NumberColumn("L TSL", format="%.1f", width="small"), "Long Delay (Days)": st.column_config.NumberColumn("L Delay", width="small"), "Short Entry Threshold (%)": st.column_config.NumberColumn("S Entry", format="%.1f", width="small"), "Short Exit Threshold (%)": st.column_config.NumberColumn("S Exit", format="%.1f", width="small"), "Short Stop Loss (%)": st.column_config.NumberColumn("S TSL", format="%.1f", width="small"), "Short Delay (Days)": st.column_config.NumberColumn("S Delay", width="small"), "Z_Avg_Profit": st.column_config.NumberColumn("Avg %", format="%.2f%%", disabled=True, width="small"), "Z_Num_Trades": st.column_config.NumberColumn("Trades", disabled=True, width="small"), "Z_WL_Ratio": st.column_config.NumberColumn("W/L", format="%.2f", disabled=True, width="small"), } edited_setups_list = st.data_editor( st.session_state[data_key], column_config=col_config, num_rows="dynamic", key=widget_key, column_order=[ "Run", "Notes", # <--- Notes column appears here (2nd) "RSI", "Volatility", "TREND", "Volume", "MACD", "MA Slope", "Markov", "ADX Filter", "Conf. Threshold", "Large MA Period", "Bollinger Band Period", "Bollinger Band Std Dev","Catcher Offset (%)", "Long Entry Threshold (%)", "Long Exit Threshold (%)", "Long Stop Loss (%)", "Long Delay (Days)", "Short Entry Threshold (%)", "Short Exit Threshold (%)", "Short Stop Loss (%)", "Short Delay (Days)", "Z_Avg_Profit", "Z_Num_Trades", "Z_WL_Ratio" ] ) col1, col2, col3, col4, col5 = st.columns([1, 1, 1, 2, 2]) if col1.button("💾 Save Setups"): save_user_setups(edited_setups_list[:20]) st.session_state[data_key] = edited_setups_list[:20] col2.button("🔄 Load Saved", on_click=load_saved_setups_callback) col3.button("🗑️ Clear All Setups", on_click=clear_all_setups_callback) if col4.button("Scan with User Setups", type="primary"): current_setups_list = edited_setups_list[:20] setups_to_run = [s for s in current_setups_list if s.get("Run") == True] if len(setups_to_run) == 0: st.error("No setups selected.") elif len(setups_to_run) > 8: st.error(f"Selected {len(setups_to_run)} setups. Max is 8.") else: st.session_state.run_scan_user_setups = True st.session_state.run_user_advisor_setup = False st.session_state.advisor_df = None st.session_state.setups_to_scan = setups_to_run st.rerun() if col5.button("Back to Main Analysis"): st.session_state.run_user_advisor_setup = False; st.session_state.advisor_df = None; st.rerun() def generate_advisor_report(main_df): """Handles the UI for running the 'Top Setups' advisor.""" st.header("📈 Advanced Advisor Report (Top 8 Setups)") top_setups = load_top_setups() if not top_setups: st.warning("No saved top setups found. Run 'Find Best Confidence' first.") if st.button("Back"): st.session_state.run_advanced_advisor = False st.rerun() return side = st.radio("Generate report for which saved setups?", ("Long", "Short"), horizontal=True, key="advisor_side_select_top") setups_to_run_all = top_setups.get(side.lower()) if not setups_to_run_all: st.warning(f"No saved top {side.lower()} setups found.") if st.button("Back"): st.session_state.run_advanced_advisor = False st.rerun() return if st.button(f"Scan using Top {side} Setups", type="primary"): st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None run_advisor_scan(main_df, setups_to_run_all, "Top Setups") # Call the scanner if st.button("Back"): st.session_state.run_advanced_advisor = False st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None st.rerun() def apply_best_weights_to_widgets(): """Loads optimised weights from st.session_state.best_weights into the sidebar widgets.""" if 'best_weights' in st.session_state and st.session_state.best_weights: weights = st.session_state.best_weights if 'rsi_w' in weights: st.session_state.rsi_w = weights['rsi_w'] if 'vol_w' in weights: st.session_state.vol_w = weights['vol_w'] if 'trend_w' in weights: st.session_state.trend_w = weights['trend_w'] if 'volume_w' in weights: st.session_state.volume_w = weights['volume_w'] if 'macd_w' in weights: st.session_state.macd_w = weights['macd_w'] if 'ma_slope_w' in weights: st.session_state.ma_slope_w = weights['ma_slope_w'] st.sidebar.success("Optimal weights loaded into sidebar!") st.rerun() else: st.sidebar.error("No optimal weights found in session state.") def apply_best_params_to_widgets(): """Loads parameters from st.session_state.best_params into the sidebar widgets.""" if 'best_params' in st.session_state and st.session_state.best_params: params = st.session_state.best_params if 'large_ma_period' in params: st.session_state.ma_period = params['large_ma_period'] if 'bband_period' in params: st.session_state.bb_period = params['bband_period'] if 'bband_std_dev' in params: st.session_state.bb_std = params['bband_std_dev'] if 'confidence_threshold' in params: st.session_state.confidence_slider = params['confidence_threshold'] if 'long_entry_threshold_pct' in params: st.session_state.long_entry = params['long_entry_threshold_pct'] * 100 if 'long_exit_ma_threshold_pct' in params: st.session_state.long_exit = params['long_exit_ma_threshold_pct'] * 100 if 'long_trailing_stop_loss_pct' in params: st.session_state.long_sl = params['long_trailing_stop_loss_pct'] * 100 if 'long_delay_days' in params: st.session_state.long_delay = params['long_delay_days'] if 'long_entry_threshold_pct' in params: st.session_state.short_entry = params['long_entry_threshold_pct'] * 100 if 'long_exit_ma_threshold_pct' in params: st.session_state.short_exit = params['long_exit_ma_threshold_pct'] * 100 if 'long_trailing_stop_loss_pct' in params: st.session_state.short_sl = params['long_trailing_stop_loss_pct'] * 100 if 'long_delay_days' in params: st.session_state.long_delay = params['long_delay_days'] st.sidebar.success("Optimal parameters loaded into sidebar!") st.rerun() else: st.sidebar.error("No optimal parameters found in session state.") # --- 5. Streamlit User Interface --- def update_state(): """ Callback to update the main session state from the 'widget_' keys. This synchronizes all widgets. """ # Get all keys from session state keys = list(st.session_state.keys()) # Loop through all keys that start with 'widget_' for widget_key in keys: if widget_key.startswith('widget_'): # Find the main key (e.g., 'widget_ma_period' -> 'ma_period') main_key = widget_key[len('widget_'):] # Update the main key with the widget's value if main_key in st.session_state: st.session_state[main_key] = st.session_state[widget_key] def convert_results_to_csv(summary_df, params): """ Combines the Performance Summary and the Configuration Settings into a single CSV string. """ if summary_df is None or summary_df.empty: return "" # 1. Get Performance Data (First Row) perf_data = summary_df.iloc[0].to_dict() # 2. Get Settings Data (From Session State/Params) settings_data = { "--- SETTINGS ---": "", # Separator "Start Date": params.get('start_date'), "End Date": params.get('end_date'), "Primary Driver": params.get('primary_driver'), "Confidence Threshold": params.get('confidence_threshold'), "RSI Weight": params.get('rsi_w') if params.get('use_rsi') else "Off", "Volatility Weight": params.get('vol_w') if params.get('use_volatility') else "Off", "Trend Weight": params.get('trend_w') if params.get('use_trend') else "Off", "Volume Weight": params.get('vol_w_val') if params.get('use_volume') else "Off", "MACD Weight": params.get('macd_w') if params.get('use_macd') else "Off", "MA Slope Weight": params.get('ma_slope_w') if params.get('use_ma_slope') else "Off", "Markov Weight": params.get('markov_w') if params.get('use_markov') else "Off", "MFI Weight": params.get('mfi_w') if params.get('use_mfi') else "Off", "SuperTrend Weight": params.get('supertrend_w') if params.get('use_supertrend') else "Off", "ADX Filter": f"On (< {params.get('adx_threshold')})" if params.get('use_adx_filter') else "Off", "BB Period": params.get('bband_period'), "BB Std Dev": params.get('bband_std_dev'), "MA Period": params.get('large_ma_period'), "Long Entry Thresh %": params.get('long_entry_threshold_pct'), "Short Entry Thresh %": params.get('short_entry_threshold_pct'), "Exit Logic": params.get('exit_logic_type'), "Smart TSL %": params.get('smart_trailing_stop_pct') } # 3. Combine into one dictionary combined_data = {**perf_data, **settings_data} # 4. Create DataFrame and convert to CSV export_df = pd.DataFrame([combined_data]) return export_df.to_csv(index=False).encode('utf-8') def main(): if 'run_advanced_advisor' not in st.session_state: st.session_state.run_advanced_advisor = False if 'run_user_advisor_setup' not in st.session_state: st.session_state.run_user_advisor_setup = False # 2. Define ALL Defaults (Prevents "AttributeError" crashes) defaults = { 'start_date': date(2024, 1, 1), 'end_date': date.today(), 'bband_period': 20, 'bband_std_dev': 2.0, 'long_entry_threshold_pct': 0.0, 'short_entry_threshold_pct': 0.0, 'long_exit_ma_threshold_pct': 0.0, 'short_exit_ma_threshold_pct': 0.0, 'confidence_threshold': 50, 'max_long_duration': 60, 'max_short_duration': 60, 'long_trailing_stop_loss_pct': 0.0, 'short_trailing_stop_loss_pct': 0.0, 'primary_driver': 'Bollinger Bands', 'use_rsi': True, 'rsi_w': 0.5, 'use_volatility': True, 'vol_w': 0.5, 'use_trend': True, 'trend_w': 0.5, 'use_volume': True, 'vol_w_val': 0.5, 'use_macd': False, 'macd_w': 0.5, 'use_ma_slope': False, 'ma_slope_w': 0.5, 'use_markov': False, 'markov_w': 0.5, # New Indicators 'use_mfi': False, 'mfi_w': 0.5, 'use_supertrend': False, 'supertrend_w': 0.5, # Filters & Exits 'use_adx_filter': False, 'adx_threshold': 25, 'rsi_logic': 'Level', 'exit_logic_type': 'Standard (Price-Based)', 'smart_trailing_stop_pct': 0.05, 'smart_exit_atr_period': 14, 'smart_exit_atr_multiplier': 3.0, 'intelligent_tsl_pct': 1.0, 'use_ma_floor_filter': False, 'catcher_stop_pct': 0.03, 'long_delay_days': 0, 'short_delay_days': 0, 'long_score_95_percentile': None, 'short_score_95_percentile': None, 'veto_setup_list': None, 'advisor_df': None, 'confidence_results_df': None, 'single_ticker_results': None, 'summary_df': None, 'open_trades_df': None, 'load_message': None, 'outlier_report': None, 'best_markov_setup': None, # Optimization 'ma_period': 50, 'bb_period': 20, 'bb_std': 2.0, 'confidence_slider': 50, 'long_entry': 0.0, 'long_exit': 0.0, 'long_sl': 0.0, 'long_delay': 0, 'short_entry': 0.0, 'short_exit': 0.0, 'short_sl': 0.0, 'short_delay': 0, 'max_duration': 60, 'volume_w': 0.5 # fallback for vol_w_val naming differences } # 3. Apply Defaults if Missing for key, val in defaults.items(): if key not in st.session_state: st.session_state[key] = val # 4. Helper to update state from widgets (Fixed) def update_state(): keys = list(st.session_state.keys()) for widget_key in keys: if widget_key.startswith('widget_'): main_key = widget_key[len('widget_'):] if main_key in st.session_state: st.session_state[main_key] = st.session_state[widget_key] st.set_page_config(page_title="Quant Trader", page_icon="🔴", layout="wide") # --- [FIX 1: Robust Initialization] --- # We check for 'run_mode' explicitly to fix the crash if session state is stale if 'first_run' not in st.session_state or 'run_mode' not in st.session_state: st.session_state.first_run = True defaults = load_settings() st.session_state.widget_defaults = defaults st.session_state.veto_setup_list = load_veto_setup() st.session_state.use_ma_floor_filter = defaults.get("use_ma_floor_filter", True) # Unpack all widget defaults into session state st.session_state.ticker_select = None st.session_state.run_mode = "Analyse Full List" # <--- This was missing in your stale state st.session_state.start_date = None st.session_state.end_date = None # Section 2 Defaults st.session_state.use_rsi = defaults.get('use_rsi', True) st.session_state.rsi_w = defaults.get('rsi_w', 1.0) st.session_state.rsi_logic = defaults.get('rsi_logic', 'Crossover') st.session_state.primary_driver = defaults.get('primary_driver', 'Bollinger Bands') st.session_state.exit_logic_type = defaults.get('exit_logic_type', 'Standard (Price-Based)') st.session_state.exit_confidence_threshold = defaults.get('exit_confidence_threshold', 50) st.session_state.smart_trailing_stop_pct = defaults.get('smart_trailing_stop_pct', 5.0) st.session_state.use_vol = defaults.get('use_vol', True) st.session_state.vol_w = defaults.get('vol_w', 1.0) st.session_state.use_trend = defaults.get('use_trend', True) st.session_state.trend_w = defaults.get('trend_w', 1.5) st.session_state.use_volume = defaults.get('use_volume', True) st.session_state.volume_w = defaults.get('volume_w', 1.0) st.session_state.use_adx_filter = defaults.get('use_adx_filter', True) st.session_state.adx_threshold = defaults.get('adx_threshold', 25.0) st.session_state.adx_period = defaults.get('adx_period', 14) st.session_state.use_macd = defaults.get('use_macd', True) st.session_state.macd_w = defaults.get('macd_w', 1.0) st.session_state.use_ma_slope = defaults.get('use_ma_slope', True) st.session_state.ma_slope_w = defaults.get('ma_slope_w', 0.5) st.session_state.use_markov = defaults.get('use_markov', False) st.session_state.markov_w = defaults.get('markov_w', 1.0) st.session_state.confidence_slider = defaults.get("confidence_threshold", 50) st.session_state.smart_exit_atr_period = defaults.get("smart_exit_atr_period", 14) st.session_state.smart_exit_atr_multiplier = defaults.get("smart_exit_atr_multiplier", 3.0) st.session_state.intelligent_tsl_pct = defaults.get("intelligent_tsl_pct", 0.60) * 100 st.session_state.catcher_stop_pct = defaults.get("catcher_stop_pct", 0.0) * 100 # Section 3 Defaults st.session_state.ma_period = defaults.get("large_ma_period", 50) st.session_state.bb_period = defaults.get("bband_period", 20) st.session_state.bb_std = defaults.get("bband_std_dev", 2.0) st.session_state.long_entry = defaults.get("long_entry_threshold_pct", 0.0) * 100 st.session_state.long_exit = defaults.get("long_exit_ma_threshold_pct", 0.0) * 100 st.session_state.long_sl = defaults.get("long_trailing_stop_loss_pct", 8.0) * 100 st.session_state.long_delay = defaults.get("long_delay_days", 0) st.session_state.short_entry = defaults.get("short_entry_threshold_pct", 0.0) * 100 st.session_state.short_exit = defaults.get("short_exit_ma_threshold_pct", 0.0) * 100 st.session_state.short_sl = defaults.get("short_trailing_stop_loss_pct", 8.0) * 100 st.session_state.short_delay = defaults.get("short_delay_days", 0) st.session_state.max_duration = defaults.get("max_trading_days", 60) st.session_state.max_long_duration = defaults.get("max_long_duration", 60) st.session_state.max_short_duration = defaults.get("max_short_duration", 10) # Section 4/5/6/7 Init (Optimisation variables) st.session_state.sq_params_toggle = False st.session_state.opt_ma_cb = False; st.session_state.opt_bb_cb = False; st.session_state.opt_std_cb = False st.session_state.opt_conf_cb = False; st.session_state.opt_sl_cb = False; st.session_state.opt_delay_cb = False st.session_state.opt_entry_cb = False; st.session_state.opt_exit_cb = False; st.session_state.opt_duration_cb = False # Init Opt Ranges (Defaults) st.session_state.ma_start = 50; st.session_state.ma_end = 55; st.session_state.ma_step = 5 st.session_state.bb_start = 20; st.session_state.bb_end = 25; st.session_state.bb_step = 5 st.session_state.std_start = 2.0; st.session_state.std_end = 2.1; st.session_state.std_step = 0.1 st.session_state.conf_start = 50; st.session_state.conf_end = 60; st.session_state.conf_step = 5 st.session_state.sl_start = 0.0; st.session_state.sl_end = 2.0; st.session_state.sl_step = 0.5 st.session_state.delay_start = 0; st.session_state.delay_end = 1; st.session_state.delay_step = 1 st.session_state.entry_start = 0.0; st.session_state.entry_end = 0.5; st.session_state.entry_step = 0.1 st.session_state.exit_start = 0.0; st.session_state.exit_end = 0.5; st.session_state.exit_step = 0.1 st.session_state.dur_start = 30; st.session_state.dur_end = 90; st.session_state.dur_step = 10 st.session_state.veto_rsi_cb = True; st.session_state.veto_vol_cb = True; st.session_state.veto_trend_cb = True st.session_state.veto_volume_cb = True; st.session_state.veto_macd_cb = False; st.session_state.veto_ma_slope_cb = False st.session_state.sq_weights_toggle = False # Markov Init st.session_state.markov_run_up_start = 5; st.session_state.markov_run_up_end = 20; st.session_state.markov_run_up_step = 2 st.session_state.markov_future_start = 3; st.session_state.markov_future_end = 10; st.session_state.markov_future_step = 1 st.session_state.best_markov_setup = load_markov_setup() # Data Editor Init st.session_state["user_setups_data"] = [] loaded_setups = load_user_setups() processed = [] factor_cols = ["RSI", "Volatility", "TREND", "Volume", "MACD", "MA Slope", "Markov"] for s in loaded_setups: new_s = s.copy() for k, v in s.items(): if k in factor_cols: new_s[k] = str(v) elif k == "Run": new_s[k] = bool(v) processed.append(new_s) st.session_state["user_setups_data"] = processed # --- Password Auth Init --- st.session_state.dev_authenticated = False # --- [Load Data Logic] --- if 'master_df' not in st.session_state or st.session_state.master_df is None: with st.spinner("Loading and cleaning data..."): master_df, load_message = load_all_data('csv_data') if master_df is None: st.error(load_message); st.stop() master_df, outlier_report = clean_data_and_report_outliers(master_df) st.session_state.master_df = master_df st.session_state.ticker_list = sorted([col for col in master_df.columns if not ('_Volume' in str(col) or '_High' in str(col) or '_Low' in str(col))]) min_date = master_df.index.min().date() max_date = master_df.index.max().date() # --- NEW: Set Default Start to 1 Year Ago from TODAY --- target_start_date = date.today() - relativedelta(years=1) # Safety Check: Ensure the target date is valid within your CSV data range if target_start_date < min_date: st.session_state.start_date = min_date elif target_start_date > max_date: st.session_state.start_date = min_date # Fallback if data is older than 1 year ago else: st.session_state.start_date = target_start_date st.session_state.end_date = max_date if st.session_state.ticker_select is None and st.session_state.ticker_list: st.session_state.ticker_select = st.session_state.ticker_list[0] st.session_state.load_message = load_message st.session_state.outlier_report = outlier_report master_df = st.session_state.master_df ticker_list = st.session_state.ticker_list # --- HEADER / GREETING --- st.title("🔴 🚀 Quant Trader") current_hour = datetime.now().hour if 5 <= current_hour < 12: greeting = "Good morning!" elif 12 <= current_hour < 19: greeting = "Good afternoon!" else: greeting = "Good evening!" today = date.today() day_str = today.strftime('%A, %d %B %Y') proverbs = load_proverbs() day_of_year = today.timetuple().tm_yday proverb_index = (day_of_year - 1) % len(proverbs) if proverbs else 0 daily_proverb = proverbs[proverb_index] if proverbs else "Have a profitable day." st.success(f"{greeting} Today is {day_str}. | *{daily_proverb}*") main_content_placeholder = st.empty() # --- SIDEBAR TOGGLE & PASSWORD LOGIC --- # Default is TRUE (Production) production_mode_toggle = st.sidebar.toggle("Production Mode", value=True, key="production_mode_toggle") if not production_mode_toggle: if not st.session_state.dev_authenticated: st.sidebar.warning("Restricted Access") pwd = st.sidebar.text_input("Enter Developer Password:", type="password", key="dev_password_input") if pwd == "492211": st.session_state.dev_authenticated = True st.rerun() else: production_mode = True else: production_mode = False else: production_mode = True if production_mode: st.markdown("""""", unsafe_allow_html=True) st.markdown("""""", unsafe_allow_html=True) # --- [MOVED] RUN ANALYSIS BUTTON (PRODUCTION MODE) --- if production_mode: # Note: We do NOT use st.rerun() here. We let the script flow down. if st.sidebar.button("Run Analysis", type="primary", key="run_analysis_prod_top", help="Runs the test using the current settings below."): # 1. Force a sync of all widgets to session state update_state() # 2. Set the flag to TRUE so the engine at the bottom runs st.session_state.run_analysis_button = True # 3. Clear previous results to ensure a fresh report st.session_state.run_advanced_advisor = False st.session_state.run_user_advisor_setup = False st.session_state.run_scan_user_setups = False st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None st.session_state.param_results_df = None; st.session_state.confidence_results_df = None st.session_state.summary_df = None; st.session_state.single_ticker_results = None st.session_state.open_trades_df = None; st.session_state.last_run_stats = None st.session_state.markov_results_df = None st.sidebar.markdown("---") # --- SIDEBAR SECTION 1: Select Test Mode & Dates (Common to both) --- st.sidebar.header("1. Select Test Mode & Dates") full_list_label = f"Analyse Full List ({len(ticker_list)} Tickers)" if st.session_state.run_mode == "Analyse Full List": st.session_state.run_mode = full_list_label st.sidebar.radio("Mode:", ("Analyse Single Ticker", full_list_label), key='run_mode') if st.session_state.get('run_mode') == "Analyse Single Ticker": if not ticker_list: st.sidebar.warning("No tickers loaded.") else: # Ensure valid selection logic (only reset if invalid/None) if st.session_state.ticker_select not in ticker_list: st.session_state.ticker_select = ticker_list[0] try: ticker_index = ticker_list.index(st.session_state.ticker_select) except ValueError: ticker_index = 0 # [FIXED] Removed the unconditional reset line that was here st.sidebar.selectbox("Select a Ticker:", ticker_list, index=ticker_index, key='widget_ticker_select', on_change=update_state ) # --- UPDATED: Date Inputs (Fully Unrestricted 2000-2030) --- st.sidebar.date_input( "Start Date", value=st.session_state.start_date, min_value=date(2005, 1, 1), max_value=date(2030, 12, 31), # Allow future dates to prevent locking key='widget_start_date', on_change=update_state ) st.sidebar.date_input( "End Date", value=st.session_state.end_date, min_value=date(2005, 1, 1), max_value=date(2030, 12, 31), key='widget_end_date', on_change=update_state ) # [NEW] Benchmark Settings Sliders (With Tooltips) st.sidebar.markdown("---") st.sidebar.write("**Benchmark Settings**") # 1. Lookback Slider - VISIBLE IN ALL MODES norm_lookback_years = st.sidebar.slider( "Lookback (Yrs)", min_value=1, max_value=30, value=st.session_state.get('norm_lookback_years', 1), # Load from Config help=( "How many years of history to use when calculating the 'Confidence Score' baseline.\n\n" "• **30 Years (Recommended):** Stable, long-term benchmark. Ensures trades are judged against all-time history.\n" "• **1-2 Years:** Adaptive. Judges trades only against recent market volatility (good for changing regimes)." "• **Default is set to 1 year as we are usually most interested in the recent year.") ) use_rolling_benchmark = st.sidebar.checkbox( "Use Rolling Benchmark (Adaptive)", value=st.session_state.get('use_rolling_benchmark', False), key='widget_use_rolling_benchmark', on_change=update_state, help="CHECKED: Adaptive Mode. Benchmarks adapt to recent history. Realistic backtest.\nUNCHECKED: Global Mode (Crystal Ball). Uses ALL history to set one strict benchmark. Good for filtering Open Trades." ) # 2. Grade Benchmark - HIDDEN IN PRODUCTION MODE if not production_mode: benchmark_percentile_setting = st.sidebar.slider( "Grade Benchmark (%)", min_value=50, max_value=99, value=st.session_state.get('benchmark_rank', 99), # Load from Config help=( "Sets the 'Bar' for the Strategy Score.\n\n" "• **99% (Default):** The top 1% of historical trades set the standard for 'Perfection'.\n" "• **80%:** Lowers the bar. Trades only need to be in the top 20% to get a high score (Useful for low volatility).") ) else: # In Production, keep the value but hide the slider benchmark_percentile_setting = st.session_state.get('benchmark_rank', 99) st.sidebar.markdown("---") # --- PRIMARY TRIGGER (Hidden in Production, Forced to BBands) --- if production_mode: st.session_state.primary_driver = "Bollinger Bands" # Force logic # No UI element shown else: st.sidebar.selectbox( "Select Primary Trigger:", ("Bollinger Bands", "RSI Crossover", "MACD Crossover", "MA Slope", "Markov State"), key='widget_primary_driver', on_change=update_state, help="Select the main indicator that will trigger a trade." ) st.sidebar.markdown("---") # --- RUN ACTIONS (Dual Mode Logic) --- st.sidebar.subheader("Run Actions") if production_mode: # --- PRODUCTION BUTTONS --- # [REMOVED] Old "Run Analysis" button was here. # 2. Dropdown for User Setups (With "Default" option) user_setups_raw = st.session_state.get("user_setups_data", []) valid_user_setups = [s for s in user_setups_raw if not is_row_blank(s)] setup_options = {"Default (Reset to Original)": -1} for i, s in enumerate(valid_user_setups): # Build Label with Notes note = s.get('Notes', '') if note: if len(note) > 50: note = note[:47] + "..." note_display = f" - {note}" else: note_display = "" label = f"Setup {i+1}{note_display}" setup_options[label] = i # --- CALLBACK: AUTO-POPULATE SETTINGS ON SELECTION --- def on_user_setup_change(): """Callback to populate settings immediately when dropdown changes.""" selected_label = st.session_state.widget_user_setup_select_key idx = setup_options[selected_label] def sync_param(main_key, value): st.session_state[main_key] = value st.session_state[f"widget_{main_key}"] = value # --- CASE A: DEFAULT (RESET) --- if idx == -1: defaults = st.session_state.get('widget_defaults', {}) or load_settings() # Reset Factors sync_param('use_rsi', defaults.get('use_rsi', True)) sync_param('rsi_w', defaults.get('rsi_w', 1.0)) sync_param('use_vol', defaults.get('use_vol', True)) sync_param('vol_w', defaults.get('vol_w', 1.0)) sync_param('use_trend', defaults.get('use_trend', True)) sync_param('trend_w', defaults.get('trend_w', 1.5)) sync_param('use_volume', defaults.get('use_volume', True)) sync_param('volume_w', defaults.get('volume_w', 1.0)) sync_param('use_macd', defaults.get('use_macd', True)) sync_param('macd_w', defaults.get('macd_w', 1.0)) sync_param('use_ma_slope', defaults.get('use_ma_slope', True)) sync_param('ma_slope_w', defaults.get('ma_slope_w', 0.5)) sync_param('use_markov', defaults.get('use_markov', False)) sync_param('markov_w', defaults.get('markov_w', 1.0)) # Reset ADX (With Clamping for Production Mode) sync_param('use_adx_filter', defaults.get('use_adx_filter', True)) raw_adx = defaults.get('adx_threshold', 25.0) clamped_adx = max(20.0, min(30.0, raw_adx)) sync_param('adx_threshold', clamped_adx) # Reset Strategies sync_param('ma_period', defaults.get("large_ma_period", 50)) sync_param('bb_period', defaults.get("bband_period", 20)) sync_param('bb_std', defaults.get("bband_std_dev", 2.0)) sync_param('confidence_slider', defaults.get("confidence_threshold", 50)) sync_param('long_entry', defaults.get("long_entry_threshold_pct", 0.0) * 100) sync_param('long_exit', defaults.get("long_exit_ma_threshold_pct", 0.0) * 100) sync_param('long_sl', defaults.get("long_trailing_stop_loss_pct", 8.0) * 100) sync_param('long_delay', defaults.get("long_delay_days", 0)) sync_param('short_entry', defaults.get("short_entry_threshold_pct", 0.0) * 100) sync_param('short_exit', defaults.get("short_exit_ma_threshold_pct", 0.0) * 100) sync_param('short_sl', defaults.get("short_trailing_stop_loss_pct", 8.0) * 100) sync_param('short_delay', defaults.get("short_delay_days", 0)) st.toast("Settings reset to Default.", icon="🔄") # --- CASE B: USER SETUP --- else: selected_setup_data = valid_user_setups[idx] def apply_factor_loading(key, main_key, weight_key): val = selected_setup_data.get(key, 'Off') if str(val).lower() in ['off', '0', '0.0', 'false']: sync_param(main_key, False) else: sync_param(main_key, True) try: w = float(val); sync_param(weight_key, w) except: sync_param(weight_key, 1.0) apply_factor_loading('RSI', 'use_rsi', 'rsi_w') apply_factor_loading('Volatility', 'use_vol', 'vol_w') apply_factor_loading('TREND', 'use_trend', 'trend_w') apply_factor_loading('Volume', 'use_volume', 'volume_w') apply_factor_loading('MACD', 'use_macd', 'macd_w') apply_factor_loading('MA Slope', 'use_ma_slope', 'ma_slope_w') apply_factor_loading('Markov', 'use_markov', 'markov_w') # --- ADX FILTER LOGIC (With Clamping) --- adx_val = selected_setup_data.get('ADX Filter', 'Off') if str(adx_val).lower() in ['off', '0', '0.0', 'false']: sync_param('use_adx_filter', False) else: sync_param('use_adx_filter', True) try: thresh = float(adx_val) clamped_thresh = max(20.0, min(30.0, thresh)) sync_param('adx_threshold', clamped_thresh) except: sync_param('adx_threshold', 25.0) def get_num(key, default, type_func): try: return type_func(selected_setup_data.get(key, default)) except: return default sync_param('ma_period', get_num('Large MA Period', st.session_state.ma_period, int)) sync_param('bb_period', get_num('Bollinger Band Period', st.session_state.bb_period, int)) sync_param('bb_std', get_num('Bollinger Band Std Dev', st.session_state.bb_std, float)) sync_param('confidence_slider', get_num('Conf. Threshold', st.session_state.confidence_slider, int)) sync_param('long_entry', get_num('Long Entry Threshold (%)', st.session_state.long_entry, float)) sync_param('long_exit', get_num('Long Exit Threshold (%)', st.session_state.long_exit, float)) sync_param('long_sl', get_num('Long Stop Loss (%)', st.session_state.long_sl, float)) sync_param('long_delay', get_num('Long Delay (Days)', st.session_state.long_delay, int)) sync_param('short_entry', get_num('Short Entry Threshold (%)', st.session_state.short_entry, float)) sync_param('short_exit', get_num('Short Exit Threshold (%)', st.session_state.short_exit, float)) sync_param('short_sl', get_num('Short Stop Loss (%)', st.session_state.short_sl, float)) sync_param('short_delay', get_num('Short Delay (Days)', st.session_state.short_delay, int)) st.toast(f"Populated settings for {selected_label}", icon="✅") # Selectbox with on_change callback st.sidebar.selectbox( "Select User Setup:", list(setup_options.keys()), key="widget_user_setup_select_key", on_change=on_user_setup_change, help="Selecting a setup will immediately populate the sidebar settings below." ) else: # --- DEVELOPER BUTTONS (Original) --- if st.sidebar.button("🚀 Run Analysis (Developer)", type="primary", key="run_analysis_dev"): st.session_state.run_analysis_button = True st.session_state.run_advanced_advisor = False st.session_state.run_user_advisor_setup = False st.session_state.run_scan_user_setups = False st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None st.session_state.param_results_df = None; st.session_state.confidence_results_df = None st.session_state.summary_df = None; st.session_state.single_ticker_results = None st.session_state.open_trades_df = None; st.session_state.last_run_stats = None st.session_state.markov_results_df = None st.rerun() if st.sidebar.button("👨‍💼 Run Advanced Advisor (Find Trades)", key="run_adv_find_trades"): st.session_state.run_advanced_advisor = True st.session_state.run_analysis_button = False st.session_state.run_user_advisor_setup = False st.session_state.run_scan_user_setups = False st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None st.session_state.param_results_df = None; st.session_state.confidence_results_df = None st.session_state.summary_df = None st.rerun() if st.sidebar.button("⚙️ Run User-Defined Advisor (Find Trades)", key="run_user_find_trades"): st.session_state.run_user_advisor_setup = True st.session_state.run_analysis_button = False st.session_state.run_advanced_advisor = False st.session_state.run_scan_user_setups = False st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None st.session_state.param_results_df = None; st.session_state.confidence_results_df = None st.session_state.summary_df = None st.rerun() st.sidebar.markdown("---") # --- SECTION 2: Confidence Score Factors --- st.sidebar.header("2. Confidence Score Factors") if production_mode: # --- PRODUCTION VIEW (Simplified) --- # Force hidden factors OFF st.session_state.use_vol = False st.session_state.use_trend = False st.session_state.use_ma_slope = False st.session_state.use_markov = False st.session_state.rsi_logic = "Level" # RSI (Momentum) if 'widget_use_rsi' not in st.session_state: st.session_state.widget_use_rsi = st.session_state.use_rsi st.sidebar.toggle("Use Momentum (RSI)", key='widget_use_rsi', on_change=update_state, help="Enable Relative Strength Index (Momentum) factor.") rsi_val = st.session_state.rsi_w if st.session_state.rsi_w != 1.0 else 0.5 if 'widget_rsi_w' not in st.session_state: st.session_state.widget_rsi_w = rsi_val st.sidebar.number_input("RSI Weight", 0.1, 5.0, step=0.1, key='widget_rsi_w', on_change=update_state, disabled=not st.session_state.get('use_rsi', True)) # Volume Spike if 'widget_use_volume' not in st.session_state: st.session_state.widget_use_volume = st.session_state.use_volume st.sidebar.toggle("Use Volume Spike", key='widget_use_volume', on_change=update_state, help="Enable Volume Spike detection.") vol_val = st.session_state.volume_w if st.session_state.volume_w != 1.0 else 0.5 if 'widget_volume_w' not in st.session_state: st.session_state.widget_volume_w = vol_val st.sidebar.number_input("Volume Weight", 0.1, 5.0, step=0.1, key='widget_volume_w', on_change=update_state, disabled=not st.session_state.get('use_volume', True)) # ADX Filter if 'widget_use_adx_filter' not in st.session_state: st.session_state.widget_use_adx_filter = st.session_state.use_adx_filter st.sidebar.toggle("Use ADX Filter (<)", key='widget_use_adx_filter', on_change=update_state, help="Enable Trend Strength filter (ADX). Good for hunting Shorts, bad for Longs") curr_adx = max(20.0, min(30.0, st.session_state.adx_threshold)) if 'widget_adx_threshold' not in st.session_state: st.session_state.widget_adx_threshold = curr_adx st.sidebar.number_input("ADX Threshold", 20.0, 30.0, step=1.0, key='widget_adx_threshold', on_change=update_state, disabled=not st.session_state.get('use_adx_filter', True)) # MACD if 'widget_use_macd' not in st.session_state: st.session_state.widget_use_macd = st.session_state.use_macd st.sidebar.toggle("Use MACD Signals", key='widget_use_macd', on_change=update_state, help="Enable MACD crossover/histogram signals.") macd_val = st.session_state.macd_w if st.session_state.macd_w != 1.0 else 2.0 if 'widget_macd_w' not in st.session_state: st.session_state.widget_macd_w = macd_val st.sidebar.number_input("MACD Weight", 0.1, 5.0, step=0.1, key='widget_macd_w', on_change=update_state, disabled=not st.session_state.get('use_macd', True)) # Confidence Slider # [FIX] Force the widget key to match the stored variable BEFORE creating the widget st.session_state['widget_confidence_slider'] = st.session_state.confidence_slider # [FIX] Remove 'value=' argument. Streamlit will use the key we just populated. st.sidebar.slider( "Minimum Confidence Threshold (%)", 0, 100, step=5, key='widget_confidence_slider', on_change=update_state, help="Acts as a quality filter. Higher values increase trade quality but reduce the number of trades found." ) else: # --- DEVELOPER VIEW (Original) --- if 'widget_use_rsi' not in st.session_state: st.session_state.widget_use_rsi = st.session_state.use_rsi st.sidebar.toggle("Use Momentum (RSI)", key='widget_use_rsi', on_change=update_state) if 'widget_rsi_w' not in st.session_state: st.session_state.widget_rsi_w = st.session_state.rsi_w st.sidebar.number_input("RSI Weight", 0.1, 5.0, step=0.1, key='widget_rsi_w', on_change=update_state, disabled=not st.session_state.get('use_rsi', True)) rsi_logic_options = ["Crossover", "Level"] rsi_logic_index = rsi_logic_options.index(st.session_state.rsi_logic) if st.session_state.rsi_logic in rsi_logic_options else 0 st.sidebar.radio("RSI Entry Logic:", rsi_logic_options, index=rsi_logic_index, key='widget_rsi_logic', on_change=update_state, help="Level: Enter <= 30/>= 70. Crossover: Enter on cross back.", disabled=not st.session_state.get('use_rsi', True)) if 'widget_use_vol' not in st.session_state: st.session_state.widget_use_vol = st.session_state.use_vol st.sidebar.toggle("Use Volatility", key='widget_use_vol', on_change=update_state) if 'widget_vol_w' not in st.session_state: st.session_state.widget_vol_w = st.session_state.vol_w st.sidebar.number_input("Volatility Weight", 0.1, 5.0, step=0.1, key='widget_vol_w', on_change=update_state, disabled=not st.session_state.get('use_vol', True)) if 'widget_use_trend' not in st.session_state: st.session_state.widget_use_trend = st.session_state.use_trend st.sidebar.toggle("Use Trend (200d MA)", key='widget_use_trend', on_change=update_state) if 'widget_trend_w' not in st.session_state: st.session_state.widget_trend_w = st.session_state.trend_w st.sidebar.number_input("Trend Weight", 0.1, 5.0, step=0.1, key='widget_trend_w', on_change=update_state, disabled=not st.session_state.get('use_trend', True)) if 'widget_use_volume' not in st.session_state: st.session_state.widget_use_volume = st.session_state.use_volume st.sidebar.toggle("Use Volume Spike", key='widget_use_volume', on_change=update_state) if 'widget_volume_w' not in st.session_state: st.session_state.widget_volume_w = st.session_state.volume_w st.sidebar.number_input("Volume Weight", 0.1, 5.0, step=0.1, key='widget_volume_w', on_change=update_state, disabled=not st.session_state.get('use_volume', True)) if 'widget_use_adx_filter' not in st.session_state: st.session_state.widget_use_adx_filter = st.session_state.use_adx_filter st.sidebar.toggle("Use ADX Filter (<)", key='widget_use_adx_filter', on_change=update_state) if 'widget_adx_threshold' not in st.session_state: st.session_state.widget_adx_threshold = st.session_state.adx_threshold st.sidebar.number_input("ADX Threshold", 10.0, 50.0, step=1.0, key='widget_adx_threshold', on_change=update_state, disabled=not st.session_state.get('use_adx_filter', True), help="Allow entries only when ADX is BELOW this value.") if 'widget_adx_period' not in st.session_state: st.session_state.widget_adx_period = st.session_state.adx_period st.sidebar.number_input("ADX Period", 5, 50, step=1, key='widget_adx_period', on_change=update_state, help="The lookback period for the ADX calculation.") if 'widget_use_macd' not in st.session_state: st.session_state.widget_use_macd = st.session_state.use_macd st.sidebar.toggle("Use MACD Signals", key='widget_use_macd', on_change=update_state) if 'widget_macd_w' not in st.session_state: st.session_state.widget_macd_w = st.session_state.macd_w st.sidebar.number_input("MACD Weight", 0.1, 5.0, step=0.1, key='widget_macd_w', on_change=update_state, disabled=not st.session_state.get('use_macd', True)) if 'widget_use_ma_slope' not in st.session_state: st.session_state.widget_use_ma_slope = st.session_state.use_ma_slope st.sidebar.toggle("Use MA Slope", key='widget_use_ma_slope', on_change=update_state) if 'widget_ma_slope_w' not in st.session_state: st.session_state.widget_ma_slope_w = st.session_state.ma_slope_w st.sidebar.number_input("MA Slope Weight", 0.1, 5.0, step=0.1, key='widget_ma_slope_w', on_change=update_state, disabled=not st.session_state.get('use_ma_slope', True)) if 'widget_use_markov' not in st.session_state: st.session_state.widget_use_markov = st.session_state.use_markov st.sidebar.toggle("Use Markov State", key='widget_use_markov', on_change=update_state, help="Use the best-found Markov state as a confidence factor. You must run 'Find Best Markov Setup' (Section 7) first.") if 'widget_markov_w' not in st.session_state: st.session_state.widget_markov_w = st.session_state.markov_w st.sidebar.number_input("Markov Weight", 0.1, 5.0, step=0.1, key='widget_markov_w', on_change=update_state, disabled=not st.session_state.get('use_markov', True)) # --- NEW: MFI Controls --- if 'widget_use_mfi' not in st.session_state: st.session_state.widget_use_mfi = st.session_state.use_mfi st.sidebar.toggle("Use Money Flow (MFI)", key='widget_use_mfi', on_change=update_state) if 'widget_mfi_w' not in st.session_state: st.session_state.widget_mfi_w = st.session_state.mfi_w st.sidebar.number_input("MFI Weight", 0.1, 5.0, step=0.1, key='widget_mfi_w', on_change=update_state, disabled=not st.session_state.get('use_mfi', True)) # --- NEW: SuperTrend Controls --- if 'widget_use_supertrend' not in st.session_state: st.session_state.widget_use_supertrend = st.session_state.use_supertrend st.sidebar.toggle("Use SuperTrend", key='widget_use_supertrend', on_change=update_state) if 'widget_supertrend_w' not in st.session_state: st.session_state.widget_supertrend_w = st.session_state.supertrend_w st.sidebar.number_input("SuperTrend Weight", 0.1, 5.0, step=0.1, key='widget_supertrend_w', on_change=update_state, disabled=not st.session_state.get('use_supertrend', True)) # [FIX] Force the widget key to match the stored variable st.session_state['widget_confidence_slider'] = st.session_state.confidence_slider # [FIX] Remove 'value=' argument. st.sidebar.slider( "Minimum Confidence Threshold (%)", 0, 100, step=5, key='widget_confidence_slider', on_change=update_state ) st.sidebar.markdown("---") st.sidebar.header("3. Strategy Parameters") if production_mode: # --- PRODUCTION VIEW (Simplified) --- # Large MA and BB Period are HIDDEN here (values persist in session_state) # Dropdown for Std Dev (1.4, 1.6, 2.2) std_options = [1.4, 1.6, 2.2] current_std = st.session_state.bb_std std_index = std_options.index(current_std) if current_std in std_options else 2 st.sidebar.selectbox( "Bollinger Band Std Dev", std_options, index=std_index, key='widget_bb_std', on_change=update_state, help="Controls volatility sensitivity. Lower (1.4/1.6) = more trades, Higher (2.2) = fewer, stricter trades." ) else: # --- DEVELOPER VIEW (Original) --- st.sidebar.number_input("Large MA Period", 10, 200, value=st.session_state.ma_period, step=1, key='widget_ma_period', on_change=update_state) st.sidebar.number_input("Bollinger Band Period", 10, 100, value=st.session_state.bb_period, step=1, key='widget_bb_period', on_change=update_state) st.sidebar.number_input("Bollinger Band Std Dev", 1.0, 5.0, value=st.session_state.bb_std, step=0.1, key='widget_bb_std', on_change=update_state) st.sidebar.subheader("Long Trade Logic") st.sidebar.slider("Entry Threshold (%)", 0.0, 10.0, value=st.session_state.long_entry, step=0.1, key='widget_long_entry', on_change=update_state) st.sidebar.slider("Exit MA Threshold (%)", 0.0, 10.0, value=st.session_state.long_exit, step=0.1, key='widget_long_exit', on_change=update_state) st.sidebar.slider("Trailing Stop Loss (%)", 0.0, 30.0, value=st.session_state.long_sl, step=0.5, key='widget_long_sl', on_change=update_state, help="A trailing stop-loss set from the trade's high-water mark. Set to 0 to disable.") st.sidebar.number_input("Delay Entry (days)", 0, 10, value=st.session_state.long_delay, step=1, key='widget_long_delay', on_change=update_state) st.sidebar.markdown("---") st.sidebar.subheader("Exit Logic") exit_options = ["Standard (Price-Based)", "Intelligent (ADX/MACD/ATR)"] if st.session_state.exit_logic_type not in exit_options: st.session_state.exit_logic_type = "Standard (Price-Based)" exit_index = exit_options.index(st.session_state.exit_logic_type) st.sidebar.selectbox( "Exit Logic Type:", exit_options, index=exit_index, key='widget_exit_logic_type', on_change=update_state, help="Standard: Exit on MA/BB cross. Intelligent: ADX/MACD profit-take with an optional TSL." ) st.sidebar.slider( "Intelligent: Trailing Stop Loss (%)", 0.0, 60.0, value=st.session_state.intelligent_tsl_pct, step=1.0, key='widget_intelligent_tsl_pct', on_change=update_state, format="%.1f%%", disabled=(st.session_state.exit_logic_type != 'Intelligent (ADX/MACD/ATR)'), help="The TSL for the 'Intelligent' exit. Set to 60.0% to replicate the 'Profit-Take or Time-Out' strategy." ) st.sidebar.toggle("Apply MA/BB Price Lock Floor (Catcher)", value=st.session_state.use_ma_floor_filter, key='widget_use_ma_floor_filter', on_change=update_state, help="If ON, the Trailing Stop Loss is prevented from dropping below the Mean Reversion price.") # [NEW SLIDER] Catcher Offset # Only visible in Developer Mode. Disabled if Catcher is OFF. st.sidebar.slider( "Catcher Offset (%)", min_value=-10.0, max_value=10.0, value=st.session_state.catcher_stop_pct, step=0.1, key='widget_catcher_stop_pct', on_change=update_state, format="%.1f%%", disabled=not st.session_state.use_ma_floor_filter, help="Adjusts the hard floor level relative to Entry Price.\nPositive = Secure Profit (e.g. +1% Floor).\nNegative = Allow wiggle room (e.g. -1% Floor)." ) st.sidebar.markdown("---") st.sidebar.subheader("Short Trade Logic") st.sidebar.slider("Entry Threshold (%)", 0.0, 10.0, value=st.session_state.short_entry, step=0.1, key='widget_short_entry', on_change=update_state) st.sidebar.slider("Exit MA Threshold (%)", 0.0, 10.0, value=st.session_state.short_exit, step=0.1, key='widget_short_exit', on_change=update_state) st.sidebar.slider("Trailing Stop Loss (%)", 0.0, 30.0, value=st.session_state.short_sl, step=0.5, key='widget_short_sl', on_change=update_state, help="A trailing stop-loss set from the trade's low-water mark. Set to 0 to disable.") st.sidebar.number_input("Delay Entry (days)", 0, 10, value=st.session_state.short_delay, step=1, key='widget_short_delay', on_change=update_state) st.sidebar.markdown("---") st.sidebar.subheader("Time Limits (Days)") c_time1, c_time2 = st.sidebar.columns(2) st.session_state.max_long_duration = c_time1.number_input("Max Long Duration", min_value=1, max_value=365, value=st.session_state.get('max_long_duration', 60), step=1, key='widget_max_long_duration', on_change=update_state) st.session_state.max_short_duration = c_time2.number_input("Max Short Duration", min_value=1, max_value=365, value=st.session_state.get('max_short_duration', 10), step=1, key='widget_max_short_duration', on_change=update_state) if 'best_params' not in st.session_state: st.session_state.best_params = None # --- SECTIONS 4, 5, 6, 7 (HIDDEN IN PRODUCTION) --- if not production_mode: st.sidebar.markdown("---") st.sidebar.header("4. Find Best Parameters") with st.sidebar.expander("Set Optimisation Ranges"): use_squared_weighting = st.toggle("Prioritise Profit per Trade (Params)", value=st.session_state.sq_params_toggle, key="widget_sq_params_toggle", on_change=update_state) st.markdown("---") optimise_ma = st.checkbox("Optimise MA Period", value=st.session_state.opt_ma_cb, key="widget_opt_ma_cb", on_change=update_state); c1,c2,c3 = st.columns(3) c1.number_input("MA Start", 10, 200, value=st.session_state.ma_start, step=5, disabled=not optimise_ma, key='widget_ma_start', on_change=update_state) c2.number_input("MA End", 10, 200, value=st.session_state.ma_end, step=5, disabled=not optimise_ma, key='widget_ma_end', on_change=update_state) c3.number_input("MA Step", 1, 20, value=st.session_state.ma_step, step=1, disabled=not optimise_ma, key='widget_ma_step', on_change=update_state) optimise_bb = st.checkbox("Optimise BB Period", value=st.session_state.opt_bb_cb, key="widget_opt_bb_cb", on_change=update_state); c1,c2,c3 = st.columns(3) c1.number_input("BB Start", 10, 100, value=st.session_state.bb_start, step=5, disabled=not optimise_bb, key='widget_bb_start', on_change=update_state) c2.number_input("BB End", 10, 100, value=st.session_state.bb_end, step=5, disabled=not optimise_bb, key='widget_bb_end', on_change=update_state) c3.number_input("BB Step", 1, 10, value=st.session_state.bb_step, step=1, disabled=not optimise_bb, key='widget_bb_step', on_change=update_state) optimise_std = st.checkbox("Optimise BB Std Dev", value=st.session_state.opt_std_cb, key="widget_opt_std_cb", on_change=update_state); c1,c2,c3 = st.columns(3) c1.number_input("Std Start", 1.0, 5.0, value=st.session_state.std_start, step=0.1, format="%.1f", disabled=not optimise_std, key='widget_std_start', on_change=update_state) c2.number_input("Std End", 1.0, 5.0, value=st.session_state.std_end, step=0.1, format="%.1f", disabled=not optimise_std, key='widget_std_end', on_change=update_state) c3.number_input("Std Step", 0.1, 1.0, value=st.session_state.std_step, step=0.1, format="%.1f", disabled=not optimise_std, key='widget_std_step', on_change=update_state) st.markdown("---") optimise_conf = st.checkbox("Optimise Confidence Threshold", value=st.session_state.opt_conf_cb, key="widget_opt_conf_cb", on_change=update_state); c1,c2,c3 = st.columns(3) c1.number_input("Conf Start", 0, 100, value=st.session_state.conf_start, step=5, disabled=not optimise_conf, key='widget_conf_start', on_change=update_state) c2.number_input("Conf End", 0, 100, value=st.session_state.conf_end, step=5, disabled=not optimise_conf, key='widget_conf_end', on_change=update_state) c3.number_input("Conf Step", 5, 25, value=st.session_state.conf_step, step=5, disabled=not optimise_conf, key='widget_conf_step', on_change=update_state) optimise_sl = st.checkbox("Optimise Stop Loss %", value=st.session_state.opt_sl_cb, key="widget_opt_sl_cb", on_change=update_state); c1,c2,c3 = st.columns(3) c1.number_input("SL Start", 0.0, 30.0, value=st.session_state.sl_start, step=0.5, disabled=not optimise_sl, key='widget_sl_start', on_change=update_state) c2.number_input("SL End", 0.0, 30.0, value=st.session_state.sl_end, step=0.5, disabled=not optimise_sl, key='widget_sl_end', on_change=update_state) c3.number_input("SL Step", 0.1, 5.0, value=st.session_state.sl_step, step=0.5, disabled=not optimise_sl, key='widget_sl_step', on_change=update_state) optimise_delay = st.checkbox("Optimise Delay Days", value=st.session_state.opt_delay_cb, key="widget_opt_delay_cb", on_change=update_state); c1,c2,c3 = st.columns(3) c1.number_input("Delay Start", 0, 10, value=st.session_state.delay_start, step=1, disabled=not optimise_delay, key='widget_delay_start', on_change=update_state) c2.number_input("Delay End", 0, 10, value=st.session_state.delay_end, step=1, disabled=not optimise_delay, key='widget_delay_end', on_change=update_state) c3.number_input("Delay Step", 1, 5, value=st.session_state.delay_step, step=1, disabled=not optimise_delay, key='widget_delay_step', on_change=update_state) optimise_entry = st.checkbox("Optimise Entry %", value=st.session_state.opt_entry_cb, key="widget_opt_entry_cb", on_change=update_state); c1,c2,c3 = st.columns(3) c1.number_input("Entry Start", 0.0, 10.0, value=st.session_state.entry_start, step=0.1, disabled=not optimise_entry, key='widget_entry_start', on_change=update_state) c2.number_input("Entry End", 0.0, 10.0, value=st.session_state.entry_end, step=0.1, disabled=not optimise_entry, key='widget_entry_end', on_change=update_state) c3.number_input("Entry Step", 0.1, 1.0, value=st.session_state.entry_step, step=0.1, disabled=not optimise_entry, key='widget_entry_step', on_change=update_state) optimise_exit = st.checkbox("Optimise Exit MA %", value=st.session_state.opt_exit_cb, key="widget_opt_exit_cb", on_change=update_state); c1,c2,c3 = st.columns(3) c1.number_input("Exit Start", 0.0, 10.0, value=st.session_state.exit_start, step=0.1, disabled=not optimise_exit, key='widget_exit_start', on_change=update_state) c2.number_input("Exit End", 0.0, 10.0, value=st.session_state.exit_end, step=0.1, disabled=not optimise_exit, key='widget_exit_end', on_change=update_state) c3.number_input("Exit Step", 0.1, 1.0, value=st.session_state.exit_step, step=0.1, disabled=not optimise_exit, key='widget_exit_step', on_change=update_state) st.markdown("---") optimise_dur = st.checkbox("Optimise Max Duration", value=st.session_state.opt_duration_cb, key="widget_opt_duration_cb", on_change=update_state); c1,c2,c3 = st.columns(3) c1.number_input("Dur Start", 1, 365, value=st.session_state.dur_start, step=1, disabled=not optimise_dur, key='widget_dur_start', on_change=update_state) c2.number_input("Dur End", 1, 365, value=st.session_state.dur_end, step=1, disabled=not optimise_dur, key='widget_dur_end', on_change=update_state) c3.number_input("Dur Step", 1, 30, value=st.session_state.dur_step, step=1, disabled=not optimise_dur, key='widget_dur_step', on_change=update_state) st.markdown("---") col1, col2 = st.columns(2) if col1.button("💡 Find Best Long"): generate_and_run_optimisation(master_df, main_content_placeholder, 'long', use_squared_weighting) if col2.button("💡 Find Best Short"): generate_and_run_optimisation(master_df, main_content_placeholder, 'short', use_squared_weighting) st.markdown("---") col_comb_1, col_comb_2 = st.columns(2) if col_comb_1.button("🔥 Find Combined Best Long", key="find_combined_long", use_container_width=True): st.session_state.run_combined_optimisation = True; st.session_state.optimise_side = 'long'; st.rerun() if col_comb_2.button("🔥 Find Combined Best Short", key="find_combined_short", use_container_width=True): st.session_state.run_combined_optimisation = True; st.session_state.optimise_side = 'short'; st.rerun() st.sidebar.markdown("---") st.sidebar.header("5. Find Best/Worst Confidence Setup") with st.sidebar.expander("Optimise Confidence Factors"): st.info("Finds good setups (using Section 2 factors) or bad setups (using the factors below).") st.write("**Find Best Setups (High Profit)**"); c1, c2 = st.columns(2) if c1.button("💡 Find Best Long Confidence", key="find_conf_long"): st.session_state.run_confidence_optimisation = True; st.session_state.confidence_optimise_side = 'long'; st.session_state.confidence_optimise_mode = 'best'; st.session_state.confidence_optimise_veto_factors = None; st.rerun() if c2.button("💡 Find Best Short Confidence", key="find_conf_short"): st.session_state.run_confidence_optimisation = True; st.session_state.confidence_optimise_side = 'short'; st.session_state.confidence_optimise_mode = 'best'; st.session_state.confidence_optimise_veto_factors = None; st.rerun() st.markdown("---") st.write("**Find Worst Setups (for Veto Filter)**") st.caption("Select the factors to test for the Veto signal:") c1_veto, c2_veto = st.columns(2) veto_rsi = c1_veto.toggle("Veto RSI", value=st.session_state.veto_rsi_cb, key="widget_veto_rsi_cb", on_change=update_state) veto_vol = c2_veto.toggle("Veto Volatility", value=st.session_state.veto_vol_cb, key="widget_veto_vol_cb", on_change=update_state) veto_trend = c1_veto.toggle("Veto Trend", value=st.session_state.veto_trend_cb, key="widget_veto_trend_cb", on_change=update_state) veto_volume = c2_veto.toggle("Veto Volume", value=st.session_state.veto_volume_cb, key="widget_veto_volume_cb", on_change=update_state) veto_macd = c1_veto.toggle("Veto MACD", value=st.session_state.veto_macd_cb, key="widget_veto_macd_cb", on_change=update_state) veto_ma_slope = c2_veto.toggle("Veto MA Slope", value=st.session_state.veto_ma_slope_cb, key="widget_veto_ma_slope_cb", on_change=update_state) veto_markov = c1_veto.toggle("Veto Markov", value=st.session_state.get('veto_markov_cb', False), key="widget_veto_markov_cb", on_change=update_state) veto_factors = (veto_rsi, veto_vol, veto_trend, veto_volume, veto_macd, veto_ma_slope, veto_markov) c1_veto_btn, c2_veto_btn = st.columns(2) if c1_veto_btn.button("❌ Find Worst Long", key="find_worst_long"): st.session_state.run_confidence_optimisation = True; st.session_state.confidence_optimise_side = 'long'; st.session_state.confidence_optimise_mode = 'worst'; st.session_state.confidence_optimise_veto_factors = veto_factors; st.rerun() if c2_veto_btn.button("❌ Find Worst Short", key="find_worst_short"): st.session_state.run_confidence_optimisation = True; st.session_state.confidence_optimise_side = 'short'; st.session_state.confidence_optimise_mode = 'worst'; st.session_state.confidence_optimise_veto_factors = veto_factors; st.rerun() st.sidebar.markdown("---") st.sidebar.header("6. Find Best Weights") with st.sidebar.expander("Optimise Specific Factor Weights"): st.info("Dynamically optimises weights (0.5 to 2.0) for all active, non-primary factors. Uses other sidebar settings as fixed values.") use_sq_weighting_for_weights = st.toggle("Prioritise Profit per Trade (Weights)", value=st.session_state.sq_weights_toggle, key="widget_sq_weights_toggle", on_change=update_state) col1_w, col2_w = st.columns(2) if col1_w.button("⚖️ Find Best Long Weights", key="find_weights_long"): st.session_state.run_weight_optimisation = True; st.session_state.weight_optimise_side = 'long'; st.rerun() if col2_w.button("⚖️ Find Best Short Weights", key="find_weights_short"): st.session_state.run_weight_optimisation = True; st.session_state.weight_optimise_side = 'short'; st.rerun() st.sidebar.markdown("---") st.sidebar.header("7. Find Best Markov Setup") with st.sidebar.expander("Optimise Markov Probabilities"): st.info("Brute-forces Run-Up/Future periods to find the highest-probability trading states.") st.write("Run-Up Period (Past)") c1, c2, c3 = st.columns(3) c1.number_input("Start", 1, 100, value=st.session_state.markov_run_up_start, key='widget_markov_run_up_start', on_change=update_state) c2.number_input("End", 1, 100, value=st.session_state.markov_run_up_end, key='widget_markov_run_up_end', on_change=update_state) c3.number_input("Step", 1, 10, value=st.session_state.markov_run_up_step, key='widget_markov_run_up_step', on_change=update_state) st.write("Future Period (Prediction)") c4, c5, c6 = st.columns(3) c4.number_input("Start", 1, 100, value=st.session_state.markov_future_start, key='widget_markov_future_start', on_change=update_state) c5.number_input("End", 1, 100, value=st.session_state.markov_future_end, key='widget_markov_future_end', on_change=update_state) c6.number_input("Step", 1, 10, value=st.session_state.markov_future_step, key='widget_markov_future_step', on_change=update_state) c1_markov, c2_markov = st.columns(2) if c1_markov.button("🔮 Find Best Long Markov", key="markov_long_button"): st.session_state.run_markov_optimisation = True; st.session_state.markov_side = 'long'; st.rerun() if c2_markov.button("🔮 Find Best Short Markov", key="markov_short_button"): st.session_state.run_markov_optimisation = True; st.session_state.markov_side = 'short'; st.rerun() # --- Veto / Save Settings (Only in Developer Mode) --- st.sidebar.markdown("---") current_veto_list = st.session_state.get('veto_setup_list', []) if current_veto_list: st.sidebar.header("Veto Filter(s)") st.sidebar.success(f"{len(current_veto_list)} Veto filter(s) ACTIVE.") with st.sidebar.expander("Show Veto Filters"): st.json(current_veto_list) if st.sidebar.button("💾 Save Veto Filters as Default"): save_veto_setup(current_veto_list) if st.sidebar.button("Clear Veto Filters"): st.session_state.veto_setup_list = []; if os.path.exists(VETO_CONFIG_FILE): try: os.remove(VETO_CONFIG_FILE) except OSError as e: print(f"Error removing veto file: {e}") st.rerun() st.sidebar.markdown("---") # --- SAVE SETTINGS (Hidden in Production Mode) --- if not production_mode: if st.sidebar.button("💾 Save Settings as Default"): settings_to_save = { "large_ma_period": st.session_state.ma_period, "bband_period": st.session_state.bb_period, "bband_std_dev": st.session_state.bb_std, "confidence_threshold": st.session_state.confidence_slider, "long_entry_threshold_pct": st.session_state.long_entry / 100, "long_exit_ma_threshold_pct": st.session_state.long_exit / 100, "long_trailing_stop_loss_pct": st.session_state.long_sl / 100, "long_delay_days": st.session_state.long_delay, "short_entry_threshold_pct": st.session_state.short_entry / 100, "short_exit_ma_threshold_pct": st.session_state.short_exit / 100, "short_trailing_stop_loss_pct": st.session_state.short_sl / 100, "short_delay_days": st.session_state.short_delay, "use_rsi": st.session_state.use_rsi, "rsi_w": st.session_state.rsi_w, "rsi_logic": st.session_state.get('rsi_logic', 'Crossover'), "primary_driver": st.session_state.get('primary_driver', 'Bollinger Bands'), "exit_logic_type": st.session_state.get('exit_logic_type', 'Standard (Price-Based)'), "use_ma_floor_filter": st.session_state.use_ma_floor_filter, "catcher_stop_pct": st.session_state.catcher_stop_pct / 100.0, "exit_confidence_threshold": st.session_state.get('exit_confidence_threshold', 50), "smart_trailing_stop_pct": st.session_state.get('smart_trailing_stop_pct', 5.0), "smart_exit_atr_period": st.session_state.get('smart_exit_atr_period', 14), "smart_exit_atr_multiplier": st.session_state.get('smart_exit_atr_multiplier', 3.0), "intelligent_tsl_pct": st.session_state.intelligent_tsl_pct / 100.0, "norm_lookback_years": norm_lookback_years, "use_rolling_benchmark": use_rolling_benchmark, "benchmark_rank": benchmark_percentile_setting, "use_vol": st.session_state.use_vol, "vol_w": st.session_state.vol_w, "use_trend": st.session_state.use_trend, "trend_w": st.session_state.trend_w, "use_volume": st.session_state.use_volume, "volume_w": st.session_state.volume_w, "use_adx_filter": st.session_state.use_adx_filter, "adx_threshold": st.session_state.adx_threshold, "adx_period": st.session_state.adx_period, "use_macd": st.session_state.use_macd, "macd_w": st.session_state.macd_w, "use_ma_slope": st.session_state.use_ma_slope, "ma_slope_w": st.session_state.ma_slope_w, "use_markov": st.session_state.use_markov, "markov_w": st.session_state.markov_w, "max_trading_days": st.session_state.max_duration, "max_long_duration": st.session_state.max_long_duration, "max_short_duration": st.session_state.max_short_duration } save_settings(settings_to_save) # --- End of Sidebar Definitions --- # --- MAIN CONTENT AREA LOGIC --- with main_content_placeholder.container(): # 1. User Defined Advisor Setup UI (Developer Only) if st.session_state.get('run_user_advisor_setup'): st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None; st.session_state.single_ticker_results = None st.session_state.open_trades_df = None; st.session_state.confidence_results_df = None st.session_state.best_params = None; st.session_state.last_run_stats = None generate_user_advisor_ui_and_run(master_df) # 2. Run User Defined Advisor Scan (Used by BOTH Dev "Scan with User Setups" AND Prod "Run User-Selected Option") elif st.session_state.get('run_scan_user_setups'): st.session_state.run_scan_user_setups = False st.session_state.last_run_stats = None setups_for_scan = st.session_state.get('setups_to_scan', []) if setups_for_scan: with st.spinner("Running advisor scan..."): # Determine label based on count label = "User-Defined" if len(setups_for_scan) > 1 else "Selected-Setup" run_advisor_scan(master_df, setups_for_scan, label) else: st.warning("No setup selected for scanning.") # 3. Confidence Optimisation (Developer Only) elif st.session_state.get('run_confidence_optimisation'): st.session_state.run_advanced_advisor = False; st.session_state.run_analysis_button = False st.session_state.run_user_advisor_setup = False; st.session_state.run_scan_user_setups = False st.session_state.run_markov_optimisation = False; st.session_state.run_weight_optimisation = False st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None st.session_state.markov_results_df = None; st.session_state.summary_df = None; st.session_state.last_run_stats = None side_to_run = st.session_state.get('confidence_optimise_side', 'long') mode_to_run = st.session_state.get('confidence_optimise_mode', 'best') veto_factors_to_run = st.session_state.get('confidence_optimise_veto_factors', None) run_confidence_optimisation(side_to_run, mode_to_run, master_df, main_content_placeholder, veto_factors_to_run) # 4. Weight Optimisation (Developer Only) elif st.session_state.get('run_weight_optimisation'): st.session_state.run_advanced_advisor = False; st.session_state.run_analysis_button = False st.session_state.run_user_advisor_setup = False; st.session_state.run_scan_user_setups = False st.session_state.run_markov_optimisation = False; st.session_state.advisor_df = None st.session_state.raw_df = None; st.session_state.deduped_df = None; st.session_state.markov_results_df = None st.session_state.summary_df = None; st.session_state.confidence_results_df = None; st.session_state.last_run_stats = None side_to_run = st.session_state.get('weight_optimise_side', 'long') use_sq = st.session_state.get('sq_weights_toggle', False) generate_and_run_weight_optimisation(master_df, main_content_placeholder, side_to_run, use_sq) # 5. Parameter Optimisation Results Display (Developer Only) elif st.session_state.get('param_results_df') is not None: st.subheader("🏆 Parameter Optimisation Results") st.caption("Results are pre-sorted by Score (High to Low).") if st.session_state.get('best_params'): st.subheader("Optimal Parameters Found:") st.json(st.session_state.best_params) c_view1, c_view2 = st.columns([1, 2]) dedupe_params = c_view1.checkbox("De-duplicate (Show unique scores only)", value=True) results_df = st.session_state.param_results_df.copy() if dedupe_params: results_df = results_df.drop_duplicates(subset=['Strategy Score', 'Avg Profit/Trade'], keep='first') display_cols = ["Strategy Score", "Avg Profit/Trade", "Ticker G/B Ratio", "Total Trades", "Avg Entry Conf."] if 'max_trading_days' in results_df.columns: display_cols.append('max_trading_days') optional_cols = ["confidence_threshold", "bband_period", "large_ma_period", "long_entry_threshold_pct", "long_exit_ma_threshold_pct", "long_delay_days", "long_trailing_stop_loss_pct"] for col in optional_cols: if col in results_df.columns: if results_df[col].nunique() > 1 or st.session_state.get(f"opt_{col[:3]}_cb", True): display_cols.append(col) st.dataframe(results_df.head(100)[display_cols].style.format({ "Strategy Score": "{:.2f}%", "Avg Profit/Trade": "{:.2%}", "Ticker G/B Ratio": "{:.2f}", "Avg Entry Conf.": "{:.1f}%", "max_trading_days": "{:.0f}" }, na_rep='-')) csv_data = results_df.to_csv(index=False).encode('utf-8') st.download_button(label="⬇️ Download Full Results (CSV)", data=csv_data, file_name="parameter_optimisation_results.csv", mime="text/csv") if st.button("Close Results"): st.session_state.param_results_df = None st.rerun() # 6. Combined Optimisation (Developer Only) elif st.session_state.get('run_combined_optimisation'): st.session_state.run_advanced_advisor = False; st.session_state.run_analysis_button = False st.session_state.run_user_advisor_setup = False; st.session_state.run_scan_user_setups = False st.session_state.run_markov_optimisation = False; st.session_state.run_weight_optimisation = False st.session_state.run_confidence_optimisation = False; st.session_state.advisor_df = None st.session_state.raw_df = None; st.session_state.deduped_df = None; st.session_state.markov_results_df = None st.session_state.summary_df = None; st.session_state.confidence_results_df = None; st.session_state.last_run_stats = None side_to_run = st.session_state.get('optimise_side', 'long') generate_and_run_combined_optimisation(master_df, main_content_placeholder, side_to_run) st.session_state.run_combined_optimisation = False # 7. Markov Optimisation (Developer Only) elif st.session_state.get('run_markov_optimisation'): st.session_state.run_advanced_advisor = False; st.session_state.run_analysis_button = False st.session_state.run_user_advisor_setup = False; st.session_state.run_scan_user_setups = False st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None st.session_state.last_run_stats = None side_to_run = st.session_state.get('markov_side', 'long') generate_and_run_markov_optimisation(master_df, main_content_placeholder, side_to_run) # 8. Top Setups Advisor UI (Developer Only) elif st.session_state.get('run_advanced_advisor'): st.session_state.summary_df = None; st.session_state.single_ticker_results = None st.session_state.open_trades_df = None; st.session_state.confidence_results_df = None st.session_state.best_params = None; st.session_state.last_run_stats = None generate_advisor_report(master_df) # 9. RUN ANALYSIS (Used by BOTH Dev "Rocket" button AND Prod "Run Analysis" button) elif st.session_state.get("run_analysis_button"): st.session_state.run_analysis_button = False # Reset flag # Clear all results st.session_state.summary_df = None; st.session_state.single_ticker_results = None st.session_state.open_trades_df = None; st.session_state.confidence_results_df = None st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None st.session_state.best_params = None; st.session_state.worst_confidence_setup = None st.session_state.worst_confidence_setups_list = []; st.session_state.markov_results_df = None st.session_state.last_run_stats = None with st.spinner("Running analysis..."): # --- VETO LOGIC FIX --- if production_mode: veto_list_to_use = [] # Disable veto in simplified mode else: veto_list_to_use = st.session_state.get('veto_setup_list', []) if veto_list_to_use: st.info(f"{len(veto_list_to_use)} Veto filter(s) active.") # Gather parameters manual_params = { "large_ma_period": st.session_state.get('ma_period', 50), "bband_period": st.session_state.get('bb_period', 20), "bband_std_dev": st.session_state.get('bb_std', 2.0), "confidence_threshold": st.session_state.get('confidence_slider', 50), "long_entry_threshold_pct": st.session_state.get('long_entry', 0.0) / 100, "long_exit_ma_threshold_pct": st.session_state.get('long_exit', 0.0) / 100, "long_trailing_stop_loss_pct": st.session_state.get('long_sl', 8.0) / 100, "long_delay_days": st.session_state.get('long_delay', 0), "short_entry_threshold_pct": st.session_state.get('short_entry', 0.0) / 100, "short_exit_ma_threshold_pct": st.session_state.get('short_exit', 0.0) / 100, "short_trailing_stop_loss_pct": st.session_state.get('short_sl', 8.0) / 100, "short_delay_days": st.session_state.get('short_delay', 0), "use_ma_floor_filter": st.session_state.get('use_ma_floor_filter', True), "catcher_stop_pct": st.session_state.get('catcher_stop_pct', 0.0) / 100.0, "max_long_duration": st.session_state.get('max_long_duration', 60), "max_short_duration": st.session_state.get('max_short_duration', 10) } markov_setup_to_use = None if st.session_state.primary_driver == 'Markov State' or st.session_state.use_markov: if 'best_markov_setup' in st.session_state and st.session_state.best_markov_setup: markov_setup_to_use = st.session_state.best_markov_setup elif not production_mode: st.error("Markov State selected but no setup found. Run Section 7."); st.stop() exit_logic = st.session_state.exit_logic_type exit_thresh = st.session_state.exit_confidence_threshold smart_trailing_stop = st.session_state.smart_trailing_stop_pct / 100.0 smart_atr_p = st.session_state.smart_exit_atr_period smart_atr_m = st.session_state.smart_exit_atr_multiplier intelligent_tsl = st.session_state.intelligent_tsl_pct / 100.0 # --- NEW: Date Buffer Logic --- # We load data starting 365 days BEFORE the user selected start date # to ensure indicators (like 200MA) are fully calculated by the time the analysis starts. user_start_date = pd.Timestamp(st.session_state.start_date) end_date = pd.Timestamp(st.session_state.end_date) warmup_delta = timedelta(days=365) # Safe buffer data_load_start = user_start_date - warmup_delta # A) Single Ticker Analysis if st.session_state.run_mode == "Analyse Single Ticker": # --- [FIX START] Logic integrated from old_but_working_app.py --- selected_ticker = st.session_state.get('ticker_select', ticker_list[0] if ticker_list else None) if not selected_ticker: st.error("No ticker selected.") st.stop() # Define Start Date for Single Ticker user_start_date = pd.Timestamp(st.session_state.start_date) cols_to_use = [selected_ticker] if f'{selected_ticker}_High' in master_df.columns: cols_to_use.append(f'{selected_ticker}_High') if f'{selected_ticker}_Low' in master_df.columns: cols_to_use.append(f'{selected_ticker}_Low') if f'{selected_ticker}_Volume' in master_df.columns: cols_to_use.append(f'{selected_ticker}_Volume') existing_cols = [col for col in cols_to_use if col in master_df.columns] if selected_ticker not in existing_cols: st.error(f"Ticker '{selected_ticker}' not found.") st.stop() # [FIX] Smart Lookback Logic: Uses the wider of (User Range) or (Slider Lookback) user_start = pd.Timestamp(st.session_state.start_date) user_end = pd.Timestamp(st.session_state.end_date) # Calculate the slider-based start date slider_start = user_end - pd.DateOffset(years=norm_lookback_years) # 1. Logic: Start loading from the EARLIER of the two dates (Wider Window) effective_start = min(user_start, slider_start) # 2. Safety: Add 365 days EXTRA buffer so MA-200 is ready on Day 1 buffer_start = effective_start - pd.DateOffset(days=365) # Ensure we don't go before available data if not master_df.empty: buffer_start = max(buffer_start, master_df.index[0]) data_for_backtest = master_df.loc[buffer_start:user_end, existing_cols].copy() rename_dict = {selected_ticker: 'Close', f'{selected_ticker}_High': 'High', f'{selected_ticker}_Low': 'Low', f'{selected_ticker}_Volume': 'Volume'} rename_dict_filtered = {k: v for k, v in rename_dict.items() if k in existing_cols} data_for_backtest = data_for_backtest.rename(columns=rename_dict_filtered) if not data_for_backtest.empty and 'Close' in data_for_backtest.columns and not data_for_backtest['Close'].isna().all(): # --- [FIX END] --- long_pnl, short_pnl, avg_long_trade, avg_short_trade, results_df, trades, open_trades, trade_counts, durations, trade_dates, exit_breakdown = run_backtest( data_for_backtest, manual_params, st.session_state.use_rsi, st.session_state.use_vol, st.session_state.use_trend, st.session_state.use_volume, st.session_state.use_macd, st.session_state.use_ma_slope, st.session_state.use_markov, st.session_state.use_mfi, st.session_state.use_supertrend, st.session_state.rsi_w, st.session_state.vol_w, st.session_state.trend_w, st.session_state.volume_w, st.session_state.macd_w, st.session_state.ma_slope_w, st.session_state.markov_w, st.session_state.mfi_w, st.session_state.supertrend_w, st.session_state.use_adx_filter, st.session_state.adx_threshold, st.session_state.get('rsi_logic', 'Crossover'), st.session_state.adx_period, veto_setups_list=veto_list_to_use, primary_driver=st.session_state.primary_driver, markov_setup=markov_setup_to_use, exit_logic_type=exit_logic, exit_confidence_threshold=exit_thresh, smart_trailing_stop_pct=smart_trailing_stop, smart_exit_atr_period=smart_atr_p, smart_exit_atr_multiplier=smart_atr_m, intelligent_tsl_pct=intelligent_tsl, benchmark_rank=benchmark_percentile_setting / 100.0, analysis_start_date=user_start_date, analysis_end_date=end_date, benchmark_lookback_years=norm_lookback_years, use_rolling_benchmark=use_rolling_benchmark ) st.session_state.single_ticker_results = {"long_pnl": long_pnl, "short_pnl": short_pnl, "avg_long_trade": avg_long_trade, "avg_short_trade": avg_short_trade, "results_df": results_df, "trades": trades} st.session_state.open_trades_df = pd.DataFrame(open_trades) if open_trades else pd.DataFrame() st.session_state.exit_breakdown_totals = {'long_profit_take_count': exit_breakdown[0], 'long_tsl_count': exit_breakdown[1], 'long_time_exit_count': exit_breakdown[2], 'short_profit_take_count': exit_breakdown[3], 'short_tsl_count': exit_breakdown[4], 'short_time_exit_count': exit_breakdown[5]} else: st.warning("No data for ticker.") # B) Full List Analysis elif st.session_state.run_mode.startswith("Analyse Full List"): # [FIX] Define Start Date for Full List (Critical Fix) user_start_date = pd.Timestamp(st.session_state.start_date) summary_results, all_open_trades = [], [] total_long_wins, total_long_losses, total_short_wins, total_short_losses = 0, 0, 0, 0 all_long_durations = []; all_short_durations = [] total_exit_breakdown = [0, 0, 0, 0, 0, 0] PROFIT_THRESHOLD = 1.0; excluded_analysis_tickers = [] progress_bar = st.progress(0, text="Starting analysis...") num_tickers = len(ticker_list) for i, ticker_symbol in enumerate(ticker_list): progress_bar.progress((i + 1) / num_tickers, text=f"Analysing {ticker_symbol}...") cols_to_use = [ticker_symbol] if f'{ticker_symbol}_High' in master_df.columns: cols_to_use.append(f'{ticker_symbol}_High') if f'{ticker_symbol}_Low' in master_df.columns: cols_to_use.append(f'{ticker_symbol}_Low') if f'{ticker_symbol}_Volume' in master_df.columns: cols_to_use.append(f'{ticker_symbol}_Volume') existing_cols = [col for col in cols_to_use if col in master_df.columns] if ticker_symbol not in existing_cols: continue # [FIX] Smart Lookback Logic: Uses the wider of (User Range) or (Slider Lookback) user_start = pd.Timestamp(st.session_state.start_date) user_end = pd.Timestamp(st.session_state.end_date) slider_start = user_end - pd.DateOffset(years=norm_lookback_years) # 1. Logic: Start loading from the EARLIER of the two dates effective_start = min(user_start, slider_start) # 2. Safety: Add 365 days EXTRA buffer buffer_start = effective_start - pd.DateOffset(days=365) if not master_df.empty: buffer_start = max(buffer_start, master_df.index[0]) ticker_data_series = master_df.loc[buffer_start:user_end, existing_cols] rename_dict = {ticker_symbol: 'Close', f'{ticker_symbol}_High': 'High', f'{ticker_symbol}_Low': 'Low', f'{ticker_symbol}_Volume': 'Volume'} ticker_data_series = ticker_data_series.rename(columns={k:v for k,v in rename_dict.items() if k in existing_cols}) if not ticker_data_series.empty and 'Close' in ticker_data_series.columns and not ticker_data_series['Close'].isna().all(): long_pnl, short_pnl, avg_long_trade, avg_short_trade, results_df, trades, open_trades, trade_counts, durations, trade_dates, exit_breakdown = run_backtest( ticker_data_series, manual_params, st.session_state.use_rsi, st.session_state.use_vol, st.session_state.use_trend, st.session_state.use_volume, st.session_state.use_macd, st.session_state.use_ma_slope, st.session_state.use_markov, st.session_state.use_mfi, st.session_state.use_supertrend, st.session_state.rsi_w, st.session_state.vol_w, st.session_state.trend_w, st.session_state.volume_w, st.session_state.macd_w, st.session_state.ma_slope_w, st.session_state.markov_w, st.session_state.mfi_w, st.session_state.supertrend_w, st.session_state.use_adx_filter, st.session_state.adx_threshold, st.session_state.get('rsi_logic', 'Crossover'), st.session_state.adx_period, veto_setups_list=veto_list_to_use, primary_driver=st.session_state.primary_driver, markov_setup=markov_setup_to_use, exit_logic_type=exit_logic, exit_confidence_threshold=exit_thresh, smart_trailing_stop_pct=smart_trailing_stop, smart_exit_atr_period=smart_atr_p, smart_exit_atr_multiplier=smart_atr_m, intelligent_tsl_pct=intelligent_tsl, benchmark_rank=benchmark_percentile_setting / 100.0, analysis_start_date=user_start_date, analysis_end_date=end_date, benchmark_lookback_years=norm_lookback_years, use_rolling_benchmark=use_rolling_benchmark ) if abs(long_pnl) > PROFIT_THRESHOLD or abs(short_pnl) > PROFIT_THRESHOLD or \ (avg_long_trade is not None and pd.notna(avg_long_trade) and abs(avg_long_trade) > PROFIT_THRESHOLD) or \ (avg_short_trade is not None and pd.notna(avg_short_trade) and abs(avg_short_trade) > PROFIT_THRESHOLD): excluded_analysis_tickers.append(ticker_symbol); continue total_exit_breakdown = [sum(x) for x in zip(total_exit_breakdown, exit_breakdown)] long_wins, long_losses, short_wins, short_losses = trade_counts long_durations, short_durations = durations first_long_entry, last_long_exit, first_short_entry, last_short_exit = trade_dates total_long_wins += long_wins; total_long_losses += long_losses total_short_wins += short_wins; total_short_losses += short_losses all_long_durations.extend(long_durations); all_short_durations.extend(short_durations) long_conf = np.mean([t['confidence'] for t in trades[0] if pd.notna(t.get('confidence'))]) if trades[0] else 0 short_conf = np.mean([t['confidence'] for t in trades[2] if pd.notna(t.get('confidence'))]) if trades[2] else 0 avg_long_dur_ticker = np.mean(long_durations) if long_durations else 0 avg_short_dur_ticker = np.mean(short_durations) if short_durations else 0 summary_results.append({ "Ticker": ticker_symbol, "Cumulative Long P&L": long_pnl, "Avg Long Profit per Trade": avg_long_trade, "Num Long Trades": len(trades[0]), "Avg Long Confidence": long_conf, "Avg Long Duration (Days)": avg_long_dur_ticker, "First Long Entry": first_long_entry, "Last Long Exit": last_long_exit, "Cumulative Short P&L": short_pnl, "Avg Short Profit per Trade": avg_short_trade, "Num Short Trades": len(trades[2]), "Avg Short Confidence": short_conf, "Avg Short Duration (Days)": avg_short_dur_ticker, "First Short Entry": first_short_entry, "Last Short Exit": last_short_exit }) if open_trades: for trade in open_trades: trade['Ticker'] = ticker_symbol; all_open_trades.append(trade) progress_bar.empty() if excluded_analysis_tickers and not production_mode: st.warning(f"Excluded {len(excluded_analysis_tickers)} tickers due to unrealistic profit.") if summary_results: st.session_state.summary_df = pd.DataFrame(summary_results).set_index('Ticker') st.session_state.trade_counts = {"long_wins": total_long_wins, "long_losses": total_long_losses, "short_wins": total_short_wins, "short_losses": total_short_losses} st.session_state.trade_durations = { "avg_long_duration": np.mean(all_long_durations) if all_long_durations else 0, "max_long_duration": np.max(all_long_durations) if all_long_durations else 0, "avg_short_duration": np.mean(all_short_durations) if all_short_durations else 0, "max_short_duration": np.max(all_short_durations) if all_short_durations else 0 } st.session_state.exit_breakdown_totals = { 'long_profit_take_count': total_exit_breakdown[0], 'long_tsl_count': total_exit_breakdown[1], 'long_time_exit_count': total_exit_breakdown[2], 'short_profit_take_count': total_exit_breakdown[3], 'short_tsl_count': total_exit_breakdown[4], 'short_time_exit_count': total_exit_breakdown[5] } else: st.warning("No trades found.") st.session_state.trade_durations = {} st.session_state.exit_breakdown_totals = {} st.session_state.open_trades_df = pd.DataFrame(all_open_trades) if all_open_trades else pd.DataFrame() # 10. Display Advisor Scan Results (raw_df) elif 'raw_df' in st.session_state and st.session_state.raw_df is not None: advisor_type = st.session_state.get('advisor_type', 'Advisor') st.subheader(f"👨‍💼 {advisor_type} Advisor: Trade Signals") c_view1, c_view2 = st.columns(2) dedupe_view = c_view1.checkbox("De-duplicate trades", value=True, key="advisor_dedupe_check") filter_trades = c_view2.checkbox("Hide Empty Tickers", value=True, key="advisor_filter_check") display_df = st.session_state.deduped_df if dedupe_view else st.session_state.raw_df if display_df is not None and not display_df.empty: if filter_trades and 'Status' in display_df.columns: display_df = display_df[display_df['Status'].notna()] format_dict = { "Final % P/L": lambda x: f"{x:.2%}" if pd.notna(x) else '-', "Date Open": lambda x: x.strftime('%Y-%m-%d') if pd.notna(x) else '-', "Date Closed": lambda x: x.strftime('%Y-%m-%d') if pd.notna(x) else '-', "Start Confidence": lambda x: f"{x:.0f}%" if pd.notna(x) else '-' } if "Setup G/B Ratio" in display_df.columns: format_dict["Setup G/B Ratio"] = lambda x: f"{x:.2f}" if pd.notna(x) else '-' for col in display_df.columns: if any(x in col for x in ["Threshold", "Std Dev", "Ratio"]) and col not in format_dict: format_dict[col] = lambda x: f"{x:.2f}" if pd.notna(x) and isinstance(x, (int, float)) else x st.dataframe(display_df.style.format(format_dict, na_rep='-')) else: st.info("No trades found.") if st.button("Back to Main Analysis"): st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None st.session_state.run_advanced_advisor = False; st.session_state.run_user_advisor_setup = False st.session_state.run_scan_user_setups = False; st.rerun() # 11. Display Confidence Optimisation Results elif st.session_state.get('confidence_results_df') is not None and not st.session_state.confidence_results_df.empty: st.subheader("📊 Confidence Setup Optimisation Results") # [CHANGE] Rename the column for display display_df_conf = st.session_state.confidence_results_df.head(60).rename(columns={'Net % Return': 'Moneypile Score'}) # [CHANGE] Update formatters to match conf_formatters = { "Avg Profit/Trade": "{:.2%}", "Ticker G/B Ratio": "{:.2f}", "Trade G/B Ratio": "{:.2f}", "Moneypile Score": "{:.1f}", "Avg Entry Conf.": "{:.1f}%", "Good Score": "{:.4f}", "Bad Score": "{:.4f}", "Norm. Score %": "{:.2f}%" } if 'MACD' in display_df_conf.columns: conf_formatters['MACD'] = '{}' if 'MA Slope' in display_df_conf.columns: conf_formatters['MA Slope'] = '{}' if 'Markov' in display_df_conf.columns: conf_formatters['Markov'] = '{}' valid_conf_formatters = {k: (lambda val, fmt=v: fmt.format(val) if pd.notna(val) else '-') for k, v in conf_formatters.items() if k in display_df_conf.columns} st.dataframe(display_df_conf.style.format(valid_conf_formatters, na_rep='-')) # 12. Display Single Ticker Results # [FIX] Changed 'elif' to 'if' so this block runs immediately after analysis if st.session_state.get('single_ticker_results') is not None: res = st.session_state.single_ticker_results st.subheader(f"Results for {st.session_state.get('ticker_select')}") c1, c2, c3, c4 = st.columns(4) c1.metric("Cumulative Long P&L", f"{res.get('long_pnl', 0):.2%}") c2.metric("Avg Long Trade P&L", f"{res.get('avg_long_trade', 0):.2%}") c3.metric("Cumulative Short P&L", f"{res.get('short_pnl', 0):.2%}") c4.metric("Avg Short Trade P&L", f"{res.get('avg_short_trade', 0):.2%}") if res.get('results_df') is not None and not res['results_df'].empty: try: st.plotly_chart(generate_long_plot(res['results_df'], res['trades'], st.session_state.get('ticker_select')), use_container_width=True) st.plotly_chart(generate_short_plot(res['results_df'], res['trades'], st.session_state.get('ticker_select')), use_container_width=True) except Exception as e: st.error(f"Error generating plot: {e}") else: st.info("No chart data available.") # 13. Display Full Analysis Summary if st.session_state.get('summary_df') is not None and not st.session_state.summary_df.empty: # 1. Top Cards display_summary_analytics(st.session_state.summary_df) # 2. Histogram 1: Profit Distribution st.markdown("---") try: if 'generate_profit_distribution_chart' in globals(): dist_fig = generate_profit_distribution_chart(st.session_state.summary_df) if dist_fig: st.plotly_chart(dist_fig, use_container_width=True) except Exception: pass # 3. Histogram 2: Trades Over Time (NEW) try: if 'generate_trades_timeline_histogram' in globals() and st.session_state.get('open_trades_df') is not None: timeline_fig = generate_trades_timeline_histogram( st.session_state.open_trades_df, st.session_state.start_date, st.session_state.end_date ) if timeline_fig: st.plotly_chart(timeline_fig, use_container_width=True) except Exception as e: st.error(f"Chart Error: {e}") st.markdown("---") # 4. Results Per Ticker Table st.subheader("Results per Ticker") if st.checkbox("Only show tickers with trades", value=True): df_to_display = st.session_state.summary_df[(st.session_state.summary_df['Num Long Trades'] > 0) | (st.session_state.summary_df['Num Short Trades'] > 0)].copy() else: df_to_display = st.session_state.summary_df.copy() date_cols = ["First Long Entry", "Last Long Exit", "First Short Entry", "Last Short Exit"] for col in date_cols: if col in df_to_display.columns: df_to_display[col] = pd.to_datetime(df_to_display[col], errors='coerce').dt.strftime('%Y-%m-%d') df_to_display.fillna('-', inplace=True) st.dataframe(df_to_display.style.format({ "Cumulative Long P&L": "{:.2%}", "Avg Long Profit per Trade": "{:.2%}", "Avg Long Duration (Days)": "{:.1f}", "Cumulative Short P&L": "{:.2%}", "Avg Short Profit per Trade": "{:.2%}", "Avg Short Duration (Days)": "{:.1f}", "Avg Long Confidence": "{:.0f}%", "Avg Short Confidence": "{:.0f}%" }, na_rep='-')) if not production_mode: if st.button("💾 Add these settings to User-Defined List", key="save_setup_from_analysis", on_click=add_setup_to_user_list): pass # 5. Open Positions Table (With Filter) st.subheader("👨🏻‍💼 Open Positions & Recently Closed", help="This table displays all currently ACTIVE trades, plus any trades that closed within the last 30 days.") if st.session_state.get('open_trades_df') is not None and not st.session_state.open_trades_df.empty: full_df = st.session_state.open_trades_df.copy() # --- FILTER: Show 'Open' OR 'Closed in last 30 days' --- cutoff = pd.Timestamp.now() - pd.Timedelta(days=30) full_df['Date Closed'] = pd.to_datetime(full_df['Date Closed'], errors='coerce') mask_open = (full_df['Status'] == 'Open') mask_recent = (full_df['Status'] == 'Closed') & (full_df['Date Closed'] >= cutoff) display_open_df = full_df[mask_open | mask_recent].copy() # ------------------------------------------------------- # [FIX] Sort strictly by Date Open (Newest First) display_open_df.sort_values(by='Date Open', ascending=False, inplace=True) cols_order_manual = ['Ticker', 'Status', 'Final % P/L', 'Side', 'Date Open', 'Date Closed', 'Start Confidence'] existing_cols_open = [col for col in cols_order_manual if col in display_open_df.columns] if existing_cols_open and not display_open_df.empty: st.dataframe(display_open_df[existing_cols_open].style.format({ "Final % P/L": lambda x: f"{x:.2%}" if pd.notna(x) else '-', "Date Open": lambda x: x.strftime('%Y-%m-%d') if pd.notna(x) else '-', "Date Closed": lambda x: x.strftime('%Y-%m-%d') if pd.notna(x) else '-', "Start Confidence": lambda x: f"{x:.0f}%" if pd.notna(x) else '-' }, na_rep='-')) else: st.info("No Open or Recent trades found (older trades are hidden).") else: st.info("No trades found.") # --- NEW: SAVE RESULTS BUTTON --- st.markdown("---") st.subheader("💾 Save Analysis") current_config = { 'start_date': st.session_state.start_date, 'end_date': st.session_state.end_date, 'primary_driver': st.session_state.primary_driver, 'confidence_threshold': st.session_state.confidence_threshold, 'use_rsi': st.session_state.use_rsi, 'rsi_w': st.session_state.rsi_w, 'use_volatility': st.session_state.use_volatility, 'vol_w': st.session_state.vol_w, 'use_trend': st.session_state.use_trend, 'trend_w': st.session_state.trend_w, 'use_volume': st.session_state.use_volume, 'vol_w_val': st.session_state.vol_w_val, 'use_macd': st.session_state.use_macd, 'macd_w': st.session_state.macd_w, 'use_ma_slope': st.session_state.use_ma_slope, 'ma_slope_w': st.session_state.ma_slope_w, 'use_markov': st.session_state.use_markov, 'markov_w': st.session_state.markov_w, 'use_mfi': st.session_state.use_mfi, 'mfi_w': st.session_state.mfi_w, 'use_supertrend': st.session_state.use_supertrend, 'supertrend_w': st.session_state.supertrend_w, 'use_adx_filter': st.session_state.use_adx_filter, 'adx_threshold': st.session_state.adx_threshold, 'bband_period': st.session_state.bband_period, 'bband_std_dev': st.session_state.bband_std_dev, 'large_ma_period': st.session_state.get('large_ma_period', 50), 'long_entry_threshold_pct': st.session_state.long_entry_threshold_pct, 'short_entry_threshold_pct': st.session_state.short_entry_threshold_pct, 'exit_logic_type': st.session_state.exit_logic_type, 'smart_trailing_stop_pct': st.session_state.smart_trailing_stop_pct } # Generate CSV if 'convert_results_to_csv' in globals(): csv_data = convert_results_to_csv(st.session_state.summary_df, current_config) timestamp = datetime.now().strftime("%Y%m%d_%H%M") file_name = f"Backtest_Results_{timestamp}.csv" st.download_button( label="⬇️ Download Results & Settings as CSV", data=csv_data, file_name=file_name, mime='text/csv', use_container_width=True ) # 14. Default Message else: if not any([st.session_state.get(k) for k in ['run_advanced_advisor','run_user_advisor_setup','advisor_df','confidence_results_df','single_ticker_results','summary_df','load_message']]): st.info("Click a 'Run' button in the sidebar to start.") # 15. Footer Buttons (Hidden in Production) if not production_mode: st.markdown("---") col_load1, col_load2, col_load3 = st.columns(3) with col_load1: if 'apply_best_params_to_widgets' in locals() and st.session_state.get('best_params'): st.button("⬇️ Load Optimal Parameters", on_click=apply_best_params_to_widgets, use_container_width=True) with col_load2: if 'apply_best_weights_to_widgets' in locals() and st.session_state.get('best_weights'): st.button("⬇️ Load Optimal Weights", on_click=apply_best_weights_to_widgets, use_container_width=True) with col_load3: if st.session_state.get('worst_confidence_setups_list'): if st.button("Apply Top Worst Setups as Veto Filter", use_container_width=True): st.session_state.veto_setup_list = st.session_state.worst_confidence_setups_list st.session_state.worst_confidence_setups_list = None st.sidebar.info(f"Applying {len(st.session_state.veto_setup_list)} Veto filters.") st.rerun() col_load4 = st.columns(1)[0] with col_load4: is_markov_relevant = (st.session_state.primary_driver == 'Markov State') or st.session_state.use_markov if st.session_state.get('best_markov_setup') and is_markov_relevant: if st.button("⬇️ Load Best Markov Setup", on_click=None, use_container_width=True): st.session_state.primary_driver = "Markov State" st.session_state.use_markov = True st.sidebar.success("Best Markov setup loaded!") st.rerun() # --- LOGIC HANDLERS (Hidden execution) --- if st.session_state.run_advanced_advisor: with st.spinner("Running Optimization..."): run_optimization() st.session_state.run_advanced_advisor = False # --- CHART FUNCTION (Must be outside main) --- def generate_trades_timeline_histogram(trades_df, start_date, end_date): """ Creates a stacked histogram showing trade results over time. 4 Colors: Long Win, Long Loss, Short Win, Short Loss. """ if trades_df is None or trades_df.empty: return None # Filter by date range mask = (trades_df['Date Closed'] >= pd.to_datetime(start_date)) & (trades_df['Date Closed'] <= pd.to_datetime(end_date)) df = trades_df[mask].copy() if df.empty: return None # Categorize df = df[df['Status'] == 'Closed'] long_wins = df[(df['Side'] == 'Long') & (df['Final % P/L'] > 0)] long_loss = df[(df['Side'] == 'Long') & (df['Final % P/L'] <= 0)] short_wins = df[(df['Side'] == 'Short') & (df['Final % P/L'] > 0)] short_loss = df[(df['Side'] == 'Short') & (df['Final % P/L'] <= 0)] fig = go.Figure() # Add Stacked Traces (Shorts hidden by default via visible='legendonly') fig.add_trace(go.Histogram(x=long_wins['Date Closed'], name='Long Winners', marker_color='green')) fig.add_trace(go.Histogram(x=long_loss['Date Closed'], name='Long Losers', marker_color='red')) fig.add_trace(go.Histogram(x=short_wins['Date Closed'], name='Short Winners', marker_color='blue', visible='legendonly')) fig.add_trace(go.Histogram(x=short_loss['Date Closed'], name='Short Losers', marker_color='orange', visible='legendonly')) fig.update_layout( barmode='stack', title="Trades Over Time (Win/Loss Stacked)", xaxis_title="Date", yaxis_title="Number of Trades", height=400, template="plotly_white", legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), # [FIX] Force the X-axis range to match the user's selected date settings xaxis=dict(range=[start_date, end_date]) ) return fig if __name__ == "__main__": main()