Spaces:
Running
Running
| # New version --- | |
| import streamlit as st | |
| import pandas as pd | |
| import os | |
| import numpy as np | |
| from datetime import date, datetime | |
| import plotly.graph_objects as go | |
| import itertools | |
| import json | |
| # --- CORRECTED IMPORTS: Moved MACD --- | |
| from ta.volatility import BollingerBands | |
| from ta.momentum import RSIIndicator # Removed MACD from here | |
| from ta.trend import ADXIndicator, MACD | |
| from ta.volume import MFIIndicator # Added MFI here | |
| # --- [NEW] ADDED ATR FOR INTELLIGENT EXIT --- | |
| from ta.volatility import AverageTrueRange | |
| # --- [END NEW] --- | |
| from multiprocessing import Pool, cpu_count | |
| from functools import partial | |
| from dateutil.relativedelta import relativedelta | |
| from datetime import timedelta | |
| # --- 0. Settings Management Functions --- | |
| CONFIG_FILE = "config.json" | |
| VETO_CONFIG_FILE = "veto_config.json" | |
| TOP_SETUPS_FILE = "top_setups.json" | |
| USER_SETUPS_FILE = "user_advisor_setups.json" | |
| MARKOV_SETUP_FILE = "best_markov.json" | |
| def save_settings(params_to_save): | |
| with open(CONFIG_FILE, 'w', encoding='utf-8') as f: | |
| json.dump(params_to_save, f, indent=4) | |
| st.sidebar.success("Settings saved as default!") | |
| # --- Replacement for load_settings function --- | |
| def load_settings(): | |
| default_structure = { | |
| "large_ma_period": 70, | |
| "bband_period": 32, | |
| "bband_std_dev": 1.4, | |
| "confidence_threshold": 95, | |
| "long_entry_threshold_pct": 0.01, | |
| "long_exit_ma_threshold_pct": 0.0, | |
| "long_trailing_stop_loss_pct": 0.2, | |
| "long_delay_days": 0, | |
| "short_entry_threshold_pct": 0.0, | |
| "short_exit_ma_threshold_pct": 0.0, | |
| "short_trailing_stop_loss_pct": 0.2, | |
| "short_delay_days": 0, | |
| "use_rsi": True, "rsi_w": 1.5, | |
| "rsi_logic": "Level", | |
| "primary_driver": "Bollinger Bands", | |
| "exit_logic_type": "Intelligent (ADX/MACD/ATR)", | |
| "exit_confidence_threshold": 40, | |
| "smart_exit_atr_period": 14, | |
| "smart_exit_atr_multiplier": 3.0, | |
| "intelligent_tsl_pct": 0.2, | |
| "norm_lookback_years": 1, | |
| 'use_rolling_benchmark': False, | |
| "benchmark_rank": 99, | |
| "use_ma_floor_filter": True, | |
| "catcher_stop_pct": 0.03, | |
| "use_vol": False, "vol_w": 0.5, | |
| "use_trend": False, "trend_w": 2.0, | |
| "use_volume": False, "volume_w": 0.5, | |
| "use_adx_filter": False, "adx_threshold": 10.0, | |
| "adx_period": 14, | |
| "use_macd": False, "macd_w": 2.0, | |
| "use_ma_slope": False, "ma_slope_w": 0.5, | |
| "use_markov": False, "markov_w": 1.0, | |
| "max_trading_days": 60, | |
| "max_long_duration": 60, | |
| "max_short_duration": 3 | |
| } | |
| if os.path.exists(CONFIG_FILE): | |
| try: | |
| with open(CONFIG_FILE, 'r', encoding='utf-8') as f: | |
| loaded = json.load(f) | |
| for key in default_structure: | |
| if key in loaded: | |
| default_structure[key] = loaded[key] | |
| except (json.JSONDecodeError, Exception) as e: | |
| print(f"Error loading config.json: {e}. Using default settings.") | |
| return default_structure | |
| # --- UPDATED: Save a list of veto setups --- | |
| def save_veto_setup(veto_setups_list): # Takes a list now | |
| with open(VETO_CONFIG_FILE, 'w', encoding='utf-8') as f: | |
| # Save the list directly | |
| json.dump(veto_setups_list, f, indent=4) | |
| st.sidebar.success(f"Saved {len(veto_setups_list)} Veto filter(s) as default!") | |
| # --- UPDATED: Load a list of veto setups, robustly --- | |
| def load_veto_setup(): | |
| veto_list = [] # Default to empty list | |
| if os.path.exists(VETO_CONFIG_FILE): | |
| try: | |
| with open(VETO_CONFIG_FILE, 'r', encoding='utf-8') as f: | |
| loaded_data = json.load(f) | |
| # Ensure it's loaded as a list | |
| if isinstance(loaded_data, list): | |
| veto_list = loaded_data | |
| elif isinstance(loaded_data, dict): # Handle old single dict format | |
| veto_list = [loaded_data] | |
| except (json.JSONDecodeError, Exception) as e: | |
| print(f"Error loading veto config: {e}. Using empty list.") | |
| # Keep veto_list as empty | |
| pass | |
| return veto_list | |
| # --- UPDATED: Includes new factors, num_setups=8, removed infinity filter --- | |
| def save_top_setups(results_df, side, num_setups=8): # Default is now 8 | |
| df = results_df.copy() | |
| if df.empty: | |
| st.sidebar.warning(f"No valid setups found for {side.title()} to save.") | |
| return | |
| deduplication_cols = [ | |
| 'Conf. Threshold', 'Avg Profit/Trade', 'Ticker G/B Ratio', 'Trade G/B Ratio', | |
| 'Winning Tickers', 'Losing Tickers', 'Avg Entry Conf.', | |
| 'Good Score', 'Bad Score', 'Norm. Score %', 'Total Trades' | |
| ] | |
| factor_cols = ['RSI', 'Volatility', 'TREND', 'Volume', 'MACD', 'MA Slope'] | |
| existing_factor_cols = [col for col in factor_cols if col in df.columns] | |
| if existing_factor_cols: | |
| df['FactorsOn'] = df[existing_factor_cols].apply(lambda row: (row == 'On').sum(), axis=1) | |
| else: | |
| df['FactorsOn'] = 0 | |
| # --- CONSISTENCY FIX: Sort by the Weighted Score ("Norm. Score %") --- | |
| # If Norm. Score % exists, use it. Otherwise fall back to Strategy Score or Trade G/B Ratio. | |
| sort_col = 'Trade G/B Ratio' # Default fallback | |
| if 'Norm. Score %' in df.columns: | |
| sort_col = 'Norm. Score %' | |
| elif 'Strategy Score' in df.columns: | |
| sort_col = 'Strategy Score' | |
| if sort_col in df.columns: | |
| df[sort_col] = df[sort_col].fillna(-np.inf) | |
| # Sort by Score (Descending), then by Complexity (Ascending - simpler is better) | |
| df = df.sort_values( | |
| by=[sort_col, 'FactorsOn'], | |
| ascending=[False, True] | |
| ) | |
| else: | |
| st.sidebar.error(f"Could not sort top setups: '{sort_col}' column missing.") | |
| return | |
| existing_cols_for_dedup = [col for col in deduplication_cols if col in df.columns] | |
| deduplicated_df = df.drop_duplicates(subset=existing_cols_for_dedup, keep='first') | |
| top_setups = deduplicated_df.head(num_setups).to_dict('records') | |
| if os.path.exists(TOP_SETUPS_FILE): | |
| try: | |
| with open(TOP_SETUPS_FILE, 'r', encoding='utf-8') as f: | |
| all_top_setups = json.load(f) | |
| except json.JSONDecodeError: | |
| all_top_setups = {} | |
| else: | |
| all_top_setups = {} | |
| all_top_setups[side] = top_setups | |
| with open(TOP_SETUPS_FILE, 'w', encoding='utf-8') as f: | |
| json.dump(all_top_setups, f, indent=4) | |
| st.sidebar.success(f"Top {len(top_setups)} unique {side.title()} setups saved! (Sorted by {sort_col})") | |
| def load_top_setups(): | |
| if os.path.exists(TOP_SETUPS_FILE): | |
| try: | |
| with open(TOP_SETUPS_FILE, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| except json.JSONDecodeError: | |
| return None # Return None if file is corrupt | |
| return None | |
| # Use caching for efficiency | |
| def load_proverbs(): | |
| # Default to an empty list | |
| proverbs_list = [] | |
| if os.path.exists("proverbs.json"): | |
| try: | |
| with open("proverbs.json", 'r', encoding='utf-8') as f: | |
| loaded_data = json.load(f) | |
| # Ensure it's a list | |
| if isinstance(loaded_data, list): | |
| proverbs_list = loaded_data | |
| except (json.JSONDecodeError, Exception) as e: | |
| print(f"Error loading proverbs.json: {e}. Using fallback.") | |
| # Keep proverbs_list empty or provide a default | |
| # proverbs_list = ["Error loading proverbs."] | |
| # Add a fallback if the list is empty after trying to load | |
| if not proverbs_list: | |
| proverbs_list = ["Have a great trading day!"] # Fallback message | |
| return proverbs_list | |
| def load_user_setups(): | |
| """Loads user-defined advisor setups from a JSON file.""" | |
| # 1. Update Template to include Notes | |
| default_row_template = { | |
| "Run": False, | |
| "Notes": "", # <--- NEW FIELD FOR ADVICE | |
| "RSI": "Off", "Volatility": "Off", "TREND": "Off", "Volume": "Off", | |
| "MACD": "Off", "MA Slope": "Off", "Markov": "Off", | |
| "ADX Filter": "Off", | |
| "Conf. Threshold": 50, | |
| "Large MA Period": 50, "Bollinger Band Period": 20, "Bollinger Band Std Dev": 2.0, | |
| "Catcher Offset (%)": 3.0, | |
| "Long Entry Threshold (%)": 0.0, "Long Exit Threshold (%)": 0.0, "Long Stop Loss (%)": 8.0, "Long Delay (Days)": 0, | |
| "Short Entry Threshold (%)": 0.0, "Short Exit Threshold (%)": 0.0, "Short Stop Loss (%)": 8.0, "Short Delay (Days)": 0, | |
| "Z_Avg_Profit": 0.0, "Z_Num_Trades": 0, "Z_WL_Ratio": 0.0 | |
| } | |
| # Default Examples | |
| default_setup = [default_row_template.copy()] | |
| # Pad with defaults | |
| while len(default_setup) < 20: | |
| default_setup.append(default_row_template.copy()) | |
| if os.path.exists(USER_SETUPS_FILE): | |
| try: | |
| with open(USER_SETUPS_FILE, 'r', encoding='utf-8') as f: | |
| user_setups = json.load(f) | |
| if isinstance(user_setups, list): | |
| processed_setups = [] | |
| for setup in user_setups: | |
| full_setup = default_row_template.copy() | |
| full_setup.update(setup) # Merge loaded data | |
| processed_setups.append(full_setup) | |
| while len(processed_setups) < 20: | |
| processed_setups.append(default_row_template.copy()) | |
| return processed_setups[:20] | |
| else: | |
| return default_setup | |
| except Exception as e: | |
| print(f"Error loading {USER_SETUPS_FILE}: {e}. Using defaults.") | |
| return default_setup | |
| return default_setup | |
| # --- UPDATED: Callback to Save Setup (Added Notes) --- | |
| def add_setup_to_user_list(): | |
| try: | |
| stats = st.session_state.get('last_run_stats', {}) | |
| if not stats: | |
| st.toast("⚠️ No stats found. Run analysis first.", icon="⚠️") | |
| return | |
| def get_weight_or_off(toggle_key, weight_key): | |
| if st.session_state.get(toggle_key, False): | |
| return round(st.session_state.get(weight_key, 1.0), 2) | |
| return "Off" | |
| adx_val = "Off" | |
| if st.session_state.get("use_adx_filter", False): | |
| val = st.session_state.get("adx_threshold", 25.0) | |
| adx_val = max(20.0, min(30.0, val)) | |
| new_setup = { | |
| "Run": True, | |
| "Notes": "Auto-Saved Setup", # <--- Default note for auto-saves | |
| "RSI": get_weight_or_off('use_rsi', 'rsi_w'), | |
| "Volatility": get_weight_or_off('use_vol', 'vol_w'), | |
| "TREND": get_weight_or_off('use_trend', 'trend_w'), | |
| "Volume": get_weight_or_off('use_volume', 'volume_w'), | |
| "MACD": get_weight_or_off('use_macd', 'macd_w'), | |
| "MA Slope": get_weight_or_off('use_ma_slope', 'ma_slope_w'), | |
| "Markov": get_weight_or_off('use_markov', 'markov_w'), | |
| "ADX Filter": adx_val, | |
| "Conf. Threshold": st.session_state.confidence_slider, | |
| "Large MA Period": st.session_state.ma_period, | |
| "Bollinger Band Period": st.session_state.bb_period, | |
| "Bollinger Band Std Dev": st.session_state.bb_std, | |
| "Long Entry Threshold (%)": st.session_state.long_entry, | |
| "Long Exit Threshold (%)": st.session_state.long_exit, | |
| "Long Stop Loss (%)": st.session_state.long_sl, | |
| "Long Delay (Days)": st.session_state.long_delay, | |
| "Short Entry Threshold (%)": st.session_state.short_entry, | |
| "Short Exit Threshold (%)": st.session_state.short_exit, | |
| "Short Stop Loss (%)": st.session_state.short_sl, | |
| "Short Delay (Days)": st.session_state.short_delay, | |
| "Z_Avg_Profit": stats.get("Z_Avg_Profit", 0.0) * 100.0, | |
| "Z_Num_Trades": stats.get("Z_Num_Trades", 0), | |
| "Z_WL_Ratio": stats.get("Z_WL_Ratio", 0.0) | |
| } | |
| current_setups = st.session_state.get("user_setups_data", []) | |
| non_empty_setups = [s for s in current_setups if not is_row_blank(s)] | |
| default_row_template = {k:v for k,v in load_user_setups()[0].items()} | |
| non_empty_setups.append(new_setup) | |
| while len(non_empty_setups) < 20: non_empty_setups.append(default_row_template.copy()) | |
| final_setups = non_empty_setups[:20] | |
| save_user_setups(final_setups) | |
| # Refresh State | |
| processed_setups = [] | |
| for s in final_setups: | |
| processed_setups.append(s.copy()) # Simple copy is enough now | |
| st.session_state["user_setups_data"] = processed_setups | |
| st.toast("✅ Setup saved!", icon="✅") | |
| st.session_state.run_user_advisor_setup = True | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Could not save setup: {e}") | |
| def save_user_setups(setups_list): | |
| """Saves user-defined advisor setups to a JSON file.""" | |
| try: | |
| # Ensure we only save 20 | |
| with open(USER_SETUPS_FILE, 'w', encoding='utf-8') as f: | |
| json.dump(setups_list[:20], f, indent=4) # <-- CHANGED TO 20 | |
| st.success("User-defined setups saved!") | |
| except Exception as e: | |
| st.error(f"Error saving user setups: {e}") | |
| def save_markov_setup(setup_dict): | |
| """Saves the best Markov setup to a JSON file.""" | |
| try: | |
| with open(MARKOV_SETUP_FILE, 'w', encoding='utf-8') as f: | |
| json.dump(setup_dict, f, indent=4) | |
| st.sidebar.success("Best Markov setup saved as default!") | |
| except Exception as e: | |
| st.sidebar.error(f"Error saving Markov setup: {e}") | |
| def load_markov_setup(): | |
| """Loads the best Markov setup from a JSON file.""" | |
| if os.path.exists(MARKOV_SETUP_FILE): | |
| try: | |
| with open(MARKOV_SETUP_FILE, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| except Exception as e: | |
| print(f"Error loading {MARKOV_SETUP_FILE}: {e}") | |
| return None | |
| return None | |
| # --- UPDATED: Helper to accept filename (Replaces the old load_completed_setups) --- | |
| def load_completed_setups(filename): | |
| """Reads the existing CSV and returns a set of completed configurations.""" | |
| completed_configs = set() | |
| if os.path.exists(filename): | |
| try: | |
| # We only need to read the config columns: Factors and Weights | |
| df_completed = pd.read_csv(filename, usecols=['RSI', 'Volatility', 'TREND', 'Volume', 'MACD', 'MA Slope', 'Markov', | |
| 'RSI W', 'Volatility W', 'TREND W', 'Volume W', 'MACD W', 'MA Slope W', 'Markov W'], | |
| dtype={'RSI': str, 'Volatility': str, 'TREND': str, 'Volume': str, 'MACD': str, 'MA Slope': str, 'Markov': str}) | |
| for index, row in df_completed.iterrows(): | |
| toggles = tuple(row[['RSI', 'Volatility', 'TREND', 'Volume', 'MACD', 'MA Slope', 'Markov']].values) | |
| weights = tuple(row[['RSI W', 'Volatility W', 'TREND W', 'Volume W', 'MACD W', 'MA Slope W', 'Markov W']].values) | |
| completed_configs.add((toggles, weights)) | |
| except Exception as e: | |
| print(f"Warning: Error loading CSV for checkpointing: {e}. Starting from scratch.") | |
| return completed_configs | |
| # --- 1. Data Loading and Cleaning Functions --- | |
| # Added TTL to auto-refresh cache every 5 mins | |
| def load_all_data(folder_path): | |
| if not os.path.exists(folder_path): | |
| st.error(f"Folder '{folder_path}' not found.") | |
| return None, None | |
| all_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')] | |
| all_files.sort() # Ensure we load in chronological order (e.g. 2023, then 2024) | |
| if not all_files: | |
| st.error("No CSV files found in the 'csv_data' folder.") | |
| return None, None | |
| df_list = [] | |
| error_messages = [] | |
| for file_name in all_files: | |
| file_path = os.path.join(folder_path, file_name) | |
| try: | |
| # 1. Read CSV without parsing dates yet (load as strings to be safe) | |
| df = pd.read_csv(file_path, header=0, index_col=0, encoding='utf-8') | |
| # 2. FORCE ISO PARSING (YYYY-MM-DD) | |
| # We disable 'dayfirst' because ISO puts Year first. | |
| df.index = pd.to_datetime(df.index, format='%Y-%m-%d', errors='coerce') | |
| # 3. Drop rows with invalid dates | |
| df = df[df.index.notna()] | |
| if df.empty: | |
| error_messages.append(f"Warning: Skipped {file_name}, no valid dates found.") | |
| continue | |
| df_list.append(df) | |
| except Exception as e: | |
| error_messages.append(f"Could not read {file_name}. Error: {e}") | |
| if not df_list: | |
| return None, "No data could be loaded successfully." | |
| # 4. CONSOLIDATE | |
| try: | |
| master_df = pd.concat(df_list) | |
| # 5. DEDUPLICATE (Keep the newest version of any overlapping date) | |
| # This prevents "Double Dots" if 2024-2025.csv and 2025-NOW.csv overlap | |
| if master_df.index.has_duplicates: | |
| master_df = master_df[~master_df.index.duplicated(keep='last')] | |
| # 6. FORCE SORT (The Final "Zig-Zag" Killer) | |
| # Ensures Jan 1st always comes before Jan 2nd | |
| master_df.sort_index(inplace=True) | |
| # 7. NUMERIC CONVERSION (From your original code) | |
| # Ensures all price columns are numbers, not strings | |
| for col in master_df.columns: | |
| master_df[col] = pd.to_numeric(master_df[col], errors='coerce') | |
| for msg in error_messages: st.warning(msg) | |
| return master_df, f"Successfully combined data from {len(df_list)} files." | |
| except Exception as e: | |
| return None, f"Critical Error merging data: {e}" | |
| def clean_data_and_report_outliers(df): | |
| """ | |
| Cleans data using a 'Rolling Median' filter (User Defined: 1 Month Window). | |
| Adapts to long-term trends while catching sudden 'Pence vs Pound' glitches. | |
| """ | |
| outlier_report = [] | |
| # Identify price columns (exclude _Volume, _High, _Low) | |
| price_columns = [col for col in df.columns if not any(x in str(col) for x in ['_Volume', '_High', '_Low'])] | |
| for ticker in price_columns: | |
| if ticker in df.columns: | |
| # Ensure numeric | |
| series = pd.to_numeric(df[ticker], errors='coerce') | |
| # --- ROLLING MEDIAN FILTER (1 Month / 20 Days) --- | |
| # center=True looks at 10 days before and 10 days after (if available) to find the 'true' level. | |
| # min_periods=1 ensures it works even at the start of the file. | |
| rolling_median = series.rolling(window=20, center=True, min_periods=1).median() | |
| # 1. Catch the 'Pence vs Pounds' Crash (e.g., 154 -> 1.56) | |
| # Logic: If price is < 20% of the recent median (an 80% drop). | |
| low_threshold = rolling_median * 0.20 | |
| # 2. Catch massive data spikes (e.g., 1.56 -> 154 if logic reversed) | |
| # Logic: If price is > 5x the recent median (500% spike). | |
| high_threshold = rolling_median * 5.0 | |
| bad_data_mask = (series < low_threshold) | (series > high_threshold) | |
| bad_days = series[bad_data_mask].index | |
| if not bad_days.empty: | |
| df.loc[bad_days, ticker] = np.nan | |
| outlier_report.append({'Ticker': ticker, 'Type': 'Rolling Filter', 'Count': len(bad_days)}) | |
| return df, outlier_report | |
| # --- 2. Custom Backtesting Engine --- | |
| # --- [UPDATED] 9-Factor Version (with MFI & SuperTrend) --- | |
| def calculate_confidence_score(df, primary_driver, | |
| use_rsi, use_volatility, use_trend, use_volume, use_macd, use_ma_slope, use_markov, | |
| use_mfi, use_supertrend, | |
| rsi_w, vol_w, trend_w, vol_w_val, macd_w, ma_slope_w, markov_w, | |
| mfi_w, supertrend_w, | |
| bband_params, | |
| best_markov_setup=None): | |
| long_score = pd.Series(0.0, index=df.index) | |
| short_score = pd.Series(0.0, index=df.index) | |
| # --- Pre-calculate Markov State if needed --- | |
| if use_markov and best_markov_setup and 'RunUp_State' not in df.columns: | |
| run_up_period = best_markov_setup.get('Run-Up Period', 10) | |
| df['RunUp_Return'] = df['Close'].pct_change(periods=run_up_period) | |
| df['RunUp_State'] = df['RunUp_Return'].apply(lambda x: 'Up' if x > 0 else 'Down') | |
| # --- BBand Factor --- | |
| if primary_driver != 'Bollinger Bands' and 'bband_lower' in df.columns: | |
| bb_weight = 1.0 | |
| long_entry_pct = bband_params.get('long_entry_threshold_pct', 0.0) | |
| short_entry_pct = bband_params.get('short_entry_threshold_pct', 0.0) | |
| long_bb_trigger_price = df['bband_lower'] * (1 - long_entry_pct) | |
| short_bb_trigger_price = df['bband_upper'] * (1 + short_entry_pct) | |
| long_score_range = (df['large_ma'] - long_bb_trigger_price).replace(0, np.nan) | |
| short_score_range = (short_bb_trigger_price - df['large_ma']).replace(0, np.nan) | |
| long_score += ((df['large_ma'] - df['Close']) / long_score_range).clip(0, 1).fillna(0) * bb_weight | |
| short_score += ((df['Close'] - df['large_ma']) / short_score_range).clip(0, 1).fillna(0) * bb_weight | |
| # Factor 1: RSI | |
| if primary_driver != 'RSI Crossover' and use_rsi and 'RSI' in df.columns: | |
| long_score += ((30 - df['RSI']) / 30).clip(0, 1).fillna(0) * rsi_w | |
| short_score += ((df['RSI'] - 70) / 30).clip(0, 1).fillna(0) * rsi_w | |
| # Factor 2: Volatility | |
| if use_volatility and 'Volatility_p' in df.columns: | |
| vol_signal = (df['Volatility_p'] > 0.025).astype(float) * vol_w | |
| long_score += vol_signal; short_score += vol_signal | |
| # Factor 3: Trend | |
| if use_trend and 'SMA_200' in df.columns and 'Close' in df.columns: | |
| valid_sma = (df['SMA_200'] != 0) & df['SMA_200'].notna() | |
| pct_dist = pd.Series(0.0, index=df.index) | |
| pct_dist.loc[valid_sma] = df.loc[valid_sma].apply(lambda row: (row['Close'] - row['SMA_200']) / row['SMA_200'] if row['SMA_200'] != 0 else 0, axis=1) | |
| long_score += (pct_dist / 0.10).clip(0, 1).fillna(0) * trend_w | |
| short_score += (-pct_dist / 0.10).clip(0, 1).fillna(0) * trend_w | |
| # Factor 4: Volume | |
| if use_volume and 'Volume_Ratio' in df.columns: | |
| vol_spike_signal = ((df['Volume_Ratio'] - 1.75) / 2.25).clip(0, 1).fillna(0) * vol_w_val | |
| long_score += vol_spike_signal; short_score += vol_spike_signal | |
| # Factor 5: MACD | |
| if primary_driver != 'MACD Crossover' and use_macd and 'MACD_line' in df.columns: | |
| macd_cross_long = (df['MACD_line'].shift(1) < df['MACD_signal'].shift(1)) & (df['MACD_line'] >= df['MACD_signal']) | |
| macd_cross_short = (df['MACD_line'].shift(1) > df['MACD_signal'].shift(1)) & (df['MACD_line'] <= df['MACD_signal']) | |
| long_score += macd_cross_long.astype(float) * macd_w * 0.6 | |
| short_score += macd_cross_short.astype(float) * macd_w * 0.6 | |
| long_score += (df['MACD_hist'] > 0).astype(float) * macd_w * 0.4 | |
| short_score += (df['MACD_hist'] < 0).astype(float) * macd_w * 0.4 | |
| # Factor 6: MA Slope | |
| if primary_driver != 'MA Slope' and use_ma_slope and 'ma_slope' in df.columns: | |
| long_score += (df['ma_slope'] > 0).astype(float) * ma_slope_w | |
| short_score += (df['ma_slope'] < 0).astype(float) * ma_slope_w | |
| # Factor 7: Markov State | |
| if primary_driver != 'Markov State' and use_markov and best_markov_setup and 'RunUp_State' in df.columns: | |
| strategy = best_markov_setup.get('Strategy') | |
| if strategy == 'Down -> Up': long_score += (df['RunUp_State'] == 'Down').astype(float) * markov_w | |
| elif strategy == 'Up -> Up': long_score += (df['RunUp_State'] == 'Up').astype(float) * markov_w | |
| elif strategy == 'Up -> Down': short_score += (df['RunUp_State'] == 'Up').astype(float) * markov_w | |
| elif strategy == 'Down -> Down': short_score += (df['RunUp_State'] == 'Down').astype(float) * markov_w | |
| # Factor 8: MFI (Money Flow Index) [IMPROVED] | |
| if use_mfi and 'MFI' in df.columns: | |
| # Binary Logic: If MFI < 20 (Oversold), give full weight. | |
| # This fixes the "tiny score" issue. | |
| long_score += (df['MFI'] < 25).astype(float) * mfi_w | |
| short_score += (df['MFI'] > 75).astype(float) * mfi_w | |
| # Factor 9: SuperTrend [IMPROVED] | |
| if use_supertrend and 'SuperTrend' in df.columns: | |
| # Long Score: Price > SuperTrend (Trend is Bullish) | |
| long_score += (df['Close'] > df['SuperTrend']).astype(float) * supertrend_w | |
| # Short Score: Price < SuperTrend (Trend is Bearish) | |
| short_score += (df['Close'] < df['SuperTrend']).astype(float) * supertrend_w | |
| return long_score.fillna(0), short_score.fillna(0) | |
| # --- FULL REPLACEMENT FOR run_backtest FUNCTION --- | |
| def run_backtest(data, params, | |
| use_rsi, use_volatility, use_trend, use_volume, use_macd, use_ma_slope, use_markov, use_mfi, use_supertrend, # Added MFI/ST | |
| rsi_w, vol_w, trend_w, vol_w_val, macd_w, ma_slope_w, markov_w, mfi_w, supertrend_w, # Added MFI/ST weights | |
| use_adx_filter, adx_threshold, rsi_logic, | |
| adx_period=14, | |
| veto_setups_list=None, | |
| primary_driver='Bollinger Bands', | |
| markov_setup=None, | |
| exit_logic_type='Standard (Price-Based)', | |
| exit_confidence_threshold=50, | |
| smart_trailing_stop_pct=0.05, | |
| long_score_95_percentile=None, | |
| short_score_95_percentile=None, | |
| benchmark_rank=0.95, | |
| smart_exit_atr_period=14, | |
| smart_exit_atr_multiplier=3.0, | |
| intelligent_tsl_pct=1.0, | |
| analysis_start_date=None, | |
| analysis_end_date=None, | |
| benchmark_lookback_years=None, | |
| use_rolling_benchmark=False): | |
| df = data.copy() | |
| required_cols = ['Close'] | |
| if 'High' not in df.columns: df['High'] = df['Close'] | |
| if 'Low' not in df.columns: df['Low'] = df['Close'] | |
| required_cols.extend(['High', 'Low']) | |
| if 'Volume' in df.columns: required_cols.append('Volume') | |
| df['Close'] = pd.to_numeric(df['Close'], errors='coerce').replace(0, np.nan) | |
| df.dropna(subset=['Close'], inplace=True) | |
| min_ma_period = params.get('large_ma_period', 50) | |
| min_lookback_needed = min_ma_period | |
| if (primary_driver == 'Markov State' or use_markov) and markov_setup is not None: | |
| min_lookback_needed = max(min_ma_period, markov_setup.get('Run-Up Period', 10)) | |
| if len(df) < min_lookback_needed or len(df) < params.get('bband_period', 20) or len(df) < 30: | |
| return 0, 0, 0.0, 0.0, None, ([], [], [], []), [], (0, 0, 0, 0), ([], []), (None, None, None, None), (0, 0, 0, 0, 0, 0) | |
| # --- Indicator Calculations --- | |
| df['large_ma'] = df['Close'].rolling(window=min_ma_period).mean().ffill() | |
| df['ma_slope'] = df['large_ma'].diff(periods=3).ffill() | |
| bband_period = params.get('bband_period', 20) | |
| if len(df) >= bband_period: | |
| try: | |
| indicator_bb = BollingerBands(close=df['Close'], window=bband_period, window_dev=params['bband_std_dev']) | |
| df['bband_lower'] = indicator_bb.bollinger_lband() | |
| df['bband_upper'] = indicator_bb.bollinger_hband() | |
| except Exception: df['bband_lower'], df['bband_upper'] = np.nan, np.nan | |
| else: df['bband_lower'], df['bband_upper'] = np.nan, np.nan | |
| if len(df) >= 14: | |
| try: | |
| indicator_rsi = RSIIndicator(close=df['Close'], window=14) | |
| df['RSI'] = indicator_rsi.rsi() | |
| except Exception: df['RSI'] = np.nan | |
| df['Volatility_p'] = df['Close'].pct_change().rolling(window=14).std() | |
| try: | |
| indicator_adx = ADXIndicator(high=df['High'], low=df['Low'], close=df['Close'], window=adx_period, fillna=True) | |
| df['ADX'] = indicator_adx.adx().ffill() | |
| except Exception: df['ADX'] = np.nan | |
| try: | |
| atr_window = int(smart_exit_atr_period) | |
| indicator_atr = AverageTrueRange(high=df['High'], low=df['Low'], close=df['Close'], window=atr_window, fillna=True) | |
| df['ATR'] = indicator_atr.average_true_range() | |
| except Exception: df['ATR'] = np.nan | |
| else: | |
| df['RSI'], df['Volatility_p'], df['ADX'], df['ATR'] = np.nan, np.nan, np.nan, np.nan | |
| # --- [NEW] MFI & SUPERTREND CALCULATIONS (Fixed) --- | |
| if len(df) >= 14: | |
| # 1. MFI Calculation | |
| try: | |
| mfi_ind = MFIIndicator(high=df['High'], low=df['Low'], close=df['Close'], volume=df['Volume'], window=14, fillna=True) | |
| df['MFI'] = mfi_ind.money_flow_index() | |
| except Exception: df['MFI'] = 50.0 | |
| # 2. Proper Recursive SuperTrend Calculation | |
| try: | |
| st_period = 10 | |
| st_multiplier = 3.0 | |
| # Calculate ATR | |
| high_low = df['High'] - df['Low'] | |
| high_close = np.abs(df['High'] - df['Close'].shift()) | |
| low_close = np.abs(df['Low'] - df['Close'].shift()) | |
| ranges = pd.concat([high_low, high_close, low_close], axis=1) | |
| true_range = np.max(ranges, axis=1) | |
| atr = true_range.rolling(st_period).mean().fillna(0) | |
| # Basic Bands | |
| hl2 = (df['High'] + df['Low']) / 2 | |
| basic_upper = hl2 + (st_multiplier * atr) | |
| basic_lower = hl2 - (st_multiplier * atr) | |
| # Initialize Final Bands | |
| final_upper = basic_upper.copy() | |
| final_lower = basic_lower.copy() | |
| trend = np.zeros(len(df), dtype=int) | |
| supertrend = np.zeros(len(df)) | |
| # Recursive Loop (Accurate Logic) | |
| close = df['Close'].values | |
| bu = basic_upper.values | |
| bl = basic_lower.values | |
| fu = final_upper.values | |
| fl = final_lower.values | |
| # 1 = Uptrend, -1 = Downtrend | |
| curr_trend = 1 | |
| for i in range(1, len(df)): | |
| # Calculate Final Upper Band | |
| if bu[i] < fu[i-1] or close[i-1] > fu[i-1]: | |
| fu[i] = bu[i] | |
| else: | |
| fu[i] = fu[i-1] | |
| # Calculate Final Lower Band | |
| if bl[i] > fl[i-1] or close[i-1] < fl[i-1]: | |
| fl[i] = bl[i] | |
| else: | |
| fl[i] = fl[i-1] | |
| # Determine Trend | |
| if curr_trend == 1 and close[i] < fl[i]: | |
| curr_trend = -1 | |
| elif curr_trend == -1 and close[i] > fu[i]: | |
| curr_trend = 1 | |
| trend[i] = curr_trend | |
| supertrend[i] = fl[i] if curr_trend == 1 else fu[i] | |
| df['SuperTrend'] = supertrend | |
| except Exception as e: | |
| # print(f"ST Error: {e}") # Debug if needed | |
| df['SuperTrend'] = np.nan | |
| else: | |
| df['MFI'], df['SuperTrend'] = 50.0, np.nan | |
| df['SMA_200'] = df['Close'].rolling(window=200, min_periods=1).mean() | |
| if 'Volume' in df.columns: | |
| df['Volume'] = pd.to_numeric(df['Volume'], errors='coerce').fillna(0) | |
| df['Volume_MA50'] = df['Volume'].rolling(window=50, min_periods=1).mean() | |
| df['Volume_Ratio'] = df.apply(lambda row: row['Volume'] / row['Volume_MA50'] if row['Volume_MA50'] > 0 else 0, axis=1) | |
| df['Volume_Ratio'] = df['Volume_Ratio'].replace([np.inf, -np.inf], 0).fillna(0) | |
| else: df['Volume_Ratio'] = 0.0 | |
| if len(df) >= 26: | |
| indicator_macd = MACD(close=df['Close'], window_slow=26, window_fast=12, window_sign=9, fillna=True) | |
| df['MACD_line'] = indicator_macd.macd().ffill() | |
| df['MACD_signal'] = indicator_macd.macd_signal().ffill() | |
| df['MACD_hist'] = indicator_macd.macd_diff().ffill() | |
| else: df['MACD_line'], df['MACD_signal'], df['MACD_hist'] = np.nan, np.nan, np.nan | |
| if (primary_driver == 'Markov State' or use_markov) and markov_setup is not None: | |
| run_up_period = markov_setup.get('Run-Up Period', 10) | |
| df['RunUp_Return'] = df['Close'].pct_change(periods=run_up_period) | |
| df['RunUp_State'] = df['RunUp_Return'].apply(lambda x: 'Up' if x > 0 else 'Down') | |
| bband_params_for_score = { | |
| 'long_entry_threshold_pct': params.get('long_entry_threshold_pct', 0.0), | |
| 'short_entry_threshold_pct': params.get('short_entry_threshold_pct', 0.0) | |
| } | |
| raw_long_score, raw_short_score = calculate_confidence_score(df, | |
| primary_driver, | |
| use_rsi, use_volatility, use_trend, use_volume, use_macd, use_ma_slope, use_markov, | |
| use_mfi, use_supertrend, # <--- NEW | |
| rsi_w, vol_w, trend_w, vol_w_val, macd_w, ma_slope_w, markov_w, | |
| mfi_w, supertrend_w, # <--- NEW | |
| bband_params_for_score, | |
| best_markov_setup=markov_setup | |
| ) | |
| # 1. GLOBAL (STATIC) MODE - "The Crystal Ball" | |
| # Used for "Elite Filtering" of open trades. | |
| if not use_rolling_benchmark: | |
| if long_score_95_percentile is None: | |
| long_scores_gt_zero = raw_long_score[raw_long_score > 0] | |
| val = long_scores_gt_zero.quantile(benchmark_rank) if not long_scores_gt_zero.empty else 1.0 | |
| long_95 = pd.Series(val, index=df.index) | |
| else: | |
| long_95 = pd.Series(long_score_95_percentile, index=df.index) | |
| if short_score_95_percentile is None: | |
| short_scores_gt_zero = raw_short_score[raw_short_score > 0] | |
| val = short_scores_gt_zero.quantile(benchmark_rank) if not short_scores_gt_zero.empty else 1.0 | |
| short_95 = pd.Series(val, index=df.index) | |
| else: | |
| short_95 = pd.Series(short_score_95_percentile, index=df.index) | |
| # 2. ADAPTIVE (ROLLING) MODE - "Real World" | |
| # Used for realistic historical simulation. | |
| else: | |
| if long_score_95_percentile is None: | |
| # Default to 1 year if lookback is missing | |
| years = benchmark_lookback_years if (benchmark_lookback_years is not None and benchmark_lookback_years > 0) else 1 | |
| window_days = int(years * 252) | |
| # Calculate Rolling Percentile | |
| long_95 = raw_long_score.rolling(window=window_days, min_periods=50).quantile(benchmark_rank).fillna(method='bfill') | |
| else: | |
| long_95 = pd.Series(long_score_95_percentile, index=df.index) | |
| if short_score_95_percentile is None: | |
| years = benchmark_lookback_years if (benchmark_lookback_years is not None and benchmark_lookback_years > 0) else 1 | |
| window_days = int(years * 252) | |
| short_95 = raw_short_score.rolling(window=window_days, min_periods=50).quantile(benchmark_rank).fillna(method='bfill') | |
| else: | |
| short_95 = pd.Series(short_score_95_percentile, index=df.index) | |
| # Safety: Ensure we don't divide by zero | |
| long_95 = long_95.replace(0, 1.0) | |
| short_95 = short_95.replace(0, 1.0) | |
| # Final Calculation (Removing 'if' check to avoid Series Ambiguity Error) | |
| df['long_confidence_score'] = (raw_long_score / long_95 * 100).clip(0, 100).fillna(0.0) | |
| df['short_confidence_score'] = (raw_short_score / short_95 * 100).clip(0, 100).fillna(0.0) | |
| apply_veto = bool(veto_setups_list) | |
| if apply_veto: | |
| df['any_long_veto_trigger'] = False; df['any_short_veto_trigger'] = False | |
| for veto_setup in veto_setups_list: | |
| veto_threshold = veto_setup.get('Conf. Threshold', 0) | |
| veto_rsi_on = veto_setup.get('RSI') == 'On'; veto_vol_on = veto_setup.get('Volatility') == 'On' | |
| veto_trend_on = veto_setup.get('TREND') == 'On'; veto_volume_on = veto_setup.get('Volume') == 'On' | |
| veto_macd_on = veto_setup.get('MACD', 'Off') == 'On'; veto_ma_slope_on = veto_setup.get('MA Slope', 'Off') == 'On' | |
| veto_markov_on = False | |
| veto_long_raw, veto_short_raw = calculate_confidence_score(df, | |
| 'Bollinger Bands', | |
| veto_rsi_on, veto_vol_on, veto_trend_on, veto_volume_on, veto_macd_on, veto_ma_slope_on, veto_markov_on, | |
| 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, | |
| bband_params_for_score, | |
| best_markov_setup=markov_setup | |
| ) | |
| veto_long_norm = (veto_long_raw / 3.0) * 100 | |
| current_long_veto_trigger = (veto_long_norm >= veto_threshold); current_short_veto_trigger = (veto_long_norm >= veto_threshold) | |
| df['any_long_veto_trigger'] |= current_long_veto_trigger; df['any_short_veto_trigger'] |= current_long_veto_trigger | |
| # --- Entry Trigger Logic --- | |
| bb_long_signal = df['Close'] < (df['bband_lower'] * (1 - params['long_entry_threshold_pct'])) | |
| bb_short_signal = df['Close'] > (df['bband_upper'] * (1 + params['short_entry_threshold_pct'])) | |
| if rsi_logic == "Crossover": | |
| rsi_long_signal = (df['RSI'].shift(1) < 30) & (df['RSI'] >= 30) & df['RSI'].notna() | |
| rsi_short_signal = (df['RSI'].shift(1) > 70) & (df['RSI'] <= 70) & df['RSI'].notna() | |
| else: # Level | |
| rsi_long_signal = (df['RSI'] <= 30) & df['RSI'].notna() | |
| rsi_short_signal = (df['RSI'] >= 70) & df['RSI'].notna() | |
| macd_long_signal = (df['MACD_line'].shift(1) < df['MACD_signal'].shift(1)) & (df['MACD_line'] >= df['MACD_signal']) | |
| macd_short_signal = (df['MACD_line'].shift(1) > df['MACD_signal'].shift(1)) & (df['MACD_line'] <= df['MACD_signal']) | |
| ma_slope_long_signal = (df['ma_slope'].shift(1) <= 0) & (df['ma_slope'] > 0) | |
| ma_slope_short_signal = (df['ma_slope'].shift(1) >= 0) & (df['ma_slope'] < 0) | |
| if primary_driver == 'RSI Crossover': | |
| base_long_trigger = rsi_long_signal; base_short_trigger = rsi_short_signal | |
| elif primary_driver == 'MACD Crossover': | |
| base_long_trigger = macd_long_signal; base_short_trigger = macd_short_signal | |
| elif primary_driver == 'MA Slope': | |
| base_long_trigger = ma_slope_long_signal; base_short_trigger = ma_slope_short_signal | |
| elif primary_driver == 'Markov State' and markov_setup is not None: | |
| strategy = markov_setup.get('Strategy') | |
| if strategy == 'Down -> Up': | |
| base_long_trigger = (df['RunUp_State'] == 'Down'); base_short_trigger = pd.Series(False, index=df.index) | |
| elif strategy == 'Up -> Up': | |
| base_long_trigger = (df['RunUp_State'] == 'Up'); base_short_trigger = pd.Series(False, index=df.index) | |
| elif strategy == 'Up -> Down': | |
| base_long_trigger = pd.Series(False, index=df.index); base_short_trigger = (df['RunUp_State'] == 'Up') | |
| elif strategy == 'Down -> Down': | |
| base_long_trigger = pd.Series(False, index=df.index); base_short_trigger = (df['RunUp_State'] == 'Down') | |
| else: | |
| base_long_trigger = pd.Series(False, index=df.index); base_short_trigger = pd.Series(False, index=df.index) | |
| else: | |
| base_long_trigger = bb_long_signal; base_short_trigger = bb_short_signal | |
| if use_adx_filter and 'ADX' in df.columns and df['ADX'].notna().any(): | |
| adx_allows_entry = (df['ADX'] < adx_threshold).fillna(False) | |
| else: adx_allows_entry = True | |
| ma_is_valid = pd.notna(df['large_ma']) | |
| # [SURGICAL PATCH START] -------------------------------------------------- | |
| # Check if ALL confidence indicators are disabled | |
| # CORRECTED: Added use_mfi and use_supertrend to this list so the score isn't ignored when they are On. | |
| all_indicators_off = not any([ | |
| use_rsi, use_volatility, use_trend, use_volume, use_macd, use_ma_slope, use_markov, | |
| use_mfi, use_supertrend | |
| ]) | |
| # If indicators are OFF, we bypass the confidence check (allow raw signal) | |
| # Otherwise, we enforce the confidence threshold as usual | |
| long_entry_trigger = base_long_trigger & adx_allows_entry & ((df['long_confidence_score'] >= params['confidence_threshold']) | all_indicators_off) & ma_is_valid | |
| short_entry_trigger = base_short_trigger & adx_allows_entry & ((df['short_confidence_score'] >= params['confidence_threshold']) | all_indicators_off) & ma_is_valid | |
| # [SURGICAL PATCH END] ---------------------------------------------------- | |
| # [FIX] Filter entries by BOTH Start and End date | |
| if analysis_start_date is not None: | |
| start_mask = df.index >= pd.Timestamp(analysis_start_date) | |
| long_entry_trigger &= start_mask | |
| short_entry_trigger &= start_mask | |
| if analysis_end_date is not None: | |
| end_mask = df.index <= pd.Timestamp(analysis_end_date) | |
| long_entry_trigger &= end_mask | |
| short_entry_trigger &= end_mask | |
| if apply_veto: | |
| long_entry_trigger &= ~df['any_long_veto_trigger'] | |
| short_entry_trigger &= ~df['any_short_veto_trigger'] | |
| potential_long_price_exit = (df['Close'] >= (df['large_ma'] * (1 + params['long_exit_ma_threshold_pct']))) | (df['Close'] >= df['bband_upper']) | |
| potential_short_price_exit = (df['Close'] <= (df['large_ma'] * (1 - params['short_exit_ma_threshold_pct']))) | (df['Close'] <= df['bband_lower']) | |
| long_entry_prices = df['Close'].where(long_entry_trigger).ffill() | |
| short_entry_prices = df['Close'].where(short_entry_trigger).ffill() | |
| if exit_logic_type == 'Intelligent (ADX/MACD/ATR)': | |
| adx_rising = (df['ADX'] > df['ADX'].shift(1)).fillna(False) | |
| adx_strong = (df['ADX'] > adx_threshold) | |
| macd_bullish = (df['MACD_line'] > df['MACD_signal']) | |
| stay_in_long_trade = (adx_rising | adx_strong) & macd_bullish | |
| base_long_exit_trigger = potential_long_price_exit & (~stay_in_long_trade) | |
| macd_bearish = (df['MACD_line'] < df['MACD_signal']) | |
| stay_in_short_trade = (adx_rising | adx_strong) & macd_bearish | |
| base_short_exit_trigger = potential_short_price_exit & (~stay_in_short_trade) | |
| else: | |
| long_is_in_profit = (df['Close'] > long_entry_prices).fillna(False) | |
| short_is_in_profit = (df['Close'] < short_entry_prices).fillna(False) | |
| base_long_exit_trigger = potential_long_price_exit & long_is_in_profit | |
| base_short_exit_trigger = potential_short_price_exit & short_is_in_profit | |
| df['long_signal'] = np.nan; df.loc[long_entry_trigger, 'long_signal'] = 1; df.loc[base_long_exit_trigger, 'long_signal'] = 0 | |
| df['short_signal'] = np.nan; df.loc[short_entry_trigger, 'short_signal'] = -1; df.loc[base_short_exit_trigger, 'short_signal'] = 0 | |
| if primary_driver == 'Markov State' and markov_setup is not None: | |
| limit_long = markov_setup.get('Future Period', 5); limit_short = markov_setup.get('Future Period', 5) | |
| else: | |
| limit_long = params.get('max_long_duration', 120); limit_short = params.get('max_short_duration', 120) | |
| temp_long_pos = df['long_signal'].ffill().fillna(0) | |
| temp_short_pos = df['short_signal'].ffill().fillna(0) | |
| long_groups = (~(temp_long_pos == 1)).cumsum(); df['days_in_long_trade'] = df.groupby(long_groups).cumcount(); df.loc[temp_long_pos == 0, 'days_in_long_trade'] = 0 | |
| short_groups = (~(temp_short_pos == -1)).cumsum(); df['days_in_short_trade'] = df.groupby(short_groups).cumcount(); df.loc[temp_short_pos == 0, 'days_in_short_trade'] = 0 | |
| long_time_exit_trigger = (df['days_in_long_trade'] > limit_long) & (temp_long_pos == 1) | |
| short_time_exit_trigger = (df['days_in_short_trade'] > limit_short) & (temp_short_pos == -1) | |
| df.loc[long_time_exit_trigger, 'long_signal'] = 0; df.loc[short_time_exit_trigger, 'short_signal'] = 0 | |
| df['long_entry_price_static'] = df['Close'].where(df['long_signal'].shift(1) == 0).ffill().bfill() | |
| df['short_entry_price_static'] = df['Close'].where(df['short_signal'].shift(1) == 0).ffill().bfill() | |
| # [SURGICAL UPDATE START] --- Catcher Offset Logic --- | |
| # We retrieve the offset percentage from params (default to 0.0 if missing) | |
| catcher_offset = params.get('catcher_stop_pct', 0.0) | |
| # Calculate the Adjusted Floor/Ceiling | |
| # Positive Offset = Higher Floor (Profit for Long, Profit for Short) | |
| # Negative Offset = Lower Floor (Loss allowance for Long, Loss allowance for Short) | |
| long_breakeven_floor = df['long_entry_price_static'] * (1 + catcher_offset) | |
| short_breakeven_floor = df['short_entry_price_static'] * (1 - catcher_offset) | |
| # [SURGICAL UPDATE END] ----------------------------- | |
| use_ma_floor_filter = params.get('use_ma_floor_filter', False) | |
| standard_tsl_pct_long = params.get('long_trailing_stop_loss_pct', 0) | |
| standard_tsl_pct_short = params.get('short_trailing_stop_loss_pct', 0) | |
| intelligent_tsl_pct_long = intelligent_tsl_pct if exit_logic_type == 'Intelligent (ADX/MACD/ATR)' else 0.0 | |
| intelligent_tsl_pct_short = intelligent_tsl_pct if exit_logic_type == 'Intelligent (ADX/MACD/ATR)' else 0.0 | |
| long_tsl_exit = pd.Series(False, index=df.index); short_tsl_exit = pd.Series(False, index=df.index) | |
| if primary_driver != 'Markov State': | |
| # --- LONG TSL --- | |
| has_hit_target = potential_long_price_exit | |
| # Determine which TSL percentage to use (Standard vs Intelligent) | |
| tsl_to_use_long = np.where(has_hit_target, intelligent_tsl_pct_long, standard_tsl_pct_long) if use_ma_floor_filter else (intelligent_tsl_pct_long if exit_logic_type == 'Intelligent (ADX/MACD/ATR)' else standard_tsl_pct_long) | |
| if not isinstance(tsl_to_use_long, pd.Series): tsl_to_use_long = pd.Series(tsl_to_use_long, index=df.index) | |
| # Only run calculation if TSL is active (> 0) | |
| if (tsl_to_use_long > 0).any(): | |
| in_long_trade = (df['long_signal'].ffill().fillna(0) == 1) | |
| long_high_water_mark = df['High'].where(in_long_trade).groupby((~in_long_trade).cumsum()).cummax() | |
| tsl_from_hwm = long_high_water_mark * (1 - tsl_to_use_long) | |
| # [FIX] Apply Catcher floor ONLY if target has been hit (Persistent during trade) | |
| if use_ma_floor_filter: | |
| trade_groups = (~in_long_trade).cumsum() | |
| # [FIX] Cast to float before cummax to avoid Object dtype error | |
| target_hit_persistent = has_hit_target.where(in_long_trade).astype(float).groupby(trade_groups).cummax().fillna(0).astype(bool) | |
| long_tsl_price = np.where(target_hit_persistent, np.maximum(tsl_from_hwm, long_breakeven_floor), tsl_from_hwm) | |
| else: | |
| long_tsl_price = tsl_from_hwm | |
| long_tsl_exit = in_long_trade & (df['Close'] < long_tsl_price) | |
| df.loc[long_tsl_exit, 'long_signal'] = 0 | |
| # --- SHORT TSL --- | |
| has_hit_target = potential_short_price_exit | |
| # Determine which TSL percentage to use (Standard vs Intelligent) | |
| tsl_to_use_short = np.where(has_hit_target, intelligent_tsl_pct_short, standard_tsl_pct_short) if use_ma_floor_filter else (intelligent_tsl_pct_short if exit_logic_type == 'Intelligent (ADX/MACD/ATR)' else standard_tsl_pct_short) | |
| if not isinstance(tsl_to_use_short, pd.Series): tsl_to_use_short = pd.Series(tsl_to_use_short, index=df.index) | |
| # Only run calculation if TSL is active (> 0) | |
| if (tsl_to_use_short > 0).any(): | |
| in_short_trade = (df['short_signal'].ffill().fillna(0) == -1) | |
| short_low_water_mark = df['Low'].where(in_short_trade).groupby((~in_short_trade).cumsum()).cummin() | |
| tsl_from_lwm = short_low_water_mark * (1 + tsl_to_use_short) | |
| # [FIX] Apply Catcher floor ONLY if target has been hit (Persistent during trade) | |
| if use_ma_floor_filter: | |
| trade_groups = (~in_short_trade).cumsum() | |
| # [FIX] Cast to float before cummax to avoid Object dtype error | |
| target_hit_persistent = has_hit_target.where(in_short_trade).astype(float).groupby(trade_groups).cummax().fillna(0).astype(bool) | |
| short_tsl_price = np.where(target_hit_persistent, np.minimum(tsl_from_lwm, short_breakeven_floor), tsl_from_lwm) | |
| else: | |
| short_tsl_price = tsl_from_lwm | |
| short_tsl_exit = in_short_trade & (df['Close'] > short_tsl_price) | |
| df.loc[short_tsl_exit, 'short_signal'] = 0 | |
| df['long_position'] = df['long_signal'].ffill().fillna(0) | |
| df['short_position'] = df['short_signal'].ffill().fillna(0) | |
| if params['long_delay_days'] > 0: df['long_position'] = df['long_position'].shift(params['long_delay_days']).fillna(0) | |
| if params['short_delay_days'] > 0: df['short_position'] = df['short_position'].shift(params['short_delay_days']).fillna(0) | |
| df['daily_return'] = df['Close'].pct_change() | |
| df['long_strategy_return'] = df['long_position'].shift(1) * df['daily_return'] | |
| df['short_strategy_return'] = df['short_position'].shift(1) * df['daily_return'] | |
| final_long_pnl = (1 + df['long_strategy_return'].fillna(0)).prod(skipna=True) - 1 | |
| final_short_pnl = (1 + df['short_strategy_return'].fillna(0)).prod(skipna=True) - 1 | |
| long_entries = df[(df['long_position'] == 1) & (df['long_position'].shift(1) == 0)] | |
| long_exits = df[(df['long_position'] == 0) & (df['long_position'].shift(1) == 1)] | |
| short_entries = df[(df['short_position'] == -1) & (df['short_position'].shift(1) == 0)] | |
| short_exits = df[(df['short_position'] == 0) & (df['short_position'].shift(1) == -1)] | |
| if not df.empty: | |
| end_date = df.index.max(); | |
| else: end_date = pd.NaT; | |
| # --- CHANGED: COLLECT ALL HISTORY (Removed "30 day" filter) --- | |
| all_historical_trades = [] | |
| long_trade_profits, long_durations, first_long_entry_date, last_long_exit_date = [], [], None, None | |
| short_trade_profits, short_durations, first_short_entry_date, last_short_exit_date = [], [], None, None | |
| long_profit_take_count, long_tsl_count, long_time_exit_count = 0, 0, 0 | |
| short_profit_take_count, short_tsl_count, short_time_exit_count = 0, 0, 0 | |
| df_indices = pd.Series(range(len(df)), index=df.index) | |
| for idx, row in long_entries.iterrows(): | |
| if first_long_entry_date is None: first_long_entry_date = idx | |
| future_exits = long_exits[long_exits.index > idx] | |
| if not future_exits.empty: | |
| exit_row = future_exits.iloc[0]; last_long_exit_date = exit_row.name | |
| exit_date = exit_row.name | |
| is_tsl = long_tsl_exit.loc[exit_row.name]; is_time = long_time_exit_trigger.loc[exit_row.name] | |
| if is_tsl: long_tsl_count += 1 | |
| elif is_time: long_time_exit_count += 1 | |
| else: long_profit_take_count += 1 | |
| profit = (exit_row['Close'] / row['Close']) - 1 if pd.notna(exit_row['Close']) and pd.notna(row['Close']) and row['Close'] != 0 else np.nan | |
| long_trade_profits.append(profit) | |
| # APPEND ALL TRADES (No Date Filter) | |
| all_historical_trades.append({'Side': 'Long', 'Date Open': idx, 'Date Closed': exit_date, 'Start Confidence': row.get('long_confidence_score', np.nan), 'Final % P/L': profit, 'Status': 'Closed', 'Exit Reason': 'TSL' if is_tsl else ('Time' if is_time else 'Profit')}) | |
| try: long_durations.append(df_indices.loc[exit_row.name] - df_indices.loc[idx]) | |
| except KeyError: long_durations.append(np.nan) | |
| avg_long_profit_per_trade = np.nanmean(long_trade_profits) if long_trade_profits else 0.0 | |
| for idx, row in short_entries.iterrows(): | |
| if first_short_entry_date is None: first_short_entry_date = idx | |
| future_exits = short_exits[short_exits.index > idx] | |
| if not future_exits.empty: | |
| exit_row = future_exits.iloc[0]; last_short_exit_date = exit_row.name | |
| exit_date = exit_row.name | |
| is_tsl = short_tsl_exit.loc[exit_row.name]; is_time = short_time_exit_trigger.loc[exit_row.name] | |
| if is_tsl: short_tsl_count += 1 | |
| elif is_time: short_time_exit_count += 1 | |
| else: short_profit_take_count += 1 | |
| profit = ((exit_row['Close'] / row['Close']) - 1) * -1 if pd.notna(exit_row['Close']) and pd.notna(row['Close']) and row['Close'] != 0 else np.nan | |
| short_trade_profits.append(profit) | |
| # APPEND ALL TRADES (No Date Filter) | |
| all_historical_trades.append({'Side': 'Short', 'Date Open': idx, 'Date Closed': exit_date, 'Start Confidence': row.get('short_confidence_score', np.nan), 'Final % P/L': profit, 'Status': 'Closed', 'Exit Reason': 'TSL' if is_tsl else ('Time' if is_time else 'Profit')}) | |
| try: short_durations.append(df_indices.loc[exit_row.name] - df_indices.loc[idx]) | |
| except KeyError: short_durations.append(np.nan) | |
| avg_short_profit_per_trade = np.nanmean(short_trade_profits) if short_trade_profits else 0.0 | |
| long_wins = sum(1 for p in long_trade_profits if pd.notna(p) and p > 0); long_losses = sum(1 for p in long_trade_profits if pd.notna(p) and p < 0) | |
| short_wins = sum(1 for p in short_trade_profits if pd.notna(p) and p > 0); short_losses = sum(1 for p in short_trade_profits if pd.notna(p) and p < 0) | |
| long_trades_log = [{'date': idx, 'price': row['Close'], 'confidence': row.get('long_confidence_score', np.nan)} for idx, row in long_entries.iterrows()] | |
| short_trades_log = [{'date': idx, 'price': row['Close'], 'confidence': row.get('short_confidence_score', np.nan)} for idx, row in short_entries.iterrows()] | |
| # --- ROBUST OPEN TRADE CAPTURE --- | |
| open_trades = [] | |
| if not df.empty and 'Close' in df.columns: | |
| last_close = df['Close'].iloc[-1] | |
| # 1. Check Long | |
| if df['long_position'].iloc[-1] == 1: | |
| if not long_entries.empty: | |
| last_entry_time = long_entries.index[-1] | |
| last_entry = long_entries.loc[last_entry_time] | |
| entry_price = last_entry['Close'] | |
| entry_conf = last_entry.get('long_confidence_score', np.nan) | |
| else: | |
| last_entry_time = pd.NaT | |
| entry_price = df['long_entry_price_static'].iloc[-1] | |
| entry_conf = np.nan | |
| if pd.notna(last_close) and pd.notna(entry_price) and entry_price != 0: | |
| pnl = (last_close / entry_price) - 1 | |
| open_trades.append({ | |
| 'Side': 'Long', | |
| 'Date Open': last_entry_time, | |
| 'Date Closed': pd.NaT, | |
| 'Start Confidence': entry_conf, | |
| 'Final % P/L': pnl, | |
| 'Status': 'Open', | |
| 'Exit Reason': 'N/A (Open)' | |
| }) | |
| # 2. Check Short | |
| if df['short_position'].iloc[-1] == -1: | |
| if not short_entries.empty: | |
| last_entry_time = short_entries.index[-1] | |
| last_entry = short_entries.loc[last_entry_time] | |
| entry_price = last_entry['Close'] | |
| entry_conf = last_entry.get('short_confidence_score', np.nan) | |
| else: | |
| last_entry_time = pd.NaT | |
| entry_price = df['short_entry_price_static'].iloc[-1] | |
| entry_conf = np.nan | |
| if pd.notna(last_close) and pd.notna(entry_price) and entry_price != 0: | |
| pnl = ((last_close / entry_price) - 1) * -1 | |
| open_trades.append({ | |
| 'Side': 'Short', | |
| 'Date Open': last_entry_time, | |
| 'Date Closed': pd.NaT, | |
| 'Start Confidence': entry_conf, | |
| 'Final % P/L': pnl, | |
| 'Status': 'Open', | |
| 'Exit Reason': 'N/A (Open)' | |
| }) | |
| # --------------------------------- | |
| # Combine: [All Historical Closed] + [Current Open] | |
| open_trades.extend(all_historical_trades) | |
| df.sort_index(inplace=True) | |
| trade_dates = (first_long_entry_date, last_long_exit_date, first_short_entry_date, last_short_exit_date) | |
| long_durations = [d for d in long_durations if pd.notna(d)]; short_durations = [d for d in short_durations if pd.notna(d)] | |
| avg_long_profit = float(avg_long_profit_per_trade) if pd.notna(avg_long_profit_per_trade) else 0.0 | |
| avg_short_profit = float(avg_short_profit_per_trade) if pd.notna(avg_short_profit_per_trade) else 0.0 | |
| final_long_pnl_float = float(final_long_pnl) if pd.notna(final_long_pnl) else 0.0 | |
| final_short_pnl_float = float(final_short_pnl) if pd.notna(final_short_pnl) else 0.0 | |
| final_trade_logs = (long_trades_log, long_exits.index, short_trades_log, short_exits.index) | |
| exit_breakdown = (long_profit_take_count, long_tsl_count, long_time_exit_count, short_profit_take_count, short_tsl_count, short_time_exit_count) | |
| return final_long_pnl_float, final_short_pnl_float, avg_long_profit, avg_short_profit, df, final_trade_logs, open_trades, (long_wins, long_losses, short_wins, short_losses), (long_durations, short_durations), trade_dates, exit_breakdown | |
| # --- 3. Charting and Display Functions --- | |
| def generate_long_plot(df, trades, ticker): | |
| fig = go.Figure() | |
| # Add Price and MA Lines | |
| fig.add_trace(go.Scatter(x=df.index, y=df['Close'], mode='lines', name='Close Price', line=dict(color='blue'))) | |
| if 'large_ma' in df.columns: | |
| fig.add_trace(go.Scatter(x=df.index, y=df['large_ma'], mode='lines', name='Large MA', line=dict(color='orange', dash='dash'))) | |
| # Add Bollinger Bands | |
| if 'bband_upper' in df.columns and 'bband_lower' in df.columns: | |
| fig.add_trace(go.Scatter(x=df.index, y=df['bband_upper'], mode='lines', name='Upper Band', line=dict(color='gray', width=0.5))) | |
| fig.add_trace(go.Scatter(x=df.index, y=df['bband_lower'], mode='lines', name='Lower Band', line=dict(color='gray', width=0.5), fill='tonexty', fillcolor='rgba(211,211,211,0.2)')) | |
| # Unpack Trades Tuple: (Long Entries Log, Long Exits Index, Short Entries Log, Short Exits Index) | |
| long_entries_log, long_exits, _, _ = trades | |
| # 1. Plot Long Entries | |
| if long_entries_log: | |
| dates = [t['date'] for t in long_entries_log] | |
| prices = [t['price'] for t in long_entries_log] | |
| scores = [f"Confidence: {t['confidence']:.0f}%" for t in long_entries_log] | |
| # Filter out entries that fall outside the dataframe's date range (just in case) | |
| valid_points = [(d, p, s) for d, p, s in zip(dates, prices, scores) if d in df.index] | |
| if valid_points: | |
| v_dates, v_prices, v_scores = zip(*valid_points) | |
| fig.add_trace(go.Scatter(x=v_dates, y=v_prices, mode='markers', name='Long Entry', | |
| marker=dict(color='green', symbol='triangle-up', size=12), | |
| text=v_scores, hoverinfo='text')) | |
| # 2. Plot Long Exits | |
| # Ensure long_exits is not empty and contains valid dates found in df | |
| if not long_exits.empty: | |
| valid_exits = [date for date in long_exits if date in df.index] | |
| if valid_exits: | |
| exit_prices = df.loc[valid_exits, 'Close'] | |
| fig.add_trace(go.Scatter(x=exit_prices.index, y=exit_prices, mode='markers', name='Long Exit', | |
| marker=dict(color='darkgreen', symbol='x', size=8))) | |
| fig.update_layout(title=f'Long Trades for {ticker}', xaxis_title='Date', yaxis_title='Price', legend_title="Indicator") | |
| return fig | |
| def generate_short_plot(df, trades, ticker): | |
| fig = go.Figure() | |
| # Add Price and MA Lines | |
| fig.add_trace(go.Scatter(x=df.index, y=df['Close'], mode='lines', name='Close Price', line=dict(color='blue'))) | |
| if 'large_ma' in df.columns: | |
| fig.add_trace(go.Scatter(x=df.index, y=df['large_ma'], mode='lines', name='Large MA', line=dict(color='orange', dash='dash'))) | |
| # Add Bollinger Bands | |
| if 'bband_upper' in df.columns and 'bband_lower' in df.columns: | |
| fig.add_trace(go.Scatter(x=df.index, y=df['bband_upper'], mode='lines', name='Upper Band', line=dict(color='gray', width=0.5))) | |
| fig.add_trace(go.Scatter(x=df.index, y=df['bband_lower'], mode='lines', name='Lower Band', line=dict(color='gray', width=0.5), fill='tonexty', fillcolor='rgba(211,211,211,0.2)')) | |
| # Add Trades | |
| _, _, short_entries_log, short_exits = trades | |
| if short_entries_log: | |
| dates = [t['date'] for t in short_entries_log] | |
| prices = [t['price'] for t in short_entries_log] | |
| scores = [f"Confidence: {t['confidence']:.0f}%" for t in short_entries_log] | |
| fig.add_trace(go.Scatter(x=dates, y=prices, mode='markers', name='Short Entry', marker=dict(color='red', symbol='triangle-down', size=12), text=scores, hoverinfo='text')) | |
| if not short_exits.empty and 'Close' in df.columns: # Check if Close exists | |
| exit_prices = df.loc[short_exits,'Close'].dropna() # Drop exits where price might be NaN | |
| fig.add_trace(go.Scatter(x=exit_prices.index, y=exit_prices, mode='markers', name='Short Exit', marker=dict(color='darkred', symbol='x', size=8))) | |
| fig.update_layout(title=f'Short Trades for {ticker}', xaxis_title='Date', yaxis_title='Price', legend_title="Indicator"); return fig | |
| def normalise_strategy_score(raw_score, benchmark_for_100_percent=0.25): | |
| if not np.isfinite(raw_score) or raw_score <= 0: return 0.0 # Handle NaN/inf | |
| return min((raw_score / benchmark_for_100_percent) * 100, 100.0) | |
| def calculate_strategy_score(avg_profit, gb_ratio, total_trades): | |
| """ | |
| Calculates a Weighted Strategy Score with a 'Quality Gate'. | |
| 1. Weights: Profit (42.5%), Win/Loss (42.5%), Trades (15%). | |
| 2. Targets: Profit 3.0%, Win/Loss 5.0, Trades 3000. | |
| 3. QUALITY GATE: If Win/Loss Ratio < 2.0, Profit Score is CAPPED at 1.0. | |
| (You cannot get 'extra credit' for high profit if the structure is risky). | |
| """ | |
| # 1. Define Targets | |
| target_profit = 0.03 # 3.0% per trade | |
| target_gb = 5.0 # 5.0 Win/Loss Ratio | |
| target_trades = 3000.0 # Perfect trade count | |
| # 2. Define Weights (Sum = 1.0) | |
| w_profit = 0.425 | |
| w_gb = 0.425 | |
| w_trades = 0.15 | |
| # 3. Calculate Scores | |
| # Profit: Uncapped Base | |
| s_profit = avg_profit / target_profit | |
| if s_profit < 0: s_profit = 0 | |
| # G/B Ratio: Uncapped Base | |
| s_gb = gb_ratio / target_gb | |
| if s_gb < 0: s_gb = 0 | |
| # --- QUALITY GATE --- | |
| # If the strategy has a poor Win/Loss ratio (< 2.0), | |
| # we cap the Profit score at 1.0 (100%). | |
| # This prevents high volatility/lucky profit from masking a bad ratio. | |
| if gb_ratio < 2.0: | |
| s_profit = min(s_profit, 1.0) | |
| # Trade Count: Pyramid Penalty | |
| dist = abs(total_trades - target_trades) | |
| s_trades = 1.0 - (dist / target_trades) | |
| if s_trades < 0: s_trades = 0 | |
| # 4. Final Weighted Score | |
| final_score = (s_profit * w_profit) + (s_gb * w_gb) + (s_trades * w_trades) | |
| return final_score * 100 | |
| def display_summary_analytics(summary_df): | |
| st.subheader("Overall Strategy Performance") | |
| trade_counts = st.session_state.get('trade_counts', {}) | |
| trade_durations = st.session_state.get('trade_durations', {}) | |
| exit_totals = st.session_state.get('exit_breakdown_totals', {}) | |
| # --- Clear last run stats before calculating --- | |
| st.session_state.last_run_stats = {} | |
| col1, col2 = st.columns(2) | |
| for side in ["Long", "Short"]: | |
| # Ensure summary_df exists and has the required columns | |
| req_cols = [f'Num {side} Trades', f'Avg {side} Profit per Trade', f'Cumulative {side} P&L', f'Avg {side} Confidence'] | |
| if summary_df is None or not all(col in summary_df.columns for col in req_cols): | |
| st.warning(f"Summary data for {side} trades is missing or incomplete.") | |
| continue | |
| active_trades_df = summary_df[summary_df[f'Num {side} Trades'] > 0] | |
| container = col1 if side == "Long" else col2 | |
| with container: | |
| st.subheader(f"{side} Trades") | |
| if not active_trades_df.empty: | |
| total_trades = active_trades_df[f'Num {side} Trades'].sum() | |
| avg_trade_profit = (active_trades_df[f'Avg {side} Profit per Trade'].fillna(0) * active_trades_df[f'Num {side} Trades']).sum() / total_trades if total_trades > 0 else 0 | |
| avg_cumulative_profit = active_trades_df[f'Cumulative {side} P&L'].mean() | |
| avg_confidence = active_trades_df[f'Avg {side} Confidence'].mean() | |
| if pd.isna(avg_confidence): avg_confidence = 0 | |
| good_tickers = (active_trades_df[f'Cumulative {side} P&L'] > 0).sum() | |
| bad_tickers = (active_trades_df[f'Cumulative {side} P&L'] < 0).sum() | |
| ticker_good_bad_ratio = good_tickers / bad_tickers if bad_tickers > 0 else 99999.0 | |
| display_score = calculate_strategy_score(avg_trade_profit, ticker_good_bad_ratio, total_trades) | |
| st.metric("Strategy Score", f"{display_score:.2f}%") | |
| st.metric("Avg Profit per Trade (Active Tickers)", f"{avg_trade_profit:.2%}") | |
| net_return_pct = avg_trade_profit * total_trades * 100 | |
| st.metric("Moneypile Score", f"{net_return_pct:.1f}", help="The total 'Pile of Money'. Sum of all trade percentages (Winners - Losers).") | |
| st.text(f"Profitable Tickers: {good_tickers}") | |
| st.text(f"Losing Tickers: {bad_tickers}") | |
| st.text(f"Ticker Good/Bad Ratio: {ticker_good_bad_ratio:.2f}") | |
| side_lower = side.lower() | |
| wins = trade_counts.get(f"{side_lower}_wins", 0) | |
| losses = trade_counts.get(f"{side_lower}_losses", 0) | |
| # --- NEW: Calculate Open Trades explicitly --- | |
| open_trades_count = int(total_trades) - (wins + losses) | |
| # --- TRADING STATS --- | |
| st.markdown("---") | |
| st.text(f"Total Individual Trades: {int(total_trades)}") | |
| st.text(f"Winning Trades: {wins}") | |
| st.text(f"Losing Trades: {losses}") | |
| st.text(f"Open Trades: {open_trades_count}") | |
| trade_win_loss_ratio = 0.0 | |
| if wins > 0 or losses > 0: | |
| trade_win_loss_ratio = wins / losses if losses > 0 else 99999.0 | |
| st.text(f"Trade Win/Loss Ratio: {trade_win_loss_ratio:.2f}") | |
| if side == "Long": | |
| st.session_state.last_run_stats = { | |
| "Z_Avg_Profit": avg_trade_profit, | |
| "Z_Num_Trades": int(total_trades), | |
| "Z_WL_Ratio": trade_win_loss_ratio | |
| } | |
| # --- EXIT BREAKDOWN --- | |
| if exit_totals: | |
| st.markdown("---") | |
| st.subheader("Exit Breakdown") | |
| if side == "Long": | |
| profit_count = exit_totals.get('long_profit_take_count', 0) | |
| tsl_count = exit_totals.get('long_tsl_count', 0) | |
| time_count = exit_totals.get('long_time_exit_count', 0) | |
| # Get Long Timeout Setting | |
| limit_days = st.session_state.get('max_long_duration', 60) | |
| else: | |
| profit_count = exit_totals.get('short_profit_take_count', 0) | |
| tsl_count = exit_totals.get('short_tsl_count', 0) | |
| time_count = exit_totals.get('short_time_exit_count', 0) | |
| # Get Short Timeout Setting | |
| limit_days = st.session_state.get('max_short_duration', 10) | |
| exit_total = profit_count + tsl_count + time_count | |
| if exit_total > 0: | |
| st.markdown(f"**Profit Take:** {profit_count} ({profit_count/exit_total:.1%})") | |
| st.markdown(f"**Stop Loss:** {tsl_count} ({tsl_count/exit_total:.1%})") | |
| # Dynamic Label | |
| st.markdown(f"**Time Out ({limit_days}d):** {time_count} ({time_count/exit_total:.1%})") | |
| # --- DURATION STATS --- | |
| avg_duration = trade_durations.get(f"avg_{side_lower}_duration", 0) | |
| max_duration = trade_durations.get(f"max_{side_lower}_duration", 0) | |
| if avg_duration > 0 or max_duration > 0: | |
| st.text(f"Avg Trade Duration: {avg_duration:.1f} days") | |
| st.text(f"Longest Trade: {max_duration:.0f} days") | |
| else: st.info("No trades found for this side with current settings.") | |
| def generate_profit_distribution_chart(summary_df): | |
| """ | |
| Creates two histograms showing the distribution of Average Profit per Trade | |
| for Long and Short strategies. | |
| [UPDATED] Now weighted by 'Num Trades' so the Y-axis shows Total Trades, | |
| not just Total Tickers. | |
| """ | |
| if summary_df is None or summary_df.empty: return None | |
| fig = go.Figure() | |
| # 1. Long Distribution | |
| if 'Avg Long Profit per Trade' in summary_df.columns and 'Num Long Trades' in summary_df.columns: | |
| # Filter out tickers that didn't trade | |
| mask = summary_df['Num Long Trades'] > 0 | |
| long_data = summary_df.loc[mask, 'Avg Long Profit per Trade'] | |
| long_weights = summary_df.loc[mask, 'Num Long Trades'] # Use trade count as weight | |
| if not long_data.empty: | |
| fig.add_trace(go.Histogram( | |
| x=long_data, | |
| y=long_weights, # [FIX] Weight by number of trades | |
| histfunc='sum', # [FIX] Sum the weights to get Total Trades per bin | |
| name='Long Strategy', | |
| marker_color='green', | |
| opacity=0.7, | |
| nbinsx=50, | |
| histnorm='' | |
| )) | |
| # 2. Short Distribution | |
| if 'Avg Short Profit per Trade' in summary_df.columns and 'Num Short Trades' in summary_df.columns: | |
| mask = summary_df['Num Short Trades'] > 0 | |
| short_data = summary_df.loc[mask, 'Avg Short Profit per Trade'] | |
| short_weights = summary_df.loc[mask, 'Num Short Trades'] # Use trade count as weight | |
| if not short_data.empty: | |
| fig.add_trace(go.Histogram( | |
| x=short_data, | |
| y=short_weights, # [FIX] Weight by number of trades | |
| histfunc='sum', # [FIX] Sum the weights | |
| name='Short Strategy', | |
| marker_color='red', | |
| opacity=0.7, | |
| nbinsx=50, | |
| visible='legendonly' | |
| )) | |
| # 3. Add Zero Line | |
| fig.add_vline(x=0, line_width=2, line_dash="dash", line_color="black", annotation_text="Break Even") | |
| # 4. Layout | |
| fig.update_layout( | |
| title="Distribution of Average Profit per Trade (Weighted by Trade Count)", | |
| xaxis_title="Average Profit per Trade (decimal, e.g. 0.01 = 1%)", | |
| yaxis_title="Number of Trades", # [FIX] Updated Label | |
| barmode='overlay', | |
| bargap=0.1, | |
| template="plotly_white", | |
| legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), | |
| height=400 | |
| ) | |
| # Format X axis as percentage | |
| fig.update_xaxes(tickformat=".1%") | |
| return fig | |
| # --- 4. Optimisation Functions (Parallelised) --- | |
| # --- FULL REPLACEMENT WORKER: ACCEPTS A SINGLE DICTIONARY ARGUMENT --- | |
| def run_single_parameter_test(task_data): | |
| # Unpack all arguments from the single dictionary provided by the multiprocessing pool | |
| params = task_data['params'] | |
| confidence_settings = task_data['confidence_settings'] | |
| master_df = task_data['master_df'] | |
| optimise_for = task_data['optimise_for'] | |
| tickers = task_data['tickers'] | |
| date_range = task_data['date_range'] | |
| power = task_data['power'] | |
| # --- FIX: Correctly unpack ALL 7 items from the list --- | |
| toggles_list = list(confidence_settings['toggles']) | |
| weights_list = list(confidence_settings['weights']) | |
| # Unpack all 7 factors (matches the order sent by generate_and_run_optimisation) | |
| use_rsi, use_vol, use_trend, use_volume, use_macd, use_ma_slope, use_markov = toggles_list | |
| rsi_w, vol_w, trend_w, volume_w, macd_w, ma_slope_w, markov_w = weights_list | |
| # --- END FIX --- | |
| # --- Rest of Unpack --- | |
| use_adx_filter, adx_threshold, adx_period = confidence_settings['adx_settings'] | |
| rsi_logic = confidence_settings['rsi_logic'] | |
| primary_driver = confidence_settings['primary_driver'] | |
| markov_setup = confidence_settings['markov_setup'] | |
| exit_logic = confidence_settings['exit_logic'] | |
| exit_thresh = confidence_settings['exit_thresh'] | |
| smart_trailing_stop = confidence_settings['smart_trailing_stop'] | |
| smart_exit_atr_p = confidence_settings['smart_exit_atr_period'] | |
| smart_exit_atr_m = confidence_settings['smart_exit_atr_multiplier'] | |
| intelligent_tsl_pct = confidence_settings['intelligent_tsl_pct'] | |
| long_95_percentile = confidence_settings['long_95_percentile'] | |
| short_95_percentile = confidence_settings['short_95_percentile'] | |
| veto_list = confidence_settings['veto_list'] | |
| # --- End Unpack --- | |
| total_profit_weighted_avg, total_trades, winning_tickers, losing_tickers = 0, 0, 0, 0 | |
| total_wins, total_losses = 0, 0 | |
| all_confidences = [] | |
| PROFIT_THRESHOLD = 1.0 | |
| excluded_tickers = [] | |
| # Initialize exit breakdown counters (Long and Short) | |
| total_exit_breakdown = [0, 0, 0, 0, 0, 0] # LP, LT, LE, SP, ST, SE | |
| if not isinstance(tickers, list): tickers = [tickers] | |
| for ticker in tickers: | |
| cols_to_use = [ticker] | |
| if f'{ticker}_High' in master_df.columns: cols_to_use.append(f'{ticker}_High') | |
| if f'{ticker}_Low' in master_df.columns: cols_to_use.append(f'{ticker}_Low') | |
| if f'{ticker}_Volume' in master_df.columns: cols_to_use.append(f'{ticker}_Volume') | |
| existing_cols = [col for col in cols_to_use if col in master_df.columns] | |
| if ticker not in existing_cols: continue | |
| ticker_data_full = master_df.loc[:, existing_cols] | |
| ticker_data = ticker_data_full.loc[date_range[0]:date_range[1]] | |
| rename_dict = { | |
| ticker: 'Close', f'{ticker}_High': 'High', | |
| f'{ticker}_Low': 'Low', f'{ticker}_Volume': 'Volume' | |
| } | |
| rename_dict_filtered = {k: v for k, v in rename_dict.items() if k in existing_cols} | |
| ticker_data = ticker_data.rename(columns=rename_dict_filtered) | |
| if not ticker_data.empty and 'Close' in ticker_data.columns and not ticker_data['Close'].isna().all(): | |
| # --- CAPTURE FIX: run_backtest now returns exit_breakdown (11th return value) --- | |
| long_pnl, short_pnl, avg_long_trade, avg_short_trade, _, trades, _, trade_counts, _, _, exit_breakdown = run_backtest( | |
| ticker_data, params, | |
| use_rsi, use_vol, use_trend, use_volume, use_macd, use_ma_slope, use_markov, | |
| rsi_w, vol_w, trend_w, volume_w, macd_w, ma_slope_w, markov_w, | |
| use_adx_filter, adx_threshold, rsi_logic, | |
| adx_period, | |
| veto_setups_list=veto_list, | |
| primary_driver=primary_driver, | |
| markov_setup=markov_setup, | |
| exit_logic_type=exit_logic, | |
| exit_confidence_threshold=exit_thresh, | |
| smart_trailing_stop_pct=smart_trailing_stop, | |
| smart_exit_atr_period=smart_exit_atr_p, | |
| smart_exit_atr_multiplier=smart_exit_atr_m, | |
| intelligent_tsl_pct=intelligent_tsl_pct, | |
| long_score_95_percentile=long_95_percentile, | |
| short_score_95_percentile=short_95_percentile | |
| ) | |
| if abs(long_pnl) > PROFIT_THRESHOLD or abs(short_pnl) > PROFIT_THRESHOLD or \ | |
| (avg_long_trade is not None and pd.notna(avg_long_trade) and abs(avg_long_trade) > PROFIT_THRESHOLD) or \ | |
| (avg_short_trade is not None and pd.notna(avg_short_trade) and abs(avg_short_trade) > PROFIT_THRESHOLD): | |
| excluded_tickers.append(ticker) | |
| continue | |
| # Aggregate exit breakdown counters | |
| total_exit_breakdown = [sum(x) for x in zip(total_exit_breakdown, exit_breakdown)] | |
| if optimise_for == 'long': | |
| pnl, avg_trade_profit, trade_log = long_pnl, avg_long_trade, trades[0] | |
| total_wins += trade_counts[0]; total_losses += trade_counts[1] | |
| else: | |
| pnl, avg_trade_profit, trade_log = short_pnl, avg_short_trade, trades[2] | |
| total_wins += trade_counts[2]; total_losses += trade_counts[3] | |
| num_trades = len(trade_log) | |
| if num_trades > 0 and avg_trade_profit is not None and pd.notna(avg_trade_profit): | |
| total_trades += num_trades | |
| total_profit_weighted_avg += avg_trade_profit * num_trades | |
| if pnl > 0: winning_tickers += 1 | |
| elif pnl < 0: losing_tickers += 1 | |
| all_confidences.extend([trade['confidence'] for trade in trade_log if pd.notna(trade.get('confidence'))]) | |
| overall_avg_profit = 0.0 | |
| good_bad_ratio = 0.0 | |
| trade_good_bad_ratio = 0.0 | |
| if total_trades > 0: | |
| overall_avg_profit = total_profit_weighted_avg / total_trades | |
| if losing_tickers > 0: | |
| good_bad_ratio = winning_tickers / losing_tickers | |
| elif winning_tickers > 0: | |
| good_bad_ratio = 99999.0 | |
| if total_losses > 0: | |
| trade_good_bad_ratio = total_wins / total_losses | |
| elif total_wins > 0: | |
| trade_good_bad_ratio = 99999.0 | |
| avg_entry_confidence = np.mean(all_confidences) if all_confidences else 0.0 | |
| # Return a single dictionary containing all calculated metrics and the configuration data | |
| return { | |
| # CORE METRICS | |
| "Avg Profit/Trade": overall_avg_profit, | |
| "Ticker G/B Ratio": good_bad_ratio, | |
| "Trade G/B Ratio": trade_good_bad_ratio, | |
| "Total Trades": total_trades, | |
| "Net % Return": total_profit_weighted_avg * 100, | |
| "Avg Entry Conf.": avg_entry_confidence, | |
| "Winning Tickers": winning_tickers, | |
| "Losing Tickers": losing_tickers, | |
| "Exit Breakdown": total_exit_breakdown, # NEW: Return the aggregated breakdown | |
| # CONFIG DATA (for Orchestrator unpacking) | |
| "params": params, | |
| "confidence_settings": confidence_settings | |
| } | |
| # --- UPDATED: Calculation Only (Saves to Session State) --- | |
| def generate_and_run_optimisation(master_df, main_content_placeholder, optimise_for, use_squared_weighting): | |
| # Clear previous results to avoid confusion | |
| st.session_state.param_results_df = None | |
| st.session_state.best_params = None | |
| # Clear other sections | |
| st.session_state.summary_df = None | |
| st.session_state.single_ticker_results = None | |
| st.session_state.confidence_results_df = None | |
| st.session_state.open_trades_df = None | |
| st.session_state.advisor_df = None | |
| with main_content_placeholder.container(): | |
| veto_list_to_use = st.session_state.get('veto_setup_list', []) | |
| if veto_list_to_use: | |
| st.info(f"{len(veto_list_to_use)} Veto filter(s) are ACTIVE for this optimisation run.") | |
| st.info("Calibrating confidence scores (0-100%)...") | |
| # --- 1. Gather Settings & Run Calibration (Same as before) --- | |
| use_rsi = st.session_state.use_rsi; use_vol = st.session_state.use_vol; use_trend = st.session_state.use_trend | |
| use_volume = st.session_state.use_volume; use_macd = st.session_state.use_macd | |
| use_ma_slope = st.session_state.use_ma_slope; use_markov = st.session_state.use_markov | |
| rsi_w = st.session_state.rsi_w; vol_w = st.session_state.vol_w; trend_w = st.session_state.trend_w | |
| vol_w_val = st.session_state.volume_w; macd_w = st.session_state.macd_w | |
| ma_slope_w = st.session_state.ma_slope_w; markov_w = st.session_state.markov_w | |
| calibration_params = { | |
| "large_ma_period": st.session_state.ma_period, "bband_period": st.session_state.bb_period, | |
| "bband_std_dev": st.session_state.bb_std, "confidence_threshold": st.session_state.confidence_slider, | |
| "long_entry_threshold_pct": st.session_state.long_entry / 100.0, "long_exit_ma_threshold_pct": st.session_state.long_exit / 100.0, | |
| "long_trailing_stop_loss_pct": st.session_state.long_sl / 100.0, "long_delay_days": st.session_state.long_delay, | |
| "short_entry_threshold_pct": st.session_state.short_entry / 100.0, "short_exit_ma_threshold_pct": st.session_state.short_exit / 100.0, | |
| "short_trailing_stop_loss_pct": st.session_state.short_sl / 100.0, "short_delay_days": st.session_state.short_delay, | |
| "max_trading_days": st.session_state.max_duration | |
| } | |
| markov_setup = st.session_state.get('best_markov_setup') | |
| primary_driver = st.session_state.primary_driver | |
| calib_adx_filter = st.session_state.use_adx_filter | |
| calib_adx_thresh = st.session_state.adx_threshold | |
| calib_adx_period = st.session_state.adx_period | |
| calib_rsi_logic = st.session_state.rsi_logic | |
| calib_exit_logic = st.session_state.exit_logic_type | |
| calib_exit_thresh = st.session_state.exit_confidence_threshold | |
| calib_smart_trailing_stop = st.session_state.smart_trailing_stop_pct / 100.0 | |
| calib_smart_atr_p = st.session_state.smart_exit_atr_period | |
| calib_smart_atr_m = st.session_state.smart_exit_atr_multiplier | |
| calib_intelligent_tsl = st.session_state.intelligent_tsl_pct / 100.0 | |
| all_long_scores = []; all_short_scores = [] | |
| tickers_to_run_calib = [col for col in master_df.columns if not ('_Volume' in str(col) or '_High' in str(col) or '_Low' in str(col))] | |
| date_range_calib = (pd.Timestamp(st.session_state.start_date), pd.Timestamp(st.session_state.end_date)) | |
| for ticker_symbol in tickers_to_run_calib: | |
| cols_to_use = [ticker_symbol, f'{ticker_symbol}_High', f'{ticker_symbol}_Low', f'{ticker_symbol}_Volume'] | |
| existing_cols = [col for col in cols_to_use if col in master_df.columns] | |
| if ticker_symbol not in existing_cols: continue | |
| ticker_data_full = master_df.loc[:, existing_cols] | |
| ticker_data = ticker_data_full.loc[date_range_calib[0]:date_range_calib[1]] | |
| rename_dict = {ticker_symbol: 'Close', f'{ticker_symbol}_High': 'High', f'{ticker_symbol}_Low': 'Low', f'{ticker_symbol}_Volume': 'Volume'} | |
| rename_dict_filtered = {k: v for k, v in rename_dict.items() if k in existing_cols} | |
| ticker_data = ticker_data.rename(columns=rename_dict_filtered) | |
| l_pnl, s_pnl, al, as_, _, _, _, _, _, _, _ = run_backtest( | |
| data=ticker_data, params=calibration_params, | |
| use_rsi=use_rsi, use_volatility=use_vol, use_trend=use_trend, use_volume=use_volume, | |
| use_macd=use_macd, use_ma_slope=use_ma_slope, use_markov=use_markov, | |
| rsi_w=rsi_w, vol_w=vol_w, trend_w=trend_w, vol_w_val=vol_w_val, macd_w=macd_w, ma_slope_w=ma_slope_w, markov_w=markov_w, | |
| use_adx_filter=calib_adx_filter, adx_threshold=calib_adx_thresh, rsi_logic=calib_rsi_logic, adx_period=calib_adx_period, | |
| veto_setups_list=veto_list_to_use, primary_driver=primary_driver, markov_setup=markov_setup, | |
| exit_logic_type=calib_exit_logic, exit_confidence_threshold=calib_exit_thresh, | |
| smart_trailing_stop_pct=calib_smart_trailing_stop, smart_exit_atr_period=calib_smart_atr_p, | |
| smart_exit_atr_multiplier=calib_smart_atr_m, intelligent_tsl_pct=calib_intelligent_tsl, | |
| long_score_95_percentile=1.0, short_score_95_percentile=1.0 | |
| ) | |
| raw_long, raw_short = calculate_confidence_score( | |
| ticker_data, primary_driver, | |
| use_rsi, use_vol, use_trend, use_volume, use_macd, use_ma_slope, use_markov, | |
| rsi_w, vol_w, trend_w, vol_w_val, macd_w, ma_slope_w, markov_w, | |
| calibration_params, markov_setup | |
| ) | |
| all_long_scores.append(raw_long[raw_long > 0]) | |
| all_short_scores.append(raw_short[raw_short > 0]) | |
| long_95 = pd.concat(all_long_scores).quantile(0.95) if all_long_scores else 1.0 | |
| short_95 = pd.concat(all_short_scores).quantile(0.95) if all_short_scores else 1.0 | |
| if pd.isna(long_95) or long_95 == 0: long_95 = 1.0 | |
| if pd.isna(short_95) or short_95 == 0: short_95 = 1.0 | |
| st.info(f"Confidence calibrated: Long 95th percentile = {long_95:.2f}, Short 95th percentile = {short_95:.2f}") | |
| # --- 2. Build Combinations (Same as before) --- | |
| ma_range = range(st.session_state.ma_start, st.session_state.ma_end + 1, st.session_state.ma_step) if st.session_state.opt_ma_cb else [st.session_state.ma_period] | |
| bb_range = range(st.session_state.bb_start, st.session_state.bb_end + 1, st.session_state.bb_step) if st.session_state.opt_bb_cb else [st.session_state.bb_period] | |
| std_range = np.arange(st.session_state.std_start, st.session_state.std_end + 0.001, st.session_state.std_step) if st.session_state.opt_std_cb else [st.session_state.bb_std] | |
| sl_range = np.arange(st.session_state.sl_start, st.session_state.sl_end + 0.001, st.session_state.sl_step) / 100 if st.session_state.opt_sl_cb else [st.session_state.long_sl / 100] | |
| delay_range = range(st.session_state.delay_start, st.session_state.delay_end + 1, st.session_state.delay_step) if st.session_state.opt_delay_cb else [st.session_state.long_delay] | |
| entry_range = np.arange(st.session_state.entry_start, st.session_state.entry_end + 0.001, st.session_state.entry_step) / 100 if st.session_state.opt_entry_cb else [st.session_state.long_entry / 100] | |
| exit_range = np.arange(st.session_state.exit_start, st.session_state.exit_end + 0.001, st.session_state.exit_step) / 100 if st.session_state.opt_exit_cb else [st.session_state.long_exit / 100] | |
| conf_range = range(st.session_state.conf_start, st.session_state.conf_end + 1, st.session_state.conf_step) if st.session_state.opt_conf_cb else [st.session_state.confidence_slider] | |
| dur_range = range(st.session_state.dur_start, st.session_state.dur_end + 1, st.session_state.dur_step) if st.session_state.opt_duration_cb else [st.session_state.max_duration] | |
| param_product = itertools.product(ma_range, bb_range, std_range, sl_range, delay_range, entry_range, exit_range, conf_range, dur_range) | |
| param_combinations = [{ | |
| "large_ma_period": p[0], "bband_period": p[1], "bband_std_dev": p[2], | |
| "long_trailing_stop_loss_pct": p[3], "short_trailing_stop_loss_pct": p[3], | |
| "long_delay_days": p[4], "short_delay_days": p[4], | |
| "long_entry_threshold_pct": p[5], "short_entry_threshold_pct": p[5], | |
| "long_exit_ma_threshold_pct": p[6], "short_exit_ma_threshold_pct": p[6], | |
| "confidence_threshold": p[7], "max_trading_days": p[8] | |
| } for p in param_product] | |
| total_combinations = len(param_combinations) | |
| if total_combinations <= 1: | |
| st.warning("No optimisation parameters selected."); return | |
| confidence_settings = { | |
| 'toggles': (use_rsi, use_vol, use_trend, use_volume, use_macd, use_ma_slope, use_markov), | |
| 'weights': (rsi_w, vol_w, trend_w, vol_w_val, macd_w, ma_slope_w, markov_w), | |
| 'adx_settings': (st.session_state.use_adx_filter, st.session_state.adx_threshold, st.session_state.adx_period), | |
| 'rsi_logic': st.session_state.rsi_logic, 'primary_driver': primary_driver, 'markov_setup': markov_setup, | |
| 'exit_logic': st.session_state.exit_logic_type, 'exit_thresh': st.session_state.exit_confidence_threshold, | |
| 'smart_trailing_stop': st.session_state.smart_trailing_stop_pct / 100.0, | |
| 'smart_exit_atr_period': st.session_state.smart_exit_atr_period, 'smart_exit_atr_multiplier': st.session_state.smart_exit_atr_multiplier, | |
| 'intelligent_tsl_pct': st.session_state.intelligent_tsl_pct / 100.0, | |
| 'long_95_percentile': long_95, 'short_95_percentile': short_95, 'veto_list': veto_list_to_use | |
| } | |
| # --- 3. Execution --- | |
| num_cores = cpu_count() | |
| st.info(f"Starting {optimise_for.upper()} optimisation on {num_cores} cores... Testing {total_combinations} combinations.") | |
| if st.session_state.run_mode.startswith("Analyse Full List"): | |
| tickers_to_run = [col for col in master_df.columns if not ('_Volume' in str(col) or '_High' in str(col) or '_Low' in str(col))] | |
| else: | |
| tickers_to_run = [st.session_state.ticker_select] | |
| date_range = (pd.Timestamp(st.session_state.start_date), pd.Timestamp(st.session_state.end_date)) | |
| power = 2 if use_squared_weighting else 1 | |
| status_text = st.empty(); status_text.text("Optimisation starting...") | |
| progress_bar = st.progress(0) | |
| tasks = [] | |
| for p in param_combinations: | |
| tasks.append({ | |
| 'params': p, 'confidence_settings': confidence_settings, 'master_df': master_df, | |
| 'optimise_for': optimise_for, 'tickers': tickers_to_run, 'date_range': date_range, 'power': power | |
| }) | |
| results_list = [] | |
| with Pool(processes=num_cores) as pool: | |
| try: | |
| iterator = pool.imap_unordered(run_single_parameter_test, tasks) | |
| for i, result_dict in enumerate(iterator, 1): | |
| results_list.append(result_dict) | |
| progress_bar.progress(i / total_combinations, text=f"Optimising... {i}/{total_combinations} combinations complete.") | |
| except Exception as e: | |
| st.error(f"An error occurred during multiprocessing: {e}") | |
| status_text.text("Optimisation failed due to an error."); return | |
| status_text.text("Optimisation complete. Processing results...") | |
| if not results_list: | |
| st.warning("Optimisation finished, but no valid results were found."); return | |
| flattened_results = [] | |
| for r in results_list: | |
| flat_row = r.copy() | |
| del flat_row['params']; del flat_row['confidence_settings'] | |
| flat_row.update(r['params']) | |
| flattened_results.append(flat_row) | |
| results_df = pd.DataFrame(flattened_results) | |
| min_trades_threshold = 10 | |
| if 'Total Trades' in results_df.columns: | |
| results_df = results_df[results_df['Total Trades'] >= min_trades_threshold].copy() | |
| if results_df.empty: | |
| st.warning(f"No results found with at least {min_trades_threshold} trades. Try a smaller threshold or different settings."); return | |
| results_df['Strategy Score'] = results_df.apply( | |
| lambda row: calculate_strategy_score(row['Avg Profit/Trade'], row['Ticker G/B Ratio'], row['Total Trades']), axis=1 | |
| ) | |
| # --- 4. PRE-SORT (Descending Duration) --- | |
| # We sort by Score (Desc) AND Duration (Desc) so the "Biggest Number" comes first. | |
| if 'max_trading_days' in results_df.columns: | |
| results_df = results_df.sort_values(by=['Strategy Score', 'max_trading_days'], ascending=[False, False]) | |
| else: | |
| results_df = results_df.sort_values(by='Strategy Score', ascending=False) | |
| # Save the full results to session state so `main()` can display them | |
| st.session_state.param_results_df = results_df | |
| # Save best params (first row is best score + highest duration) | |
| best_setup_series = results_df.iloc[0] | |
| st.session_state.best_params = {k: best_setup_series[k] for k in param_combinations[0].keys()} | |
| status_text.empty() | |
| st.success(f"Optimisation Complete! Best Strategy Score: {best_setup_series['Strategy Score']:.2f}%") | |
| st.subheader("Optimal Parameters Found (based on Strategy Score):") | |
| st.json(st.session_state.best_params) | |
| # Note: We do NOT display the table here anymore. main() handles it. | |
| # --- 4.5. NEW COMBINED OPTIMISATION FUNCTION (Optimized: 32k -> 3k Tasks) --- | |
| def generate_and_run_combined_optimisation(master_df, main_content_placeholder, optimise_for): | |
| st.session_state.summary_df = None | |
| st.session_state.single_ticker_results = None | |
| st.session_state.confidence_results_df = None | |
| st.session_state.open_trades_df = None | |
| st.session_state.advisor_df = None | |
| st.session_state.best_params = None | |
| st.session_state.best_weights = None | |
| # Define side-specific filename to separate Long/Short results | |
| results_file = f"combined_optimisation_results_{optimise_for}.csv" | |
| with main_content_placeholder.container(): | |
| # --- Visual Feedback --- | |
| st.header(f"⚡ Running Combined Factor & Weight Optimisation ({optimise_for.title()})") | |
| st.caption(f"Results will be saved to: {results_file}") | |
| # --- FIXED PARAMETERS --- | |
| fixed_params = { | |
| "large_ma_period": st.session_state.ma_period, "bband_period": st.session_state.bb_period, | |
| "bband_std_dev": st.session_state.bb_std, "confidence_threshold": 50, | |
| "long_entry_threshold_pct": st.session_state.long_entry / 100, "short_entry_threshold_pct": st.session_state.short_entry / 100, | |
| "long_exit_ma_threshold_pct": st.session_state.long_exit / 100, "short_exit_ma_threshold_pct": st.session_state.short_exit / 100, | |
| "long_trailing_stop_loss_pct": st.session_state.long_sl / 100, "short_trailing_stop_loss_pct": st.session_state.short_sl / 100, | |
| "long_delay_days": st.session_state.long_delay, "short_delay_days": st.session_state.short_delay | |
| } | |
| # --- CHECKPOINT LOADING --- | |
| completed_configs = load_completed_setups(results_file) | |
| st.info(f"Loaded {len(completed_configs)} completed combinations from {results_file}. Resuming job...") | |
| # --- DYNAMIC FACTOR/WEIGHT GENERATION (OPTIMIZED) --- | |
| # Instead of looping Toggles and Weights separately (which creates duplicates), | |
| # we treat 0.0 as "Off" and any other value as "On + Weight". | |
| # 5 States per Factor: [Off, 0.5, 1.0, 1.5, 2.0] | |
| factors = ['RSI', 'Volatility', 'Volume', 'MACD', 'MA Slope'] | |
| possible_states = [0.0, 0.5, 1.0, 1.5, 2.0] # 0.0 = Off | |
| # Generate all 3,125 unique combinations (5^5) | |
| all_combos = list(itertools.product(possible_states, repeat=len(factors))) | |
| all_tasks = [] | |
| fixed_adx_settings = (st.session_state.use_adx_filter, st.session_state.adx_threshold, st.session_state.adx_period) | |
| fixed_markov_setup = st.session_state.get('best_markov_setup') | |
| fixed_exit_settings = { | |
| 'rsi_logic': st.session_state.rsi_logic, 'primary_driver': st.session_state.primary_driver, | |
| 'exit_logic': st.session_state.exit_logic_type, 'exit_thresh': st.session_state.exit_confidence_threshold, | |
| 'smart_trailing_stop': st.session_state.smart_trailing_stop_pct / 100.0, | |
| 'smart_exit_atr_period': st.session_state.smart_exit_atr_period, | |
| 'smart_exit_atr_multiplier': st.session_state.smart_exit_atr_multiplier, | |
| 'intelligent_tsl_pct': st.session_state.intelligent_tsl_pct / 100.0 | |
| } | |
| tickers_to_run = [col for col in master_df.columns if not ('_Volume' in str(col) or '_High' in str(col) or '_Low' in str(col))] | |
| date_range = (pd.Timestamp(st.session_state.start_date), pd.Timestamp(st.session_state.end_date)) | |
| for combo in all_combos: | |
| weights_5 = list(combo) | |
| toggles_5 = [w > 0.0 for w in weights_5] # If weight > 0, it's On. If 0.0, it's Off. | |
| # --- EXPAND TO 7 ITEMS (Insert False/0.0 for Trend and Markov) --- | |
| # Standard Order: RSI, Vol, TREND, Volume, MACD, MA Slope, Markov | |
| toggles_7 = ( | |
| toggles_5[0], toggles_5[1], False, toggles_5[2], toggles_5[3], toggles_5[4], False | |
| ) | |
| weights_7 = ( | |
| weights_5[0], weights_5[1], 0.0, weights_5[2], weights_5[3], weights_5[4], 0.0 | |
| ) | |
| # --- CHECKPOINT LOGIC --- | |
| toggles_str = tuple(["On" if t else "Off" for t in toggles_7]) | |
| weights_round = tuple([round(w, 2) for w in weights_7]) | |
| task_key = (toggles_str, weights_round) | |
| if task_key not in completed_configs: | |
| confidence_settings = { | |
| 'toggles': toggles_7, 'weights': weights_7, | |
| 'adx_settings': fixed_adx_settings, | |
| 'markov_setup': fixed_markov_setup, 'confidence_threshold': fixed_params['confidence_threshold'], | |
| 'long_95_percentile': 1.0, 'short_95_percentile': 1.0, 'veto_list': st.session_state.get('veto_setup_list', []), | |
| } | |
| confidence_settings.update(fixed_exit_settings) | |
| all_tasks.append({ | |
| 'params': fixed_params, | |
| 'confidence_settings': confidence_settings, | |
| 'master_df': master_df, | |
| 'optimise_for': optimise_for, | |
| 'tickers': tickers_to_run, | |
| 'date_range': date_range, | |
| 'power': 1 | |
| }) | |
| # --- Multiprocessing Execution Setup --- | |
| total_combinations_theoretical = 3125 # Corrected to 5^5 | |
| tasks_remaining = len(all_tasks) | |
| if tasks_remaining == 0: | |
| st.success(f"Optimisation complete. All {total_combinations_theoretical} combinations for {optimise_for.title()} have been processed."); | |
| pass | |
| current_progress_base = total_combinations_theoretical - tasks_remaining | |
| BATCH_SIZE = 50 # Smaller batch for smoother progress | |
| if tasks_remaining > 0: | |
| num_cores = cpu_count() | |
| st.info(f"Starting COMBINED optimisation ({optimise_for.title()}) on {num_cores} cores... {tasks_remaining} unique tasks remaining.") | |
| results_list = [] | |
| status_text = st.empty(); status_text.text("Optimisation starting...") | |
| progress_bar = st.progress(0) | |
| with Pool(processes=cpu_count()) as pool: | |
| try: | |
| iterator = pool.imap_unordered(run_single_parameter_test, all_tasks) | |
| for i, result_dict in enumerate(iterator, 1): | |
| results_list.append(result_dict) | |
| total_progress = (current_progress_base + i) / total_combinations_theoretical | |
| progress_bar.progress(total_progress, text=f"Optimising {optimise_for.title()}... {i}/{tasks_remaining} processed.") | |
| if i % BATCH_SIZE == 0 or i == tasks_remaining: | |
| factors_csv_order = ['RSI', 'Volatility', 'TREND', 'Volume', 'MACD', 'MA Slope', 'Markov'] | |
| def unpack_csv_config(row): | |
| conf_settings = row['confidence_settings'] | |
| result = {} | |
| togs = conf_settings['toggles'] | |
| wgts = conf_settings['weights'] | |
| for idx, name in enumerate(factors_csv_order): | |
| result[name] = "On" if togs[idx] else "Off" | |
| result[name + ' W'] = wgts[idx] | |
| return pd.Series(result) | |
| config_df = pd.DataFrame(results_list).apply(unpack_csv_config, axis=1) | |
| results_df_batch = pd.concat([pd.DataFrame(results_list).drop(['params', 'confidence_settings'], axis=1), config_df], axis=1) | |
| results_df_batch['Strategy Score'] = results_df_batch.apply( | |
| lambda row: calculate_strategy_score(row['Avg Profit/Trade'], row['Trade G/B Ratio'], row['Total Trades']), axis=1 | |
| ) | |
| write_header = not os.path.exists(results_file) | |
| results_df_batch.to_csv(results_file, mode='a', header=write_header, index=False) | |
| status_text.text(f"CHECKPOINT: Saved {i} new {optimise_for} combinations. Total: {current_progress_base + i} / {total_combinations_theoretical}") | |
| results_list = [] | |
| except Exception as e: | |
| st.error(f"FATAL ERROR during multiprocessing. Checkpoint saved. Error: {e}") | |
| status_text.text("Optimization stopped.") | |
| return | |
| status_text.empty() | |
| st.success(f"Optimization finished. All {optimise_for.title()} results saved.") | |
| # --- FINAL DISPLAY --- | |
| if os.path.exists(results_file): | |
| final_df = pd.read_csv(results_file) | |
| final_df = final_df.sort_values(by='Strategy Score', ascending=False) | |
| st.subheader(f"Top 20 Complete {optimise_for.title()} Setups:") | |
| display_cols = ['Strategy Score', 'Avg Profit/Trade', 'Trade G/B Ratio', 'Total Trades'] | |
| factors_display = ['RSI', 'Volatility', 'TREND', 'Volume', 'MACD', 'MA Slope', 'Markov'] | |
| display_cols.extend(factors_display) | |
| display_cols.extend([f + ' W' for f in factors_display]) | |
| display_cols = [c for c in display_cols if c in final_df.columns] | |
| display_df = final_df.head(20) | |
| st.dataframe(display_df[display_cols].style.format({ | |
| "Strategy Score": "{:.2f}%", | |
| "Avg Profit/Trade": "{:.2%}", | |
| "Trade G/B Ratio": "{:.2f}", | |
| })) | |
| else: | |
| st.info(f"No results found yet for {optimise_for.title()}.") | |
| # --- CORRECTED: Ensures ALL arguments are correctly passed to run_backtest --- | |
| def run_single_confidence_test(task, base_params, master_df, date_range, tickers_to_run, optimise_for, factor_weights): | |
| # Unpack the factor combo tuple (now has 7 elements) | |
| combo, threshold, _ = task | |
| use_rsi_combo, use_volatility_combo, use_trend_combo, use_volume_combo, use_macd_combo, use_ma_slope_combo, use_markov_combo = combo | |
| test_params = base_params.copy() | |
| test_params["confidence_threshold"] = threshold | |
| total_profit_weighted_avg, total_trades, winning_tickers, losing_tickers = 0, 0, 0, 0 | |
| all_confidences = [] | |
| total_wins, total_losses = 0, 0 | |
| PROFIT_THRESHOLD = 1.0 | |
| excluded_tickers_conf = [] | |
| # --- Get STATIC settings from factor_weights --- | |
| rsi_logic = factor_weights.get('rsi_logic', 'Crossover') | |
| use_adx_filter = factor_weights.get('use_adx', True) | |
| adx_threshold = factor_weights.get('adx_thresh', 25.0) | |
| adx_period = factor_weights.get('adx_period', 14) | |
| primary_driver = factor_weights.get('primary_driver', 'Bollinger Bands') | |
| markov_setup = factor_weights.get('markov_setup') | |
| exit_logic = factor_weights.get('exit_logic') | |
| exit_thresh = factor_weights.get('exit_thresh') | |
| smart_trailing_stop = factor_weights.get('smart_trailing_stop') | |
| smart_exit_atr_p = factor_weights.get('smart_exit_atr_period', 14) | |
| smart_exit_atr_m = factor_weights.get('smart_exit_atr_multiplier', 3.0) | |
| intelligent_tsl_pct = factor_weights.get('intelligent_tsl_pct', 0.60) | |
| # Get weights | |
| rsi_w = factor_weights.get('rsi', 1.0); vol_w = factor_weights.get('vol', 1.0) | |
| trend_w = factor_weights.get('trend', 1.0); volume_w = factor_weights.get('volume', 1.0) | |
| macd_w = factor_weights.get('macd', 1.0); ma_slope_w = factor_weights.get('ma_slope', 1.0) | |
| markov_w = factor_weights.get('markov', 1.0) | |
| for ticker in tickers_to_run: | |
| cols_to_use = [ticker] | |
| if f'{ticker}_High' in master_df.columns: cols_to_use.append(f'{ticker}_High') | |
| if f'{ticker}_Low' in master_df.columns: cols_to_use.append(f'{ticker}_Low') | |
| if f'{ticker}_Volume' in master_df.columns: cols_to_use.append(f'{ticker}_Volume') | |
| existing_cols = [col for col in cols_to_use if col in master_df.columns] | |
| if ticker not in existing_cols: continue | |
| ticker_data_full = master_df.loc[:, existing_cols] | |
| ticker_data = ticker_data_full.loc[date_range[0]:date_range[1]] | |
| rename_dict = {ticker: 'Close', f'{ticker}_High': 'High', f'{ticker}_Low': 'Low', f'{ticker}_Volume': 'Volume'} | |
| rename_dict_filtered = {k: v for k, v in rename_dict.items() if k in existing_cols} | |
| ticker_data = ticker_data.rename(columns=rename_dict_filtered) | |
| if not ticker_data.empty and 'Close' in ticker_data.columns and not ticker_data['Close'].isna().all(): | |
| # --- CORRECTED run_backtest CALL --- | |
| long_pnl, short_pnl, avg_long_trade, avg_short_trade, _, trades, _, trade_counts, _, _ = run_backtest( | |
| ticker_data, test_params, | |
| use_rsi_combo, use_volatility_combo, use_trend_combo, use_volume_combo, use_macd_combo, use_ma_slope_combo, use_markov_combo, | |
| rsi_w, vol_w, trend_w, volume_w, macd_w, ma_slope_w, markov_w, | |
| use_adx_filter, adx_threshold, | |
| rsi_logic, | |
| adx_period, | |
| veto_setups_list=None, | |
| primary_driver=primary_driver, | |
| markov_setup=markov_setup, | |
| exit_logic_type=exit_logic, | |
| exit_confidence_threshold=exit_thresh, | |
| smart_trailing_stop_pct=smart_trailing_stop, | |
| smart_exit_atr_period=smart_exit_atr_p, | |
| smart_exit_atr_multiplier=smart_exit_atr_m, | |
| intelligent_tsl_pct=intelligent_tsl_pct | |
| ) | |
| if abs(long_pnl) > PROFIT_THRESHOLD or abs(short_pnl) > PROFIT_THRESHOLD or \ | |
| (avg_long_trade is not None and pd.notna(avg_long_trade) and abs(avg_long_trade) > PROFIT_THRESHOLD) or \ | |
| (avg_short_trade is not None and pd.notna(avg_short_trade) and abs(avg_short_trade) > PROFIT_THRESHOLD): | |
| excluded_tickers_conf.append(ticker); continue | |
| if optimise_for == 'long': | |
| pnl, avg_trade_profit, trade_log = long_pnl, avg_long_trade, trades[0] | |
| total_wins += trade_counts[0]; total_losses += trade_counts[1] | |
| else: | |
| pnl, avg_trade_profit, trade_log = short_pnl, avg_short_trade, trades[2] | |
| total_wins += trade_counts[2]; total_losses += trade_counts[3] | |
| num_trades = len(trade_log) | |
| if num_trades > 0 and avg_trade_profit is not None and pd.notna(avg_trade_profit): | |
| total_trades += num_trades | |
| total_profit_weighted_avg += avg_trade_profit * num_trades | |
| if pnl > 0: winning_tickers += 1 | |
| elif pnl < 0: losing_tickers += 1 | |
| all_confidences.extend([trade['confidence'] for trade in trade_log if pd.notna(trade.get('confidence'))]) | |
| # --- [NEW SCORING LOGIC] --- | |
| overall_avg_profit = 0.0 | |
| ticker_good_bad_ratio = 0.0 | |
| badness_score = 0.0 | |
| if total_trades > 0: | |
| overall_avg_profit = total_profit_weighted_avg / total_trades | |
| if losing_tickers > 0: | |
| ticker_good_bad_ratio = winning_tickers / losing_tickers | |
| elif winning_tickers > 0: | |
| ticker_good_bad_ratio = 99999.0 | |
| if winning_tickers > 0 and overall_avg_profit < 0: | |
| badness_score = (losing_tickers / winning_tickers) * abs(overall_avg_profit) | |
| avg_entry_confidence = np.mean(all_confidences) if all_confidences else 0.0 | |
| trade_good_bad_ratio = total_wins / total_losses if total_losses > 0 else 99999.0 | |
| if pd.isna(avg_entry_confidence): avg_entry_confidence = 0.0 | |
| # Use the new 3-Factor Score | |
| # We pass TRADE G/B ratio as the second argument because it is more granular/reliable | |
| norm_score = calculate_strategy_score(overall_avg_profit, trade_good_bad_ratio, total_trades) | |
| # For sorting consistency, we set "Good Score" to the same value as Norm Score | |
| raw_score = norm_score | |
| return { | |
| "RSI": use_rsi_combo, "Volatility": use_volatility_combo, "TREND": use_trend_combo, "Volume": use_volume_combo, | |
| "MACD": use_macd_combo, "MA Slope": use_ma_slope_combo, "Markov": use_markov_combo, | |
| "Conf. Threshold": threshold, "Avg Profit/Trade": overall_avg_profit if pd.notna(overall_avg_profit) else 0.0, | |
| "Ticker G/B Ratio": ticker_good_bad_ratio if pd.notna(ticker_good_bad_ratio) else 0.0, | |
| "Trade G/B Ratio": trade_good_bad_ratio if pd.notna(trade_good_bad_ratio) else 0.0, | |
| "Winning Tickers": winning_tickers, "Losing Tickers": losing_tickers, | |
| "Avg Entry Conf.": avg_entry_confidence, | |
| "Good Score": raw_score if pd.notna(raw_score) else 0.0, | |
| "Bad Score": badness_score if pd.notna(badness_score) else 0.0, | |
| "Norm. Score %": norm_score, | |
| "Total Trades": total_trades | |
| } | |
| def run_single_weight_test(confidence_settings, base_params, master_df, optimise_for, tickers, date_range, power): | |
| """ | |
| Worker function for weight optimisation. | |
| 'base_params' (MA, BB, etc.) are fixed. | |
| 'confidence_settings' (weights) are variable. | |
| """ | |
| total_profit_weighted_avg, total_trades, winning_tickers, losing_tickers = 0, 0, 0, 0 | |
| total_wins, total_losses = 0, 0 | |
| all_confidences = [] | |
| # --- UNPACK all settings from the variable dictionary --- | |
| use_rsi, use_vol, use_trend, use_volume, use_macd, use_ma_slope, use_markov = confidence_settings['toggles'] | |
| rsi_w, vol_w, trend_w, volume_w, macd_w, ma_slope_w, markov_w = confidence_settings['weights'] | |
| use_adx_filter, adx_threshold, adx_period = confidence_settings['adx_settings'] # <-- MODIFIED | |
| rsi_logic = confidence_settings['rsi_logic'] | |
| primary_driver = confidence_settings['primary_driver'] | |
| markov_setup = confidence_settings['markov_setup'] | |
| exit_logic = confidence_settings['exit_logic'] | |
| exit_thresh = confidence_settings['exit_thresh'] | |
| smart_trailing_stop = confidence_settings['smart_trailing_stop'] | |
| smart_exit_atr_p = confidence_settings['smart_exit_atr_period'] | |
| smart_exit_atr_m = confidence_settings['smart_exit_atr_multiplier'] | |
| intelligent_tsl_pct = confidence_settings['intelligent_tsl_pct'] # <-- ADDED | |
| # --- End Unpack --- | |
| PROFIT_THRESHOLD = 1.0 | |
| if not isinstance(tickers, list): tickers = [tickers] | |
| for ticker in tickers: | |
| cols_to_use = [ticker] | |
| if f'{ticker}_High' in master_df.columns: cols_to_use.append(f'{ticker}_High') | |
| if f'{ticker}_Low' in master_df.columns: cols_to_use.append(f'{ticker}_Low') | |
| if f'{ticker}_Volume' in master_df.columns: cols_to_use.append(f'{ticker}_Volume') | |
| existing_cols = [col for col in cols_to_use if col in master_df.columns] | |
| if ticker not in existing_cols: continue | |
| ticker_data_full = master_df.loc[:, existing_cols] | |
| ticker_data = ticker_data_full.loc[date_range[0]:date_range[1]] | |
| rename_dict = { | |
| ticker: 'Close', f'{ticker}_High': 'High', | |
| f'{ticker}_Low': 'Low', f'{ticker}_Volume': 'Volume' | |
| } | |
| rename_dict_filtered = {k: v for k, v in rename_dict.items() if k in existing_cols} | |
| ticker_data = ticker_data.rename(columns=rename_dict_filtered) | |
| if not ticker_data.empty and 'Close' in ticker_data.columns and not ticker_data['Close'].isna().all(): | |
| long_pnl, short_pnl, avg_long_trade, avg_short_trade, _, trades, _, trade_counts, _, _ = run_backtest( | |
| ticker_data, base_params, | |
| use_rsi, use_vol, use_trend, use_volume, use_macd, use_ma_slope, use_markov, | |
| rsi_w, vol_w, trend_w, volume_w, macd_w, ma_slope_w, markov_w, | |
| use_adx_filter, adx_threshold, rsi_logic, | |
| adx_period, # <-- ADDED | |
| veto_setups_list=None, # Veto is OFF during optimisation | |
| primary_driver=primary_driver, | |
| markov_setup=markov_setup, | |
| exit_logic_type=exit_logic, | |
| exit_confidence_threshold=exit_thresh, | |
| smart_trailing_stop_pct=smart_trailing_stop, | |
| smart_exit_atr_period=smart_exit_atr_p, | |
| smart_exit_atr_multiplier=smart_exit_atr_m, | |
| intelligent_tsl_pct=intelligent_tsl_pct # <-- ADDED | |
| ) | |
| if abs(long_pnl) > PROFIT_THRESHOLD or abs(short_pnl) > PROFIT_THRESHOLD or \ | |
| (avg_long_trade is not None and pd.notna(avg_long_trade) and abs(avg_long_trade) > PROFIT_THRESHOLD) or \ | |
| (avg_short_trade is not None and pd.notna(avg_short_trade) and abs(avg_short_trade) > PROFIT_THRESHOLD): | |
| continue # Skip outlier tickers | |
| if optimise_for == 'long': | |
| pnl, avg_trade_profit, trade_log = long_pnl, avg_long_trade, trades[0] | |
| total_wins += trade_counts[0]; total_losses += trade_counts[1] | |
| else: | |
| pnl, avg_trade_profit, trade_log = short_pnl, avg_short_trade, trades[2] | |
| total_wins += trade_counts[2]; total_losses += trade_counts[3] | |
| num_trades = len(trade_log) | |
| if num_trades > 0 and avg_trade_profit is not None and pd.notna(avg_trade_profit): | |
| total_trades += num_trades | |
| total_profit_weighted_avg += avg_trade_profit * num_trades | |
| if pnl > 0: winning_tickers += 1 | |
| elif pnl < 0: losing_tickers += 1 | |
| all_confidences.extend([trade['confidence'] for trade in trade_log if pd.notna(trade.get('confidence'))]) | |
| overall_avg_profit = 0.0 | |
| good_bad_ratio = 0.0 | |
| trade_good_bad_ratio = 0.0 | |
| if total_trades > 0: | |
| overall_avg_profit = total_profit_weighted_avg / total_trades | |
| if losing_tickers > 0: | |
| good_bad_ratio = winning_tickers / losing_tickers | |
| elif winning_tickers > 0: | |
| good_bad_ratio = 99999.0 | |
| if total_losses > 0: | |
| trade_good_bad_ratio = total_wins / total_losses | |
| elif total_wins > 0: | |
| trade_good_bad_ratio = 99999.0 | |
| avg_entry_confidence = np.mean(all_confidences) if all_confidences else 0.0 | |
| weights_tested = { | |
| "rsi_w": rsi_w, "vol_w": vol_w, "trend_w": trend_w, | |
| "volume_w": volume_w, "macd_w": macd_w, "ma_slope_w": ma_slope_w, | |
| "markov_w": markov_w | |
| } | |
| return { | |
| "weights": weights_tested, | |
| "Avg Profit/Trade": overall_avg_profit, | |
| "Ticker G/B Ratio": good_bad_ratio, | |
| "Trade G/B Ratio": trade_good_bad_ratio, | |
| "Total Trades": total_trades, | |
| "Avg Entry Conf.": avg_entry_confidence, | |
| "Winning Tickers": winning_tickers, | |
| "Losing Tickers": losing_tickers | |
| } | |
| def run_advisor_scan(main_df, setups_to_run, advisor_type="Advisor"): | |
| """ | |
| Scans tickers for open trades using a list of setups. | |
| Updated to provide real-time granular feedback per ticker. | |
| """ | |
| st.info(f"Scanning tickers for open trades based on {len(setups_to_run)} {advisor_type} setups...") | |
| base_params = { | |
| "large_ma_period": st.session_state.ma_period, | |
| "bband_period": st.session_state.bb_period, | |
| "bband_std_dev": st.session_state.bb_std, | |
| "long_entry_threshold_pct": st.session_state.long_entry / 100, | |
| "long_exit_ma_threshold_pct": st.session_state.long_exit / 100, | |
| "long_trailing_stop_loss_pct": st.session_state.long_sl / 100, | |
| "long_delay_days": st.session_state.long_delay, | |
| "short_entry_threshold_pct": st.session_state.short_entry / 100, | |
| "short_exit_ma_threshold_pct": st.session_state.short_exit / 100, | |
| "short_trailing_stop_loss_pct": st.session_state.short_sl / 100, | |
| "short_delay_days": st.session_state.short_delay | |
| } | |
| lookbacks = [ | |
| base_params.get('large_ma_period', 50), | |
| base_params.get('bband_period', 20), | |
| 200, 50, 26, 14, | |
| st.session_state.adx_period | |
| ] | |
| markov_setup_to_use = None | |
| if st.session_state.primary_driver == 'Markov State' or st.session_state.use_markov: | |
| if 'best_markov_setup' in st.session_state and st.session_state.best_markov_setup: | |
| markov_setup_to_use = st.session_state.best_markov_setup | |
| if st.session_state.primary_driver == 'Markov State': | |
| st.info("Using saved Best Markov Setup as Primary Driver.") | |
| lookbacks.append(markov_setup_to_use.get('Run-Up Period', 10)) | |
| else: | |
| st.error("Markov State is active, but no Best Markov Setup found. Run Section 7 first.") | |
| st.stop() | |
| if advisor_type == "User-Defined": | |
| for setup in setups_to_run: | |
| try: | |
| lookbacks.append(int(setup.get("Large MA Period", 50))) | |
| lookbacks.append(int(setup.get("Bollinger Band Period", 20))) | |
| except (ValueError, TypeError): | |
| lookbacks.append(50) | |
| lookbacks.append(20) | |
| max_lookback_period = max(lookbacks) | |
| active_scan_days = 120 | |
| buffer_days = 10 | |
| total_days_needed = max_lookback_period + active_scan_days + buffer_days | |
| scan_end_date = pd.Timestamp(st.session_state.end_date) | |
| scan_start_date = scan_end_date - timedelta(days=total_days_needed) | |
| earliest_data_date = main_df.index.min() | |
| if scan_start_date < earliest_data_date: scan_start_date = earliest_data_date | |
| st.caption(f"Analysing data from {scan_start_date.date()} to {scan_end_date.date()}.") | |
| factor_settings = { | |
| "use_adx": st.session_state.use_adx_filter, "adx_thresh": st.session_state.adx_threshold, | |
| "adx_period": st.session_state.adx_period, | |
| "rsi_logic": st.session_state.get('rsi_logic', 'Crossover'), | |
| "primary_driver": st.session_state.get('primary_driver', 'Bollinger Bands'), | |
| "markov_setup": markov_setup_to_use, | |
| "exit_logic": st.session_state.exit_logic_type, | |
| "exit_thresh": st.session_state.exit_confidence_threshold, | |
| "smart_trailing_stop": st.session_state.smart_trailing_stop_pct / 100.0, | |
| "rsi_w": st.session_state.rsi_w, "vol_w": st.session_state.vol_w, | |
| "trend_w": st.session_state.trend_w, "volume_w": st.session_state.volume_w, | |
| "macd_w": st.session_state.macd_w, "ma_slope_w": st.session_state.ma_slope_w, | |
| "markov_w": st.session_state.markov_w, | |
| 'smart_exit_atr_period': st.session_state.smart_exit_atr_period, | |
| 'smart_exit_atr_multiplier': st.session_state.smart_exit_atr_multiplier, | |
| 'intelligent_tsl_pct': st.session_state.intelligent_tsl_pct / 100.0 | |
| } | |
| all_advisor_trades = [] | |
| ticker_list = sorted([col for col in main_df.columns if not ('_Volume' in str(col) or '_High' in str(col) or '_Low' in str(col))]) | |
| # --- [NEW] Better Progress Tracking --- | |
| num_setups = len(setups_to_run) | |
| num_tickers = len(ticker_list) | |
| total_ops = num_setups * num_tickers | |
| current_op = 0 | |
| progress_bar = st.progress(0, text="Initializing scan...") | |
| status_text = st.empty() # Placeholder for rolling ticker names | |
| for i, setup in enumerate(setups_to_run): | |
| if advisor_type == "Top Setups": | |
| params_for_run = base_params.copy() | |
| params_for_run['confidence_threshold'] = setup.get('Conf. Threshold', 50) | |
| setup_use_rsi = setup.get('RSI', 'Off') == 'On' | |
| setup_use_vol = setup.get('Volatility', 'Off') == 'On' | |
| setup_use_trend = setup.get('TREND', 'Off') == 'On' | |
| setup_use_volume = setup.get('Volume', 'Off') == 'On' | |
| setup_use_macd = setup.get('MACD', 'Off') == 'On' | |
| setup_use_ma_slope = setup.get('MA Slope', 'Off') == 'On' | |
| setup_use_markov = setup.get('Markov', 'Off') == 'On' | |
| scan_rsi_w = factor_settings['rsi_w']; scan_vol_w = factor_settings['vol_w'] | |
| scan_trend_w = factor_settings['trend_w']; scan_volume_w = factor_settings['volume_w'] | |
| scan_macd_w = factor_settings['macd_w']; scan_ma_slope_w = factor_settings['ma_slope_w'] | |
| scan_markov_w = factor_settings['markov_w'] | |
| elif advisor_type == "User-Defined": | |
| try: ma_p = int(setup.get("Large MA Period", 50)) | |
| except: ma_p = 50 | |
| try: bb_p = int(setup.get("Bollinger Band Period", 20)) | |
| except: bb_p = 20 | |
| try: long_d = int(setup.get("Long Delay (Days)", 0)) | |
| except: long_d = 0 | |
| try: short_d = int(setup.get("Short Delay (Days)", 0)) | |
| except: short_d = 0 | |
| catcher_pct = setup.get("Catcher Offset (%)", 3.0) | |
| try: catcher_decimal = float(catcher_pct) / 100.0 | |
| except: catcher_decimal = 0.03 | |
| params_for_run = { | |
| "large_ma_period": ma_p, "bband_period": bb_p, "long_delay_days": long_d, "short_delay_days": short_d, | |
| "bband_std_dev": setup.get("Bollinger Band Std Dev", 2.0), | |
| "confidence_threshold": setup.get("Conf. Threshold", 50), | |
| "catcher_stop_pct": catcher_decimal, | |
| "long_entry_threshold_pct": setup.get("Long Entry Threshold (%)", 0.0) / 100.0, | |
| "long_exit_ma_threshold_pct": setup.get("Long Exit Threshold (%)", 0.0) / 100.0, | |
| "long_trailing_stop_loss_pct": setup.get("Long Stop Loss (%)", 0.0) / 100.0, | |
| "short_entry_threshold_pct": setup.get("Short Entry Threshold (%)", 0.0) / 100.0, | |
| "short_exit_ma_threshold_pct": setup.get("Short Exit Threshold (%)", 0.0) / 100.0, | |
| "short_trailing_stop_loss_pct": setup.get("Short Stop Loss (%)", 0.0) / 100.0, | |
| "use_ma_floor_filter": st.session_state.use_ma_floor_filter # Pass this through | |
| } | |
| def get_weight_toggle(val): | |
| try: | |
| w = float(val) | |
| if w > 0: return w, True | |
| except: pass | |
| return 0.0, False | |
| scan_rsi_w, setup_use_rsi = get_weight_toggle(setup.get('RSI', 'Off')) | |
| scan_vol_w, setup_use_vol = get_weight_toggle(setup.get('Volatility', 'Off')) | |
| scan_trend_w, setup_use_trend = get_weight_toggle(setup.get('TREND', 'Off')) | |
| scan_volume_w, setup_use_volume = get_weight_toggle(setup.get('Volume', 'Off')) | |
| scan_macd_w, setup_use_macd = get_weight_toggle(setup.get('MACD', 'Off')) | |
| scan_ma_slope_w, setup_use_ma_slope = get_weight_toggle(setup.get('MA Slope', 'Off')) | |
| scan_markov_w, setup_use_markov = get_weight_toggle(setup.get('Markov', 'Off')) | |
| else: | |
| continue | |
| # --- INNER LOOP --- | |
| for ticker_symbol in ticker_list: | |
| # Update progress per ticker | |
| current_op += 1 | |
| if current_op % 10 == 0: # Update visual every 10 items to be faster | |
| progress_bar.progress(current_op / total_ops, text=f"Setup {i+1}/{num_setups}: Scanning {ticker_symbol}...") | |
| cols_to_use = [ticker_symbol] | |
| if f'{ticker_symbol}_High' in main_df.columns: cols_to_use.append(f'{ticker_symbol}_High') | |
| if f'{ticker_symbol}_Low' in main_df.columns: cols_to_use.append(f'{ticker_symbol}_Low') | |
| if f'{ticker_symbol}_Volume' in main_df.columns: cols_to_use.append(f'{ticker_symbol}_Volume') | |
| existing_cols = [col for col in cols_to_use if col in main_df.columns] | |
| if ticker_symbol not in existing_cols: continue | |
| data_for_backtest_full = main_df.loc[:, existing_cols] | |
| data_for_scan = data_for_backtest_full.loc[scan_start_date:scan_end_date] | |
| rename_dict = {ticker_symbol: 'Close', f'{ticker_symbol}_High': 'High', f'{ticker_symbol}_Low': 'Low', f'{ticker_symbol}_Volume': 'Volume'} | |
| rename_dict_filtered = {k: v for k, v in rename_dict.items() if k in existing_cols} | |
| data_for_scan = data_for_scan.rename(columns=rename_dict_filtered) | |
| if advisor_type == "User-Defined": | |
| current_lookback = max(params_for_run.get('large_ma_period', 50), params_for_run.get('bband_period', 20), 200, 50, 26, 14, factor_settings['adx_period']) | |
| else: | |
| current_lookback = max_lookback_period | |
| if (factor_settings['primary_driver'] == 'Markov State' or setup_use_markov) and factor_settings['markov_setup']: | |
| current_lookback = max(current_lookback, factor_settings['markov_setup'].get('Run-Up Period', 10)) | |
| if not data_for_scan.empty and 'Close' in data_for_scan.columns and not data_for_scan['Close'].isna().all() and len(data_for_scan) >= current_lookback : | |
| try: | |
| # Added missing MFI and SuperTrend arguments (set to False/1.0) --- | |
| _, _, _, _, _, _, open_trades, _, _, _, _ = run_backtest( | |
| data_for_scan, params_for_run, | |
| setup_use_rsi, setup_use_vol, setup_use_trend, setup_use_volume, setup_use_macd, setup_use_ma_slope, setup_use_markov, | |
| False, False, # use_mfi, use_supertrend (Default Off for Advisor scans) | |
| scan_rsi_w, scan_vol_w, scan_trend_w, scan_volume_w, scan_macd_w, scan_ma_slope_w, scan_markov_w, | |
| 1.0, 1.0, # mfi_w, supertrend_w (Default 1.0) | |
| factor_settings['use_adx'], factor_settings['adx_thresh'], factor_settings['rsi_logic'], factor_settings['adx_period'], | |
| veto_setups_list=None, | |
| primary_driver=factor_settings['primary_driver'], markov_setup=factor_settings['markov_setup'], | |
| exit_logic_type=factor_settings['exit_logic'], exit_confidence_threshold=factor_settings['exit_thresh'], | |
| smart_trailing_stop_pct=factor_settings['smart_trailing_stop'], | |
| smart_exit_atr_period=factor_settings['smart_exit_atr_period'], | |
| smart_exit_atr_multiplier=factor_settings['smart_exit_atr_multiplier'], | |
| intelligent_tsl_pct=factor_settings['intelligent_tsl_pct'] | |
| ) | |
| except Exception as e: | |
| print(f"Error during advisor backtest for {ticker_symbol} with setup {i+1}: {e}") | |
| continue | |
| if open_trades: | |
| for trade in open_trades: | |
| trade['Ticker'] = ticker_symbol | |
| trade['Setup Rank'] = i + 1 | |
| if advisor_type == "Top Setups": | |
| trade['Setup G/B Ratio'] = setup.get('Ticker G/B Ratio', np.nan) | |
| all_advisor_trades.append(trade) | |
| progress_bar.empty() | |
| status_text.empty() | |
| print(f"\nFound {len(all_advisor_trades)} total trades before de-duplication.") | |
| best_trades = {} | |
| for trade in all_advisor_trades: | |
| try: | |
| trade_key = (trade.get('Ticker'), trade.get('Entry Date'), trade.get('Trade Type')) | |
| except Exception as e: | |
| print(f"Warning: Could not create trade key for {trade}. Error: {e}") | |
| continue | |
| try: | |
| current_rank = int(trade.get('Setup Rank', 999)) | |
| except (ValueError, TypeError): | |
| current_rank = 999 | |
| if trade_key not in best_trades: | |
| best_trades[trade_key] = trade | |
| else: | |
| existing_rank = int(best_trades[trade_key].get('Setup Rank', 999)) | |
| if current_rank < existing_rank: | |
| best_trades[trade_key] = trade | |
| deduplicated_trades = list(best_trades.values()) | |
| if all_advisor_trades: | |
| raw_advisor_df = pd.DataFrame(all_advisor_trades) | |
| deduped_advisor_df = pd.DataFrame(deduplicated_trades) | |
| cols_order = ['Ticker', 'Status', 'Setup Rank', 'Final % P/L', 'Side', 'Date Open', 'Date Closed', 'Start Confidence'] | |
| if advisor_type == "Top Setups": | |
| cols_order.append('Setup G/B Ratio') | |
| if advisor_type == "User-Defined": | |
| param_keys = [k for k in setups_to_run[0].keys() if k not in ["RSI", "Volatility", "TREND", "Volume", "MACD", "MA Slope", "Conf. Threshold", "Markov"]] | |
| cols_order.extend(param_keys) | |
| setups_df = pd.DataFrame(setups_to_run) | |
| setups_df['Setup Rank'] = setups_df.index + 1 | |
| raw_advisor_df = pd.merge(raw_advisor_df, setups_df, on='Setup Rank', how='left') | |
| deduped_advisor_df = pd.merge(deduped_advisor_df, setups_df, on='Setup Rank', how='left') | |
| existing_cols_final = [col for col in cols_order if col in raw_advisor_df.columns] | |
| st.session_state.raw_df = raw_advisor_df[existing_cols_final].sort_values(by=['Status', 'Date Open'], ascending=[True, False]) | |
| st.session_state.deduped_df = deduped_advisor_df[existing_cols_final].sort_values(by=['Status', 'Date Open'], ascending=[True, False]) | |
| st.session_state.advisor_type = advisor_type | |
| else: | |
| st.session_state.raw_df = None | |
| st.session_state.deduped_df = None | |
| st.session_state.advisor_type = advisor_type | |
| st.session_state.run_advanced_advisor = False | |
| st.session_state.run_user_advisor_setup = False | |
| st.session_state.run_scan_user_setups = False | |
| st.rerun() | |
| def generate_and_run_weight_optimisation(master_df, main_content_placeholder, side, use_sq_weighting): | |
| """ | |
| Runs weight optimisation using multiprocessing. | |
| """ | |
| st.session_state.summary_df = None | |
| st.session_state.single_ticker_results = None | |
| st.session_state.confidence_results_df = None | |
| st.session_state.open_trades_df = None | |
| st.session_state.advisor_df = None | |
| st.session_state.best_params = None | |
| st.session_state.best_weights = None | |
| with main_content_placeholder.container(): | |
| # Get FIXED strategy parameters | |
| base_params = { | |
| "large_ma_period": st.session_state.ma_period, "bband_period": st.session_state.bb_period, | |
| "bband_std_dev": st.session_state.bb_std, "long_entry_threshold_pct": st.session_state.long_entry / 100, | |
| "long_exit_ma_threshold_pct": st.session_state.long_exit / 100, | |
| "long_trailing_stop_loss_pct": st.session_state.long_sl / 100, | |
| "long_delay_days": st.session_state.long_delay, "short_entry_threshold_pct": st.session_state.short_entry / 100, | |
| "short_exit_ma_threshold_pct": st.session_state.short_exit / 100, | |
| "short_trailing_stop_loss_pct": st.session_state.short_sl / 100, | |
| "short_delay_days": st.session_state.short_delay, "confidence_threshold": st.session_state.confidence_slider | |
| } | |
| # Get FIXED confidence settings | |
| fixed_toggles = (st.session_state.use_rsi, st.session_state.use_vol, st.session_state.use_trend, | |
| st.session_state.use_volume, st.session_state.use_macd, st.session_state.use_ma_slope, st.session_state.use_markov) | |
| fixed_adx_settings = (st.session_state.use_adx_filter, st.session_state.adx_threshold, st.session_state.adx_period) | |
| fixed_rsi_logic = st.session_state.rsi_logic | |
| fixed_primary_driver = st.session_state.primary_driver | |
| fixed_markov_setup = st.session_state.get('best_markov_setup') | |
| fixed_exit_logic = st.session_state.exit_logic_type | |
| fixed_exit_thresh = st.session_state.exit_confidence_threshold | |
| fixed_smart_trailing_stop = st.session_state.smart_trailing_stop_pct / 100.0 | |
| smart_atr_p = st.session_state.smart_exit_atr_period | |
| smart_atr_m = st.session_state.smart_exit_atr_multiplier | |
| # Check if key exists, default to 0.6 if not (backward compatibility) | |
| fixed_intelligent_tsl = st.session_state.get('intelligent_tsl_pct', 60.0) / 100.0 | |
| # Dynamic Weight Optimisation Logic | |
| weight_step = 0.5 | |
| weight_range = np.arange(0.5, 2.0 + weight_step, weight_step) | |
| all_factor_info = { | |
| 'RSI': {'toggle_key': 'use_rsi', 'weight_key': 'rsi_w'}, | |
| 'Volatility': {'toggle_key': 'use_vol', 'weight_key': 'vol_w'}, | |
| 'TREND': {'toggle_key': 'use_trend', 'weight_key': 'trend_w'}, | |
| 'Volume': {'toggle_key': 'use_volume', 'weight_key': 'volume_w'}, | |
| 'MACD': {'toggle_key': 'use_macd', 'weight_key': 'macd_w'}, | |
| 'MA Slope': {'toggle_key': 'use_ma_slope', 'weight_key': 'ma_slope_w'}, | |
| 'Markov': {'toggle_key': 'use_markov', 'weight_key': 'markov_w'} | |
| } | |
| driver_map = {'RSI Crossover': 'RSI', 'MACD Crossover': 'MACD', 'MA Slope': 'MA Slope', 'Markov State': 'Markov', 'Bollinger Bands': None} | |
| primary_factor_key = driver_map.get(fixed_primary_driver) | |
| factors_to_optimise = [] | |
| for factor_key, info in all_factor_info.items(): | |
| is_active = st.session_state.get(info['toggle_key'], False) | |
| is_primary = (factor_key == primary_factor_key) | |
| if factor_key == 'Markov' and not fixed_markov_setup: | |
| st.warning("Skipping Markov weight optimisation: No 'Best Markov Setup' found. Please run Section 7 first.") | |
| is_active = False | |
| if is_active and not is_primary: | |
| factors_to_optimise.append(factor_key) | |
| if not factors_to_optimise: | |
| st.warning("No active, non-primary factors to optimise. Toggle some factors 'On' in Section 2.") | |
| st.session_state.run_weight_optimisation = False | |
| return | |
| st.info(f"Optimising weights for: {', '.join(factors_to_optimise)}") | |
| weight_product = itertools.product(weight_range, repeat=len(factors_to_optimise)) | |
| base_weight_keys = ('rsi_w', 'vol_w', 'trend_w', 'volume_w', 'macd_w', 'ma_slope_w', 'markov_w') | |
| base_weights_tuple = (st.session_state.rsi_w, st.session_state.vol_w, st.session_state.trend_w, st.session_state.volume_w, st.session_state.macd_w, st.session_state.ma_slope_w, st.session_state.markov_w) | |
| confidence_combinations = [] | |
| for weight_tuple in weight_product: | |
| current_weights_map = dict(zip(base_weight_keys, base_weights_tuple)) | |
| for factor_key, new_weight_value in zip(factors_to_optimise, weight_tuple): | |
| weight_key = all_factor_info[factor_key]['weight_key'] | |
| current_weights_map[weight_key] = new_weight_value | |
| final_weights_tuple = (current_weights_map['rsi_w'], current_weights_map['vol_w'], current_weights_map['trend_w'], current_weights_map['volume_w'], current_weights_map['macd_w'], current_weights_map['ma_slope_w'], current_weights_map['markov_w']) | |
| confidence_combinations.append({ | |
| 'toggles': fixed_toggles, 'weights': final_weights_tuple, 'adx_settings': fixed_adx_settings, | |
| 'rsi_logic': fixed_rsi_logic, 'primary_driver': fixed_primary_driver, 'markov_setup': fixed_markov_setup, | |
| 'exit_logic': fixed_exit_logic, 'exit_thresh': fixed_exit_thresh, 'smart_trailing_stop': fixed_smart_trailing_stop, | |
| 'smart_exit_atr_period': smart_atr_p, 'smart_exit_atr_multiplier': smart_atr_m, 'intelligent_tsl_pct': fixed_intelligent_tsl | |
| }) | |
| total_combinations = len(confidence_combinations) | |
| num_cores = cpu_count() | |
| st.info(f"Starting {side.upper()} weight optimisation on {num_cores} cores... Testing {total_combinations} weight combinations.") | |
| if st.session_state.run_mode.startswith("Analyse Full List"): | |
| tickers_to_run = [col for col in master_df.columns if not ('_Volume' in str(col) or '_High' in str(col) or '_Low' in str(col))] | |
| else: | |
| tickers_to_run = [st.session_state.ticker_select] | |
| date_range = (pd.Timestamp(st.session_state.start_date), pd.Timestamp(st.session_state.end_date)) | |
| power = 2 if use_sq_weighting else 1 | |
| status_text = st.empty(); status_text.text("Optimisation starting...") | |
| progress_bar = st.progress(0) | |
| worker_func = partial(run_single_weight_test, base_params=base_params, master_df=master_df, optimise_for=side, tickers=tickers_to_run, date_range=date_range, power=power) | |
| results_list = [] | |
| with Pool(processes=num_cores) as pool: | |
| try: | |
| iterator = pool.imap_unordered(worker_func, confidence_combinations) | |
| for i, result_dict in enumerate(iterator, 1): | |
| results_list.append(result_dict) | |
| progress_bar.progress(i / total_combinations, text=f"Optimising... {i}/{total_combinations} combinations complete.") | |
| except Exception as e: | |
| st.error(f"An error occurred during multiprocessing: {e}") | |
| status_text.text("Optimisation failed due to an error."); st.session_state.run_weight_optimisation = False; return | |
| status_text.text("Optimisation complete. Calculating scores...") | |
| if not results_list: | |
| st.warning("Optimisation finished, but no valid results were found."); st.session_state.run_weight_optimisation = False; return | |
| results_df = pd.DataFrame(results_list) | |
| weights_df = results_df['weights'].apply(pd.Series) | |
| results_df = pd.concat([results_df.drop('weights', axis=1), weights_df], axis=1) | |
| min_trades_threshold = 10 | |
| if 'Total Trades' in results_df.columns: results_df = results_df[results_df['Total Trades'] >= min_trades_threshold].copy() | |
| if results_df.empty: | |
| st.warning(f"No results found with at least {min_trades_threshold} trades."); st.session_state.run_weight_optimisation = False; return | |
| results_df['Strategy Score'] = results_df.apply(lambda row: calculate_strategy_score(row['Avg Profit/Trade'], row['Ticker G/B Ratio'], row['Total Trades']), axis=1) | |
| results_df = results_df.sort_values(by='Strategy Score', ascending=False) | |
| best_setup_series = results_df.iloc[0] | |
| best_metric = best_setup_series['Strategy Score'] | |
| best_weights_dict = {key: float(best_setup_series[key]) for key in base_weight_keys} | |
| status_text.empty() | |
| st.success(f"Weight Optimisation Complete! Best Strategy Score: {best_metric:.2f}%") | |
| st.subheader("Optimal Weights Found (based on Strategy Score):"); st.json(best_weights_dict) | |
| st.session_state.best_weights = best_weights_dict | |
| st.subheader("Full Weight Optimisation Results:") | |
| display_cols = ["Strategy Score", "Avg Profit/Trade", "Ticker G/B Ratio", "Total Trades"] + list(base_weight_keys) | |
| display_cols = [col for col in display_cols if col in results_df.columns] | |
| formatters = {"Strategy Score": "{:.2f}%", "Avg Profit/Trade": "{:.2%}", "Ticker G/B Ratio": "{:.2f}"} | |
| for w_key in base_weight_keys: formatters[w_key] = "{:.1f}" | |
| st.dataframe(results_df[display_cols].style.format(formatters)) | |
| st.session_state.run_weight_optimisation = False | |
| def apply_best_weights_to_widgets(): | |
| """Loads optimised weights from st.session_state.best_weights into the sidebar widgets.""" | |
| if 'best_weights' in st.session_state and st.session_state.best_weights: | |
| weights = st.session_state.best_weights | |
| if 'rsi_w' in weights: st.session_state.rsi_w = weights['rsi_w'] | |
| if 'vol_w' in weights: st.session_state.vol_w = weights['vol_w'] | |
| if 'trend_w' in weights: st.session_state.trend_w = weights['trend_w'] | |
| if 'volume_w' in weights: st.session_state.volume_w = weights['volume_w'] | |
| if 'macd_w' in weights: st.session_state.macd_w = weights['macd_w'] | |
| if 'ma_slope_w' in weights: st.session_state.ma_slope_w = weights['ma_slope_w'] | |
| st.sidebar.success("Optimal weights loaded into sidebar!") | |
| st.rerun() | |
| else: | |
| st.sidebar.error("No optimal weights found in session state.") | |
| def apply_best_params_to_widgets(): | |
| """Loads parameters from st.session_state.best_params into the sidebar widgets.""" | |
| if 'best_params' in st.session_state and st.session_state.best_params: | |
| params = st.session_state.best_params | |
| if 'large_ma_period' in params: st.session_state.ma_period = params['large_ma_period'] | |
| if 'bband_period' in params: st.session_state.bb_period = params['bband_period'] | |
| if 'bband_std_dev' in params: st.session_state.bb_std = params['bband_std_dev'] | |
| if 'confidence_threshold' in params: st.session_state.confidence_slider = params['confidence_threshold'] | |
| if 'long_entry_threshold_pct' in params: st.session_state.long_entry = params['long_entry_threshold_pct'] * 100 | |
| if 'long_exit_ma_threshold_pct' in params: st.session_state.long_exit = params['long_exit_ma_threshold_pct'] * 100 | |
| if 'long_trailing_stop_loss_pct' in params: st.session_state.long_sl = params['long_trailing_stop_loss_pct'] * 100 | |
| if 'long_delay_days' in params: st.session_state.long_delay = params['long_delay_days'] | |
| if 'long_entry_threshold_pct' in params: st.session_state.short_entry = params['long_entry_threshold_pct'] * 100 | |
| if 'long_exit_ma_threshold_pct' in params: st.session_state.short_exit = params['long_exit_ma_threshold_pct'] * 100 | |
| if 'long_trailing_stop_loss_pct' in params: st.session_state.short_sl = params['long_trailing_stop_loss_pct'] * 100 | |
| if 'long_delay_days' in params: st.session_state.short_delay = params['long_delay_days'] | |
| # --- NEW: Load Max Duration --- | |
| if 'max_trading_days' in params: st.session_state.max_duration = params['max_trading_days'] | |
| # --- NEW: Load Catcher Offset --- | |
| if 'catcher_stop_pct' in params: st.session_state.catcher_stop_pct = params['catcher_stop_pct'] * 100 | |
| st.sidebar.success("Optimal parameters loaded into sidebar!") | |
| st.rerun() | |
| else: | |
| st.sidebar.error("No optimal parameters found in session state.") | |
| # --- 5. Streamlit User Interface --- | |
| def update_state(): | |
| """ | |
| Callback to update the main session state from the 'widget_' keys. | |
| This synchronizes all widgets. | |
| """ | |
| # Get all keys from session state | |
| keys = list(st.session_state.keys()) | |
| # Loop through all keys that start with 'widget_' | |
| for widget_key in keys: | |
| if widget_key.startswith('widget_'): | |
| # Find the main key (e.g., 'widget_ma_period' -> 'ma_period') | |
| main_key = widget_key[len('widget_'):] | |
| # Update the main key with the widget's value | |
| if main_key in st.session_state: | |
| st.session_state[main_key] = st.session_state[widget_key] | |
| # --- UPDATED: Helper to check for blank rows (Checks Factors, Stats, Notes, AND Run status) --- | |
| def is_row_blank(s): | |
| """Checks if a row from the user-defined table is blank.""" | |
| factors = ["RSI", "Volatility", "TREND", "Volume", "MACD", "MA Slope", "Markov", "ADX Filter"] | |
| # Check 1: Are all factors Off? | |
| all_factors_off = all(str(s.get(f, "Off")).lower() in ["off", "0", "0.0"] for f in factors) | |
| # Check 2: Are stats zero? | |
| stats_are_zero = s.get("Z_Num_Trades", 0) == 0 | |
| # Check 3: Is the note empty? | |
| note_is_empty = str(s.get("Notes", "")).strip() == "" | |
| # Check 4: Is the Run box unchecked? | |
| run_is_unchecked = s.get("Run", False) is False | |
| # The row is considered blank ONLY if ALL conditions are met | |
| return all_factors_off and stats_are_zero and note_is_empty and run_is_unchecked | |
| # --- [NEW] 6. Markov Chain Optimisation Functions --- | |
| def run_single_markov_test(params, master_df, tickers, date_range): | |
| """ | |
| Worker function for the Markov optimisation. | |
| Tests a single combination of (run_up_period, future_period). | |
| """ | |
| run_up_period = params['run_up'] | |
| future_period = params['future'] | |
| total_results = { | |
| 'Up -> Up': {'profit': 0.0, 'count': 0}, | |
| 'Up -> Down': {'profit': 0.0, 'count': 0}, | |
| 'Down -> Up': {'profit': 0.0, 'count': 0}, | |
| 'Down -> Down': {'profit': 0.0, 'count': 0} | |
| } | |
| for ticker in tickers: | |
| if ticker not in master_df.columns: | |
| continue | |
| # Get only the 'Close' price for this ticker | |
| ticker_data = master_df[ticker].loc[date_range[0]:date_range[1]].to_frame(name='Close') | |
| # --- [NEW] CLEANING LOGIC TO PREVENT INFINITY --- | |
| ticker_data['Close'] = pd.to_numeric(ticker_data['Close'], errors='coerce').replace(0, np.nan) | |
| ticker_data.dropna(subset=['Close'], inplace=True) | |
| # --- [END NEW] --- | |
| if ticker_data.empty or ticker_data['Close'].isna().all(): | |
| continue | |
| # 1. Calculate Run-Up state (past) | |
| # pct_change(N) calculates (price[t] / price[t-N]) - 1 | |
| ticker_data['RunUp_Return'] = ticker_data['Close'].pct_change(periods=run_up_period) | |
| ticker_data['RunUp_State'] = ticker_data['RunUp_Return'].apply(lambda x: 'Up' if x > 0 else 'Down') | |
| # 2. Calculate Future state (what *actually* happened) | |
| # shift(-N) looks *forward* N periods | |
| ticker_data['Future_Return'] = (ticker_data['Close'].shift(-future_period) / ticker_data['Close']) - 1 | |
| # 3. Drop NaNs created by the shifts/pct_change | |
| ticker_data.dropna(subset=['RunUp_State', 'Future_Return'], inplace=True) | |
| if ticker_data.empty: | |
| continue | |
| # 4. Tally results | |
| # We use .values for speed | |
| runup_states = ticker_data['RunUp_State'].values | |
| future_returns = ticker_data['Future_Return'].values | |
| for i in range(len(runup_states)): | |
| state = runup_states[i] | |
| ret = future_returns[i] | |
| # --- [NEW] Check for infinity returns --- | |
| if not np.isfinite(ret): | |
| continue | |
| # --- [END NEW] --- | |
| if state == 'Up': | |
| # Strategy: Bet on Up | |
| total_results['Up -> Up']['profit'] += ret | |
| total_results['Up -> Up']['count'] += 1 | |
| # Strategy: Bet on Down | |
| total_results['Up -> Down']['profit'] += (ret * -1) | |
| total_results['Up -> Down']['count'] += 1 | |
| else: # State is 'Down' | |
| # Strategy: Bet on Up | |
| total_results['Down -> Up']['profit'] += ret | |
| total_results['Down -> Up']['count'] += 1 | |
| # Strategy: Bet on Down | |
| total_results['Down -> Down']['profit'] += (ret * -1) | |
| total_results['Down -> Down']['count'] += 1 | |
| # 5. Compile final metrics | |
| final_report = [] | |
| for strategy, results in total_results.items(): | |
| if results['count'] > 0: | |
| avg_pnl = (results['profit'] / results['count']) | |
| # We use a simple score: Avg P&L * log(count) to value consistency | |
| score = avg_pnl * np.log10(results['count'] + 1) | |
| else: | |
| avg_pnl = 0 | |
| score = 0 | |
| final_report.append({ | |
| 'Run-Up Period': run_up_period, | |
| 'Future Period': future_period, | |
| 'Strategy': strategy, | |
| 'Avg. P/L': avg_pnl, | |
| 'Total Occurrences': results['count'], | |
| 'Total P/L': results['profit'], | |
| 'Score': score | |
| }) | |
| return final_report | |
| def generate_and_run_markov_optimisation(master_df, main_content_placeholder, side): | |
| """ | |
| Main UI and orchestrator for Markov Chain optimisation. | |
| [NEW] This version calculates an 'Alpha Score per Day' to find the | |
| most efficient predictive edge. | |
| """ | |
| st.session_state.summary_df = None | |
| st.session_state.single_ticker_results = None | |
| st.session_state.confidence_results_df = None | |
| st.session_state.open_trades_df = None | |
| st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None | |
| st.session_state.best_params = None | |
| st.session_state.best_weights = None | |
| st.session_state.markov_results_df = None # Clear previous Markov results | |
| st.session_state.best_markov_setup = None | |
| with main_content_placeholder.container(): | |
| st.header(f"🔮 Finding Best Markov Probabilities ({side.title()})") | |
| # --- Get UI inputs from session state --- | |
| run_up_start = st.session_state.markov_run_up_start | |
| run_up_end = st.session_state.markov_run_up_end | |
| run_up_step = st.session_state.markov_run_up_step | |
| future_start = st.session_state.markov_future_start | |
| future_end = st.session_state.markov_future_end | |
| future_step = st.session_state.markov_future_step | |
| # --- Create parameter combinations --- | |
| run_up_range = range(run_up_start, run_up_end + 1, run_up_step) | |
| future_range = range(future_start, future_end + 1, future_step) | |
| param_product = itertools.product(run_up_range, future_range) | |
| param_combinations = [{ | |
| "run_up": p[0], "future": p[1] | |
| } for p in param_product] | |
| total_combinations = len(param_combinations) | |
| if total_combinations == 0: | |
| st.warning("No combinations to test. Check your ranges in the sidebar.") | |
| st.session_state.run_markov_optimisation = False # Reset flag | |
| st.stop() | |
| num_cores = cpu_count() | |
| st.info(f"Starting Markov optimisation on {num_cores} cores... Testing {total_combinations} period combinations.") | |
| tickers_to_run = [col for col in master_df.columns if not ('_Volume' in str(col) or '_High' in str(col) or '_Low' in str(col))] | |
| date_range = (pd.Timestamp(st.session_state.start_date), pd.Timestamp(st.session_state.end_date)) | |
| status_text = st.empty(); status_text.text("Optimisation starting...") | |
| progress_bar = st.progress(0) # <-- The progress bar | |
| worker_func = partial(run_single_markov_test, master_df=master_df, tickers=tickers_to_run, date_range=date_range) | |
| results_list = [] | |
| with Pool(processes=num_cores) as pool: | |
| try: | |
| iterator = pool.imap_unordered(worker_func, param_combinations) | |
| for i, result_group in enumerate(iterator, 1): | |
| results_list.extend(result_group) # Add the 4 strategies | |
| progress_bar.progress(i / total_combinations, text=f"Optimising... {i}/{total_combinations} combinations complete.") | |
| except Exception as e: | |
| st.error(f"An error occurred during multiprocessing: {e}") | |
| status_text.text("Optimisation failed due to an error."); | |
| st.session_state.run_markov_optimisation = False # Reset flag | |
| return | |
| status_text.text("Optimisation complete. Compiling results...") | |
| if not results_list: | |
| st.warning("Optimisation finished, but no valid results were found.") | |
| st.session_state.run_markov_optimisation = False # Reset flag | |
| return | |
| results_df = pd.DataFrame(results_list) | |
| # --- [NEW ALPHA SCORE PER DAY LOGIC] --- | |
| # 1. Pivot the data to get all 4 strategies in one row per time period | |
| pivot_df = results_df.pivot_table( | |
| index=['Run-Up Period', 'Future Period'], | |
| columns='Strategy', | |
| values='Avg. P/L' # We use Avg. P/L for the alpha calculation | |
| ).reset_index() | |
| # 2. Calculate the "Alpha Scores" | |
| pivot_df['Long Alpha Score'] = pivot_df.get('Down -> Up', 0) - pivot_df.get('Up -> Up', 0) | |
| pivot_df['Short Alpha Score'] = pivot_df.get('Up -> Down', 0) - pivot_df.get('Down -> Down', 0) | |
| # 3. Calculate the "Alpha Score per Day" | |
| pivot_df['Long Alpha Score per Day'] = pivot_df['Long Alpha Score'] / pivot_df['Future Period'] | |
| pivot_df['Short Alpha Score per Day'] = pivot_df['Short Alpha Score'] / pivot_df['Future Period'] | |
| # 4. Join this back to the original results to get counts, etc. | |
| results_df = results_df.set_index(['Run-Up Period', 'Future Period']) | |
| pivot_df = pivot_df.set_index(['Run-Up Period', 'Future Period']) | |
| # Join the new Alpha Scores to the main results | |
| results_df = results_df.join(pivot_df[['Long Alpha Score', 'Short Alpha Score', 'Long Alpha Score per Day', 'Short Alpha Score per Day']]) | |
| results_df = results_df.reset_index() | |
| # 5. Determine which Alpha Score and strategies to show | |
| if side == 'long': | |
| score_to_use = 'Long Alpha Score per Day' | |
| target_strategies = ['Down -> Up', 'Up -> Up'] | |
| else: # short | |
| score_to_use = 'Short Alpha Score per Day' | |
| target_strategies = ['Up -> Down', 'Down -> Down'] | |
| # Filter for the strategies we care about for this 'side' | |
| final_df = results_df[results_df['Strategy'].isin(target_strategies)].copy() | |
| if final_df.empty: | |
| st.warning(f"No valid results found for {side}-biased strategies.") | |
| st.session_state.run_markov_optimisation = False # Reset flag | |
| return | |
| # 6. Sort by the new "Alpha Score per Day" | |
| final_df = final_df.sort_values(by=score_to_use, ascending=False) | |
| # Get the best row (which will be the one with the highest Alpha per Day) | |
| best_setup_series = final_df.iloc[0] | |
| st.success(f"Markov Optimisation Complete! Best 'Alpha' strategy found:") | |
| st.subheader("Best Markov Setup (by Alpha Score per Day):") | |
| # Save the best setup to session state | |
| st.session_state.best_markov_setup = best_setup_series.to_dict() | |
| save_markov_setup(st.session_state.best_markov_setup) # <-- SAVE TO FILE | |
| st.json(st.session_state.best_markov_setup) | |
| st.subheader(f"Top 10 Setups ({side.title()}-biased, sorted by {score_to_use}):") | |
| display_cols = [ | |
| 'Strategy', 'Run-Up Period', 'Future Period', score_to_use, 'Avg. P/L', 'Total Occurrences' | |
| ] | |
| st.dataframe(final_df.head(10)[display_cols].style.format({ | |
| score_to_use: "{:.4%}", # Format as percentage | |
| "Avg. P/L": "{:.4%}", | |
| })) | |
| # --- [NEW] HEATMAP INFOGRAPHIC --- | |
| st.subheader(f"Markov '{side.title()}' Alpha Score per Day Heatmap") | |
| st.caption("This heatmap shows the most *efficient* signals (highest predictive value per day held).") | |
| try: | |
| # We use the pivot_df we created earlier, which has the scores | |
| heatmap_data = pivot_df.pivot_table( | |
| index='Run-Up Period', | |
| columns='Future Period', | |
| values=score_to_use # Plot the Alpha Score per Day | |
| ) | |
| # Create the heatmap | |
| st.dataframe( | |
| heatmap_data.style | |
| .background_gradient(cmap='RdYlGn', axis=None) # Red-Yellow-Green colormap | |
| .format("{:.4%}", na_rep='-') # Format as percentage | |
| ) | |
| except Exception as e: | |
| st.warning(f"Could not generate heatmap. Error: {e}") | |
| # --- [END NEW] --- | |
| st.session_state.markov_results_df = final_df # Save the sorted results | |
| st.session_state.run_markov_optimisation = False # Reset flag | |
| # --- Corrected: Ensures rsi_logic AND primary_driver are passed --- | |
| def run_confidence_optimisation(optimise_for, find_mode, master_df, main_content_placeholder, veto_factors): | |
| st.session_state.summary_df = None | |
| st.session_state.single_ticker_results = None | |
| st.session_state.open_trades_df = None | |
| st.session_state.best_params = None | |
| st.session_state.advisor_df = None | |
| st.session_state.worst_confidence_setups_list = [] | |
| with main_content_placeholder.container(): | |
| num_cores = cpu_count() | |
| st.info(f"Starting to find **{find_mode.upper()}** {optimise_for.upper()} setups on {num_cores} CPU cores...") | |
| st.caption("Note: This process runs in parallel for maximum speed. The status below updates as each strategy combination completes.") | |
| # --- [NEW] Added Markov --- | |
| factors = ['RSI', 'Volatility', 'TREND', 'Volume', 'MACD', 'MA Slope', 'Markov'] | |
| num_factors = len(factors) | |
| if find_mode == 'worst': | |
| if veto_factors is None or len(veto_factors) != num_factors: | |
| st.error("Internal error: Veto factors not provided correctly.") | |
| return | |
| target_combo = veto_factors | |
| on_off_combos = [c for c in itertools.product([False, True], repeat=num_factors) if c == target_combo] | |
| if not on_off_combos or not any(on_off_combos[0]): | |
| st.warning("Please select at least one factor for the Veto search."); return | |
| else: | |
| on_off_combos = [c for c in itertools.product([False, True], repeat=num_factors) if any(c)] | |
| thresholds_to_test = [20, 25, 30, 35, 40, 45, 50] | |
| tasks = list(itertools.product(on_off_combos, thresholds_to_test, [1.0])) | |
| total_tasks = len(tasks) | |
| base_params = { "large_ma_period": st.session_state.ma_period, "bband_period": st.session_state.bb_period, "bband_std_dev": st.session_state.bb_std, "long_entry_threshold_pct": st.session_state.long_entry / 100, "long_exit_ma_threshold_pct": st.session_state.long_exit / 100, "long_trailing_stop_loss_pct": st.session_state.long_sl / 100, "long_delay_days": st.session_state.long_delay, "short_entry_threshold_pct": st.session_state.short_entry / 100, "short_exit_ma_threshold_pct": st.session_state.short_exit / 100, "short_trailing_stop_loss_pct": st.session_state.short_sl / 100, "short_delay_days": st.session_state.short_delay, } | |
| tickers_to_run = sorted([col for col in master_df.columns if not ('_Volume' in str(col) or '_High' in str(col) or '_Low' in str(col))]) | |
| date_range = (pd.Timestamp(st.session_state.start_date), pd.Timestamp(st.session_state.end_date)) | |
| factor_weights = { | |
| "rsi": st.session_state.rsi_w, "vol": st.session_state.vol_w, | |
| "trend": st.session_state.trend_w, "volume": st.session_state.volume_w, | |
| "macd": st.session_state.macd_w, "ma_slope": st.session_state.ma_slope_w, | |
| "markov": st.session_state.markov_w, | |
| "use_adx": st.session_state.use_adx_filter, | |
| "adx_thresh": st.session_state.adx_threshold, | |
| "adx_period": st.session_state.adx_period, | |
| "rsi_logic": st.session_state.rsi_logic, | |
| "primary_driver": st.session_state.primary_driver, | |
| 'markov_setup': st.session_state.get('best_markov_setup'), | |
| 'exit_logic': st.session_state.exit_logic_type, | |
| 'exit_thresh': st.session_state.exit_confidence_threshold, | |
| 'smart_trailing_stop': st.session_state.smart_trailing_stop_pct / 100.0, | |
| 'smart_exit_atr_period': st.session_state.smart_exit_atr_period, | |
| 'smart_exit_atr_multiplier': st.session_state.smart_exit_atr_multiplier, | |
| 'intelligent_tsl_pct': st.session_state.intelligent_tsl_pct / 100.0 | |
| } | |
| worker_func = partial(run_single_confidence_test, base_params=base_params, master_df=master_df, date_range=date_range, tickers_to_run=tickers_to_run, optimise_for=optimise_for, factor_weights=factor_weights) | |
| results_list = [] | |
| # --- UI Feedback Elements --- | |
| progress_bar = st.progress(0, text="Initializing parallel engines...") | |
| status_text = st.empty() | |
| with Pool(processes=num_cores) as pool: | |
| try: | |
| iterator = pool.imap_unordered(worker_func, tasks) | |
| for i, result in enumerate(iterator, 1): | |
| if isinstance(result, dict) and "Trade G/B Ratio" in result: | |
| results_list.append(result) | |
| # --- DYNAMIC STATUS UPDATE --- | |
| # This creates the "Screaming Past" effect for strategies | |
| active = [] | |
| if result['RSI']: active.append("RSI") | |
| if result['Volatility']: active.append("Vol") | |
| if result['TREND']: active.append("Trend") | |
| if result['Volume']: active.append("VolSpike") | |
| if result['MACD']: active.append("MACD") | |
| if result['MA Slope']: active.append("Slope") | |
| if result['Markov']: active.append("Markov") | |
| status_msg = f"Analyzed: [{' + '.join(active)}] @ Thresh {result['Conf. Threshold']} -> Profit: {result['Avg Profit/Trade']:.2%} ({result['Total Trades']} trades)" | |
| status_text.text(status_msg) | |
| else: print(f"Warning: Worker returned invalid result format: {result}") | |
| # Update progress bar | |
| progress_bar.progress(i / total_tasks, text=f"Optimising... {i}/{total_tasks} combinations complete.") | |
| except Exception as e: | |
| st.error(f"An error occurred during multiprocessing: {e}") | |
| progress_bar.empty(); return | |
| if results_list: | |
| results_df = pd.DataFrame(results_list) | |
| # --- [NEW] FILTER OUT "INFINITY" RESULTS --- | |
| # We exclude results where Trade G/B Ratio is > 50000 (the placeholder for 0 losses). | |
| # This prevents low-volume "perfect" trades from gaming the Uncapped Scoring system. | |
| if 'Trade G/B Ratio' in results_df.columns: | |
| results_df = results_df[results_df['Trade G/B Ratio'] < 50000].copy() | |
| if results_df.empty: | |
| st.warning("No valid setups found after removing infinite ratio outliers.") | |
| st.session_state.run_confidence_optimisation = False | |
| return | |
| # --- Sort by the NEW 3-Factor Weighted Score --- | |
| sort_col = "Good Score" if find_mode == 'best' else "Bad Score" | |
| if sort_col in results_df.columns: | |
| fill_value = -np.inf if find_mode == 'best' else 0 | |
| results_df[sort_col] = results_df[sort_col].fillna(fill_value) | |
| results_df = results_df.sort_values(by=sort_col, ascending=False).reset_index(drop=True) | |
| else: st.error(f"Sorting column '{sort_col}' not found."); return | |
| for factor in factors: | |
| if factor in results_df.columns: | |
| results_df[factor] = results_df[factor].apply(lambda x: "On" if x else "Off") | |
| # Save all results to state so they persist | |
| st.session_state.confidence_results_df = results_df | |
| if find_mode == 'best': | |
| st.subheader(f"📊 Top 60 Confidence Setups ({optimise_for.title()} Trades)") | |
| if not results_df.empty: | |
| best_setup = results_df.iloc[0] | |
| st.session_state.best_confidence_setup = best_setup.to_dict() | |
| # Save TOP setups using the same DF (which is now sorted by Weighted Score) | |
| save_top_setups(results_df, optimise_for) | |
| else: | |
| st.warning("No valid 'best' setups found.") | |
| st.session_state.best_confidence_setup = None | |
| else: # 'worst' mode | |
| valid_worst_df = results_df[results_df['Trade G/B Ratio'] < 99999.0].copy() | |
| if not valid_worst_df.empty: | |
| valid_worst_df['Bad Score'] = valid_worst_df['Bad Score'].fillna(0) | |
| valid_worst_df = valid_worst_df.sort_values(by="Bad Score", ascending=False).reset_index(drop=True) | |
| st.session_state.worst_confidence_setups_list = valid_worst_df.head(4).to_dict('records') | |
| st.info(f"Top {len(st.session_state.worst_confidence_setups_list)} valid 'worst' setups ready.") | |
| st.subheader(f"🏆 Top 60 Valid Worst Setups ({optimise_for.title()} Trades)") | |
| results_df = valid_worst_df # Use this for display | |
| else: | |
| st.warning("No valid 'worst' setups found.") | |
| st.session_state.worst_confidence_setups_list = [] | |
| # --- Display the table --- | |
| # [CHANGE] Rename the column for display purposes | |
| display_df_conf = results_df.head(60).rename(columns={'Net % Return': 'Moneypile Score'}) | |
| # [CHANGE] Update formatter key to 'Moneypile Score' and remove '%' from format string | |
| conf_formatters = { | |
| "Avg Profit/Trade": "{:.2%}", | |
| "Ticker G/B Ratio": "{:.2f}", | |
| "Trade G/B Ratio": "{:.2f}", | |
| "Moneypile Score": "{:.1f}", | |
| "Avg Entry Conf.": "{:.1f}%", | |
| "Good Score": "{:.4f}", | |
| "Bad Score": "{:.4f}", | |
| "Norm. Score %": "{:.2f}%" | |
| } | |
| if 'MACD' in display_df_conf.columns: conf_formatters['MACD'] = '{}' | |
| if 'MA Slope' in display_df_conf.columns: conf_formatters['MA Slope'] = '{}' | |
| if 'Markov' in display_df_conf.columns: conf_formatters['Markov'] = '{}' | |
| valid_conf_formatters = {k: (lambda val, fmt=v: fmt.format(val) if pd.notna(val) else '-') for k, v in conf_formatters.items() if k in display_df_conf.columns} | |
| st.dataframe(display_df_conf.style.format(valid_conf_formatters, na_rep='-')) | |
| else: | |
| st.warning("Optimisation completed but no results were generated."); st.session_state.confidence_results_df = None | |
| st.session_state.run_confidence_optimisation = False # Reset flag | |
| def generate_user_advisor_ui_and_run(main_df): | |
| st.header("⚙️ User-Defined Advisor Setups") | |
| data_key = "user_setups_data" | |
| widget_key = "user_setups_editor_state" | |
| def load_saved_setups_callback(): | |
| st.session_state[data_key] = load_user_setups() | |
| if widget_key in st.session_state: del st.session_state[widget_key] | |
| def clear_all_setups_callback(): | |
| blank_setups = [load_user_setups()[0].copy() for _ in range(20)] | |
| st.session_state[data_key] = blank_setups | |
| if widget_key in st.session_state: del st.session_state[widget_key] | |
| st.caption("Add advice/notes in the 'Notes' column. Check 'Run' to test.") | |
| col_config = { | |
| "Run": st.column_config.CheckboxColumn("Run", width="small"), | |
| # Added Notes Column | |
| "Notes": st.column_config.TextColumn("Notes", help="Advice/Description for this setup", width="medium"), | |
| "RSI": st.column_config.TextColumn("RSI", width="small"), | |
| "Volatility": st.column_config.TextColumn("Volatility", width="small"), | |
| "TREND": st.column_config.TextColumn("TREND", width="small"), | |
| "Volume": st.column_config.TextColumn("Volume", width="small"), | |
| "MACD": st.column_config.TextColumn("MACD", width="small"), | |
| "MA Slope": st.column_config.TextColumn("MA Slope", width="small"), | |
| "Markov": st.column_config.TextColumn("Markov", width="small"), | |
| "ADX Filter": st.column_config.TextColumn("ADX Filter", width="small"), | |
| "Conf. Threshold": st.column_config.NumberColumn("Conf.", width="small"), | |
| "Large MA Period": st.column_config.NumberColumn("MA", width="small"), | |
| "Bollinger Band Period": st.column_config.NumberColumn("BB", width="small"), | |
| "Bollinger Band Std Dev": st.column_config.NumberColumn("Std", format="%.1f", width="small"), | |
| "Catcher Offset (%)": st.column_config.NumberColumn("Catcher %", format="%.1f", width="small"), | |
| "Long Entry Threshold (%)": st.column_config.NumberColumn("L Entry", format="%.1f", width="small"), | |
| "Long Exit Threshold (%)": st.column_config.NumberColumn("L Exit", format="%.1f", width="small"), | |
| "Long Stop Loss (%)": st.column_config.NumberColumn("L TSL", format="%.1f", width="small"), | |
| "Long Delay (Days)": st.column_config.NumberColumn("L Delay", width="small"), | |
| "Short Entry Threshold (%)": st.column_config.NumberColumn("S Entry", format="%.1f", width="small"), | |
| "Short Exit Threshold (%)": st.column_config.NumberColumn("S Exit", format="%.1f", width="small"), | |
| "Short Stop Loss (%)": st.column_config.NumberColumn("S TSL", format="%.1f", width="small"), | |
| "Short Delay (Days)": st.column_config.NumberColumn("S Delay", width="small"), | |
| "Z_Avg_Profit": st.column_config.NumberColumn("Avg %", format="%.2f%%", disabled=True, width="small"), | |
| "Z_Num_Trades": st.column_config.NumberColumn("Trades", disabled=True, width="small"), | |
| "Z_WL_Ratio": st.column_config.NumberColumn("W/L", format="%.2f", disabled=True, width="small"), | |
| } | |
| edited_setups_list = st.data_editor( | |
| st.session_state[data_key], | |
| column_config=col_config, | |
| num_rows="dynamic", | |
| key=widget_key, | |
| column_order=[ | |
| "Run", "Notes", # <--- Notes column appears here (2nd) | |
| "RSI", "Volatility", "TREND", "Volume", "MACD", "MA Slope", "Markov", "ADX Filter", | |
| "Conf. Threshold", | |
| "Large MA Period", "Bollinger Band Period", "Bollinger Band Std Dev","Catcher Offset (%)", | |
| "Long Entry Threshold (%)", "Long Exit Threshold (%)", "Long Stop Loss (%)", "Long Delay (Days)", | |
| "Short Entry Threshold (%)", "Short Exit Threshold (%)", "Short Stop Loss (%)", "Short Delay (Days)", | |
| "Z_Avg_Profit", "Z_Num_Trades", "Z_WL_Ratio" | |
| ] | |
| ) | |
| col1, col2, col3, col4, col5 = st.columns([1, 1, 1, 2, 2]) | |
| if col1.button("💾 Save Setups"): | |
| save_user_setups(edited_setups_list[:20]) | |
| st.session_state[data_key] = edited_setups_list[:20] | |
| col2.button("🔄 Load Saved", on_click=load_saved_setups_callback) | |
| col3.button("🗑️ Clear All Setups", on_click=clear_all_setups_callback) | |
| if col4.button("Scan with User Setups", type="primary"): | |
| current_setups_list = edited_setups_list[:20] | |
| setups_to_run = [s for s in current_setups_list if s.get("Run") == True] | |
| if len(setups_to_run) == 0: st.error("No setups selected.") | |
| elif len(setups_to_run) > 8: st.error(f"Selected {len(setups_to_run)} setups. Max is 8.") | |
| else: | |
| st.session_state.run_scan_user_setups = True | |
| st.session_state.run_user_advisor_setup = False | |
| st.session_state.advisor_df = None | |
| st.session_state.setups_to_scan = setups_to_run | |
| st.rerun() | |
| if col5.button("Back to Main Analysis"): | |
| st.session_state.run_user_advisor_setup = False; st.session_state.advisor_df = None; st.rerun() | |
| def generate_advisor_report(main_df): | |
| """Handles the UI for running the 'Top Setups' advisor.""" | |
| st.header("📈 Advanced Advisor Report (Top 8 Setups)") | |
| top_setups = load_top_setups() | |
| if not top_setups: | |
| st.warning("No saved top setups found. Run 'Find Best Confidence' first.") | |
| if st.button("Back"): | |
| st.session_state.run_advanced_advisor = False | |
| st.rerun() | |
| return | |
| side = st.radio("Generate report for which saved setups?", ("Long", "Short"), horizontal=True, key="advisor_side_select_top") | |
| setups_to_run_all = top_setups.get(side.lower()) | |
| if not setups_to_run_all: | |
| st.warning(f"No saved top {side.lower()} setups found.") | |
| if st.button("Back"): | |
| st.session_state.run_advanced_advisor = False | |
| st.rerun() | |
| return | |
| if st.button(f"Scan using Top {side} Setups", type="primary"): | |
| st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None | |
| run_advisor_scan(main_df, setups_to_run_all, "Top Setups") # Call the scanner | |
| if st.button("Back"): | |
| st.session_state.run_advanced_advisor = False | |
| st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None | |
| st.rerun() | |
| def apply_best_weights_to_widgets(): | |
| """Loads optimised weights from st.session_state.best_weights into the sidebar widgets.""" | |
| if 'best_weights' in st.session_state and st.session_state.best_weights: | |
| weights = st.session_state.best_weights | |
| if 'rsi_w' in weights: st.session_state.rsi_w = weights['rsi_w'] | |
| if 'vol_w' in weights: st.session_state.vol_w = weights['vol_w'] | |
| if 'trend_w' in weights: st.session_state.trend_w = weights['trend_w'] | |
| if 'volume_w' in weights: st.session_state.volume_w = weights['volume_w'] | |
| if 'macd_w' in weights: st.session_state.macd_w = weights['macd_w'] | |
| if 'ma_slope_w' in weights: st.session_state.ma_slope_w = weights['ma_slope_w'] | |
| st.sidebar.success("Optimal weights loaded into sidebar!") | |
| st.rerun() | |
| else: | |
| st.sidebar.error("No optimal weights found in session state.") | |
| def apply_best_params_to_widgets(): | |
| """Loads parameters from st.session_state.best_params into the sidebar widgets.""" | |
| if 'best_params' in st.session_state and st.session_state.best_params: | |
| params = st.session_state.best_params | |
| if 'large_ma_period' in params: st.session_state.ma_period = params['large_ma_period'] | |
| if 'bband_period' in params: st.session_state.bb_period = params['bband_period'] | |
| if 'bband_std_dev' in params: st.session_state.bb_std = params['bband_std_dev'] | |
| if 'confidence_threshold' in params: st.session_state.confidence_slider = params['confidence_threshold'] | |
| if 'long_entry_threshold_pct' in params: st.session_state.long_entry = params['long_entry_threshold_pct'] * 100 | |
| if 'long_exit_ma_threshold_pct' in params: st.session_state.long_exit = params['long_exit_ma_threshold_pct'] * 100 | |
| if 'long_trailing_stop_loss_pct' in params: st.session_state.long_sl = params['long_trailing_stop_loss_pct'] * 100 | |
| if 'long_delay_days' in params: st.session_state.long_delay = params['long_delay_days'] | |
| if 'long_entry_threshold_pct' in params: st.session_state.short_entry = params['long_entry_threshold_pct'] * 100 | |
| if 'long_exit_ma_threshold_pct' in params: st.session_state.short_exit = params['long_exit_ma_threshold_pct'] * 100 | |
| if 'long_trailing_stop_loss_pct' in params: st.session_state.short_sl = params['long_trailing_stop_loss_pct'] * 100 | |
| if 'long_delay_days' in params: st.session_state.long_delay = params['long_delay_days'] | |
| st.sidebar.success("Optimal parameters loaded into sidebar!") | |
| st.rerun() | |
| else: | |
| st.sidebar.error("No optimal parameters found in session state.") | |
| # --- 5. Streamlit User Interface --- | |
| def update_state(): | |
| """ | |
| Callback to update the main session state from the 'widget_' keys. | |
| This synchronizes all widgets. | |
| """ | |
| # Get all keys from session state | |
| keys = list(st.session_state.keys()) | |
| # Loop through all keys that start with 'widget_' | |
| for widget_key in keys: | |
| if widget_key.startswith('widget_'): | |
| # Find the main key (e.g., 'widget_ma_period' -> 'ma_period') | |
| main_key = widget_key[len('widget_'):] | |
| # Update the main key with the widget's value | |
| if main_key in st.session_state: | |
| st.session_state[main_key] = st.session_state[widget_key] | |
| def convert_results_to_csv(summary_df, params): | |
| """ | |
| Combines the Performance Summary and the Configuration Settings into a single CSV string. | |
| """ | |
| if summary_df is None or summary_df.empty: | |
| return "" | |
| # 1. Get Performance Data (First Row) | |
| perf_data = summary_df.iloc[0].to_dict() | |
| # 2. Get Settings Data (From Session State/Params) | |
| settings_data = { | |
| "--- SETTINGS ---": "", # Separator | |
| "Start Date": params.get('start_date'), | |
| "End Date": params.get('end_date'), | |
| "Primary Driver": params.get('primary_driver'), | |
| "Confidence Threshold": params.get('confidence_threshold'), | |
| "RSI Weight": params.get('rsi_w') if params.get('use_rsi') else "Off", | |
| "Volatility Weight": params.get('vol_w') if params.get('use_volatility') else "Off", | |
| "Trend Weight": params.get('trend_w') if params.get('use_trend') else "Off", | |
| "Volume Weight": params.get('vol_w_val') if params.get('use_volume') else "Off", | |
| "MACD Weight": params.get('macd_w') if params.get('use_macd') else "Off", | |
| "MA Slope Weight": params.get('ma_slope_w') if params.get('use_ma_slope') else "Off", | |
| "Markov Weight": params.get('markov_w') if params.get('use_markov') else "Off", | |
| "MFI Weight": params.get('mfi_w') if params.get('use_mfi') else "Off", | |
| "SuperTrend Weight": params.get('supertrend_w') if params.get('use_supertrend') else "Off", | |
| "ADX Filter": f"On (< {params.get('adx_threshold')})" if params.get('use_adx_filter') else "Off", | |
| "BB Period": params.get('bband_period'), | |
| "BB Std Dev": params.get('bband_std_dev'), | |
| "MA Period": params.get('large_ma_period'), | |
| "Long Entry Thresh %": params.get('long_entry_threshold_pct'), | |
| "Short Entry Thresh %": params.get('short_entry_threshold_pct'), | |
| "Exit Logic": params.get('exit_logic_type'), | |
| "Smart TSL %": params.get('smart_trailing_stop_pct') | |
| } | |
| # 3. Combine into one dictionary | |
| combined_data = {**perf_data, **settings_data} | |
| # 4. Create DataFrame and convert to CSV | |
| export_df = pd.DataFrame([combined_data]) | |
| return export_df.to_csv(index=False).encode('utf-8') | |
| def main(): | |
| if 'run_advanced_advisor' not in st.session_state: st.session_state.run_advanced_advisor = False | |
| if 'run_user_advisor_setup' not in st.session_state: st.session_state.run_user_advisor_setup = False | |
| # 2. Define ALL Defaults (Prevents "AttributeError" crashes) | |
| defaults = { | |
| 'start_date': date(2024, 1, 1), 'end_date': date.today(), | |
| 'bband_period': 20, 'bband_std_dev': 2.0, | |
| 'long_entry_threshold_pct': 0.0, 'short_entry_threshold_pct': 0.0, | |
| 'long_exit_ma_threshold_pct': 0.0, 'short_exit_ma_threshold_pct': 0.0, | |
| 'confidence_threshold': 50, 'max_long_duration': 60, 'max_short_duration': 60, | |
| 'long_trailing_stop_loss_pct': 0.0, 'short_trailing_stop_loss_pct': 0.0, | |
| 'primary_driver': 'Bollinger Bands', | |
| 'use_rsi': True, 'rsi_w': 0.5, | |
| 'use_volatility': True, 'vol_w': 0.5, | |
| 'use_trend': True, 'trend_w': 0.5, | |
| 'use_volume': True, 'vol_w_val': 0.5, | |
| 'use_macd': False, 'macd_w': 0.5, | |
| 'use_ma_slope': False, 'ma_slope_w': 0.5, | |
| 'use_markov': False, 'markov_w': 0.5, | |
| # New Indicators | |
| 'use_mfi': False, 'mfi_w': 0.5, | |
| 'use_supertrend': False, 'supertrend_w': 0.5, | |
| # Filters & Exits | |
| 'use_adx_filter': False, 'adx_threshold': 25, 'rsi_logic': 'Level', | |
| 'exit_logic_type': 'Standard (Price-Based)', | |
| 'smart_trailing_stop_pct': 0.05, | |
| 'smart_exit_atr_period': 14, 'smart_exit_atr_multiplier': 3.0, | |
| 'intelligent_tsl_pct': 1.0, | |
| 'use_ma_floor_filter': False, | |
| 'catcher_stop_pct': 0.03, | |
| 'long_delay_days': 0, 'short_delay_days': 0, | |
| 'long_score_95_percentile': None, 'short_score_95_percentile': None, | |
| 'veto_setup_list': None, | |
| 'advisor_df': None, 'confidence_results_df': None, | |
| 'single_ticker_results': None, 'summary_df': None, | |
| 'open_trades_df': None, 'load_message': None, | |
| 'outlier_report': None, | |
| 'best_markov_setup': None, | |
| # Optimization | |
| 'ma_period': 50, 'bb_period': 20, 'bb_std': 2.0, | |
| 'confidence_slider': 50, 'long_entry': 0.0, 'long_exit': 0.0, | |
| 'long_sl': 0.0, 'long_delay': 0, 'short_entry': 0.0, | |
| 'short_exit': 0.0, 'short_sl': 0.0, 'short_delay': 0, 'max_duration': 60, | |
| 'volume_w': 0.5 # fallback for vol_w_val naming differences | |
| } | |
| # 3. Apply Defaults if Missing | |
| for key, val in defaults.items(): | |
| if key not in st.session_state: st.session_state[key] = val | |
| # 4. Helper to update state from widgets (Fixed) | |
| def update_state(): | |
| keys = list(st.session_state.keys()) | |
| for widget_key in keys: | |
| if widget_key.startswith('widget_'): | |
| main_key = widget_key[len('widget_'):] | |
| if main_key in st.session_state: | |
| st.session_state[main_key] = st.session_state[widget_key] | |
| st.set_page_config(page_title="Quant Trader", page_icon="🔴", layout="wide") | |
| # --- [FIX 1: Robust Initialization] --- | |
| # We check for 'run_mode' explicitly to fix the crash if session state is stale | |
| if 'first_run' not in st.session_state or 'run_mode' not in st.session_state: | |
| st.session_state.first_run = True | |
| defaults = load_settings() | |
| st.session_state.widget_defaults = defaults | |
| st.session_state.veto_setup_list = load_veto_setup() | |
| st.session_state.use_ma_floor_filter = defaults.get("use_ma_floor_filter", True) | |
| # Unpack all widget defaults into session state | |
| st.session_state.ticker_select = None | |
| st.session_state.run_mode = "Analyse Full List" # <--- This was missing in your stale state | |
| st.session_state.start_date = None | |
| st.session_state.end_date = None | |
| # Section 2 Defaults | |
| st.session_state.use_rsi = defaults.get('use_rsi', True) | |
| st.session_state.rsi_w = defaults.get('rsi_w', 1.0) | |
| st.session_state.rsi_logic = defaults.get('rsi_logic', 'Crossover') | |
| st.session_state.primary_driver = defaults.get('primary_driver', 'Bollinger Bands') | |
| st.session_state.exit_logic_type = defaults.get('exit_logic_type', 'Standard (Price-Based)') | |
| st.session_state.exit_confidence_threshold = defaults.get('exit_confidence_threshold', 50) | |
| st.session_state.smart_trailing_stop_pct = defaults.get('smart_trailing_stop_pct', 5.0) | |
| st.session_state.use_vol = defaults.get('use_vol', True) | |
| st.session_state.vol_w = defaults.get('vol_w', 1.0) | |
| st.session_state.use_trend = defaults.get('use_trend', True) | |
| st.session_state.trend_w = defaults.get('trend_w', 1.5) | |
| st.session_state.use_volume = defaults.get('use_volume', True) | |
| st.session_state.volume_w = defaults.get('volume_w', 1.0) | |
| st.session_state.use_adx_filter = defaults.get('use_adx_filter', True) | |
| st.session_state.adx_threshold = defaults.get('adx_threshold', 25.0) | |
| st.session_state.adx_period = defaults.get('adx_period', 14) | |
| st.session_state.use_macd = defaults.get('use_macd', True) | |
| st.session_state.macd_w = defaults.get('macd_w', 1.0) | |
| st.session_state.use_ma_slope = defaults.get('use_ma_slope', True) | |
| st.session_state.ma_slope_w = defaults.get('ma_slope_w', 0.5) | |
| st.session_state.use_markov = defaults.get('use_markov', False) | |
| st.session_state.markov_w = defaults.get('markov_w', 1.0) | |
| st.session_state.confidence_slider = defaults.get("confidence_threshold", 50) | |
| st.session_state.smart_exit_atr_period = defaults.get("smart_exit_atr_period", 14) | |
| st.session_state.smart_exit_atr_multiplier = defaults.get("smart_exit_atr_multiplier", 3.0) | |
| st.session_state.intelligent_tsl_pct = defaults.get("intelligent_tsl_pct", 0.60) * 100 | |
| st.session_state.catcher_stop_pct = defaults.get("catcher_stop_pct", 0.0) * 100 | |
| # Section 3 Defaults | |
| st.session_state.ma_period = defaults.get("large_ma_period", 50) | |
| st.session_state.bb_period = defaults.get("bband_period", 20) | |
| st.session_state.bb_std = defaults.get("bband_std_dev", 2.0) | |
| st.session_state.long_entry = defaults.get("long_entry_threshold_pct", 0.0) * 100 | |
| st.session_state.long_exit = defaults.get("long_exit_ma_threshold_pct", 0.0) * 100 | |
| st.session_state.long_sl = defaults.get("long_trailing_stop_loss_pct", 8.0) * 100 | |
| st.session_state.long_delay = defaults.get("long_delay_days", 0) | |
| st.session_state.short_entry = defaults.get("short_entry_threshold_pct", 0.0) * 100 | |
| st.session_state.short_exit = defaults.get("short_exit_ma_threshold_pct", 0.0) * 100 | |
| st.session_state.short_sl = defaults.get("short_trailing_stop_loss_pct", 8.0) * 100 | |
| st.session_state.short_delay = defaults.get("short_delay_days", 0) | |
| st.session_state.max_duration = defaults.get("max_trading_days", 60) | |
| st.session_state.max_long_duration = defaults.get("max_long_duration", 60) | |
| st.session_state.max_short_duration = defaults.get("max_short_duration", 10) | |
| # Section 4/5/6/7 Init (Optimisation variables) | |
| st.session_state.sq_params_toggle = False | |
| st.session_state.opt_ma_cb = False; st.session_state.opt_bb_cb = False; st.session_state.opt_std_cb = False | |
| st.session_state.opt_conf_cb = False; st.session_state.opt_sl_cb = False; st.session_state.opt_delay_cb = False | |
| st.session_state.opt_entry_cb = False; st.session_state.opt_exit_cb = False; st.session_state.opt_duration_cb = False | |
| # Init Opt Ranges (Defaults) | |
| st.session_state.ma_start = 50; st.session_state.ma_end = 55; st.session_state.ma_step = 5 | |
| st.session_state.bb_start = 20; st.session_state.bb_end = 25; st.session_state.bb_step = 5 | |
| st.session_state.std_start = 2.0; st.session_state.std_end = 2.1; st.session_state.std_step = 0.1 | |
| st.session_state.conf_start = 50; st.session_state.conf_end = 60; st.session_state.conf_step = 5 | |
| st.session_state.sl_start = 0.0; st.session_state.sl_end = 2.0; st.session_state.sl_step = 0.5 | |
| st.session_state.delay_start = 0; st.session_state.delay_end = 1; st.session_state.delay_step = 1 | |
| st.session_state.entry_start = 0.0; st.session_state.entry_end = 0.5; st.session_state.entry_step = 0.1 | |
| st.session_state.exit_start = 0.0; st.session_state.exit_end = 0.5; st.session_state.exit_step = 0.1 | |
| st.session_state.dur_start = 30; st.session_state.dur_end = 90; st.session_state.dur_step = 10 | |
| st.session_state.veto_rsi_cb = True; st.session_state.veto_vol_cb = True; st.session_state.veto_trend_cb = True | |
| st.session_state.veto_volume_cb = True; st.session_state.veto_macd_cb = False; st.session_state.veto_ma_slope_cb = False | |
| st.session_state.sq_weights_toggle = False | |
| # Markov Init | |
| st.session_state.markov_run_up_start = 5; st.session_state.markov_run_up_end = 20; st.session_state.markov_run_up_step = 2 | |
| st.session_state.markov_future_start = 3; st.session_state.markov_future_end = 10; st.session_state.markov_future_step = 1 | |
| st.session_state.best_markov_setup = load_markov_setup() | |
| # Data Editor Init | |
| st.session_state["user_setups_data"] = [] | |
| loaded_setups = load_user_setups() | |
| processed = [] | |
| factor_cols = ["RSI", "Volatility", "TREND", "Volume", "MACD", "MA Slope", "Markov"] | |
| for s in loaded_setups: | |
| new_s = s.copy() | |
| for k, v in s.items(): | |
| if k in factor_cols: new_s[k] = str(v) | |
| elif k == "Run": new_s[k] = bool(v) | |
| processed.append(new_s) | |
| st.session_state["user_setups_data"] = processed | |
| # --- Password Auth Init --- | |
| st.session_state.dev_authenticated = False | |
| # --- [Load Data Logic] --- | |
| if 'master_df' not in st.session_state or st.session_state.master_df is None: | |
| with st.spinner("Loading and cleaning data..."): | |
| master_df, load_message = load_all_data('csv_data') | |
| if master_df is None: st.error(load_message); st.stop() | |
| master_df, outlier_report = clean_data_and_report_outliers(master_df) | |
| st.session_state.master_df = master_df | |
| st.session_state.ticker_list = sorted([col for col in master_df.columns if not ('_Volume' in str(col) or '_High' in str(col) or '_Low' in str(col))]) | |
| min_date = master_df.index.min().date() | |
| max_date = master_df.index.max().date() | |
| # --- NEW: Set Default Start to 1 Year Ago from TODAY --- | |
| target_start_date = date.today() - relativedelta(years=1) | |
| # Safety Check: Ensure the target date is valid within your CSV data range | |
| if target_start_date < min_date: | |
| st.session_state.start_date = min_date | |
| elif target_start_date > max_date: | |
| st.session_state.start_date = min_date # Fallback if data is older than 1 year ago | |
| else: | |
| st.session_state.start_date = target_start_date | |
| st.session_state.end_date = max_date | |
| if st.session_state.ticker_select is None and st.session_state.ticker_list: | |
| st.session_state.ticker_select = st.session_state.ticker_list[0] | |
| st.session_state.load_message = load_message | |
| st.session_state.outlier_report = outlier_report | |
| master_df = st.session_state.master_df | |
| ticker_list = st.session_state.ticker_list | |
| # --- HEADER / GREETING --- | |
| st.title("🔴 🚀 Quant Trader") | |
| current_hour = datetime.now().hour | |
| if 5 <= current_hour < 12: greeting = "Good morning!" | |
| elif 12 <= current_hour < 19: greeting = "Good afternoon!" | |
| else: greeting = "Good evening!" | |
| today = date.today() | |
| day_str = today.strftime('%A, %d %B %Y') | |
| proverbs = load_proverbs() | |
| day_of_year = today.timetuple().tm_yday | |
| proverb_index = (day_of_year - 1) % len(proverbs) if proverbs else 0 | |
| daily_proverb = proverbs[proverb_index] if proverbs else "Have a profitable day." | |
| st.success(f"{greeting} Today is {day_str}. | *{daily_proverb}*") | |
| main_content_placeholder = st.empty() | |
| # --- SIDEBAR TOGGLE & PASSWORD LOGIC --- | |
| # Default is TRUE (Production) | |
| production_mode_toggle = st.sidebar.toggle("Production Mode", value=True, key="production_mode_toggle") | |
| if not production_mode_toggle: | |
| if not st.session_state.dev_authenticated: | |
| st.sidebar.warning("Restricted Access") | |
| pwd = st.sidebar.text_input("Enter Developer Password:", type="password", key="dev_password_input") | |
| if pwd == "492211": | |
| st.session_state.dev_authenticated = True | |
| st.rerun() | |
| else: | |
| production_mode = True | |
| else: | |
| production_mode = False | |
| else: | |
| production_mode = True | |
| if production_mode: | |
| st.markdown("""<style>div[data-testid="stSidebar"] { background-color: #f0f2f6; }</style>""", unsafe_allow_html=True) | |
| st.markdown("""<style> | |
| div[data-testid="stSidebar"] button[kind="primary"] { width: 100%; } | |
| div[data-testid="stSidebar"] div[data-testid="stButton"] button { width: 100%; } | |
| </style>""", unsafe_allow_html=True) | |
| # --- [MOVED] RUN ANALYSIS BUTTON (PRODUCTION MODE) --- | |
| if production_mode: | |
| # Note: We do NOT use st.rerun() here. We let the script flow down. | |
| if st.sidebar.button("Run Analysis", type="primary", key="run_analysis_prod_top", | |
| help="Runs the test using the current settings below."): | |
| # 1. Force a sync of all widgets to session state | |
| update_state() | |
| # 2. Set the flag to TRUE so the engine at the bottom runs | |
| st.session_state.run_analysis_button = True | |
| # 3. Clear previous results to ensure a fresh report | |
| st.session_state.run_advanced_advisor = False | |
| st.session_state.run_user_advisor_setup = False | |
| st.session_state.run_scan_user_setups = False | |
| st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None | |
| st.session_state.param_results_df = None; st.session_state.confidence_results_df = None | |
| st.session_state.summary_df = None; st.session_state.single_ticker_results = None | |
| st.session_state.open_trades_df = None; st.session_state.last_run_stats = None | |
| st.session_state.markov_results_df = None | |
| st.sidebar.markdown("---") | |
| # --- SIDEBAR SECTION 1: Select Test Mode & Dates (Common to both) --- | |
| st.sidebar.header("1. Select Test Mode & Dates") | |
| full_list_label = f"Analyse Full List ({len(ticker_list)} Tickers)" | |
| if st.session_state.run_mode == "Analyse Full List": | |
| st.session_state.run_mode = full_list_label | |
| st.sidebar.radio("Mode:", ("Analyse Single Ticker", full_list_label), key='run_mode') | |
| if st.session_state.get('run_mode') == "Analyse Single Ticker": | |
| if not ticker_list: | |
| st.sidebar.warning("No tickers loaded.") | |
| else: | |
| # Ensure valid selection logic (only reset if invalid/None) | |
| if st.session_state.ticker_select not in ticker_list: | |
| st.session_state.ticker_select = ticker_list[0] | |
| try: | |
| ticker_index = ticker_list.index(st.session_state.ticker_select) | |
| except ValueError: | |
| ticker_index = 0 | |
| # [FIXED] Removed the unconditional reset line that was here | |
| st.sidebar.selectbox("Select a Ticker:", ticker_list, index=ticker_index, key='widget_ticker_select', on_change=update_state ) | |
| # --- UPDATED: Date Inputs (Fully Unrestricted 2000-2030) --- | |
| st.sidebar.date_input( | |
| "Start Date", | |
| value=st.session_state.start_date, | |
| min_value=date(2005, 1, 1), | |
| max_value=date(2030, 12, 31), # Allow future dates to prevent locking | |
| key='widget_start_date', | |
| on_change=update_state | |
| ) | |
| st.sidebar.date_input( | |
| "End Date", | |
| value=st.session_state.end_date, | |
| min_value=date(2005, 1, 1), | |
| max_value=date(2030, 12, 31), | |
| key='widget_end_date', | |
| on_change=update_state | |
| ) | |
| # [NEW] Benchmark Settings Sliders (With Tooltips) | |
| st.sidebar.markdown("---") | |
| st.sidebar.write("**Benchmark Settings**") | |
| # 1. Lookback Slider - VISIBLE IN ALL MODES | |
| norm_lookback_years = st.sidebar.slider( | |
| "Lookback (Yrs)", | |
| min_value=1, | |
| max_value=30, | |
| value=st.session_state.get('norm_lookback_years', 1), # Load from Config | |
| help=( | |
| "How many years of history to use when calculating the 'Confidence Score' baseline.\n\n" | |
| "• **30 Years (Recommended):** Stable, long-term benchmark. Ensures trades are judged against all-time history.\n" | |
| "• **1-2 Years:** Adaptive. Judges trades only against recent market volatility (good for changing regimes)." | |
| "• **Default is set to 1 year as we are usually most interested in the recent year.") | |
| ) | |
| use_rolling_benchmark = st.sidebar.checkbox( | |
| "Use Rolling Benchmark (Adaptive)", | |
| value=st.session_state.get('use_rolling_benchmark', False), | |
| key='widget_use_rolling_benchmark', | |
| on_change=update_state, | |
| help="CHECKED: Adaptive Mode. Benchmarks adapt to recent history. Realistic backtest.\nUNCHECKED: Global Mode (Crystal Ball). Uses ALL history to set one strict benchmark. Good for filtering Open Trades." | |
| ) | |
| # 2. Grade Benchmark - HIDDEN IN PRODUCTION MODE | |
| if not production_mode: | |
| benchmark_percentile_setting = st.sidebar.slider( | |
| "Grade Benchmark (%)", | |
| min_value=50, | |
| max_value=99, | |
| value=st.session_state.get('benchmark_rank', 99), # Load from Config | |
| help=( | |
| "Sets the 'Bar' for the Strategy Score.\n\n" | |
| "• **99% (Default):** The top 1% of historical trades set the standard for 'Perfection'.\n" | |
| "• **80%:** Lowers the bar. Trades only need to be in the top 20% to get a high score (Useful for low volatility).") | |
| ) | |
| else: | |
| # In Production, keep the value but hide the slider | |
| benchmark_percentile_setting = st.session_state.get('benchmark_rank', 99) | |
| st.sidebar.markdown("---") | |
| # --- PRIMARY TRIGGER (Hidden in Production, Forced to BBands) --- | |
| if production_mode: | |
| st.session_state.primary_driver = "Bollinger Bands" # Force logic | |
| # No UI element shown | |
| else: | |
| st.sidebar.selectbox( | |
| "Select Primary Trigger:", | |
| ("Bollinger Bands", "RSI Crossover", "MACD Crossover", "MA Slope", "Markov State"), | |
| key='widget_primary_driver', | |
| on_change=update_state, | |
| help="Select the main indicator that will trigger a trade." | |
| ) | |
| st.sidebar.markdown("---") | |
| # --- RUN ACTIONS (Dual Mode Logic) --- | |
| st.sidebar.subheader("Run Actions") | |
| if production_mode: | |
| # --- PRODUCTION BUTTONS --- | |
| # [REMOVED] Old "Run Analysis" button was here. | |
| # 2. Dropdown for User Setups (With "Default" option) | |
| user_setups_raw = st.session_state.get("user_setups_data", []) | |
| valid_user_setups = [s for s in user_setups_raw if not is_row_blank(s)] | |
| setup_options = {"Default (Reset to Original)": -1} | |
| for i, s in enumerate(valid_user_setups): | |
| # Build Label with Notes | |
| note = s.get('Notes', '') | |
| if note: | |
| if len(note) > 50: note = note[:47] + "..." | |
| note_display = f" - {note}" | |
| else: | |
| note_display = "" | |
| label = f"Setup {i+1}{note_display}" | |
| setup_options[label] = i | |
| # --- CALLBACK: AUTO-POPULATE SETTINGS ON SELECTION --- | |
| def on_user_setup_change(): | |
| """Callback to populate settings immediately when dropdown changes.""" | |
| selected_label = st.session_state.widget_user_setup_select_key | |
| idx = setup_options[selected_label] | |
| def sync_param(main_key, value): | |
| st.session_state[main_key] = value | |
| st.session_state[f"widget_{main_key}"] = value | |
| # --- CASE A: DEFAULT (RESET) --- | |
| if idx == -1: | |
| defaults = st.session_state.get('widget_defaults', {}) or load_settings() | |
| # Reset Factors | |
| sync_param('use_rsi', defaults.get('use_rsi', True)) | |
| sync_param('rsi_w', defaults.get('rsi_w', 1.0)) | |
| sync_param('use_vol', defaults.get('use_vol', True)) | |
| sync_param('vol_w', defaults.get('vol_w', 1.0)) | |
| sync_param('use_trend', defaults.get('use_trend', True)) | |
| sync_param('trend_w', defaults.get('trend_w', 1.5)) | |
| sync_param('use_volume', defaults.get('use_volume', True)) | |
| sync_param('volume_w', defaults.get('volume_w', 1.0)) | |
| sync_param('use_macd', defaults.get('use_macd', True)) | |
| sync_param('macd_w', defaults.get('macd_w', 1.0)) | |
| sync_param('use_ma_slope', defaults.get('use_ma_slope', True)) | |
| sync_param('ma_slope_w', defaults.get('ma_slope_w', 0.5)) | |
| sync_param('use_markov', defaults.get('use_markov', False)) | |
| sync_param('markov_w', defaults.get('markov_w', 1.0)) | |
| # Reset ADX (With Clamping for Production Mode) | |
| sync_param('use_adx_filter', defaults.get('use_adx_filter', True)) | |
| raw_adx = defaults.get('adx_threshold', 25.0) | |
| clamped_adx = max(20.0, min(30.0, raw_adx)) | |
| sync_param('adx_threshold', clamped_adx) | |
| # Reset Strategies | |
| sync_param('ma_period', defaults.get("large_ma_period", 50)) | |
| sync_param('bb_period', defaults.get("bband_period", 20)) | |
| sync_param('bb_std', defaults.get("bband_std_dev", 2.0)) | |
| sync_param('confidence_slider', defaults.get("confidence_threshold", 50)) | |
| sync_param('long_entry', defaults.get("long_entry_threshold_pct", 0.0) * 100) | |
| sync_param('long_exit', defaults.get("long_exit_ma_threshold_pct", 0.0) * 100) | |
| sync_param('long_sl', defaults.get("long_trailing_stop_loss_pct", 8.0) * 100) | |
| sync_param('long_delay', defaults.get("long_delay_days", 0)) | |
| sync_param('short_entry', defaults.get("short_entry_threshold_pct", 0.0) * 100) | |
| sync_param('short_exit', defaults.get("short_exit_ma_threshold_pct", 0.0) * 100) | |
| sync_param('short_sl', defaults.get("short_trailing_stop_loss_pct", 8.0) * 100) | |
| sync_param('short_delay', defaults.get("short_delay_days", 0)) | |
| st.toast("Settings reset to Default.", icon="🔄") | |
| # --- CASE B: USER SETUP --- | |
| else: | |
| selected_setup_data = valid_user_setups[idx] | |
| def apply_factor_loading(key, main_key, weight_key): | |
| val = selected_setup_data.get(key, 'Off') | |
| if str(val).lower() in ['off', '0', '0.0', 'false']: | |
| sync_param(main_key, False) | |
| else: | |
| sync_param(main_key, True) | |
| try: w = float(val); sync_param(weight_key, w) | |
| except: sync_param(weight_key, 1.0) | |
| apply_factor_loading('RSI', 'use_rsi', 'rsi_w') | |
| apply_factor_loading('Volatility', 'use_vol', 'vol_w') | |
| apply_factor_loading('TREND', 'use_trend', 'trend_w') | |
| apply_factor_loading('Volume', 'use_volume', 'volume_w') | |
| apply_factor_loading('MACD', 'use_macd', 'macd_w') | |
| apply_factor_loading('MA Slope', 'use_ma_slope', 'ma_slope_w') | |
| apply_factor_loading('Markov', 'use_markov', 'markov_w') | |
| # --- ADX FILTER LOGIC (With Clamping) --- | |
| adx_val = selected_setup_data.get('ADX Filter', 'Off') | |
| if str(adx_val).lower() in ['off', '0', '0.0', 'false']: | |
| sync_param('use_adx_filter', False) | |
| else: | |
| sync_param('use_adx_filter', True) | |
| try: | |
| thresh = float(adx_val) | |
| clamped_thresh = max(20.0, min(30.0, thresh)) | |
| sync_param('adx_threshold', clamped_thresh) | |
| except: | |
| sync_param('adx_threshold', 25.0) | |
| def get_num(key, default, type_func): | |
| try: return type_func(selected_setup_data.get(key, default)) | |
| except: return default | |
| sync_param('ma_period', get_num('Large MA Period', st.session_state.ma_period, int)) | |
| sync_param('bb_period', get_num('Bollinger Band Period', st.session_state.bb_period, int)) | |
| sync_param('bb_std', get_num('Bollinger Band Std Dev', st.session_state.bb_std, float)) | |
| sync_param('confidence_slider', get_num('Conf. Threshold', st.session_state.confidence_slider, int)) | |
| sync_param('long_entry', get_num('Long Entry Threshold (%)', st.session_state.long_entry, float)) | |
| sync_param('long_exit', get_num('Long Exit Threshold (%)', st.session_state.long_exit, float)) | |
| sync_param('long_sl', get_num('Long Stop Loss (%)', st.session_state.long_sl, float)) | |
| sync_param('long_delay', get_num('Long Delay (Days)', st.session_state.long_delay, int)) | |
| sync_param('short_entry', get_num('Short Entry Threshold (%)', st.session_state.short_entry, float)) | |
| sync_param('short_exit', get_num('Short Exit Threshold (%)', st.session_state.short_exit, float)) | |
| sync_param('short_sl', get_num('Short Stop Loss (%)', st.session_state.short_sl, float)) | |
| sync_param('short_delay', get_num('Short Delay (Days)', st.session_state.short_delay, int)) | |
| st.toast(f"Populated settings for {selected_label}", icon="✅") | |
| # Selectbox with on_change callback | |
| st.sidebar.selectbox( | |
| "Select User Setup:", | |
| list(setup_options.keys()), | |
| key="widget_user_setup_select_key", | |
| on_change=on_user_setup_change, | |
| help="Selecting a setup will immediately populate the sidebar settings below." | |
| ) | |
| else: | |
| # --- DEVELOPER BUTTONS (Original) --- | |
| if st.sidebar.button("🚀 Run Analysis (Developer)", type="primary", key="run_analysis_dev"): | |
| st.session_state.run_analysis_button = True | |
| st.session_state.run_advanced_advisor = False | |
| st.session_state.run_user_advisor_setup = False | |
| st.session_state.run_scan_user_setups = False | |
| st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None | |
| st.session_state.param_results_df = None; st.session_state.confidence_results_df = None | |
| st.session_state.summary_df = None; st.session_state.single_ticker_results = None | |
| st.session_state.open_trades_df = None; st.session_state.last_run_stats = None | |
| st.session_state.markov_results_df = None | |
| st.rerun() | |
| if st.sidebar.button("👨💼 Run Advanced Advisor (Find Trades)", key="run_adv_find_trades"): | |
| st.session_state.run_advanced_advisor = True | |
| st.session_state.run_analysis_button = False | |
| st.session_state.run_user_advisor_setup = False | |
| st.session_state.run_scan_user_setups = False | |
| st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None | |
| st.session_state.param_results_df = None; st.session_state.confidence_results_df = None | |
| st.session_state.summary_df = None | |
| st.rerun() | |
| if st.sidebar.button("⚙️ Run User-Defined Advisor (Find Trades)", key="run_user_find_trades"): | |
| st.session_state.run_user_advisor_setup = True | |
| st.session_state.run_analysis_button = False | |
| st.session_state.run_advanced_advisor = False | |
| st.session_state.run_scan_user_setups = False | |
| st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None | |
| st.session_state.param_results_df = None; st.session_state.confidence_results_df = None | |
| st.session_state.summary_df = None | |
| st.rerun() | |
| st.sidebar.markdown("---") | |
| # --- SECTION 2: Confidence Score Factors --- | |
| st.sidebar.header("2. Confidence Score Factors") | |
| if production_mode: | |
| # --- PRODUCTION VIEW (Simplified) --- | |
| # Force hidden factors OFF | |
| st.session_state.use_vol = False | |
| st.session_state.use_trend = False | |
| st.session_state.use_ma_slope = False | |
| st.session_state.use_markov = False | |
| st.session_state.rsi_logic = "Level" | |
| # RSI (Momentum) | |
| if 'widget_use_rsi' not in st.session_state: st.session_state.widget_use_rsi = st.session_state.use_rsi | |
| st.sidebar.toggle("Use Momentum (RSI)", key='widget_use_rsi', on_change=update_state, help="Enable Relative Strength Index (Momentum) factor.") | |
| rsi_val = st.session_state.rsi_w if st.session_state.rsi_w != 1.0 else 0.5 | |
| if 'widget_rsi_w' not in st.session_state: st.session_state.widget_rsi_w = rsi_val | |
| st.sidebar.number_input("RSI Weight", 0.1, 5.0, step=0.1, key='widget_rsi_w', on_change=update_state, disabled=not st.session_state.get('use_rsi', True)) | |
| # Volume Spike | |
| if 'widget_use_volume' not in st.session_state: st.session_state.widget_use_volume = st.session_state.use_volume | |
| st.sidebar.toggle("Use Volume Spike", key='widget_use_volume', on_change=update_state, help="Enable Volume Spike detection.") | |
| vol_val = st.session_state.volume_w if st.session_state.volume_w != 1.0 else 0.5 | |
| if 'widget_volume_w' not in st.session_state: st.session_state.widget_volume_w = vol_val | |
| st.sidebar.number_input("Volume Weight", 0.1, 5.0, step=0.1, key='widget_volume_w', on_change=update_state, disabled=not st.session_state.get('use_volume', True)) | |
| # ADX Filter | |
| if 'widget_use_adx_filter' not in st.session_state: st.session_state.widget_use_adx_filter = st.session_state.use_adx_filter | |
| st.sidebar.toggle("Use ADX Filter (<)", key='widget_use_adx_filter', on_change=update_state, help="Enable Trend Strength filter (ADX). Good for hunting Shorts, bad for Longs") | |
| curr_adx = max(20.0, min(30.0, st.session_state.adx_threshold)) | |
| if 'widget_adx_threshold' not in st.session_state: st.session_state.widget_adx_threshold = curr_adx | |
| st.sidebar.number_input("ADX Threshold", 20.0, 30.0, step=1.0, key='widget_adx_threshold', on_change=update_state, disabled=not st.session_state.get('use_adx_filter', True)) | |
| # MACD | |
| if 'widget_use_macd' not in st.session_state: st.session_state.widget_use_macd = st.session_state.use_macd | |
| st.sidebar.toggle("Use MACD Signals", key='widget_use_macd', on_change=update_state, help="Enable MACD crossover/histogram signals.") | |
| macd_val = st.session_state.macd_w if st.session_state.macd_w != 1.0 else 2.0 | |
| if 'widget_macd_w' not in st.session_state: st.session_state.widget_macd_w = macd_val | |
| st.sidebar.number_input("MACD Weight", 0.1, 5.0, step=0.1, key='widget_macd_w', on_change=update_state, disabled=not st.session_state.get('use_macd', True)) | |
| # Confidence Slider | |
| # [FIX] Force the widget key to match the stored variable BEFORE creating the widget | |
| st.session_state['widget_confidence_slider'] = st.session_state.confidence_slider | |
| # [FIX] Remove 'value=' argument. Streamlit will use the key we just populated. | |
| st.sidebar.slider( | |
| "Minimum Confidence Threshold (%)", | |
| 0, 100, | |
| step=5, | |
| key='widget_confidence_slider', | |
| on_change=update_state, | |
| help="Acts as a quality filter. Higher values increase trade quality but reduce the number of trades found." | |
| ) | |
| else: | |
| # --- DEVELOPER VIEW (Original) --- | |
| if 'widget_use_rsi' not in st.session_state: st.session_state.widget_use_rsi = st.session_state.use_rsi | |
| st.sidebar.toggle("Use Momentum (RSI)", key='widget_use_rsi', on_change=update_state) | |
| if 'widget_rsi_w' not in st.session_state: st.session_state.widget_rsi_w = st.session_state.rsi_w | |
| st.sidebar.number_input("RSI Weight", 0.1, 5.0, step=0.1, key='widget_rsi_w', on_change=update_state, disabled=not st.session_state.get('use_rsi', True)) | |
| rsi_logic_options = ["Crossover", "Level"] | |
| rsi_logic_index = rsi_logic_options.index(st.session_state.rsi_logic) if st.session_state.rsi_logic in rsi_logic_options else 0 | |
| st.sidebar.radio("RSI Entry Logic:", rsi_logic_options, index=rsi_logic_index, key='widget_rsi_logic', on_change=update_state, | |
| help="Level: Enter <= 30/>= 70. Crossover: Enter on cross back.", | |
| disabled=not st.session_state.get('use_rsi', True)) | |
| if 'widget_use_vol' not in st.session_state: st.session_state.widget_use_vol = st.session_state.use_vol | |
| st.sidebar.toggle("Use Volatility", key='widget_use_vol', on_change=update_state) | |
| if 'widget_vol_w' not in st.session_state: st.session_state.widget_vol_w = st.session_state.vol_w | |
| st.sidebar.number_input("Volatility Weight", 0.1, 5.0, step=0.1, key='widget_vol_w', on_change=update_state, disabled=not st.session_state.get('use_vol', True)) | |
| if 'widget_use_trend' not in st.session_state: st.session_state.widget_use_trend = st.session_state.use_trend | |
| st.sidebar.toggle("Use Trend (200d MA)", key='widget_use_trend', on_change=update_state) | |
| if 'widget_trend_w' not in st.session_state: st.session_state.widget_trend_w = st.session_state.trend_w | |
| st.sidebar.number_input("Trend Weight", 0.1, 5.0, step=0.1, key='widget_trend_w', on_change=update_state, disabled=not st.session_state.get('use_trend', True)) | |
| if 'widget_use_volume' not in st.session_state: st.session_state.widget_use_volume = st.session_state.use_volume | |
| st.sidebar.toggle("Use Volume Spike", key='widget_use_volume', on_change=update_state) | |
| if 'widget_volume_w' not in st.session_state: st.session_state.widget_volume_w = st.session_state.volume_w | |
| st.sidebar.number_input("Volume Weight", 0.1, 5.0, step=0.1, key='widget_volume_w', on_change=update_state, disabled=not st.session_state.get('use_volume', True)) | |
| if 'widget_use_adx_filter' not in st.session_state: st.session_state.widget_use_adx_filter = st.session_state.use_adx_filter | |
| st.sidebar.toggle("Use ADX Filter (<)", key='widget_use_adx_filter', on_change=update_state) | |
| if 'widget_adx_threshold' not in st.session_state: st.session_state.widget_adx_threshold = st.session_state.adx_threshold | |
| st.sidebar.number_input("ADX Threshold", 10.0, 50.0, step=1.0, key='widget_adx_threshold', on_change=update_state, disabled=not st.session_state.get('use_adx_filter', True), help="Allow entries only when ADX is BELOW this value.") | |
| if 'widget_adx_period' not in st.session_state: st.session_state.widget_adx_period = st.session_state.adx_period | |
| st.sidebar.number_input("ADX Period", 5, 50, step=1, key='widget_adx_period', on_change=update_state, help="The lookback period for the ADX calculation.") | |
| if 'widget_use_macd' not in st.session_state: st.session_state.widget_use_macd = st.session_state.use_macd | |
| st.sidebar.toggle("Use MACD Signals", key='widget_use_macd', on_change=update_state) | |
| if 'widget_macd_w' not in st.session_state: st.session_state.widget_macd_w = st.session_state.macd_w | |
| st.sidebar.number_input("MACD Weight", 0.1, 5.0, step=0.1, key='widget_macd_w', on_change=update_state, disabled=not st.session_state.get('use_macd', True)) | |
| if 'widget_use_ma_slope' not in st.session_state: st.session_state.widget_use_ma_slope = st.session_state.use_ma_slope | |
| st.sidebar.toggle("Use MA Slope", key='widget_use_ma_slope', on_change=update_state) | |
| if 'widget_ma_slope_w' not in st.session_state: st.session_state.widget_ma_slope_w = st.session_state.ma_slope_w | |
| st.sidebar.number_input("MA Slope Weight", 0.1, 5.0, step=0.1, key='widget_ma_slope_w', on_change=update_state, disabled=not st.session_state.get('use_ma_slope', True)) | |
| if 'widget_use_markov' not in st.session_state: st.session_state.widget_use_markov = st.session_state.use_markov | |
| st.sidebar.toggle("Use Markov State", key='widget_use_markov', on_change=update_state, help="Use the best-found Markov state as a confidence factor. You must run 'Find Best Markov Setup' (Section 7) first.") | |
| if 'widget_markov_w' not in st.session_state: st.session_state.widget_markov_w = st.session_state.markov_w | |
| st.sidebar.number_input("Markov Weight", 0.1, 5.0, step=0.1, key='widget_markov_w', on_change=update_state, disabled=not st.session_state.get('use_markov', True)) | |
| # --- NEW: MFI Controls --- | |
| if 'widget_use_mfi' not in st.session_state: st.session_state.widget_use_mfi = st.session_state.use_mfi | |
| st.sidebar.toggle("Use Money Flow (MFI)", key='widget_use_mfi', on_change=update_state) | |
| if 'widget_mfi_w' not in st.session_state: st.session_state.widget_mfi_w = st.session_state.mfi_w | |
| st.sidebar.number_input("MFI Weight", 0.1, 5.0, step=0.1, key='widget_mfi_w', on_change=update_state, disabled=not st.session_state.get('use_mfi', True)) | |
| # --- NEW: SuperTrend Controls --- | |
| if 'widget_use_supertrend' not in st.session_state: st.session_state.widget_use_supertrend = st.session_state.use_supertrend | |
| st.sidebar.toggle("Use SuperTrend", key='widget_use_supertrend', on_change=update_state) | |
| if 'widget_supertrend_w' not in st.session_state: st.session_state.widget_supertrend_w = st.session_state.supertrend_w | |
| st.sidebar.number_input("SuperTrend Weight", 0.1, 5.0, step=0.1, key='widget_supertrend_w', on_change=update_state, disabled=not st.session_state.get('use_supertrend', True)) | |
| # [FIX] Force the widget key to match the stored variable | |
| st.session_state['widget_confidence_slider'] = st.session_state.confidence_slider | |
| # [FIX] Remove 'value=' argument. | |
| st.sidebar.slider( | |
| "Minimum Confidence Threshold (%)", | |
| 0, 100, | |
| step=5, | |
| key='widget_confidence_slider', | |
| on_change=update_state | |
| ) | |
| st.sidebar.markdown("---") | |
| st.sidebar.header("3. Strategy Parameters") | |
| if production_mode: | |
| # --- PRODUCTION VIEW (Simplified) --- | |
| # Large MA and BB Period are HIDDEN here (values persist in session_state) | |
| # Dropdown for Std Dev (1.4, 1.6, 2.2) | |
| std_options = [1.4, 1.6, 2.2] | |
| current_std = st.session_state.bb_std | |
| std_index = std_options.index(current_std) if current_std in std_options else 2 | |
| st.sidebar.selectbox( | |
| "Bollinger Band Std Dev", | |
| std_options, | |
| index=std_index, | |
| key='widget_bb_std', | |
| on_change=update_state, | |
| help="Controls volatility sensitivity. Lower (1.4/1.6) = more trades, Higher (2.2) = fewer, stricter trades." | |
| ) | |
| else: | |
| # --- DEVELOPER VIEW (Original) --- | |
| st.sidebar.number_input("Large MA Period", 10, 200, value=st.session_state.ma_period, step=1, key='widget_ma_period', on_change=update_state) | |
| st.sidebar.number_input("Bollinger Band Period", 10, 100, value=st.session_state.bb_period, step=1, key='widget_bb_period', on_change=update_state) | |
| st.sidebar.number_input("Bollinger Band Std Dev", 1.0, 5.0, value=st.session_state.bb_std, step=0.1, key='widget_bb_std', on_change=update_state) | |
| st.sidebar.subheader("Long Trade Logic") | |
| st.sidebar.slider("Entry Threshold (%)", 0.0, 10.0, value=st.session_state.long_entry, step=0.1, key='widget_long_entry', on_change=update_state) | |
| st.sidebar.slider("Exit MA Threshold (%)", 0.0, 10.0, value=st.session_state.long_exit, step=0.1, key='widget_long_exit', on_change=update_state) | |
| st.sidebar.slider("Trailing Stop Loss (%)", 0.0, 30.0, value=st.session_state.long_sl, step=0.5, key='widget_long_sl', on_change=update_state, help="A trailing stop-loss set from the trade's high-water mark. Set to 0 to disable.") | |
| st.sidebar.number_input("Delay Entry (days)", 0, 10, value=st.session_state.long_delay, step=1, key='widget_long_delay', on_change=update_state) | |
| st.sidebar.markdown("---") | |
| st.sidebar.subheader("Exit Logic") | |
| exit_options = ["Standard (Price-Based)", "Intelligent (ADX/MACD/ATR)"] | |
| if st.session_state.exit_logic_type not in exit_options: st.session_state.exit_logic_type = "Standard (Price-Based)" | |
| exit_index = exit_options.index(st.session_state.exit_logic_type) | |
| st.sidebar.selectbox( | |
| "Exit Logic Type:", exit_options, index=exit_index, key='widget_exit_logic_type', on_change=update_state, | |
| help="Standard: Exit on MA/BB cross. Intelligent: ADX/MACD profit-take with an optional TSL." | |
| ) | |
| st.sidebar.slider( | |
| "Intelligent: Trailing Stop Loss (%)", 0.0, 60.0, value=st.session_state.intelligent_tsl_pct, step=1.0, key='widget_intelligent_tsl_pct', on_change=update_state, | |
| format="%.1f%%", disabled=(st.session_state.exit_logic_type != 'Intelligent (ADX/MACD/ATR)'), | |
| help="The TSL for the 'Intelligent' exit. Set to 60.0% to replicate the 'Profit-Take or Time-Out' strategy." | |
| ) | |
| st.sidebar.toggle("Apply MA/BB Price Lock Floor (Catcher)", value=st.session_state.use_ma_floor_filter, key='widget_use_ma_floor_filter', on_change=update_state, | |
| help="If ON, the Trailing Stop Loss is prevented from dropping below the Mean Reversion price.") | |
| # [NEW SLIDER] Catcher Offset | |
| # Only visible in Developer Mode. Disabled if Catcher is OFF. | |
| st.sidebar.slider( | |
| "Catcher Offset (%)", | |
| min_value=-10.0, max_value=10.0, value=st.session_state.catcher_stop_pct, step=0.1, | |
| key='widget_catcher_stop_pct', on_change=update_state, | |
| format="%.1f%%", | |
| disabled=not st.session_state.use_ma_floor_filter, | |
| help="Adjusts the hard floor level relative to Entry Price.\nPositive = Secure Profit (e.g. +1% Floor).\nNegative = Allow wiggle room (e.g. -1% Floor)." | |
| ) | |
| st.sidebar.markdown("---") | |
| st.sidebar.subheader("Short Trade Logic") | |
| st.sidebar.slider("Entry Threshold (%)", 0.0, 10.0, value=st.session_state.short_entry, step=0.1, key='widget_short_entry', on_change=update_state) | |
| st.sidebar.slider("Exit MA Threshold (%)", 0.0, 10.0, value=st.session_state.short_exit, step=0.1, key='widget_short_exit', on_change=update_state) | |
| st.sidebar.slider("Trailing Stop Loss (%)", 0.0, 30.0, value=st.session_state.short_sl, step=0.5, key='widget_short_sl', on_change=update_state, help="A trailing stop-loss set from the trade's low-water mark. Set to 0 to disable.") | |
| st.sidebar.number_input("Delay Entry (days)", 0, 10, value=st.session_state.short_delay, step=1, key='widget_short_delay', on_change=update_state) | |
| st.sidebar.markdown("---") | |
| st.sidebar.subheader("Time Limits (Days)") | |
| c_time1, c_time2 = st.sidebar.columns(2) | |
| st.session_state.max_long_duration = c_time1.number_input("Max Long Duration", min_value=1, max_value=365, value=st.session_state.get('max_long_duration', 60), step=1, key='widget_max_long_duration', on_change=update_state) | |
| st.session_state.max_short_duration = c_time2.number_input("Max Short Duration", min_value=1, max_value=365, value=st.session_state.get('max_short_duration', 10), step=1, key='widget_max_short_duration', on_change=update_state) | |
| if 'best_params' not in st.session_state: st.session_state.best_params = None | |
| # --- SECTIONS 4, 5, 6, 7 (HIDDEN IN PRODUCTION) --- | |
| if not production_mode: | |
| st.sidebar.markdown("---") | |
| st.sidebar.header("4. Find Best Parameters") | |
| with st.sidebar.expander("Set Optimisation Ranges"): | |
| use_squared_weighting = st.toggle("Prioritise Profit per Trade (Params)", value=st.session_state.sq_params_toggle, key="widget_sq_params_toggle", on_change=update_state) | |
| st.markdown("---") | |
| optimise_ma = st.checkbox("Optimise MA Period", value=st.session_state.opt_ma_cb, key="widget_opt_ma_cb", on_change=update_state); c1,c2,c3 = st.columns(3) | |
| c1.number_input("MA Start", 10, 200, value=st.session_state.ma_start, step=5, disabled=not optimise_ma, key='widget_ma_start', on_change=update_state) | |
| c2.number_input("MA End", 10, 200, value=st.session_state.ma_end, step=5, disabled=not optimise_ma, key='widget_ma_end', on_change=update_state) | |
| c3.number_input("MA Step", 1, 20, value=st.session_state.ma_step, step=1, disabled=not optimise_ma, key='widget_ma_step', on_change=update_state) | |
| optimise_bb = st.checkbox("Optimise BB Period", value=st.session_state.opt_bb_cb, key="widget_opt_bb_cb", on_change=update_state); c1,c2,c3 = st.columns(3) | |
| c1.number_input("BB Start", 10, 100, value=st.session_state.bb_start, step=5, disabled=not optimise_bb, key='widget_bb_start', on_change=update_state) | |
| c2.number_input("BB End", 10, 100, value=st.session_state.bb_end, step=5, disabled=not optimise_bb, key='widget_bb_end', on_change=update_state) | |
| c3.number_input("BB Step", 1, 10, value=st.session_state.bb_step, step=1, disabled=not optimise_bb, key='widget_bb_step', on_change=update_state) | |
| optimise_std = st.checkbox("Optimise BB Std Dev", value=st.session_state.opt_std_cb, key="widget_opt_std_cb", on_change=update_state); c1,c2,c3 = st.columns(3) | |
| c1.number_input("Std Start", 1.0, 5.0, value=st.session_state.std_start, step=0.1, format="%.1f", disabled=not optimise_std, key='widget_std_start', on_change=update_state) | |
| c2.number_input("Std End", 1.0, 5.0, value=st.session_state.std_end, step=0.1, format="%.1f", disabled=not optimise_std, key='widget_std_end', on_change=update_state) | |
| c3.number_input("Std Step", 0.1, 1.0, value=st.session_state.std_step, step=0.1, format="%.1f", disabled=not optimise_std, key='widget_std_step', on_change=update_state) | |
| st.markdown("---") | |
| optimise_conf = st.checkbox("Optimise Confidence Threshold", value=st.session_state.opt_conf_cb, key="widget_opt_conf_cb", on_change=update_state); c1,c2,c3 = st.columns(3) | |
| c1.number_input("Conf Start", 0, 100, value=st.session_state.conf_start, step=5, disabled=not optimise_conf, key='widget_conf_start', on_change=update_state) | |
| c2.number_input("Conf End", 0, 100, value=st.session_state.conf_end, step=5, disabled=not optimise_conf, key='widget_conf_end', on_change=update_state) | |
| c3.number_input("Conf Step", 5, 25, value=st.session_state.conf_step, step=5, disabled=not optimise_conf, key='widget_conf_step', on_change=update_state) | |
| optimise_sl = st.checkbox("Optimise Stop Loss %", value=st.session_state.opt_sl_cb, key="widget_opt_sl_cb", on_change=update_state); c1,c2,c3 = st.columns(3) | |
| c1.number_input("SL Start", 0.0, 30.0, value=st.session_state.sl_start, step=0.5, disabled=not optimise_sl, key='widget_sl_start', on_change=update_state) | |
| c2.number_input("SL End", 0.0, 30.0, value=st.session_state.sl_end, step=0.5, disabled=not optimise_sl, key='widget_sl_end', on_change=update_state) | |
| c3.number_input("SL Step", 0.1, 5.0, value=st.session_state.sl_step, step=0.5, disabled=not optimise_sl, key='widget_sl_step', on_change=update_state) | |
| optimise_delay = st.checkbox("Optimise Delay Days", value=st.session_state.opt_delay_cb, key="widget_opt_delay_cb", on_change=update_state); c1,c2,c3 = st.columns(3) | |
| c1.number_input("Delay Start", 0, 10, value=st.session_state.delay_start, step=1, disabled=not optimise_delay, key='widget_delay_start', on_change=update_state) | |
| c2.number_input("Delay End", 0, 10, value=st.session_state.delay_end, step=1, disabled=not optimise_delay, key='widget_delay_end', on_change=update_state) | |
| c3.number_input("Delay Step", 1, 5, value=st.session_state.delay_step, step=1, disabled=not optimise_delay, key='widget_delay_step', on_change=update_state) | |
| optimise_entry = st.checkbox("Optimise Entry %", value=st.session_state.opt_entry_cb, key="widget_opt_entry_cb", on_change=update_state); c1,c2,c3 = st.columns(3) | |
| c1.number_input("Entry Start", 0.0, 10.0, value=st.session_state.entry_start, step=0.1, disabled=not optimise_entry, key='widget_entry_start', on_change=update_state) | |
| c2.number_input("Entry End", 0.0, 10.0, value=st.session_state.entry_end, step=0.1, disabled=not optimise_entry, key='widget_entry_end', on_change=update_state) | |
| c3.number_input("Entry Step", 0.1, 1.0, value=st.session_state.entry_step, step=0.1, disabled=not optimise_entry, key='widget_entry_step', on_change=update_state) | |
| optimise_exit = st.checkbox("Optimise Exit MA %", value=st.session_state.opt_exit_cb, key="widget_opt_exit_cb", on_change=update_state); c1,c2,c3 = st.columns(3) | |
| c1.number_input("Exit Start", 0.0, 10.0, value=st.session_state.exit_start, step=0.1, disabled=not optimise_exit, key='widget_exit_start', on_change=update_state) | |
| c2.number_input("Exit End", 0.0, 10.0, value=st.session_state.exit_end, step=0.1, disabled=not optimise_exit, key='widget_exit_end', on_change=update_state) | |
| c3.number_input("Exit Step", 0.1, 1.0, value=st.session_state.exit_step, step=0.1, disabled=not optimise_exit, key='widget_exit_step', on_change=update_state) | |
| st.markdown("---") | |
| optimise_dur = st.checkbox("Optimise Max Duration", value=st.session_state.opt_duration_cb, key="widget_opt_duration_cb", on_change=update_state); c1,c2,c3 = st.columns(3) | |
| c1.number_input("Dur Start", 1, 365, value=st.session_state.dur_start, step=1, disabled=not optimise_dur, key='widget_dur_start', on_change=update_state) | |
| c2.number_input("Dur End", 1, 365, value=st.session_state.dur_end, step=1, disabled=not optimise_dur, key='widget_dur_end', on_change=update_state) | |
| c3.number_input("Dur Step", 1, 30, value=st.session_state.dur_step, step=1, disabled=not optimise_dur, key='widget_dur_step', on_change=update_state) | |
| st.markdown("---") | |
| col1, col2 = st.columns(2) | |
| if col1.button("💡 Find Best Long"): generate_and_run_optimisation(master_df, main_content_placeholder, 'long', use_squared_weighting) | |
| if col2.button("💡 Find Best Short"): generate_and_run_optimisation(master_df, main_content_placeholder, 'short', use_squared_weighting) | |
| st.markdown("---") | |
| col_comb_1, col_comb_2 = st.columns(2) | |
| if col_comb_1.button("🔥 Find Combined Best Long", key="find_combined_long", use_container_width=True): | |
| st.session_state.run_combined_optimisation = True; st.session_state.optimise_side = 'long'; st.rerun() | |
| if col_comb_2.button("🔥 Find Combined Best Short", key="find_combined_short", use_container_width=True): | |
| st.session_state.run_combined_optimisation = True; st.session_state.optimise_side = 'short'; st.rerun() | |
| st.sidebar.markdown("---") | |
| st.sidebar.header("5. Find Best/Worst Confidence Setup") | |
| with st.sidebar.expander("Optimise Confidence Factors"): | |
| st.info("Finds good setups (using Section 2 factors) or bad setups (using the factors below).") | |
| st.write("**Find Best Setups (High Profit)**"); c1, c2 = st.columns(2) | |
| if c1.button("💡 Find Best Long Confidence", key="find_conf_long"): | |
| st.session_state.run_confidence_optimisation = True; st.session_state.confidence_optimise_side = 'long'; st.session_state.confidence_optimise_mode = 'best'; st.session_state.confidence_optimise_veto_factors = None; st.rerun() | |
| if c2.button("💡 Find Best Short Confidence", key="find_conf_short"): | |
| st.session_state.run_confidence_optimisation = True; st.session_state.confidence_optimise_side = 'short'; st.session_state.confidence_optimise_mode = 'best'; st.session_state.confidence_optimise_veto_factors = None; st.rerun() | |
| st.markdown("---") | |
| st.write("**Find Worst Setups (for Veto Filter)**") | |
| st.caption("Select the factors to test for the Veto signal:") | |
| c1_veto, c2_veto = st.columns(2) | |
| veto_rsi = c1_veto.toggle("Veto RSI", value=st.session_state.veto_rsi_cb, key="widget_veto_rsi_cb", on_change=update_state) | |
| veto_vol = c2_veto.toggle("Veto Volatility", value=st.session_state.veto_vol_cb, key="widget_veto_vol_cb", on_change=update_state) | |
| veto_trend = c1_veto.toggle("Veto Trend", value=st.session_state.veto_trend_cb, key="widget_veto_trend_cb", on_change=update_state) | |
| veto_volume = c2_veto.toggle("Veto Volume", value=st.session_state.veto_volume_cb, key="widget_veto_volume_cb", on_change=update_state) | |
| veto_macd = c1_veto.toggle("Veto MACD", value=st.session_state.veto_macd_cb, key="widget_veto_macd_cb", on_change=update_state) | |
| veto_ma_slope = c2_veto.toggle("Veto MA Slope", value=st.session_state.veto_ma_slope_cb, key="widget_veto_ma_slope_cb", on_change=update_state) | |
| veto_markov = c1_veto.toggle("Veto Markov", value=st.session_state.get('veto_markov_cb', False), key="widget_veto_markov_cb", on_change=update_state) | |
| veto_factors = (veto_rsi, veto_vol, veto_trend, veto_volume, veto_macd, veto_ma_slope, veto_markov) | |
| c1_veto_btn, c2_veto_btn = st.columns(2) | |
| if c1_veto_btn.button("❌ Find Worst Long", key="find_worst_long"): | |
| st.session_state.run_confidence_optimisation = True; st.session_state.confidence_optimise_side = 'long'; st.session_state.confidence_optimise_mode = 'worst'; st.session_state.confidence_optimise_veto_factors = veto_factors; st.rerun() | |
| if c2_veto_btn.button("❌ Find Worst Short", key="find_worst_short"): | |
| st.session_state.run_confidence_optimisation = True; st.session_state.confidence_optimise_side = 'short'; st.session_state.confidence_optimise_mode = 'worst'; st.session_state.confidence_optimise_veto_factors = veto_factors; st.rerun() | |
| st.sidebar.markdown("---") | |
| st.sidebar.header("6. Find Best Weights") | |
| with st.sidebar.expander("Optimise Specific Factor Weights"): | |
| st.info("Dynamically optimises weights (0.5 to 2.0) for all active, non-primary factors. Uses other sidebar settings as fixed values.") | |
| use_sq_weighting_for_weights = st.toggle("Prioritise Profit per Trade (Weights)", value=st.session_state.sq_weights_toggle, key="widget_sq_weights_toggle", on_change=update_state) | |
| col1_w, col2_w = st.columns(2) | |
| if col1_w.button("⚖️ Find Best Long Weights", key="find_weights_long"): | |
| st.session_state.run_weight_optimisation = True; st.session_state.weight_optimise_side = 'long'; st.rerun() | |
| if col2_w.button("⚖️ Find Best Short Weights", key="find_weights_short"): | |
| st.session_state.run_weight_optimisation = True; st.session_state.weight_optimise_side = 'short'; st.rerun() | |
| st.sidebar.markdown("---") | |
| st.sidebar.header("7. Find Best Markov Setup") | |
| with st.sidebar.expander("Optimise Markov Probabilities"): | |
| st.info("Brute-forces Run-Up/Future periods to find the highest-probability trading states.") | |
| st.write("Run-Up Period (Past)") | |
| c1, c2, c3 = st.columns(3) | |
| c1.number_input("Start", 1, 100, value=st.session_state.markov_run_up_start, key='widget_markov_run_up_start', on_change=update_state) | |
| c2.number_input("End", 1, 100, value=st.session_state.markov_run_up_end, key='widget_markov_run_up_end', on_change=update_state) | |
| c3.number_input("Step", 1, 10, value=st.session_state.markov_run_up_step, key='widget_markov_run_up_step', on_change=update_state) | |
| st.write("Future Period (Prediction)") | |
| c4, c5, c6 = st.columns(3) | |
| c4.number_input("Start", 1, 100, value=st.session_state.markov_future_start, key='widget_markov_future_start', on_change=update_state) | |
| c5.number_input("End", 1, 100, value=st.session_state.markov_future_end, key='widget_markov_future_end', on_change=update_state) | |
| c6.number_input("Step", 1, 10, value=st.session_state.markov_future_step, key='widget_markov_future_step', on_change=update_state) | |
| c1_markov, c2_markov = st.columns(2) | |
| if c1_markov.button("🔮 Find Best Long Markov", key="markov_long_button"): | |
| st.session_state.run_markov_optimisation = True; st.session_state.markov_side = 'long'; st.rerun() | |
| if c2_markov.button("🔮 Find Best Short Markov", key="markov_short_button"): | |
| st.session_state.run_markov_optimisation = True; st.session_state.markov_side = 'short'; st.rerun() | |
| # --- Veto / Save Settings (Only in Developer Mode) --- | |
| st.sidebar.markdown("---") | |
| current_veto_list = st.session_state.get('veto_setup_list', []) | |
| if current_veto_list: | |
| st.sidebar.header("Veto Filter(s)") | |
| st.sidebar.success(f"{len(current_veto_list)} Veto filter(s) ACTIVE.") | |
| with st.sidebar.expander("Show Veto Filters"): st.json(current_veto_list) | |
| if st.sidebar.button("💾 Save Veto Filters as Default"): save_veto_setup(current_veto_list) | |
| if st.sidebar.button("Clear Veto Filters"): | |
| st.session_state.veto_setup_list = []; | |
| if os.path.exists(VETO_CONFIG_FILE): | |
| try: os.remove(VETO_CONFIG_FILE) | |
| except OSError as e: print(f"Error removing veto file: {e}") | |
| st.rerun() | |
| st.sidebar.markdown("---") | |
| # --- SAVE SETTINGS (Hidden in Production Mode) --- | |
| if not production_mode: | |
| if st.sidebar.button("💾 Save Settings as Default"): | |
| settings_to_save = { | |
| "large_ma_period": st.session_state.ma_period, "bband_period": st.session_state.bb_period, | |
| "bband_std_dev": st.session_state.bb_std, "confidence_threshold": st.session_state.confidence_slider, | |
| "long_entry_threshold_pct": st.session_state.long_entry / 100, "long_exit_ma_threshold_pct": st.session_state.long_exit / 100, | |
| "long_trailing_stop_loss_pct": st.session_state.long_sl / 100, "long_delay_days": st.session_state.long_delay, | |
| "short_entry_threshold_pct": st.session_state.short_entry / 100, "short_exit_ma_threshold_pct": st.session_state.short_exit / 100, | |
| "short_trailing_stop_loss_pct": st.session_state.short_sl / 100, "short_delay_days": st.session_state.short_delay, | |
| "use_rsi": st.session_state.use_rsi, "rsi_w": st.session_state.rsi_w, | |
| "rsi_logic": st.session_state.get('rsi_logic', 'Crossover'), | |
| "primary_driver": st.session_state.get('primary_driver', 'Bollinger Bands'), | |
| "exit_logic_type": st.session_state.get('exit_logic_type', 'Standard (Price-Based)'), | |
| "use_ma_floor_filter": st.session_state.use_ma_floor_filter, | |
| "catcher_stop_pct": st.session_state.catcher_stop_pct / 100.0, | |
| "exit_confidence_threshold": st.session_state.get('exit_confidence_threshold', 50), | |
| "smart_trailing_stop_pct": st.session_state.get('smart_trailing_stop_pct', 5.0), | |
| "smart_exit_atr_period": st.session_state.get('smart_exit_atr_period', 14), | |
| "smart_exit_atr_multiplier": st.session_state.get('smart_exit_atr_multiplier', 3.0), | |
| "intelligent_tsl_pct": st.session_state.intelligent_tsl_pct / 100.0, | |
| "norm_lookback_years": norm_lookback_years, | |
| "use_rolling_benchmark": use_rolling_benchmark, | |
| "benchmark_rank": benchmark_percentile_setting, | |
| "use_vol": st.session_state.use_vol, "vol_w": st.session_state.vol_w, | |
| "use_trend": st.session_state.use_trend, "trend_w": st.session_state.trend_w, | |
| "use_volume": st.session_state.use_volume, "volume_w": st.session_state.volume_w, | |
| "use_adx_filter": st.session_state.use_adx_filter, "adx_threshold": st.session_state.adx_threshold, | |
| "adx_period": st.session_state.adx_period, | |
| "use_macd": st.session_state.use_macd, "macd_w": st.session_state.macd_w, | |
| "use_ma_slope": st.session_state.use_ma_slope, "ma_slope_w": st.session_state.ma_slope_w, | |
| "use_markov": st.session_state.use_markov, "markov_w": st.session_state.markov_w, | |
| "max_trading_days": st.session_state.max_duration, | |
| "max_long_duration": st.session_state.max_long_duration, | |
| "max_short_duration": st.session_state.max_short_duration | |
| } | |
| save_settings(settings_to_save) | |
| # --- End of Sidebar Definitions --- | |
| # --- MAIN CONTENT AREA LOGIC --- | |
| with main_content_placeholder.container(): | |
| # 1. User Defined Advisor Setup UI (Developer Only) | |
| if st.session_state.get('run_user_advisor_setup'): | |
| st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None; st.session_state.single_ticker_results = None | |
| st.session_state.open_trades_df = None; st.session_state.confidence_results_df = None | |
| st.session_state.best_params = None; st.session_state.last_run_stats = None | |
| generate_user_advisor_ui_and_run(master_df) | |
| # 2. Run User Defined Advisor Scan (Used by BOTH Dev "Scan with User Setups" AND Prod "Run User-Selected Option") | |
| elif st.session_state.get('run_scan_user_setups'): | |
| st.session_state.run_scan_user_setups = False | |
| st.session_state.last_run_stats = None | |
| setups_for_scan = st.session_state.get('setups_to_scan', []) | |
| if setups_for_scan: | |
| with st.spinner("Running advisor scan..."): | |
| # Determine label based on count | |
| label = "User-Defined" if len(setups_for_scan) > 1 else "Selected-Setup" | |
| run_advisor_scan(master_df, setups_for_scan, label) | |
| else: | |
| st.warning("No setup selected for scanning.") | |
| # 3. Confidence Optimisation (Developer Only) | |
| elif st.session_state.get('run_confidence_optimisation'): | |
| st.session_state.run_advanced_advisor = False; st.session_state.run_analysis_button = False | |
| st.session_state.run_user_advisor_setup = False; st.session_state.run_scan_user_setups = False | |
| st.session_state.run_markov_optimisation = False; st.session_state.run_weight_optimisation = False | |
| st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None | |
| st.session_state.markov_results_df = None; st.session_state.summary_df = None; st.session_state.last_run_stats = None | |
| side_to_run = st.session_state.get('confidence_optimise_side', 'long') | |
| mode_to_run = st.session_state.get('confidence_optimise_mode', 'best') | |
| veto_factors_to_run = st.session_state.get('confidence_optimise_veto_factors', None) | |
| run_confidence_optimisation(side_to_run, mode_to_run, master_df, main_content_placeholder, veto_factors_to_run) | |
| # 4. Weight Optimisation (Developer Only) | |
| elif st.session_state.get('run_weight_optimisation'): | |
| st.session_state.run_advanced_advisor = False; st.session_state.run_analysis_button = False | |
| st.session_state.run_user_advisor_setup = False; st.session_state.run_scan_user_setups = False | |
| st.session_state.run_markov_optimisation = False; st.session_state.advisor_df = None | |
| st.session_state.raw_df = None; st.session_state.deduped_df = None; st.session_state.markov_results_df = None | |
| st.session_state.summary_df = None; st.session_state.confidence_results_df = None; st.session_state.last_run_stats = None | |
| side_to_run = st.session_state.get('weight_optimise_side', 'long') | |
| use_sq = st.session_state.get('sq_weights_toggle', False) | |
| generate_and_run_weight_optimisation(master_df, main_content_placeholder, side_to_run, use_sq) | |
| # 5. Parameter Optimisation Results Display (Developer Only) | |
| elif st.session_state.get('param_results_df') is not None: | |
| st.subheader("🏆 Parameter Optimisation Results") | |
| st.caption("Results are pre-sorted by Score (High to Low).") | |
| if st.session_state.get('best_params'): | |
| st.subheader("Optimal Parameters Found:") | |
| st.json(st.session_state.best_params) | |
| c_view1, c_view2 = st.columns([1, 2]) | |
| dedupe_params = c_view1.checkbox("De-duplicate (Show unique scores only)", value=True) | |
| results_df = st.session_state.param_results_df.copy() | |
| if dedupe_params: | |
| results_df = results_df.drop_duplicates(subset=['Strategy Score', 'Avg Profit/Trade'], keep='first') | |
| display_cols = ["Strategy Score", "Avg Profit/Trade", "Ticker G/B Ratio", "Total Trades", "Avg Entry Conf."] | |
| if 'max_trading_days' in results_df.columns: display_cols.append('max_trading_days') | |
| optional_cols = ["confidence_threshold", "bband_period", "large_ma_period", "long_entry_threshold_pct", "long_exit_ma_threshold_pct", "long_delay_days", "long_trailing_stop_loss_pct"] | |
| for col in optional_cols: | |
| if col in results_df.columns: | |
| if results_df[col].nunique() > 1 or st.session_state.get(f"opt_{col[:3]}_cb", True): | |
| display_cols.append(col) | |
| st.dataframe(results_df.head(100)[display_cols].style.format({ | |
| "Strategy Score": "{:.2f}%", "Avg Profit/Trade": "{:.2%}", "Ticker G/B Ratio": "{:.2f}", | |
| "Avg Entry Conf.": "{:.1f}%", "max_trading_days": "{:.0f}" | |
| }, na_rep='-')) | |
| csv_data = results_df.to_csv(index=False).encode('utf-8') | |
| st.download_button(label="⬇️ Download Full Results (CSV)", data=csv_data, file_name="parameter_optimisation_results.csv", mime="text/csv") | |
| if st.button("Close Results"): | |
| st.session_state.param_results_df = None | |
| st.rerun() | |
| # 6. Combined Optimisation (Developer Only) | |
| elif st.session_state.get('run_combined_optimisation'): | |
| st.session_state.run_advanced_advisor = False; st.session_state.run_analysis_button = False | |
| st.session_state.run_user_advisor_setup = False; st.session_state.run_scan_user_setups = False | |
| st.session_state.run_markov_optimisation = False; st.session_state.run_weight_optimisation = False | |
| st.session_state.run_confidence_optimisation = False; st.session_state.advisor_df = None | |
| st.session_state.raw_df = None; st.session_state.deduped_df = None; st.session_state.markov_results_df = None | |
| st.session_state.summary_df = None; st.session_state.confidence_results_df = None; st.session_state.last_run_stats = None | |
| side_to_run = st.session_state.get('optimise_side', 'long') | |
| generate_and_run_combined_optimisation(master_df, main_content_placeholder, side_to_run) | |
| st.session_state.run_combined_optimisation = False | |
| # 7. Markov Optimisation (Developer Only) | |
| elif st.session_state.get('run_markov_optimisation'): | |
| st.session_state.run_advanced_advisor = False; st.session_state.run_analysis_button = False | |
| st.session_state.run_user_advisor_setup = False; st.session_state.run_scan_user_setups = False | |
| st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None | |
| st.session_state.last_run_stats = None | |
| side_to_run = st.session_state.get('markov_side', 'long') | |
| generate_and_run_markov_optimisation(master_df, main_content_placeholder, side_to_run) | |
| # 8. Top Setups Advisor UI (Developer Only) | |
| elif st.session_state.get('run_advanced_advisor'): | |
| st.session_state.summary_df = None; st.session_state.single_ticker_results = None | |
| st.session_state.open_trades_df = None; st.session_state.confidence_results_df = None | |
| st.session_state.best_params = None; st.session_state.last_run_stats = None | |
| generate_advisor_report(master_df) | |
| # 9. RUN ANALYSIS (Used by BOTH Dev "Rocket" button AND Prod "Run Analysis" button) | |
| elif st.session_state.get("run_analysis_button"): | |
| st.session_state.run_analysis_button = False # Reset flag | |
| # Clear all results | |
| st.session_state.summary_df = None; st.session_state.single_ticker_results = None | |
| st.session_state.open_trades_df = None; st.session_state.confidence_results_df = None | |
| st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None | |
| st.session_state.best_params = None; st.session_state.worst_confidence_setup = None | |
| st.session_state.worst_confidence_setups_list = []; st.session_state.markov_results_df = None | |
| st.session_state.last_run_stats = None | |
| with st.spinner("Running analysis..."): | |
| # --- VETO LOGIC FIX --- | |
| if production_mode: | |
| veto_list_to_use = [] # Disable veto in simplified mode | |
| else: | |
| veto_list_to_use = st.session_state.get('veto_setup_list', []) | |
| if veto_list_to_use: st.info(f"{len(veto_list_to_use)} Veto filter(s) active.") | |
| # Gather parameters | |
| manual_params = { | |
| "large_ma_period": st.session_state.get('ma_period', 50), | |
| "bband_period": st.session_state.get('bb_period', 20), | |
| "bband_std_dev": st.session_state.get('bb_std', 2.0), | |
| "confidence_threshold": st.session_state.get('confidence_slider', 50), | |
| "long_entry_threshold_pct": st.session_state.get('long_entry', 0.0) / 100, | |
| "long_exit_ma_threshold_pct": st.session_state.get('long_exit', 0.0) / 100, | |
| "long_trailing_stop_loss_pct": st.session_state.get('long_sl', 8.0) / 100, | |
| "long_delay_days": st.session_state.get('long_delay', 0), | |
| "short_entry_threshold_pct": st.session_state.get('short_entry', 0.0) / 100, | |
| "short_exit_ma_threshold_pct": st.session_state.get('short_exit', 0.0) / 100, | |
| "short_trailing_stop_loss_pct": st.session_state.get('short_sl', 8.0) / 100, | |
| "short_delay_days": st.session_state.get('short_delay', 0), | |
| "use_ma_floor_filter": st.session_state.get('use_ma_floor_filter', True), | |
| "catcher_stop_pct": st.session_state.get('catcher_stop_pct', 0.0) / 100.0, | |
| "max_long_duration": st.session_state.get('max_long_duration', 60), | |
| "max_short_duration": st.session_state.get('max_short_duration', 10) | |
| } | |
| markov_setup_to_use = None | |
| if st.session_state.primary_driver == 'Markov State' or st.session_state.use_markov: | |
| if 'best_markov_setup' in st.session_state and st.session_state.best_markov_setup: | |
| markov_setup_to_use = st.session_state.best_markov_setup | |
| elif not production_mode: | |
| st.error("Markov State selected but no setup found. Run Section 7."); st.stop() | |
| exit_logic = st.session_state.exit_logic_type | |
| exit_thresh = st.session_state.exit_confidence_threshold | |
| smart_trailing_stop = st.session_state.smart_trailing_stop_pct / 100.0 | |
| smart_atr_p = st.session_state.smart_exit_atr_period | |
| smart_atr_m = st.session_state.smart_exit_atr_multiplier | |
| intelligent_tsl = st.session_state.intelligent_tsl_pct / 100.0 | |
| # --- NEW: Date Buffer Logic --- | |
| # We load data starting 365 days BEFORE the user selected start date | |
| # to ensure indicators (like 200MA) are fully calculated by the time the analysis starts. | |
| user_start_date = pd.Timestamp(st.session_state.start_date) | |
| end_date = pd.Timestamp(st.session_state.end_date) | |
| warmup_delta = timedelta(days=365) # Safe buffer | |
| data_load_start = user_start_date - warmup_delta | |
| # A) Single Ticker Analysis | |
| if st.session_state.run_mode == "Analyse Single Ticker": | |
| # --- [FIX START] Logic integrated from old_but_working_app.py --- | |
| selected_ticker = st.session_state.get('ticker_select', ticker_list[0] if ticker_list else None) | |
| if not selected_ticker: | |
| st.error("No ticker selected.") | |
| st.stop() | |
| # Define Start Date for Single Ticker | |
| user_start_date = pd.Timestamp(st.session_state.start_date) | |
| cols_to_use = [selected_ticker] | |
| if f'{selected_ticker}_High' in master_df.columns: cols_to_use.append(f'{selected_ticker}_High') | |
| if f'{selected_ticker}_Low' in master_df.columns: cols_to_use.append(f'{selected_ticker}_Low') | |
| if f'{selected_ticker}_Volume' in master_df.columns: cols_to_use.append(f'{selected_ticker}_Volume') | |
| existing_cols = [col for col in cols_to_use if col in master_df.columns] | |
| if selected_ticker not in existing_cols: | |
| st.error(f"Ticker '{selected_ticker}' not found.") | |
| st.stop() | |
| # [FIX] Smart Lookback Logic: Uses the wider of (User Range) or (Slider Lookback) | |
| user_start = pd.Timestamp(st.session_state.start_date) | |
| user_end = pd.Timestamp(st.session_state.end_date) | |
| # Calculate the slider-based start date | |
| slider_start = user_end - pd.DateOffset(years=norm_lookback_years) | |
| # 1. Logic: Start loading from the EARLIER of the two dates (Wider Window) | |
| effective_start = min(user_start, slider_start) | |
| # 2. Safety: Add 365 days EXTRA buffer so MA-200 is ready on Day 1 | |
| buffer_start = effective_start - pd.DateOffset(days=365) | |
| # Ensure we don't go before available data | |
| if not master_df.empty: | |
| buffer_start = max(buffer_start, master_df.index[0]) | |
| data_for_backtest = master_df.loc[buffer_start:user_end, existing_cols].copy() | |
| rename_dict = {selected_ticker: 'Close', f'{selected_ticker}_High': 'High', f'{selected_ticker}_Low': 'Low', f'{selected_ticker}_Volume': 'Volume'} | |
| rename_dict_filtered = {k: v for k, v in rename_dict.items() if k in existing_cols} | |
| data_for_backtest = data_for_backtest.rename(columns=rename_dict_filtered) | |
| if not data_for_backtest.empty and 'Close' in data_for_backtest.columns and not data_for_backtest['Close'].isna().all(): | |
| # --- [FIX END] --- | |
| long_pnl, short_pnl, avg_long_trade, avg_short_trade, results_df, trades, open_trades, trade_counts, durations, trade_dates, exit_breakdown = run_backtest( | |
| data_for_backtest, manual_params, | |
| st.session_state.use_rsi, st.session_state.use_vol, st.session_state.use_trend, st.session_state.use_volume, | |
| st.session_state.use_macd, st.session_state.use_ma_slope, st.session_state.use_markov, | |
| st.session_state.use_mfi, st.session_state.use_supertrend, | |
| st.session_state.rsi_w, st.session_state.vol_w, st.session_state.trend_w, st.session_state.volume_w, | |
| st.session_state.macd_w, st.session_state.ma_slope_w, st.session_state.markov_w, | |
| st.session_state.mfi_w, st.session_state.supertrend_w, | |
| st.session_state.use_adx_filter, st.session_state.adx_threshold, | |
| st.session_state.get('rsi_logic', 'Crossover'), | |
| st.session_state.adx_period, | |
| veto_setups_list=veto_list_to_use, | |
| primary_driver=st.session_state.primary_driver, | |
| markov_setup=markov_setup_to_use, | |
| exit_logic_type=exit_logic, | |
| exit_confidence_threshold=exit_thresh, | |
| smart_trailing_stop_pct=smart_trailing_stop, | |
| smart_exit_atr_period=smart_atr_p, | |
| smart_exit_atr_multiplier=smart_atr_m, | |
| intelligent_tsl_pct=intelligent_tsl, | |
| benchmark_rank=benchmark_percentile_setting / 100.0, | |
| analysis_start_date=user_start_date, | |
| analysis_end_date=end_date, | |
| benchmark_lookback_years=norm_lookback_years, | |
| use_rolling_benchmark=use_rolling_benchmark | |
| ) | |
| st.session_state.single_ticker_results = {"long_pnl": long_pnl, "short_pnl": short_pnl, "avg_long_trade": avg_long_trade, "avg_short_trade": avg_short_trade, "results_df": results_df, "trades": trades} | |
| st.session_state.open_trades_df = pd.DataFrame(open_trades) if open_trades else pd.DataFrame() | |
| st.session_state.exit_breakdown_totals = {'long_profit_take_count': exit_breakdown[0], 'long_tsl_count': exit_breakdown[1], 'long_time_exit_count': exit_breakdown[2], 'short_profit_take_count': exit_breakdown[3], 'short_tsl_count': exit_breakdown[4], 'short_time_exit_count': exit_breakdown[5]} | |
| else: st.warning("No data for ticker.") | |
| # B) Full List Analysis | |
| elif st.session_state.run_mode.startswith("Analyse Full List"): | |
| # [FIX] Define Start Date for Full List (Critical Fix) | |
| user_start_date = pd.Timestamp(st.session_state.start_date) | |
| summary_results, all_open_trades = [], [] | |
| total_long_wins, total_long_losses, total_short_wins, total_short_losses = 0, 0, 0, 0 | |
| all_long_durations = []; all_short_durations = [] | |
| total_exit_breakdown = [0, 0, 0, 0, 0, 0] | |
| PROFIT_THRESHOLD = 1.0; excluded_analysis_tickers = [] | |
| progress_bar = st.progress(0, text="Starting analysis...") | |
| num_tickers = len(ticker_list) | |
| for i, ticker_symbol in enumerate(ticker_list): | |
| progress_bar.progress((i + 1) / num_tickers, text=f"Analysing {ticker_symbol}...") | |
| cols_to_use = [ticker_symbol] | |
| if f'{ticker_symbol}_High' in master_df.columns: cols_to_use.append(f'{ticker_symbol}_High') | |
| if f'{ticker_symbol}_Low' in master_df.columns: cols_to_use.append(f'{ticker_symbol}_Low') | |
| if f'{ticker_symbol}_Volume' in master_df.columns: cols_to_use.append(f'{ticker_symbol}_Volume') | |
| existing_cols = [col for col in cols_to_use if col in master_df.columns] | |
| if ticker_symbol not in existing_cols: continue | |
| # [FIX] Smart Lookback Logic: Uses the wider of (User Range) or (Slider Lookback) | |
| user_start = pd.Timestamp(st.session_state.start_date) | |
| user_end = pd.Timestamp(st.session_state.end_date) | |
| slider_start = user_end - pd.DateOffset(years=norm_lookback_years) | |
| # 1. Logic: Start loading from the EARLIER of the two dates | |
| effective_start = min(user_start, slider_start) | |
| # 2. Safety: Add 365 days EXTRA buffer | |
| buffer_start = effective_start - pd.DateOffset(days=365) | |
| if not master_df.empty: | |
| buffer_start = max(buffer_start, master_df.index[0]) | |
| ticker_data_series = master_df.loc[buffer_start:user_end, existing_cols] | |
| rename_dict = {ticker_symbol: 'Close', f'{ticker_symbol}_High': 'High', f'{ticker_symbol}_Low': 'Low', f'{ticker_symbol}_Volume': 'Volume'} | |
| ticker_data_series = ticker_data_series.rename(columns={k:v for k,v in rename_dict.items() if k in existing_cols}) | |
| if not ticker_data_series.empty and 'Close' in ticker_data_series.columns and not ticker_data_series['Close'].isna().all(): | |
| long_pnl, short_pnl, avg_long_trade, avg_short_trade, results_df, trades, open_trades, trade_counts, durations, trade_dates, exit_breakdown = run_backtest( | |
| ticker_data_series, manual_params, | |
| st.session_state.use_rsi, st.session_state.use_vol, st.session_state.use_trend, st.session_state.use_volume, | |
| st.session_state.use_macd, st.session_state.use_ma_slope, st.session_state.use_markov, | |
| st.session_state.use_mfi, st.session_state.use_supertrend, | |
| st.session_state.rsi_w, st.session_state.vol_w, st.session_state.trend_w, st.session_state.volume_w, | |
| st.session_state.macd_w, st.session_state.ma_slope_w, st.session_state.markov_w, | |
| st.session_state.mfi_w, st.session_state.supertrend_w, | |
| st.session_state.use_adx_filter, st.session_state.adx_threshold, st.session_state.get('rsi_logic', 'Crossover'), st.session_state.adx_period, | |
| veto_setups_list=veto_list_to_use, primary_driver=st.session_state.primary_driver, | |
| markov_setup=markov_setup_to_use, exit_logic_type=exit_logic, exit_confidence_threshold=exit_thresh, | |
| smart_trailing_stop_pct=smart_trailing_stop, smart_exit_atr_period=smart_atr_p, | |
| smart_exit_atr_multiplier=smart_atr_m, intelligent_tsl_pct=intelligent_tsl, | |
| benchmark_rank=benchmark_percentile_setting / 100.0, | |
| analysis_start_date=user_start_date, | |
| analysis_end_date=end_date, | |
| benchmark_lookback_years=norm_lookback_years, | |
| use_rolling_benchmark=use_rolling_benchmark | |
| ) | |
| if abs(long_pnl) > PROFIT_THRESHOLD or abs(short_pnl) > PROFIT_THRESHOLD or \ | |
| (avg_long_trade is not None and pd.notna(avg_long_trade) and abs(avg_long_trade) > PROFIT_THRESHOLD) or \ | |
| (avg_short_trade is not None and pd.notna(avg_short_trade) and abs(avg_short_trade) > PROFIT_THRESHOLD): | |
| excluded_analysis_tickers.append(ticker_symbol); continue | |
| total_exit_breakdown = [sum(x) for x in zip(total_exit_breakdown, exit_breakdown)] | |
| long_wins, long_losses, short_wins, short_losses = trade_counts | |
| long_durations, short_durations = durations | |
| first_long_entry, last_long_exit, first_short_entry, last_short_exit = trade_dates | |
| total_long_wins += long_wins; total_long_losses += long_losses | |
| total_short_wins += short_wins; total_short_losses += short_losses | |
| all_long_durations.extend(long_durations); all_short_durations.extend(short_durations) | |
| long_conf = np.mean([t['confidence'] for t in trades[0] if pd.notna(t.get('confidence'))]) if trades[0] else 0 | |
| short_conf = np.mean([t['confidence'] for t in trades[2] if pd.notna(t.get('confidence'))]) if trades[2] else 0 | |
| avg_long_dur_ticker = np.mean(long_durations) if long_durations else 0 | |
| avg_short_dur_ticker = np.mean(short_durations) if short_durations else 0 | |
| summary_results.append({ | |
| "Ticker": ticker_symbol, "Cumulative Long P&L": long_pnl, "Avg Long Profit per Trade": avg_long_trade, | |
| "Num Long Trades": len(trades[0]), "Avg Long Confidence": long_conf, "Avg Long Duration (Days)": avg_long_dur_ticker, | |
| "First Long Entry": first_long_entry, "Last Long Exit": last_long_exit, | |
| "Cumulative Short P&L": short_pnl, "Avg Short Profit per Trade": avg_short_trade, | |
| "Num Short Trades": len(trades[2]), "Avg Short Confidence": short_conf, "Avg Short Duration (Days)": avg_short_dur_ticker, | |
| "First Short Entry": first_short_entry, "Last Short Exit": last_short_exit | |
| }) | |
| if open_trades: | |
| for trade in open_trades: trade['Ticker'] = ticker_symbol; all_open_trades.append(trade) | |
| progress_bar.empty() | |
| if excluded_analysis_tickers and not production_mode: st.warning(f"Excluded {len(excluded_analysis_tickers)} tickers due to unrealistic profit.") | |
| if summary_results: | |
| st.session_state.summary_df = pd.DataFrame(summary_results).set_index('Ticker') | |
| st.session_state.trade_counts = {"long_wins": total_long_wins, "long_losses": total_long_losses, "short_wins": total_short_wins, "short_losses": total_short_losses} | |
| st.session_state.trade_durations = { "avg_long_duration": np.mean(all_long_durations) if all_long_durations else 0, "max_long_duration": np.max(all_long_durations) if all_long_durations else 0, "avg_short_duration": np.mean(all_short_durations) if all_short_durations else 0, "max_short_duration": np.max(all_short_durations) if all_short_durations else 0 } | |
| st.session_state.exit_breakdown_totals = { | |
| 'long_profit_take_count': total_exit_breakdown[0], 'long_tsl_count': total_exit_breakdown[1], 'long_time_exit_count': total_exit_breakdown[2], | |
| 'short_profit_take_count': total_exit_breakdown[3], 'short_tsl_count': total_exit_breakdown[4], 'short_time_exit_count': total_exit_breakdown[5] | |
| } | |
| else: | |
| st.warning("No trades found.") | |
| st.session_state.trade_durations = {} | |
| st.session_state.exit_breakdown_totals = {} | |
| st.session_state.open_trades_df = pd.DataFrame(all_open_trades) if all_open_trades else pd.DataFrame() | |
| # 10. Display Advisor Scan Results (raw_df) | |
| elif 'raw_df' in st.session_state and st.session_state.raw_df is not None: | |
| advisor_type = st.session_state.get('advisor_type', 'Advisor') | |
| st.subheader(f"👨💼 {advisor_type} Advisor: Trade Signals") | |
| c_view1, c_view2 = st.columns(2) | |
| dedupe_view = c_view1.checkbox("De-duplicate trades", value=True, key="advisor_dedupe_check") | |
| filter_trades = c_view2.checkbox("Hide Empty Tickers", value=True, key="advisor_filter_check") | |
| display_df = st.session_state.deduped_df if dedupe_view else st.session_state.raw_df | |
| if display_df is not None and not display_df.empty: | |
| if filter_trades and 'Status' in display_df.columns: display_df = display_df[display_df['Status'].notna()] | |
| format_dict = { | |
| "Final % P/L": lambda x: f"{x:.2%}" if pd.notna(x) else '-', | |
| "Date Open": lambda x: x.strftime('%Y-%m-%d') if pd.notna(x) else '-', | |
| "Date Closed": lambda x: x.strftime('%Y-%m-%d') if pd.notna(x) else '-', | |
| "Start Confidence": lambda x: f"{x:.0f}%" if pd.notna(x) else '-' | |
| } | |
| if "Setup G/B Ratio" in display_df.columns: format_dict["Setup G/B Ratio"] = lambda x: f"{x:.2f}" if pd.notna(x) else '-' | |
| for col in display_df.columns: | |
| if any(x in col for x in ["Threshold", "Std Dev", "Ratio"]) and col not in format_dict: | |
| format_dict[col] = lambda x: f"{x:.2f}" if pd.notna(x) and isinstance(x, (int, float)) else x | |
| st.dataframe(display_df.style.format(format_dict, na_rep='-')) | |
| else: st.info("No trades found.") | |
| if st.button("Back to Main Analysis"): | |
| st.session_state.advisor_df = None; st.session_state.raw_df = None; st.session_state.deduped_df = None | |
| st.session_state.run_advanced_advisor = False; st.session_state.run_user_advisor_setup = False | |
| st.session_state.run_scan_user_setups = False; st.rerun() | |
| # 11. Display Confidence Optimisation Results | |
| elif st.session_state.get('confidence_results_df') is not None and not st.session_state.confidence_results_df.empty: | |
| st.subheader("📊 Confidence Setup Optimisation Results") | |
| # [CHANGE] Rename the column for display | |
| display_df_conf = st.session_state.confidence_results_df.head(60).rename(columns={'Net % Return': 'Moneypile Score'}) | |
| # [CHANGE] Update formatters to match | |
| conf_formatters = { | |
| "Avg Profit/Trade": "{:.2%}", | |
| "Ticker G/B Ratio": "{:.2f}", | |
| "Trade G/B Ratio": "{:.2f}", | |
| "Moneypile Score": "{:.1f}", | |
| "Avg Entry Conf.": "{:.1f}%", | |
| "Good Score": "{:.4f}", | |
| "Bad Score": "{:.4f}", | |
| "Norm. Score %": "{:.2f}%" | |
| } | |
| if 'MACD' in display_df_conf.columns: conf_formatters['MACD'] = '{}' | |
| if 'MA Slope' in display_df_conf.columns: conf_formatters['MA Slope'] = '{}' | |
| if 'Markov' in display_df_conf.columns: conf_formatters['Markov'] = '{}' | |
| valid_conf_formatters = {k: (lambda val, fmt=v: fmt.format(val) if pd.notna(val) else '-') for k, v in conf_formatters.items() if k in display_df_conf.columns} | |
| st.dataframe(display_df_conf.style.format(valid_conf_formatters, na_rep='-')) | |
| # 12. Display Single Ticker Results | |
| # [FIX] Changed 'elif' to 'if' so this block runs immediately after analysis | |
| if st.session_state.get('single_ticker_results') is not None: | |
| res = st.session_state.single_ticker_results | |
| st.subheader(f"Results for {st.session_state.get('ticker_select')}") | |
| c1, c2, c3, c4 = st.columns(4) | |
| c1.metric("Cumulative Long P&L", f"{res.get('long_pnl', 0):.2%}") | |
| c2.metric("Avg Long Trade P&L", f"{res.get('avg_long_trade', 0):.2%}") | |
| c3.metric("Cumulative Short P&L", f"{res.get('short_pnl', 0):.2%}") | |
| c4.metric("Avg Short Trade P&L", f"{res.get('avg_short_trade', 0):.2%}") | |
| if res.get('results_df') is not None and not res['results_df'].empty: | |
| try: | |
| st.plotly_chart(generate_long_plot(res['results_df'], res['trades'], st.session_state.get('ticker_select')), use_container_width=True) | |
| st.plotly_chart(generate_short_plot(res['results_df'], res['trades'], st.session_state.get('ticker_select')), use_container_width=True) | |
| except Exception as e: st.error(f"Error generating plot: {e}") | |
| else: st.info("No chart data available.") | |
| # 13. Display Full Analysis Summary | |
| if st.session_state.get('summary_df') is not None and not st.session_state.summary_df.empty: | |
| # 1. Top Cards | |
| display_summary_analytics(st.session_state.summary_df) | |
| # 2. Histogram 1: Profit Distribution | |
| st.markdown("---") | |
| try: | |
| if 'generate_profit_distribution_chart' in globals(): | |
| dist_fig = generate_profit_distribution_chart(st.session_state.summary_df) | |
| if dist_fig: st.plotly_chart(dist_fig, use_container_width=True) | |
| except Exception: pass | |
| # 3. Histogram 2: Trades Over Time (NEW) | |
| try: | |
| if 'generate_trades_timeline_histogram' in globals() and st.session_state.get('open_trades_df') is not None: | |
| timeline_fig = generate_trades_timeline_histogram( | |
| st.session_state.open_trades_df, | |
| st.session_state.start_date, | |
| st.session_state.end_date | |
| ) | |
| if timeline_fig: st.plotly_chart(timeline_fig, use_container_width=True) | |
| except Exception as e: st.error(f"Chart Error: {e}") | |
| st.markdown("---") | |
| # 4. Results Per Ticker Table | |
| st.subheader("Results per Ticker") | |
| if st.checkbox("Only show tickers with trades", value=True): | |
| df_to_display = st.session_state.summary_df[(st.session_state.summary_df['Num Long Trades'] > 0) | (st.session_state.summary_df['Num Short Trades'] > 0)].copy() | |
| else: | |
| df_to_display = st.session_state.summary_df.copy() | |
| date_cols = ["First Long Entry", "Last Long Exit", "First Short Entry", "Last Short Exit"] | |
| for col in date_cols: | |
| if col in df_to_display.columns: df_to_display[col] = pd.to_datetime(df_to_display[col], errors='coerce').dt.strftime('%Y-%m-%d') | |
| df_to_display.fillna('-', inplace=True) | |
| st.dataframe(df_to_display.style.format({ "Cumulative Long P&L": "{:.2%}", "Avg Long Profit per Trade": "{:.2%}", "Avg Long Duration (Days)": "{:.1f}", "Cumulative Short P&L": "{:.2%}", "Avg Short Profit per Trade": "{:.2%}", "Avg Short Duration (Days)": "{:.1f}", "Avg Long Confidence": "{:.0f}%", "Avg Short Confidence": "{:.0f}%" }, na_rep='-')) | |
| if not production_mode: | |
| if st.button("💾 Add these settings to User-Defined List", key="save_setup_from_analysis", on_click=add_setup_to_user_list): pass | |
| # 5. Open Positions Table (With Filter) | |
| st.subheader("👨🏻💼 Open Positions & Recently Closed", | |
| help="This table displays all currently ACTIVE trades, plus any trades that closed within the last 30 days.") | |
| if st.session_state.get('open_trades_df') is not None and not st.session_state.open_trades_df.empty: | |
| full_df = st.session_state.open_trades_df.copy() | |
| # --- FILTER: Show 'Open' OR 'Closed in last 30 days' --- | |
| cutoff = pd.Timestamp.now() - pd.Timedelta(days=30) | |
| full_df['Date Closed'] = pd.to_datetime(full_df['Date Closed'], errors='coerce') | |
| mask_open = (full_df['Status'] == 'Open') | |
| mask_recent = (full_df['Status'] == 'Closed') & (full_df['Date Closed'] >= cutoff) | |
| display_open_df = full_df[mask_open | mask_recent].copy() | |
| # ------------------------------------------------------- | |
| # [FIX] Sort strictly by Date Open (Newest First) | |
| display_open_df.sort_values(by='Date Open', ascending=False, inplace=True) | |
| cols_order_manual = ['Ticker', 'Status', 'Final % P/L', 'Side', 'Date Open', 'Date Closed', 'Start Confidence'] | |
| existing_cols_open = [col for col in cols_order_manual if col in display_open_df.columns] | |
| if existing_cols_open and not display_open_df.empty: | |
| st.dataframe(display_open_df[existing_cols_open].style.format({ | |
| "Final % P/L": lambda x: f"{x:.2%}" if pd.notna(x) else '-', | |
| "Date Open": lambda x: x.strftime('%Y-%m-%d') if pd.notna(x) else '-', | |
| "Date Closed": lambda x: x.strftime('%Y-%m-%d') if pd.notna(x) else '-', | |
| "Start Confidence": lambda x: f"{x:.0f}%" if pd.notna(x) else '-' | |
| }, na_rep='-')) | |
| else: | |
| st.info("No Open or Recent trades found (older trades are hidden).") | |
| else: | |
| st.info("No trades found.") | |
| # --- NEW: SAVE RESULTS BUTTON --- | |
| st.markdown("---") | |
| st.subheader("💾 Save Analysis") | |
| current_config = { | |
| 'start_date': st.session_state.start_date, 'end_date': st.session_state.end_date, | |
| 'primary_driver': st.session_state.primary_driver, 'confidence_threshold': st.session_state.confidence_threshold, | |
| 'use_rsi': st.session_state.use_rsi, 'rsi_w': st.session_state.rsi_w, | |
| 'use_volatility': st.session_state.use_volatility, 'vol_w': st.session_state.vol_w, | |
| 'use_trend': st.session_state.use_trend, 'trend_w': st.session_state.trend_w, | |
| 'use_volume': st.session_state.use_volume, 'vol_w_val': st.session_state.vol_w_val, | |
| 'use_macd': st.session_state.use_macd, 'macd_w': st.session_state.macd_w, | |
| 'use_ma_slope': st.session_state.use_ma_slope, 'ma_slope_w': st.session_state.ma_slope_w, | |
| 'use_markov': st.session_state.use_markov, 'markov_w': st.session_state.markov_w, | |
| 'use_mfi': st.session_state.use_mfi, 'mfi_w': st.session_state.mfi_w, | |
| 'use_supertrend': st.session_state.use_supertrend, 'supertrend_w': st.session_state.supertrend_w, | |
| 'use_adx_filter': st.session_state.use_adx_filter, 'adx_threshold': st.session_state.adx_threshold, | |
| 'bband_period': st.session_state.bband_period, 'bband_std_dev': st.session_state.bband_std_dev, | |
| 'large_ma_period': st.session_state.get('large_ma_period', 50), | |
| 'long_entry_threshold_pct': st.session_state.long_entry_threshold_pct, | |
| 'short_entry_threshold_pct': st.session_state.short_entry_threshold_pct, | |
| 'exit_logic_type': st.session_state.exit_logic_type, | |
| 'smart_trailing_stop_pct': st.session_state.smart_trailing_stop_pct | |
| } | |
| # Generate CSV | |
| if 'convert_results_to_csv' in globals(): | |
| csv_data = convert_results_to_csv(st.session_state.summary_df, current_config) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M") | |
| file_name = f"Backtest_Results_{timestamp}.csv" | |
| st.download_button( | |
| label="⬇️ Download Results & Settings as CSV", | |
| data=csv_data, | |
| file_name=file_name, | |
| mime='text/csv', | |
| use_container_width=True | |
| ) | |
| # 14. Default Message | |
| else: | |
| if not any([st.session_state.get(k) for k in ['run_advanced_advisor','run_user_advisor_setup','advisor_df','confidence_results_df','single_ticker_results','summary_df','load_message']]): | |
| st.info("Click a 'Run' button in the sidebar to start.") | |
| # 15. Footer Buttons (Hidden in Production) | |
| if not production_mode: | |
| st.markdown("---") | |
| col_load1, col_load2, col_load3 = st.columns(3) | |
| with col_load1: | |
| if 'apply_best_params_to_widgets' in locals() and st.session_state.get('best_params'): | |
| st.button("⬇️ Load Optimal Parameters", on_click=apply_best_params_to_widgets, use_container_width=True) | |
| with col_load2: | |
| if 'apply_best_weights_to_widgets' in locals() and st.session_state.get('best_weights'): | |
| st.button("⬇️ Load Optimal Weights", on_click=apply_best_weights_to_widgets, use_container_width=True) | |
| with col_load3: | |
| if st.session_state.get('worst_confidence_setups_list'): | |
| if st.button("Apply Top Worst Setups as Veto Filter", use_container_width=True): | |
| st.session_state.veto_setup_list = st.session_state.worst_confidence_setups_list | |
| st.session_state.worst_confidence_setups_list = None | |
| st.sidebar.info(f"Applying {len(st.session_state.veto_setup_list)} Veto filters.") | |
| st.rerun() | |
| col_load4 = st.columns(1)[0] | |
| with col_load4: | |
| is_markov_relevant = (st.session_state.primary_driver == 'Markov State') or st.session_state.use_markov | |
| if st.session_state.get('best_markov_setup') and is_markov_relevant: | |
| if st.button("⬇️ Load Best Markov Setup", on_click=None, use_container_width=True): | |
| st.session_state.primary_driver = "Markov State" | |
| st.session_state.use_markov = True | |
| st.sidebar.success("Best Markov setup loaded!") | |
| st.rerun() | |
| # --- LOGIC HANDLERS (Hidden execution) --- | |
| if st.session_state.run_advanced_advisor: | |
| with st.spinner("Running Optimization..."): | |
| run_optimization() | |
| st.session_state.run_advanced_advisor = False | |
| # --- CHART FUNCTION (Must be outside main) --- | |
| def generate_trades_timeline_histogram(trades_df, start_date, end_date): | |
| """ | |
| Creates a stacked histogram showing trade results over time. | |
| 4 Colors: Long Win, Long Loss, Short Win, Short Loss. | |
| """ | |
| if trades_df is None or trades_df.empty: return None | |
| # Filter by date range | |
| mask = (trades_df['Date Closed'] >= pd.to_datetime(start_date)) & (trades_df['Date Closed'] <= pd.to_datetime(end_date)) | |
| df = trades_df[mask].copy() | |
| if df.empty: return None | |
| # Categorize | |
| df = df[df['Status'] == 'Closed'] | |
| long_wins = df[(df['Side'] == 'Long') & (df['Final % P/L'] > 0)] | |
| long_loss = df[(df['Side'] == 'Long') & (df['Final % P/L'] <= 0)] | |
| short_wins = df[(df['Side'] == 'Short') & (df['Final % P/L'] > 0)] | |
| short_loss = df[(df['Side'] == 'Short') & (df['Final % P/L'] <= 0)] | |
| fig = go.Figure() | |
| # Add Stacked Traces (Shorts hidden by default via visible='legendonly') | |
| fig.add_trace(go.Histogram(x=long_wins['Date Closed'], name='Long Winners', marker_color='green')) | |
| fig.add_trace(go.Histogram(x=long_loss['Date Closed'], name='Long Losers', marker_color='red')) | |
| fig.add_trace(go.Histogram(x=short_wins['Date Closed'], name='Short Winners', marker_color='blue', visible='legendonly')) | |
| fig.add_trace(go.Histogram(x=short_loss['Date Closed'], name='Short Losers', marker_color='orange', visible='legendonly')) | |
| fig.update_layout( | |
| barmode='stack', | |
| title="Trades Over Time (Win/Loss Stacked)", | |
| xaxis_title="Date", | |
| yaxis_title="Number of Trades", | |
| height=400, | |
| template="plotly_white", | |
| legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), | |
| # [FIX] Force the X-axis range to match the user's selected date settings | |
| xaxis=dict(range=[start_date, end_date]) | |
| ) | |
| return fig | |
| if __name__ == "__main__": | |
| main() |