import streamlit as st import numpy as np import pandas as pd import plotly.graph_objects as go from datetime import datetime, timedelta import yfinance as yf # Set wide page layout and page title st.set_page_config(layout="wide", page_title="Self-Tuning SuperTrend and K-Means") # App title and purpose explanation st.title("Self-Tuning SuperTrend and K-Means") st.write("This tool builds a self-tuning SuperTrend indicator using k-means clustering to adapt stop levels and generate buy/sell signals. It uses daily price data to compute volatility, score multiple configurations, and select the best one in real time.") # Methodology expander (closed by default) with st.expander("Methodology", expanded=False): #st.write("The tool self-tunes the SuperTrend indicator by testing multiple configurations and picking the best one.") #st.write("**Step 1: Data Preparation**") #st.write("Daily price data is downloaded. Key columns (High, Low, Close) are retained and prepped.") st.write("**Volatility Measurement**") st.write("An Average True Range (ATR) is computed using an Exponential Moving Average (EMA).") st.latex(r'\text{ATR} = \text{EMA}\left(\max\left\{High-Low,\ |High-\text{PrevClose}|,\ |Low-\text{PrevClose}|\right\}\right)') st.write("**Generating SuperTrend Variants**") st.write("Multiple SuperTrend signals are calculated. They use:") st.latex(r'\text{Upper Band} = hl2 + ATR \times \text{factor}') st.latex(r'\text{Lower Band} = hl2 - ATR \times \text{factor}') st.write("**Performance Scoring & Clustering**") st.write("Each variant is scored based on price movement. K-means (k=3) clusters these scores into Best, Average, and Worst groups.") st.write("**Final Signal Generation**") st.write("The indicator is recomputed using the average factor from the selected cluster. This gives a self-calibrated trading signal.") st.write("This process minimizes manual tuning and adapts with recent price action.") st.write("For more details, see the this article [here](https://entreprenerdly.com/trading-signals-with-adaptive-supertrend-and-k-means/).") # Sidebar inputs explanation st.write("#### Adjustable Inputs & Implications") st.write(""" - **Ticker:** The stock symbol to analyze. Changing this lets you switch assets. - **Start Date & End Date:** Define the analysis window. End Date defaults to today plus one day. - **ATR Length:** Sets the period for ATR. Lower values react faster; higher values smooth out noise. - **Minimum/Maximum Multipliers & Step:** Define the range for SuperTrend sensitivity. Smaller steps improve resolution but increase compute time. - **Performance Alpha:** Determines the smoothness of the performance score. Lower makes the metric more reactive; higher favors stability. - **K-Means Options (From Cluster & Max Iterations):** Choose which cluster (Best, Average, Worst) to use and limit iterations for clustering. This controls how variants are grouped. """) # Sidebar inputs with st.sidebar: st.header("Input Parameters") # Data inputs expander with st.expander("Data Inputs", expanded=True): ticker = st.text_input( "Ticker", value="ASML", help="Enter the stock symbol to analyze. Example: AAPL, MSFT, NVDA. This determines which asset's data will be used." ) start_date = st.date_input( "Start Date", value=datetime(2022, 1, 1), help="Start of the historical data window. Affects the amount of price history used to compute signals." ) default_end_date = datetime.today() + timedelta(days=1) end_date = st.date_input( "End Date", value=default_end_date, help="End of the data window. Automatically set to today + 1 to include the most recent bar." ) # Methodology parameters expander with st.expander("Methodology Parameters", expanded=True): atr_length = st.number_input( "ATR Length", min_value=1, value=7, step=1, help="ATR period controls how volatility is measured. Lower values make the stop more sensitive to short-term moves. Higher values smooth noise but may react slower." ) min_mult = st.number_input( "Minimum Multiplier", value=1.0, step=0.1, help="Defines the tightest SuperTrend stop. Lower values mean tighter stops, which react quickly but may whipsaw in choppy conditions." ) max_mult = st.number_input( "Maximum Multiplier", value=5.0, step=0.1, help="Defines the widest SuperTrend stop. Higher values give more breathing room but may delay trend changes." ) step_mult = st.number_input( "Step", value=0.5, step=0.1, help="Step size between multipliers. Smaller values give finer resolution but increase compute time." ) perf_alpha = st.number_input( "Performance Alpha", min_value=1, value=8, step=1, help="Controls how quickly the performance metric responds to new price behavior. Lower = more reactive, higher = more stable but slower to adapt." ) from_cluster = st.selectbox( "From Cluster", options=["Best", "Average", "Worst"], help="Selects which k-means cluster to use for final signal generation. 'Best' is typical for trend-following. 'Average' or 'Worst' can simulate conservative or contrarian behavior." ) max_iter = st.number_input( "Max Iterations", min_value=1, value=1000, step=1, help="Upper limit on how long k-means clustering can run. Higher values allow more precise convergence but slow down the run time." ) # Action button run_analysis = st.button("Run Analysis") if run_analysis: # Validate date input if start_date >= end_date: st.error("Start Date must be before End Date.") else: with st.spinner("Running analysis..."): try: # Convert dates to string format start_date_str = start_date.strftime("%Y-%m-%d") end_date_str = end_date.strftime("%Y-%m-%d") # 1) Download data df = yf.download(ticker, start=start_date_str, end=end_date_str, interval="1d", auto_adjust=False) if df.empty: st.error("No data returned from the data provider.") st.stop() if isinstance(df.columns, pd.MultiIndex): df.columns = df.columns.get_level_values(0) df.rename(columns={"Open": "Open", "High": "High", "Low": "Low", "Close": "Close", "Volume": "Volume"}, inplace=True) df.dropna(subset=["High", "Low", "Close"], inplace=True) df["hl2"] = (df["High"] + df["Low"]) / 2.0 # 2) Compute ATR df["prev_close"] = df["Close"].shift(1) df["tr1"] = df["High"] - df["Low"] df["tr2"] = (df["High"] - df["prev_close"]).abs() df["tr3"] = (df["Low"] - df["prev_close"]).abs() df["tr"] = df[["tr1", "tr2", "tr3"]].max(axis=1) df["atr"] = df["tr"].ewm(alpha=2/(atr_length+1), adjust=False).mean() df.dropna(inplace=True) df.reset_index(drop=False, inplace=True) n = len(df) # Helper function: sign def sign(x): return np.where(x > 0, 1, np.where(x < 0, -1, 0)) # 3) Compute supertrend for each factor def compute_supertrend(df, factor, perf_alpha): arr_close = df["Close"].values arr_hl2 = df["hl2"].values arr_atr = df["atr"].values trend = np.zeros(n, dtype=int) upper = np.zeros(n, dtype=float) lower = np.zeros(n, dtype=float) output = np.zeros(n, dtype=float) perf = np.zeros(n, dtype=float) trend[0] = 1 if arr_close[0] > arr_hl2[0] else 0 upper[0] = arr_hl2[0] lower[0] = arr_hl2[0] output[0] = arr_hl2[0] perf[0] = 0.0 for i in range(1, n): up = arr_hl2[i] + arr_atr[i] * factor dn = arr_hl2[i] - arr_atr[i] * factor if arr_close[i] > upper[i-1]: trend[i] = 1 elif arr_close[i] < lower[i-1]: trend[i] = 0 else: trend[i] = trend[i-1] if arr_close[i-1] < upper[i-1]: upper[i] = min(up, upper[i-1]) else: upper[i] = up if arr_close[i-1] > lower[i-1]: lower[i] = max(dn, lower[i-1]) else: lower[i] = dn diff_sign = sign(arr_close[i-1] - output[i-1]) perf[i] = perf[i-1] + 2/(perf_alpha+1)*((arr_close[i] - arr_close[i-1]) * diff_sign - perf[i-1]) output[i] = lower[i] if trend[i] == 1 else upper[i] return { "trend": trend, "upper": upper, "lower": lower, "output": output, "perf": perf, "factor": factor } factors = np.arange(min_mult, max_mult + 0.0001, step_mult) st_results = [] for f in factors: st_results.append(compute_supertrend(df, f, perf_alpha)) perf_vals = np.array([res["perf"][-1] for res in st_results]) fact_vals = np.array([res["factor"] for res in st_results]) # 4) K-means clustering (k=3) def k_means(data, factors, k=3, max_iter=max_iter): c1, c2, c3 = np.percentile(data, [25, 50, 75]) centroids = np.array([c1, c2, c3]) for _ in range(max_iter): clusters = {0: [], 1: [], 2: []} cluster_factors = {0: [], 1: [], 2: []} for d, f in zip(data, factors): dist = np.abs(d - centroids) idx = dist.argmin() clusters[idx].append(d) cluster_factors[idx].append(f) new_centroids = np.array([np.mean(clusters[i]) if len(clusters[i]) > 0 else centroids[i] for i in range(3)]) if np.allclose(new_centroids, centroids): break centroids = new_centroids return clusters, cluster_factors, centroids clusters, cluster_factors, centroids = k_means(perf_vals, fact_vals, k=3, max_iter=max_iter) order = np.argsort(centroids) sorted_clusters = {i: clusters[j] for i, j in enumerate(order)} sorted_cluster_factors = {i: cluster_factors[j] for i, j in enumerate(order)} sorted_centroids = centroids[order] if from_cluster == "Best": chosen_index = 2 elif from_cluster == "Average": chosen_index = 1 else: chosen_index = 0 if len(sorted_cluster_factors[chosen_index]) > 0: target_factor = np.mean(sorted_cluster_factors[chosen_index]) else: target_factor = factors[-1] if len(sorted_clusters[chosen_index]) > 0: target_perf = np.mean(sorted_clusters[chosen_index]) else: target_perf = 0.0 # 5) Recompute final supertrend with target_factor st_final = compute_supertrend(df, target_factor, perf_alpha) ts = st_final["output"] os_arr = np.zeros(n, dtype=int) os_arr[0] = 1 if df["Close"].iloc[0] > st_final["upper"][0] else 0 for i in range(1, n): c = df["Close"].iloc[i] up = st_final["upper"][i] dn = st_final["lower"][i] if c > up: os_arr[i] = 1 elif c < dn: os_arr[i] = 0 else: os_arr[i] = os_arr[i-1] # Build an adaptive MA for the trailing stop den_close_diff = (df["Close"] - df["Close"].shift(1)).abs() den = den_close_diff.ewm(alpha=2/(perf_alpha+1), adjust=False).mean() den_val = den.iloc[-1] if den.iloc[-1] != 0 else 1e-9 perf_idx = max(target_perf, 0) / den_val perf_ama = np.zeros(n, dtype=float) perf_ama[0] = ts[0] for i in range(1, n): perf_ama[i] = perf_ama[i-1] + perf_idx * (ts[i] - perf_ama[i-1]) # 6) Build Plotly chart fig = go.Figure() fig.add_trace(go.Scatter( x=df["Date"], y=df["Close"], mode="lines", line=dict(color="silver", width=1.2), name="Close Price" )) ts_bull = np.where(os_arr == 1, ts, np.nan) ts_bear = np.where(os_arr == 0, ts, np.nan) fig.add_trace(go.Scatter( x=df["Date"], y=ts_bull, mode="lines", line=dict(color="teal", width=1.2), name="Bullish Stop" )) fig.add_trace(go.Scatter( x=df["Date"], y=ts_bear, mode="lines", line=dict(color="red", width=1.2), name="Bearish Stop" )) fig.add_trace(go.Scatter( x=df["Date"], y=perf_ama, mode="lines", line=dict(color="orange", width=1.0), opacity=0.7, name="Trailing Stop AMA" )) for i in range(1, n): if os_arr[i] != os_arr[i-1]: if os_arr[i] == 1: fig.add_trace(go.Scatter( x=[df["Date"].iloc[i]], y=[ts[i]], mode="markers", marker=dict(symbol="triangle-up", size=10, color="teal", line=dict(color="white", width=1)), name="Bullish Signal", showlegend=False )) else: fig.add_trace(go.Scatter( x=[df["Date"].iloc[i]], y=[ts[i]], mode="markers", marker=dict(symbol="triangle-down", size=10, color="red", line=dict(color="white", width=1)), name="Bearish Signal", showlegend=False )) fig.update_layout( title=f"SuperTrend (Clustering) - {ticker} [Factor ~ {target_factor:.2f}]", xaxis_title="Date", yaxis_title="Price", template="plotly_dark", width=1600, height=800 ) fig.update_xaxes(tickformat='%Y-%m-%d') # Display results st.markdown("### Analysis Results") st.write("The plot below shows the closing price with the SuperTrend indicators and signals.") st.plotly_chart(fig, use_container_width=True) except Exception as e: st.error("An error occurred during the analysis.") st.error(str(e)) # Hide default Streamlit style st.markdown( """ """, unsafe_allow_html=True )