import streamlit as st import pandas as pd import os import numpy as np from datetime import date import plotly.graph_objects as go import itertools import json # --- MODIFIED IMPORTS: Changed to import specific indicator classes from 'ta' --- from ta.volatility import BollingerBands from ta.momentum import RSIIndicator from multiprocessing import Pool, cpu_count from functools import partial # --- 0. Settings Management Functions --- CONFIG_FILE = "config.json" VETO_CONFIG_FILE = "veto_config.json" TOP_SETUPS_FILE = "top_setups.json" def save_settings(params_to_save): with open(CONFIG_FILE, 'w') as f: json.dump(params_to_save, f, indent=4) st.sidebar.success("Settings saved as default!") def load_settings(): default_structure = { "large_ma_period": 50, "bband_period": 20, "bband_std_dev": 2.0, "long_entry_threshold_pct": 0.0, "long_exit_ma_threshold_pct": 0.0, "long_stop_loss_pct": 0.0, "long_delay_days": 0, "short_entry_threshold_pct": 0.0, "short_exit_ma_threshold_pct": 0.0, "short_stop_loss_pct": 0.0, "short_delay_days": 0, "confidence_threshold": 50 } if os.path.exists(CONFIG_FILE): with open(CONFIG_FILE, 'r') as f: loaded = json.load(f) default_structure.update(loaded) return default_structure return default_structure def save_veto_setup(veto_setup): with open(VETO_CONFIG_FILE, 'w') as f: json.dump(veto_setup, f, indent=4) st.sidebar.success("Veto filter saved as default!") def load_veto_setup(): if os.path.exists(VETO_CONFIG_FILE): with open(VETO_CONFIG_FILE, 'r') as f: return json.load(f) return None def save_top_setups(results_df, side, num_setups=6): df = results_df.copy() deduplication_cols = [ 'Conf. Threshold', 'Avg Profit/Trade', 'Good/Bad Ratio', 'Winning Tickers', 'Losing Tickers', 'Avg Entry Conf.', 'Good Score', 'Bad Score', 'Norm. Score %', 'Total Trades' ] df['FactorsOn'] = df[['RSI', 'Volatility', 'TREND', 'Volume']].apply(lambda row: (row == 'On').sum(), axis=1) sort_col = 'Good Score' if side in ['long', 'best'] else 'Bad Score' sorted_df = df.sort_values( by=[sort_col, 'FactorsOn'], ascending=[False, True] ) deduplicated_df = sorted_df.drop_duplicates(subset=deduplication_cols, keep='first') top_setups = deduplicated_df.head(num_setups).to_dict('records') if os.path.exists(TOP_SETUPS_FILE): with open(TOP_SETUPS_FILE, 'r') as f: all_top_setups = json.load(f) else: all_top_setups = {} all_top_setups[side] = top_setups with open(TOP_SETUPS_FILE, 'w') as f: json.dump(all_top_setups, f, indent=4) st.sidebar.success(f"Top {len(top_setups)} unique {side.title()} setups saved!") def load_top_setups(): if os.path.exists(TOP_SETUPS_FILE): with open(TOP_SETUPS_FILE, 'r') as f: return json.load(f) return None # --- 1. Data Loading and Cleaning Functions --- @st.cache_data def load_all_data(folder_path): all_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')] if not all_files: st.error("No CSV files found in the 'csv_data' folder.") return None, None df_list = [] for file_name in all_files: file_path = os.path.join(folder_path, file_name) try: df = pd.read_csv(file_path, header=0, index_col=0, dayfirst=True, parse_dates=True) df_list.append(df) except Exception as e: return None, f"Could not read or process {file_name}. Error: {e}" if not df_list: return None, "No data could be loaded from the CSV files." master_df = pd.concat(df_list) master_df.index = pd.to_datetime(master_df.index, errors='coerce') master_df = master_df[master_df.index.notna()] if master_df.index.has_duplicates: master_df = master_df.loc[~master_df.index.duplicated(keep='last')] master_df.sort_index(inplace=True) return master_df, f"Successfully combined data from {len(all_files)} files." def clean_data_and_report_outliers(df): outlier_report = [] price_columns = [col for col in df.columns if '_volume' not in str(col).lower()] for ticker in price_columns: numeric_prices = pd.to_numeric(df[ticker], errors='coerce') daily_pct_change = numeric_prices.pct_change().abs() outlier_days = daily_pct_change[daily_pct_change > 1.0].index if not outlier_days.empty: outlier_report.append({'Ticker': ticker, 'Outliers Removed': len(outlier_days)}) df.loc[outlier_days, ticker] = np.nan return df, outlier_report def normalise_strategy_score(raw_score, benchmark_for_100_percent=0.25): if raw_score <= 0: return 0.0 return min((raw_score / benchmark_for_100_percent) * 100, 100.0) # --- 2. Custom Backtesting Engine --- def calculate_confidence_score(df, use_rsi, use_volatility, use_trend, use_volume, rsi_w, vol_w, trend_w, vol_w_val): long_score = pd.Series(0.0, index=df.index) short_score = pd.Series(0.0, index=df.index) total_weight = 0.0 if use_rsi and 'RSI' in df.columns: total_weight += rsi_w long_score += ((30 - df['RSI']) / 30).clip(0, 1) * rsi_w short_score += ((df['RSI'] - 70) / 30).clip(0, 1) * rsi_w if use_volatility and 'Volatility_p' in df.columns: total_weight += vol_w score = (df['Volatility_p'] > 0.025).astype(float) * vol_w long_score += score short_score += score if use_trend and 'SMA_200' in df.columns: total_weight += trend_w pct_dist = (df['Close'] - df['SMA_200']) / df['SMA_200'] long_score += (pct_dist / 0.10).clip(0, 1) * trend_w short_score += (-pct_dist / 0.10).clip(0, 1) * trend_w if use_volume and 'Volume_Ratio' in df.columns: total_weight += vol_w_val score = ((df['Volume_Ratio'] - 1.75) / 2.25).clip(0, 1) * vol_w_val long_score += score short_score += score if total_weight > 0: return (long_score / total_weight) * 100, (short_score / total_weight) * 100 return pd.Series(100.0, index=df.index), pd.Series(100.0, index=df.index) def run_backtest(data, params, use_rsi, use_volatility, use_trend, use_volume, rsi_weight, volatility_weight, trend_weight, volume_weight, veto_setup=None): df = data.copy() df['Close'] = pd.to_numeric(df['Close'], errors='coerce').replace(0, np.nan) df.dropna(subset=['Close'], inplace=True) if len(df) < params.get('large_ma_period', 200) or len(df) < params.get('bband_period', 20): return 0, 0, 0, 0, None, ([], [], [], []), [] df['large_ma'] = df['Close'].rolling(window=params['large_ma_period']).mean() # --- CORRECTED: Calculate Bollinger Bands using the 'ta' library --- indicator_bb = BollingerBands(close=df['Close'], window=params['bband_period'], window_dev=params['bband_std_dev']) df['bband_lower'] = indicator_bb.bollinger_lband() df['bband_upper'] = indicator_bb.bollinger_hband() # --- CORRECTED: Calculate RSI using the 'ta' library --- indicator_rsi = RSIIndicator(close=df['Close'], window=14) df['RSI'] = indicator_rsi.rsi() df['Volatility_p'] = df['Close'].pct_change().rolling(window=14).std() df['SMA_200'] = df['Close'].rolling(window=200, min_periods=1).mean() if 'Volume' in df.columns: df['Volume'] = pd.to_numeric(df['Volume'], errors='coerce').fillna(0) df['Volume_MA50'] = df['Volume'].rolling(window=50, min_periods=1).mean() df['Volume_Ratio'] = (df['Volume'] / df['Volume_MA50']).replace([np.inf, -np.inf], np.nan).fillna(0) df['long_confidence_score'], df['short_confidence_score'] = calculate_confidence_score(df, use_rsi, use_volatility, use_trend, use_volume, rsi_weight, volatility_weight, trend_weight, volume_weight) if veto_setup: veto_weight = veto_setup.get('Weight', 1.0) df['long_veto_score'], df['short_veto_score'] = calculate_confidence_score(df, veto_setup['RSI'], veto_setup['Volatility'], veto_setup['TREND'], veto_setup['Volume'], veto_weight, veto_weight, veto_weight, veto_weight) base_long_trigger = df['Close'] < (df['bband_lower'] * (1 - params['long_entry_threshold_pct'])) base_short_trigger = df['Close'] > (df['bband_upper'] * (1 + params['short_entry_threshold_pct'])) long_entry_trigger = base_long_trigger & (df['long_confidence_score'] >= params['confidence_threshold']) short_entry_trigger = base_short_trigger & (df['short_confidence_score'] >= params['confidence_threshold']) if veto_setup: long_veto_trigger = df['long_veto_score'] >= veto_setup['Conf. Threshold'] short_veto_trigger = df['short_veto_score'] >= veto_setup['Conf. Threshold'] long_entry_trigger &= ~long_veto_trigger short_entry_trigger &= ~short_veto_trigger long_exit_trigger = (df['Close'] >= (df['large_ma'] * (1 + params['long_exit_ma_threshold_pct']))) | (df['Close'] >= df['bband_upper']) short_exit_trigger = (df['Close'] <= (df['large_ma'] * (1 - params['short_exit_ma_threshold_pct']))) | (df['Close'] <= df['bband_lower']) df['long_signal'] = np.nan; df.loc[long_entry_trigger, 'long_signal'] = 1; df.loc[long_exit_trigger, 'long_signal'] = 0 df['short_signal'] = np.nan; df.loc[short_entry_trigger, 'short_signal'] = -1; df.loc[short_exit_trigger, 'short_signal'] = 0 df['long_position'] = df['long_signal'].ffill().fillna(0); df['short_position'] = df['short_signal'].ffill().fillna(0) if params['long_delay_days'] > 0: df['long_position'] = df['long_position'].shift(params['long_delay_days']).fillna(0) if params['short_delay_days'] > 0: df['short_position'] = df['short_position'].shift(params['short_delay_days']).fillna(0) if params['long_stop_loss_pct'] > 0: long_entry_prices = df['Close'].where((df['long_position'] == 1) & (df['long_position'].shift(1) == 0)).ffill() long_sl_hit = (df['Close'] < (long_entry_prices * (1 - params['long_stop_loss_pct']))) & (df['long_position'] == 1) for index in long_sl_hit[long_sl_hit].index: df.loc[index:, 'long_position'] = 0 if params['short_stop_loss_pct'] > 0: short_entry_prices = df['Close'].where((df['short_position'] == -1) & (df['short_position'].shift(1) == 0)).ffill() short_sl_hit = (df['Close'] > (short_entry_prices * (1 + params['short_stop_loss_pct']))) & (df['short_position'] == -1) for index in short_sl_hit[short_sl_hit].index: df.loc[index:, 'short_position'] = 0 df['daily_return'] = df['Close'].pct_change() df['long_strategy_return'] = df['long_position'].shift(1) * df['daily_return'] df['short_strategy_return'] = df['short_position'].shift(1) * df['daily_return'] final_long_pnl = (1 + df['long_strategy_return']).prod(skipna=True) - 1 final_short_pnl = (1 + df['short_strategy_return']).prod(skipna=True) - 1 long_entries = df[(df['long_position'] == 1) & (df['long_position'].shift(1) == 0)] long_exits = df[(df['long_position'] == 0) & (df['long_position'].shift(1) == 1)] short_entries = df[(df['short_position'] == -1) & (df['short_position'].shift(1) == 0)] short_exits = df[(df['short_position'] == 0) & (df['short_position'].shift(1) == -1)] long_trade_profits = [] for idx, row in long_entries.iterrows(): future_exits = long_exits[long_exits.index > idx] if not future_exits.empty: long_trade_profits.append((future_exits.iloc[0]['Close'] / row['Close']) - 1) avg_long_profit_per_trade = np.mean(long_trade_profits) if long_trade_profits else 0 short_trade_profits = [] for idx, row in short_entries.iterrows(): future_exits = short_exits[short_exits.index > idx] if not future_exits.empty: short_trade_profits.append(((future_exits.iloc[0]['Close'] / row['Close']) - 1) * -1) avg_short_profit_per_trade = np.mean(short_trade_profits) if short_trade_profits else 0 long_trades_log = [{'date': idx, 'price': row['Close'], 'confidence': row['long_confidence_score']} for idx, row in long_entries.iterrows()] short_trades_log = [{'date': idx, 'price': row['Close'], 'confidence': row['short_confidence_score']} for idx, row in short_entries.iterrows()] open_trades = [] if not df.empty: last_close = df['Close'].iloc[-1] if df['long_position'].iloc[-1] == 1 and not long_entries.empty: last_entry = long_entries.iloc[-1] pnl = (last_close / last_entry['Close']) - 1 open_trades.append({'Side': 'Long', 'Date Open': last_entry.name, 'Start Confidence': last_entry['long_confidence_score'], 'Current % P/L': pnl}) if df['short_position'].iloc[-1] == -1 and not short_entries.empty: last_entry = short_entries.iloc[-1] pnl = ((last_close / last_entry['Close']) - 1) * -1 open_trades.append({'Side': 'Short', 'Date Open': last_entry.name, 'Start Confidence': last_entry['short_confidence_score'], 'Current % P/L': pnl}) df.sort_index(inplace=True) return final_long_pnl, final_short_pnl, avg_long_profit_per_trade, avg_short_profit_per_trade, df, (long_trades_log, long_exits.index, short_trades_log, short_exits.index), open_trades # --- 3. Charting and Display Functions --- def generate_long_plot(df, trades, ticker): fig = go.Figure(); fig.add_trace(go.Scatter(x=df.index, y=df['Close'], mode='lines', name='Close Price', line=dict(color='blue'))); fig.add_trace(go.Scatter(x=df.index, y=df['large_ma'], mode='lines', name='Large MA', line=dict(color='orange', dash='dash'))); fig.add_trace(go.Scatter(x=df.index, y=df['bband_upper'], mode='lines', name='Upper Band', line=dict(color='gray', width=0.5))); fig.add_trace(go.Scatter(x=df.index, y=df['bband_lower'], mode='lines', name='Lower Band', line=dict(color='gray', width=0.5), fill='tonexty', fillcolor='rgba(211,211,211,0.2)')) long_entries_log, long_exits, _, _ = trades if long_entries_log: dates = [t['date'] for t in long_entries_log]; prices = [t['price'] for t in long_entries_log]; scores = [f"Confidence: {t['confidence']:.0f}%" for t in long_entries_log] fig.add_trace(go.Scatter(x=dates, y=prices, mode='markers', name='Long Entry', marker=dict(color='green', symbol='triangle-up', size=12), text=scores, hoverinfo='text')) if not long_exits.empty: fig.add_trace(go.Scatter(x=long_exits, y=df.loc[long_exits,'Close'], mode='markers', name='Long Exit', marker=dict(color='darkgreen', symbol='x', size=8))) fig.update_layout(title=f'Long Trades for {ticker}', xaxis_title='Date', yaxis_title='Price', legend_title="Indicator"); return fig def generate_short_plot(df, trades, ticker): fig = go.Figure(); fig.add_trace(go.Scatter(x=df.index, y=df['Close'], mode='lines', name='Close Price', line=dict(color='blue'))); fig.add_trace(go.Scatter(x=df.index, y=df['large_ma'], mode='lines', name='Large MA', line=dict(color='orange', dash='dash'))); fig.add_trace(go.Scatter(x=df.index, y=df['bband_upper'], mode='lines', name='Upper Band', line=dict(color='gray', width=0.5))); fig.add_trace(go.Scatter(x=df.index, y=df['bband_lower'], mode='lines', name='Lower Band', line=dict(color='gray', width=0.5), fill='tonexty', fillcolor='rgba(211,211,211,0.2)')) _, _, short_entries_log, short_exits = trades if short_entries_log: dates = [t['date'] for t in short_entries_log]; prices = [t['price'] for t in short_entries_log]; scores = [f"Confidence: {t['confidence']:.0f}%" for t in short_entries_log] fig.add_trace(go.Scatter(x=dates, y=prices, mode='markers', name='Short Entry', marker=dict(color='red', symbol='triangle-down', size=12), text=scores, hoverinfo='text')) if not short_exits.empty: fig.add_trace(go.Scatter(x=short_exits, y=df.loc[short_exits,'Close'], mode='markers', name='Short Exit', marker=dict(color='darkred', symbol='x', size=8))) fig.update_layout(title=f'Short Trades for {ticker}', xaxis_title='Date', yaxis_title='Price', legend_title="Indicator"); return fig def display_summary_analytics(summary_df): st.subheader("Overall Strategy Performance") col1, col2 = st.columns(2) for side in ["Long", "Short"]: active_trades_df = summary_df[summary_df[f'Num {side} Trades'] > 0] container = col1 if side == "Long" else col2 with container: st.subheader(f"{side} Trades") if not active_trades_df.empty: total_trades = active_trades_df[f'Num {side} Trades'].sum() avg_trade_profit = (active_trades_df[f'Avg {side} Profit per Trade'] * active_trades_df[f'Num {side} Trades']).sum() / total_trades if total_trades > 0 else 0 avg_cumulative_profit = active_trades_df[f'Cumulative {side} P&L'].mean() avg_confidence = active_trades_df[f'Avg {side} Confidence'].mean() if pd.isna(avg_confidence): avg_confidence = 0 good_tickers = (active_trades_df[f'Cumulative {side} P&L'] > 0).sum(); bad_tickers = (active_trades_df[f'Cumulative {side} P&L'] < 0).sum() good_bad_ratio = good_tickers / bad_tickers if bad_tickers > 0 else float('inf') raw_strategy_score = avg_trade_profit * good_bad_ratio if np.isfinite(good_bad_ratio) else 0.0 display_score = normalise_strategy_score(raw_strategy_score) st.metric("Strategy Score", f"{display_score:.2f}%"); st.metric("Avg Cumulative Profit (Active Tickers)", f"{avg_cumulative_profit:.2%}"); st.metric("Avg Profit per Trade (Active Tickers)", f"{avg_trade_profit:.2%}"); st.metric(f"Average Entry Confidence", f"{avg_confidence:.0f}%") st.text(f"Profitable Tickers: {good_tickers}") st.text(f"Losing Tickers: {bad_tickers}") st.text(f"Total Individual Trades: {int(total_trades)}") st.text(f"Good/Bad Ratio: {good_bad_ratio:.2f}") else: st.info("No trades found for this side with current settings.") # --- 4. Optimisation Functions (Parallelised) --- def run_single_parameter_test(params, master_df, optimise_for, tickers, date_range, power, confidence_settings): total_profit_weighted_avg, total_trades, winning_tickers, losing_tickers = 0, 0, 0, 0 use_rsi, use_vol, use_trend, use_volume = confidence_settings['toggles'] rsi_w, vol_w, trend_w, volume_w = confidence_settings['weights'] if not isinstance(tickers, list): tickers = [tickers] for ticker in tickers: cols_to_use = [ticker] if f'{ticker}_Volume' in master_df.columns: cols_to_use.append(f'{ticker}_Volume') ticker_data = master_df.loc[date_range[0]:date_range[1], cols_to_use] rename_dict = {ticker: 'Close', f'{ticker}_Volume': 'Volume'} ticker_data = ticker_data.rename(columns=rename_dict) if not ticker_data.empty: long_pnl, short_pnl, avg_long_trade, avg_short_trade, _, trades, _ = run_backtest( ticker_data, params, use_rsi, use_vol, use_trend, use_volume, rsi_w, vol_w, trend_w, volume_w ) if optimise_for == 'long': pnl, avg_trade_profit, num_trades = long_pnl, avg_long_trade, len(trades[0]) else: pnl, avg_trade_profit, num_trades = short_pnl, avg_short_trade, len(trades[2]) if num_trades > 0: total_trades += num_trades; total_profit_weighted_avg += avg_trade_profit * num_trades if pnl > 0: winning_tickers += 1 elif pnl < 0: losing_tickers += 1 current_metric = -np.inf if total_trades > 0: overall_avg_profit_per_trade = total_profit_weighted_avg / total_trades if losing_tickers > 0: good_bad_ratio = winning_tickers / losing_tickers elif winning_tickers > 0: good_bad_ratio = np.inf else: good_bad_ratio = 0 if overall_avg_profit_per_trade > 0: current_metric = (overall_avg_profit_per_trade ** power) * good_bad_ratio else: current_metric = overall_avg_profit_per_trade return (current_metric, params) def generate_and_run_optimisation(main_df, main_content_placeholder, optimise_for, use_squared_weighting): st.session_state.summary_df = None st.session_state.single_ticker_results = None st.session_state.confidence_results_df = None st.session_state.open_trades_df = None st.session_state.advisor_df = None with main_content_placeholder.container(): defaults = st.session_state.widget_defaults ma_range = range(st.session_state.ma_start_num, st.session_state.ma_end_num + 1, st.session_state.ma_step_num) if st.session_state.opt_ma_cb else [defaults['large_ma_period']] bb_range = range(st.session_state.bb_start_num, st.session_state.bb_end_num + 1, st.session_state.bb_step_num) if st.session_state.opt_bb_cb else [defaults['bband_period']] std_range = np.arange(st.session_state.std_start_num, st.session_state.std_end_num + 0.001, st.session_state.std_step_num) if st.session_state.opt_std_cb else [defaults['bband_std_dev']] sl_range = np.arange(st.session_state.sl_start_num, st.session_state.sl_end_num + 0.001, st.session_state.sl_step_num) / 100 if st.session_state.opt_sl_cb else [defaults['long_stop_loss_pct']] delay_range = range(st.session_state.delay_start_num, st.session_state.delay_end_num + 1, st.session_state.delay_step_num) if st.session_state.opt_delay_cb else [defaults['long_delay_days']] entry_range = np.arange(st.session_state.entry_start_num, st.session_state.entry_end_num + 0.001, st.session_state.entry_step_num) / 100 if st.session_state.opt_entry_cb else [defaults['long_entry_threshold_pct']] exit_range = np.arange(st.session_state.exit_start_num, st.session_state.exit_end_num + 0.001, st.session_state.exit_step_num) / 100 if st.session_state.opt_exit_cb else [defaults['long_exit_ma_threshold_pct']] conf_range = range(st.session_state.conf_start_num, st.session_state.conf_end_num + 1, st.session_state.conf_step_num) if st.session_state.opt_conf_cb else [defaults['confidence_threshold']] param_product = itertools.product(ma_range, bb_range, std_range, sl_range, delay_range, entry_range, exit_range, conf_range) param_combinations = [{ "large_ma_period": p[0], "bband_period": p[1], "bband_std_dev": p[2], "long_stop_loss_pct": p[3], "short_stop_loss_pct": p[3], "long_delay_days": p[4], "short_delay_days": p[4], "long_entry_threshold_pct": p[5], "short_entry_threshold_pct": p[5], "long_exit_ma_threshold_pct": p[6], "short_exit_ma_threshold_pct": p[6], "confidence_threshold": p[7] } for p in param_product] total_combinations = len(param_combinations) if total_combinations <= 1: st.warning("No optimisation parameters selected."); return confidence_settings = { 'toggles': (st.session_state.use_rsi, st.session_state.use_vol, st.session_state.use_trend, st.session_state.use_volume), 'weights': (st.session_state.rsi_w, st.session_state.vol_w, st.session_state.trend_w, st.session_state.volume_w) } num_cores = cpu_count() st.info(f"Starting {optimise_for.upper()} optimisation on {num_cores} cores... Testing {total_combinations} combinations.") tickers_to_run = [col for col in main_df.columns if '_volume' not in str(col).lower()] if st.session_state.run_mode == "Analyse Full List" else [st.session_state.ticker_select] date_range = (pd.Timestamp(st.session_state.start_date), pd.Timestamp(st.session_state.end_date)) power = 2 if use_squared_weighting else 1 best_metric, best_params = -np.inf, {} status_text = st.empty(); status_text.text("Optimisation starting...") progress_bar = st.progress(0) worker_func = partial(run_single_parameter_test, master_df=main_df, optimise_for=optimise_for, tickers=tickers_to_run, date_range=date_range, power=power, confidence_settings=confidence_settings) with Pool(processes=num_cores) as pool: iterator = pool.imap_unordered(worker_func, param_combinations) for i, (metric, params) in enumerate(iterator, 1): if metric > best_metric: best_metric, best_params = metric, params display_score = normalise_strategy_score(best_metric) status_text.text(f"Testing... New Best Score: {display_score:.2f}%") progress_bar.progress(i / total_combinations, text=f"Optimising... {i}/{total_combinations} combinations complete.") status_text.empty() if best_params: display_score = normalise_strategy_score(best_metric) st.success(f"Optimisation Complete! Best Strategy Score: {display_score:.2f}%") st.subheader("Optimal Parameters Found"); st.json(best_params) st.session_state.best_params = best_params else: st.warning("Optimisation finished, but no profitable combinations were found.") def run_single_confidence_test(task, base_params, master_df, date_range, tickers_to_run, optimise_for, factor_weights): combo, threshold, _ = task use_rsi, use_volatility, use_trend, use_volume = combo test_params = base_params.copy() test_params["confidence_threshold"] = threshold total_profit_weighted_avg, total_trades, winning_tickers, losing_tickers = 0, 0, 0, 0 all_confidences = [] for ticker in tickers_to_run: cols_to_use = [ticker] if f'{ticker}_Volume' in master_df.columns: cols_to_use.append(f'{ticker}_Volume') ticker_data = master_df.loc[date_range[0]:date_range[1], cols_to_use] rename_dict = {ticker: 'Close', f'{ticker}_Volume': 'Volume'} ticker_data = ticker_data.rename(columns=rename_dict) if not ticker_data.empty: long_pnl, short_pnl, avg_long_trade, avg_short_trade, _, trades, _ = run_backtest( ticker_data, test_params, use_rsi, use_volatility, use_trend, use_volume, factor_weights['rsi'], factor_weights['vol'], factor_weights['trend'], factor_weights['volume'] ) if optimise_for == 'long': pnl, avg_trade_profit, trade_log = long_pnl, avg_long_trade, trades[0] else: pnl, avg_trade_profit, trade_log = short_pnl, avg_short_trade, trades[2] num_trades = len(trade_log) if num_trades > 0: total_trades += num_trades total_profit_weighted_avg += avg_trade_profit * num_trades if pnl > 0: winning_tickers += 1 elif pnl < 0: losing_tickers += 1 all_confidences.extend([trade['confidence'] for trade in trade_log]) raw_score, badness_score, overall_avg_profit, good_bad_ratio = 0.0, 0.0, 0.0, 0.0 if total_trades > 0: overall_avg_profit = total_profit_weighted_avg / total_trades if losing_tickers > 0: good_bad_ratio = winning_tickers / losing_tickers raw_score = overall_avg_profit * good_bad_ratio elif winning_tickers > 0: good_bad_ratio = float('inf') raw_score = overall_avg_profit * 100 if winning_tickers > 0 and overall_avg_profit < 0: badness_score = (losing_tickers / winning_tickers) * abs(overall_avg_profit) avg_entry_confidence = np.mean(all_confidences) if all_confidences else 0 return { "RSI": use_rsi, "Volatility": use_volatility, "TREND": use_trend, "Volume": use_volume, "Conf. Threshold": threshold, "Avg Profit/Trade": overall_avg_profit, "Good/Bad Ratio": good_bad_ratio, "Winning Tickers": winning_tickers, "Losing Tickers": losing_tickers, "Avg Entry Conf.": avg_entry_confidence, "Good Score": raw_score, "Bad Score": badness_score, "Norm. Score %": normalise_strategy_score(raw_score), "Total Trades": total_trades } def run_confidence_optimisation(optimise_for, find_mode, master_df, main_content_placeholder, veto_factors): st.session_state.summary_df = None st.session_state.single_ticker_results = None st.session_state.open_trades_df = None st.session_state.best_params = None st.session_state.advisor_df = None with main_content_placeholder.container(): num_cores = cpu_count() st.info(f"Starting to find **{find_mode.upper()}** {optimise_for.upper()} setups on {num_cores} CPU cores...") factors = ['RSI', 'Volatility', 'TREND', 'Volume'] if find_mode == 'worst': use_rsi, use_vol, use_trend, use_volume = veto_factors on_off_combos = [c for c in itertools.product([False, True], repeat=4) if c == (use_rsi, use_vol, use_trend, use_volume)] if not any(on_off_combos[0]): st.warning("Please select at least one factor for the Veto search."); return else: on_off_combos = [c for c in itertools.product([False, True], repeat=len(factors)) if any(c)] thresholds_to_test = [10, 25, 50, 85] tasks = list(itertools.product(on_off_combos, thresholds_to_test, [1.0])) total_tasks = len(tasks) base_params = { "large_ma_period": st.session_state.ma_period, "bband_period": st.session_state.bb_period, "bband_std_dev": st.session_state.bb_std, "long_entry_threshold_pct": st.session_state.long_entry / 100, "long_exit_ma_threshold_pct": st.session_state.long_exit / 100, "long_stop_loss_pct": st.session_state.long_sl / 100, "long_delay_days": st.session_state.long_delay, "short_entry_threshold_pct": st.session_state.short_entry / 100, "short_exit_ma_threshold_pct": st.session_state.short_exit / 100, "short_stop_loss_pct": st.session_state.short_sl / 100, "short_delay_days": st.session_state.short_delay, } tickers_to_run = sorted([col for col in master_df.columns if '_volume' not in str(col).lower()]) date_range = (pd.Timestamp(st.session_state.start_date), pd.Timestamp(st.session_state.end_date)) factor_weights = { "rsi": st.session_state.rsi_w, "vol": st.session_state.vol_w, "trend": st.session_state.trend_w, "volume": st.session_state.volume_w } worker_func = partial(run_single_confidence_test, base_params=base_params, master_df=master_df, date_range=date_range, tickers_to_run=tickers_to_run, optimise_for=optimise_for, factor_weights=factor_weights) results_list = [] progress_bar = st.progress(0, text="Optimisation starting...") with Pool(processes=num_cores) as pool: iterator = pool.imap_unordered(worker_func, tasks) for i, result in enumerate(iterator, 1): results_list.append(result) progress_bar.progress(i / total_tasks, text=f"Optimising... {i}/{total_tasks} combinations complete.") if results_list: results_df = pd.DataFrame(results_list) sort_col = "Good Score" if find_mode == 'best' else "Bad Score" results_df = results_df.sort_values(by=sort_col, ascending=False).reset_index(drop=True) for factor in factors: results_df[factor] = results_df[factor].apply(lambda x: "On" if x else "Off") st.subheader(f"🏆 Top {find_mode.title()} Confidence Setup Found ({optimise_for.title()} Trades)") best_setup = results_df.iloc[0] st.dataframe(best_setup) if find_mode == 'best': st.session_state.best_confidence_setup = best_setup.to_dict() save_top_setups(results_df, optimise_for) else: st.session_state.worst_confidence_setup = best_setup.to_dict() st.session_state.confidence_results_df = results_df else: st.warning("Confidence optimisation completed but no results were generated.") st.session_state.confidence_results_df = None def generate_advisor_report(main_df, main_content_placeholder): st.session_state.summary_df = None st.session_state.single_ticker_results = None st.session_state.confidence_results_df = None st.session_state.open_trades_df = None st.session_state.best_params = None with main_content_placeholder.container(): st.header("📈 Advanced Advisor Report") top_setups = load_top_setups() if not top_setups: st.warning("No saved top setups found. Please run a 'Find Best Confidence' optimisation from Section 5 first.") return side = st.radio("Generate report for which setups?", ("Long", "Short"), horizontal=True) setups_to_run = top_setups.get(side.lower()) if not setups_to_run: st.warning(f"No saved top {side.lower()} setups found in the file.") return st.info(f"Scanning all tickers for open trades based on the top {len(setups_to_run)} saved {side} setups...") base_params = {"large_ma_period": st.session_state.ma_period, "bband_period": st.session_state.bb_period, "bband_std_dev": st.session_state.bb_std, "long_entry_threshold_pct": st.session_state.long_entry / 100, "long_exit_ma_threshold_pct": st.session_state.long_exit / 100, "long_stop_loss_pct": st.session_state.long_sl / 100, "long_delay_days": st.session_state.long_delay, "short_entry_threshold_pct": st.session_state.short_entry / 100, "short_exit_ma_threshold_pct": st.session_state.short_exit / 100, "short_stop_loss_pct": st.session_state.short_sl / 100, "short_delay_days": st.session_state.short_delay, } factor_weights = {"rsi": st.session_state.rsi_w, "vol": st.session_state.vol_w, "trend": st.session_state.trend_w, "volume": st.session_state.volume_w} all_advisor_trades = [] ticker_list = sorted([col for col in main_df.columns if '_volume' not in str(col).lower()]) progress_bar = st.progress(0, text="Scanning setups...") for i, setup in enumerate(setups_to_run): progress_bar.progress((i + 1) / len(setups_to_run), text=f"Scanning with Setup #{i+1}...") use_rsi = setup.get('RSI') == 'On' use_vol = setup.get('Volatility') == 'On' use_trend = setup.get('TREND') == 'On' use_volume = setup.get('Volume') == 'On' params_for_run = base_params.copy() params_for_run['confidence_threshold'] = setup.get('Conf. Threshold') for ticker_symbol in ticker_list: cols_to_use = [ticker_symbol] if f'{ticker_symbol}_Volume' in main_df.columns: cols_to_use.append(f'{ticker_symbol}_Volume') data_for_backtest = main_df[cols_to_use].rename(columns={ticker_symbol: 'Close', f'{ticker_symbol}_Volume': 'Volume'}) _, _, _, _, _, _, open_trades = run_backtest(data_for_backtest, params_for_run, use_rsi, use_vol, use_trend, use_volume, factor_weights['rsi'], factor_weights['vol'], factor_weights['trend'], factor_weights['volume']) if open_trades: for trade in open_trades: if trade['Side'].lower() == side.lower(): trade['Ticker'] = ticker_symbol trade['Setup Rank'] = i + 1 trade['Setup G/B Ratio'] = setup.get('Good/Bad Ratio') trade['Setup Avg Profit'] = setup.get('Avg Profit/Trade') all_advisor_trades.append(trade) progress_bar.empty() if all_advisor_trades: advisor_df = pd.DataFrame(all_advisor_trades) cols_order = ['Ticker', 'Setup Rank', 'Current % P/L', 'Side', 'Date Open', 'Start Confidence', 'Setup G/B Ratio', 'Setup Avg Profit'] advisor_df = advisor_df[cols_order] st.session_state.advisor_df = advisor_df else: st.success(f"No open {side} trades found matching any of the top setups.") st.session_state.advisor_df = pd.DataFrame() # --- 5. Streamlit User Interface --- def main(): st.set_page_config(page_title="Stock Backtesting Sandbox", page_icon="📈", layout="wide") if 'first_run' not in st.session_state: st.session_state.first_run = True st.session_state.widget_defaults = load_settings() st.session_state.veto_setup = load_veto_setup() st.session_state.summary_df = None st.session_state.single_ticker_results = None st.session_state.confidence_results_df = None st.session_state.open_trades_df = None st.session_state.best_params = None st.title("📈 Stock Backtesting Sandbox") st.success(f"Good morning! Today is {date.today().strftime('%A, %d %B %Y')}.") main_content_placeholder = st.empty() if 'master_df' not in st.session_state: with main_content_placeholder.container(): master_df, load_message = load_all_data('csv_data') if master_df is None: st.error(load_message); st.stop() else: st.info(load_message) master_df, outlier_report = clean_data_and_report_outliers(master_df) if outlier_report: report_df = pd.DataFrame(outlier_report) st.info(f"Data Cleaning: Found and removed price spikes >100% in {len(outlier_report)} tickers.") st.download_button("⬇️ Download Outlier Report", report_df.to_csv(index=False).encode('utf-8'), "outlier_report.csv", "text/csv") st.session_state.master_df = master_df st.session_state.ticker_list = sorted([col for col in master_df.columns if '_volume' not in str(col).lower()]) master_df = st.session_state.master_df ticker_list = st.session_state.ticker_list defaults = st.session_state.widget_defaults st.sidebar.header("1. Select Test Mode") st.sidebar.radio("Mode:", ("Analyse Single Ticker", "Analyse Full List"), key='run_mode', index=1) if st.session_state.get('run_mode') == "Analyse Single Ticker": st.sidebar.selectbox("Select a Ticker:", ticker_list, key='ticker_select') st.sidebar.date_input("Start Date", master_df.index.min().date(), key='start_date') st.sidebar.date_input("End Date", master_df.index.max().date(), key='end_date') st.markdown("""""", unsafe_allow_html=True) if st.sidebar.button("🚀 Run Analysis", type="primary", key="run_analysis_button"): st.session_state.confidence_results_df = None st.session_state.best_params = None st.sidebar.markdown("---") st.sidebar.header("2. Confidence Score Factors (for Main Signal)") st.sidebar.toggle("Use Momentum (RSI)", value=True, key='use_rsi') st.sidebar.number_input("RSI Weight", 0.1, 5.0, 1.0, 0.1, key='rsi_w', disabled=not st.session_state.get('use_rsi', True)) st.sidebar.toggle("Use Volatility", value=True, key='use_vol') st.sidebar.number_input("Volatility Weight", 0.1, 5.0, 1.0, 0.1, key='vol_w', disabled=not st.session_state.get('use_vol', True)) st.sidebar.toggle("Use Trend (200d MA)", value=True, key='use_trend') st.sidebar.number_input("Trend Weight", 0.1, 5.0, 1.0, 0.1, key='trend_w', disabled=not st.session_state.get('use_trend', True)) st.sidebar.toggle("Use Volume Spike", value=True, key='use_volume') st.sidebar.number_input("Volume Weight", 0.1, 5.0, 1.0, 0.1, key='volume_w', disabled=not st.session_state.get('use_volume', True)) st.sidebar.slider("Minimum Confidence Threshold (%)", 0, 100, defaults.get("confidence_threshold", 50), 5, key='confidence_slider') st.sidebar.markdown("---") st.sidebar.header("3. Strategy Parameters") st.sidebar.number_input("Large MA Period", 10, 200, defaults.get("large_ma_period", 50), 1, key='ma_period') st.sidebar.number_input("Bollinger Band Period", 10, 100, defaults.get("bband_period", 20), 1, key='bb_period') st.sidebar.number_input("Bollinger Band Std Dev", 1.0, 4.0, defaults.get("bband_std_dev", 2.0), 0.1, key='bb_std') st.sidebar.subheader("Long Trade Logic"); st.sidebar.slider("Entry Threshold (%)", 0.0, 10.0, defaults.get("long_entry_threshold_pct", 0.0) * 100, 0.1, key='long_entry'); st.sidebar.slider("Exit MA Threshold (%)", 0.0, 10.0, defaults.get("long_exit_ma_threshold_pct", 0.0) * 100, 0.1, key='long_exit'); st.sidebar.slider("Stop Loss (%)", 0.0, 30.0, defaults.get("long_stop_loss_pct", 0.0) * 100, 0.5, key='long_sl'); st.sidebar.number_input("Delay Entry (days)", 0, 10, defaults.get("long_delay_days", 0), 1, key='long_delay') st.sidebar.subheader("Short Trade Logic"); st.sidebar.slider("Entry Threshold (%)", 0.0, 10.0, defaults.get("short_entry_threshold_pct", 0.0) * 100, 0.1, key='short_entry'); st.sidebar.slider("Exit MA Threshold (%)", 0.0, 10.0, defaults.get("short_exit_ma_threshold_pct", 0.0) * 100, 0.1, key='short_exit'); st.sidebar.slider("Stop Loss (%)", 0.0, 30.0, defaults.get("short_stop_loss_pct", 0.0) * 100, 0.5, key='short_sl'); st.sidebar.number_input("Delay Entry (days)", 0, 10, defaults.get("short_delay_days", 0), 1, key='short_delay') st.sidebar.markdown("---") st.sidebar.header("4. Find Best Parameters") with st.sidebar.expander("Set Optimisation Ranges"): use_squared_weighting = st.toggle("Prioritise Profit per Trade (Squared Weighting)") st.markdown("---") optimise_ma = st.checkbox("Optimise MA Period", False, key="opt_ma_cb") c1,c2,c3 = st.columns(3); st.session_state.ma_start_num = c1.number_input("MA Start", 10, 200, 50, 5, disabled=not optimise_ma, key='ma_start'); st.session_state.ma_end_num = c2.number_input("MA End", 10, 200, 55, 5, disabled=not optimise_ma, key='ma_end'); st.session_state.ma_step_num = c3.number_input("MA Step", 1, 20, 5, disabled=not optimise_ma, key='ma_step') optimise_bb = st.checkbox("Optimise BB Period", False, key="opt_bb_cb") c1,c2,c3 = st.columns(3); st.session_state.bb_start_num = c1.number_input("BB Start", 10, 100, 20, 5, disabled=not optimise_bb, key='bb_start'); st.session_state.bb_end_num = c2.number_input("BB End", 10, 100, 25, 5, disabled=not optimise_bb, key='bb_end'); st.session_state.bb_step_num = c3.number_input("BB Step", 1, 10, 5, disabled=not optimise_bb, key='bb_step') optimise_std = st.checkbox("Optimise BB Std Dev", False, key="opt_std_cb") c1,c2,c3 = st.columns(3); st.session_state.std_start_num = c1.number_input("Std Start", 1.0, 4.0, 2.0, 0.1, format="%.1f", disabled=not optimise_std, key='std_start'); st.session_state.std_end_num = c2.number_input("Std End", 1.0, 4.0, 2.1, 0.1, format="%.1f", disabled=not optimise_std, key='std_end'); st.session_state.std_step_num = c3.number_input("Std Step", 0.1, 1.0, 0.1, format="%.1f", disabled=not optimise_std, key='std_step') st.markdown("---") optimise_conf = st.checkbox("Optimise Confidence Threshold", False, key="opt_conf_cb") c1,c2,c3 = st.columns(3); st.session_state.conf_start_num = c1.number_input("Conf Start", 0, 100, 50, 5, disabled=not optimise_conf, key='conf_start'); st.session_state.conf_end_num = c2.number_input("Conf End", 0, 100, 75, 5, disabled=not optimise_conf, key='conf_end'); st.session_state.conf_step_num = c3.number_input("Conf Step", 5, 25, 5, disabled=not optimise_conf, key='conf_step') optimise_sl = st.checkbox("Optimise Stop Loss %", False, key="opt_sl_cb") c1,c2,c3 = st.columns(3); st.session_state.sl_start_num = c1.number_input("SL Start", 0.0, 30.0, 2.0, 0.5, disabled=not optimise_sl, key='sl_start'); st.session_state.sl_end_num = c2.number_input("SL End", 0.0, 30.0, 5.0, 0.5, disabled=not optimise_sl, key='sl_end'); st.session_state.sl_step_num = c3.number_input("SL Step", 0.1, 5.0, 0.5, disabled=not optimise_sl, key='sl_step') optimise_delay = st.checkbox("Optimise Delay Days", False, key="opt_delay_cb") c1,c2,c3 = st.columns(3); st.session_state.delay_start_num = c1.number_input("Delay Start", 0, 5, 0, 1, disabled=not optimise_delay, key='delay_start'); st.session_state.delay_end_num = c2.number_input("Delay End", 0, 5, 1, 1, disabled=not optimise_delay, key='delay_end'); st.session_state.delay_step_num = c3.number_input("Delay Step", 1, 5, 1, disabled=not optimise_delay, key='delay_step') optimise_entry = st.checkbox("Optimise Entry %", False, key="opt_entry_cb") c1,c2,c3 = st.columns(3); st.session_state.entry_start_num = c1.number_input("Entry Start", 0.0, 10.0, 0.0, 0.1, disabled=not optimise_entry, key='entry_start'); st.session_state.entry_end_num = c2.number_input("Entry End", 0.0, 10.0, 1.0, 0.1, disabled=not optimise_entry, key='entry_end'); st.session_state.entry_step_num = c3.number_input("Entry Step", 0.1, 1.0, 0.1, disabled=not optimise_entry, key='entry_step') optimise_exit = st.checkbox("Optimise Exit MA %", False, key="opt_exit_cb") c1,c2,c3 = st.columns(3); st.session_state.exit_start_num = c1.number_input("Exit Start", 0.0, 10.0, 0.0, 0.1, disabled=not optimise_exit, key='exit_start'); st.session_state.exit_end_num = c2.number_input("Exit End", 0.0, 10.0, 1.0, 0.1, disabled=not optimise_exit, key='exit_end'); st.session_state.exit_step_num = c3.number_input("Exit Step", 0.1, 1.0, 0.1, disabled=not optimise_exit, key='exit_step') st.markdown("---") col1, col2 = st.columns(2) if col1.button("💡 Find Best Long"): generate_and_run_optimisation(master_df, main_content_placeholder, 'long', use_squared_weighting) if col2.button("💡 Find Best Short"): generate_and_run_optimisation(master_df, main_content_placeholder, 'short', use_squared_weighting) st.sidebar.markdown("---") st.sidebar.header("5. Find Best/Worst Confidence Setup") with st.sidebar.expander("Optimise Confidence Factors"): st.info("Finds good setups (using Section 2 factors) or bad setups (using the factors below).") st.write("**Find Best Setups (High Profit)**"); c1, c2 = st.columns(2) if c1.button("💡 Find Best Long Confidence"): run_confidence_optimisation('long', 'best', master_df, main_content_placeholder, None) if c2.button("💡 Find Best Short Confidence"): run_confidence_optimisation('short', 'best', master_df, main_content_placeholder, None) st.markdown("---") st.write("**Find Worst Setups (for Veto Filter)**") st.caption("Select the factors to test for the Veto signal:") c1, c2 = st.columns(2) veto_rsi = c1.toggle("Veto RSI", value=True) veto_vol = c2.toggle("Veto Volatility", value=True) veto_trend = c1.toggle("Veto Trend", value=True) veto_volume = c2.toggle("Veto Volume", value=True) veto_factors = (veto_rsi, veto_vol, veto_trend, veto_volume) c1, c2 = st.columns(2) if c1.button("❌ Find Worst Long"): run_confidence_optimisation('long', 'worst', master_df, main_content_placeholder, veto_factors) if c2.button("❌ Find Worst Short"): run_confidence_optimisation('short', 'worst', master_df, main_content_placeholder, veto_factors) st.sidebar.markdown("---") if st.session_state.get('veto_setup'): st.sidebar.header("Veto Filter") st.sidebar.success("Veto filter is ACTIVE.") st.sidebar.json(st.session_state.veto_setup) if st.sidebar.button("💾 Save Veto as Default"): save_veto_setup(st.session_state.veto_setup) if st.sidebar.button("Clear Veto Filter"): st.session_state.veto_setup = None st.rerun() st.sidebar.markdown("---") if st.sidebar.button("💾 Save Settings as Default"): save_settings({ "large_ma_period": st.session_state.ma_period, "bband_period": st.session_state.bb_period, "bband_std_dev": st.session_state.bb_std, "confidence_threshold": st.session_state.confidence_slider, "long_entry_threshold_pct": st.session_state.long_entry / 100, "long_exit_ma_threshold_pct": st.session_state.long_exit / 100, "long_stop_loss_pct": st.session_state.long_sl / 100, "long_delay_days": st.session_state.long_delay, "short_entry_threshold_pct": st.session_state.short_entry / 100, "short_exit_ma_threshold_pct": st.session_state.short_exit / 100, "short_stop_loss_pct": st.session_state.short_sl / 100, "short_delay_days": st.session_state.short_delay, }) # --- Trigger actions based on session state flags --- if st.session_state.get('run_analysis_button'): st.session_state.confidence_results_df = None st.session_state.best_params = None st.session_state.advisor_df = None with main_content_placeholder.container(): veto_to_use = st.session_state.get('veto_setup') if veto_to_use: st.info("Veto filter is active for this analysis.") else: st.info("💡 Tip: You can find and apply a 'Veto Filter' from section 5 in the sidebar.") manual_params = {"large_ma_period": st.session_state.ma_period, "bband_period": st.session_state.bb_period, "bband_std_dev": st.session_state.bb_std, "confidence_threshold": st.session_state.confidence_slider, "long_entry_threshold_pct": st.session_state.long_entry / 100, "long_exit_ma_threshold_pct": st.session_state.long_exit / 100, "long_stop_loss_pct": st.session_state.long_sl / 100, "long_delay_days": st.session_state.long_delay, "short_entry_threshold_pct": st.session_state.short_entry / 100, "short_exit_ma_threshold_pct": st.session_state.short_exit / 100, "short_stop_loss_pct": st.session_state.short_sl / 100, "short_delay_days": st.session_state.short_delay, } # --- FIX: Correct indentation for this block --- if st.session_state.run_mode == "Analyse Single Ticker": selected_ticker = st.session_state.get('ticker_select', ticker_list[0]) cols_to_use = [selected_ticker] if f'{selected_ticker}_Volume' in master_df.columns: cols_to_use.append(f'{selected_ticker}_Volume') data_for_backtest = master_df[cols_to_use].rename(columns={selected_ticker: 'Close', f'{selected_ticker}_Volume': 'Volume'}) ticker_data_series = data_for_backtest.loc[pd.Timestamp(st.session_state.start_date):pd.Timestamp(st.session_state.end_date)] if not ticker_data_series.empty: long_pnl, short_pnl, avg_long_trade, avg_short_trade, results_df, trades, open_trades = run_backtest(ticker_data_series, manual_params, st.session_state.use_rsi, st.session_state.use_vol, st.session_state.use_trend, st.session_state.use_volume, st.session_state.rsi_w, st.session_state.vol_w, st.session_state.trend_w, st.session_state.volume_w, veto_setup=veto_to_use) st.session_state.single_ticker_results = {"long_pnl": long_pnl, "short_pnl": short_pnl, "avg_long_trade": avg_long_trade, "avg_short_trade": avg_short_trade, "results_df": results_df, "trades": trades} if open_trades: st.session_state.open_trades_df = pd.DataFrame(open_trades) else: st.session_state.open_trades_df = pd.DataFrame() else: st.warning("No data for this ticker in the selected date range.") elif st.session_state.run_mode == "Analyse Full List": summary_results, all_open_trades = [], [] progress_bar = st.progress(0, text="Starting analysis...") for i, ticker_symbol in enumerate(ticker_list): progress_bar.progress((i + 1) / len(ticker_list), text=f"Analysing {ticker_symbol}...") cols_to_use = [ticker_symbol] if f'{ticker_symbol}_Volume' in master_df.columns: cols_to_use.append(f'{ticker_symbol}_Volume') data_for_backtest = master_df[cols_to_use].rename(columns={ticker_symbol: 'Close', f'{ticker_symbol}_Volume': 'Volume'}) ticker_data_series = data_for_backtest.loc[pd.Timestamp(st.session_state.start_date):pd.Timestamp(st.session_state.end_date)] if not ticker_data_series.empty: long_pnl, short_pnl, avg_long_trade, avg_short_trade, _, trades, open_trades = run_backtest(ticker_data_series, manual_params, st.session_state.use_rsi, st.session_state.use_vol, st.session_state.use_trend, st.session_state.use_volume, st.session_state.rsi_w, st.session_state.vol_w, st.session_state.trend_w, st.session_state.volume_w, veto_setup=veto_to_use) long_conf = np.mean([t['confidence'] for t in trades[0]]) if trades[0] else 0 short_conf = np.mean([t['confidence'] for t in trades[2]]) if trades[2] else 0 summary_results.append({"Ticker": ticker_symbol, "Cumulative Long P&L": long_pnl, "Avg Long Profit per Trade": avg_long_trade, "Num Long Trades": len(trades[0]), "Avg Long Confidence": long_conf, "Cumulative Short P&L": short_pnl, "Avg Short Profit per Trade": avg_short_trade, "Num Short Trades": len(trades[2]), "Avg Short Confidence": short_conf}) if open_trades: for trade in open_trades: trade['Ticker'] = ticker_symbol all_open_trades.append(trade) progress_bar.empty() if summary_results: st.session_state.summary_df = pd.DataFrame(summary_results).set_index('Ticker') else: st.warning("No trades found for any ticker with the current settings.") if all_open_trades: st.session_state.open_trades_df = pd.DataFrame(all_open_trades) else: st.session_state.open_trades_df = pd.DataFrame() st.session_state.run_analysis_button = False if st.session_state.get('run_advanced_advisor'): generate_advisor_report(master_df, main_content_placeholder) st.session_state.run_advanced_advisor = False # --- Main Display Area --- with main_content_placeholder.container(): if st.session_state.get('advisor_df') is not None: st.subheader("👨‍💼 Advanced Advisor: Open Positions from Top Setups") if not st.session_state.advisor_df.empty: st.dataframe(st.session_state.advisor_df.style.format({ "Current % P/L": "{:.2%}", "Date Open": "{:%Y-%m-%d}", "Start Confidence": "{:.0f}%", "Setup G/B Ratio": "{:.2f}", "Setup Avg Profit": "{:.2%}" })) else: st.info("No open positions found matching the criteria.") elif st.session_state.get('confidence_results_df') is not None and not st.session_state.confidence_results_df.empty: st.subheader("📊 Confidence Setup Optimisation Results") display_df = st.session_state.confidence_results_df.head(60) st.dataframe(display_df.style.format({ "Avg Profit/Trade": "{:.2%}", "Good/Bad Ratio": "{:.2f}", "Avg Entry Conf.": "{:.1f}%", "Good Score": "{:.4f}", "Bad Score": "{:.4f}", "Norm. Score %": "{:.2f}%" })) elif st.session_state.get('single_ticker_results') is not None: res = st.session_state.single_ticker_results st.subheader(f"Results for {st.session_state.get('ticker_select')}") c1, c2, c3, c4 = st.columns(4); c1.metric("Cumulative Long P&L", f"{res['long_pnl']:.2%}"); c2.metric("Avg Long Trade P&L", f"{res['avg_long_trade']:.2%}"); c3.metric("Cumulative Short P&L", f"{res['short_pnl']:.2%}"); c4.metric("Avg Short Trade P&L", f"{res['avg_short_trade']:.2%}") if res['results_df'] is not None: st.plotly_chart(generate_long_plot(res['results_df'], res['trades'], st.session_state.get('ticker_select')), use_container_width=True) st.plotly_chart(generate_short_plot(res['results_df'], res['trades'], st.session_state.get('ticker_select')), use_container_width=True) elif st.session_state.get('summary_df') is not None and not st.session_state.summary_df.empty: display_summary_analytics(st.session_state.summary_df) st.subheader("Results per Ticker") if st.checkbox("Only show tickers with trades", value=True): display_df = st.session_state.summary_df[(st.session_state.summary_df['Num Long Trades'] > 0) | (st.session_state.summary_df['Num Short Trades'] > 0)] else: display_df = st.session_state.summary_df st.dataframe(display_df.style.format({"Cumulative Long P&L": "{:.2%}", "Avg Long Profit per Trade": "{:.2%}", "Cumulative Short P&L": "{:.2%}", "Avg Short Profit per Trade": "{:.2%}", "Avg Long Confidence": "{:.0f}%", "Avg Short Confidence": "{:.0f}%"})) if st.session_state.get('open_trades_df') is not None and not st.session_state.open_trades_df.empty: st.subheader("👨‍💼 Advisor: Currently Open Positions (Manual Run)") display_open_df = st.session_state.open_trades_df.copy() st.dataframe(display_open_df.style.format({"Date Open": "{:%Y-%m-%d}", "Start Confidence": "{:.0f}%", "Current % P/L": "{:.2%}"})) st.markdown("---") st.info("Want to see open trades from a wider range of top strategies?") if st.button("Run Advanced Advisor Report"): st.session_state.run_advanced_advisor = True st.rerun() def apply_best_params_to_widgets(): bp = st.session_state.get('best_params'); if not bp: return st.session_state.ma_period, st.session_state.bb_period, st.session_state.bb_std = bp.get("large_ma_period"), bp.get("bband_period"), bp.get("bband_std_dev") st.session_state.long_sl, st.session_state.short_sl = bp.get("long_stop_loss_pct") * 100, bp.get("short_stop_loss_pct") * 100 st.session_state.long_delay, st.session_state.short_delay = bp.get("long_delay_days"), bp.get("short_delay_days") st.session_state.long_entry, st.session_state.short_entry = bp.get("long_entry_threshold_pct") * 100, bp.get("short_entry_threshold_pct") * 100 st.session_state.long_exit, st.session_state.short_exit = bp.get("long_exit_ma_threshold_pct") * 100, bp.get("short_exit_ma_threshold_pct") * 100 st.session_state.confidence_slider = bp.get("confidence_threshold") st.session_state.best_params = None if st.session_state.get('best_params'): st.button("⬇️ Load Optimal Parameters into Manual Settings", on_click=apply_best_params_to_widgets) if st.session_state.get('worst_confidence_setup'): if st.button("Apply Worst Setup as Veto Filter"): st.session_state.veto_setup = st.session_state.worst_confidence_setup st.session_state.worst_confidence_setup = None st.rerun() if __name__ == "__main__": main()