Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import numpy as np | |
| import pandas as pd | |
| import plotly.graph_objects as go | |
| from datetime import datetime, timedelta | |
| import yfinance as yf | |
| # Set wide page layout and page title | |
| st.set_page_config(layout="wide", page_title="Self-Tuning SuperTrend and K-Means") | |
| # App title and purpose explanation | |
| st.title("Self-Tuning SuperTrend and K-Means") | |
| st.write("This tool builds a self-tuning SuperTrend indicator using k-means clustering to adapt stop levels and generate buy/sell signals. It uses daily price data to compute volatility, score multiple configurations, and select the best one in real time.") | |
| # Methodology expander (closed by default) | |
| with st.expander("Methodology", expanded=False): | |
| #st.write("The tool self-tunes the SuperTrend indicator by testing multiple configurations and picking the best one.") | |
| #st.write("**Step 1: Data Preparation**") | |
| #st.write("Daily price data is downloaded. Key columns (High, Low, Close) are retained and prepped.") | |
| st.write("**Volatility Measurement**") | |
| st.write("An Average True Range (ATR) is computed using an Exponential Moving Average (EMA).") | |
| st.latex(r'\text{ATR} = \text{EMA}\left(\max\left\{High-Low,\ |High-\text{PrevClose}|,\ |Low-\text{PrevClose}|\right\}\right)') | |
| st.write("**Generating SuperTrend Variants**") | |
| st.write("Multiple SuperTrend signals are calculated. They use:") | |
| st.latex(r'\text{Upper Band} = hl2 + ATR \times \text{factor}') | |
| st.latex(r'\text{Lower Band} = hl2 - ATR \times \text{factor}') | |
| st.write("**Performance Scoring & Clustering**") | |
| st.write("Each variant is scored based on price movement. K-means (k=3) clusters these scores into Best, Average, and Worst groups.") | |
| st.write("**Final Signal Generation**") | |
| st.write("The indicator is recomputed using the average factor from the selected cluster. This gives a self-calibrated trading signal.") | |
| st.write("This process minimizes manual tuning and adapts with recent price action.") | |
| st.write("For more details, see the this article [here](https://entreprenerdly.com/trading-signals-with-adaptive-supertrend-and-k-means/).") | |
| # Sidebar inputs explanation | |
| st.write("#### Adjustable Inputs & Implications") | |
| st.write(""" | |
| - **Ticker:** The stock symbol to analyze. Changing this lets you switch assets. | |
| - **Start Date & End Date:** Define the analysis window. End Date defaults to today plus one day. | |
| - **ATR Length:** Sets the period for ATR. Lower values react faster; higher values smooth out noise. | |
| - **Minimum/Maximum Multipliers & Step:** Define the range for SuperTrend sensitivity. Smaller steps improve resolution but increase compute time. | |
| - **Performance Alpha:** Determines the smoothness of the performance score. Lower makes the metric more reactive; higher favors stability. | |
| - **K-Means Options (From Cluster & Max Iterations):** Choose which cluster (Best, Average, Worst) to use and limit iterations for clustering. This controls how variants are grouped. | |
| """) | |
| # Sidebar inputs | |
| with st.sidebar: | |
| st.header("Input Parameters") | |
| # Data inputs expander | |
| with st.expander("Data Inputs", expanded=True): | |
| ticker = st.text_input( | |
| "Ticker", | |
| value="ASML", | |
| help="Enter the stock symbol to analyze. Example: AAPL, MSFT, NVDA. This determines which asset's data will be used." | |
| ) | |
| start_date = st.date_input( | |
| "Start Date", | |
| value=datetime(2022, 1, 1), | |
| help="Start of the historical data window. Affects the amount of price history used to compute signals." | |
| ) | |
| default_end_date = datetime.today() + timedelta(days=1) | |
| end_date = st.date_input( | |
| "End Date", | |
| value=default_end_date, | |
| help="End of the data window. Automatically set to today + 1 to include the most recent bar." | |
| ) | |
| # Methodology parameters expander | |
| with st.expander("Methodology Parameters", expanded=True): | |
| atr_length = st.number_input( | |
| "ATR Length", | |
| min_value=1, | |
| value=7, | |
| step=1, | |
| help="ATR period controls how volatility is measured. Lower values make the stop more sensitive to short-term moves. Higher values smooth noise but may react slower." | |
| ) | |
| min_mult = st.number_input( | |
| "Minimum Multiplier", | |
| value=1.0, | |
| step=0.1, | |
| help="Defines the tightest SuperTrend stop. Lower values mean tighter stops, which react quickly but may whipsaw in choppy conditions." | |
| ) | |
| max_mult = st.number_input( | |
| "Maximum Multiplier", | |
| value=5.0, | |
| step=0.1, | |
| help="Defines the widest SuperTrend stop. Higher values give more breathing room but may delay trend changes." | |
| ) | |
| step_mult = st.number_input( | |
| "Step", | |
| value=0.5, | |
| step=0.1, | |
| help="Step size between multipliers. Smaller values give finer resolution but increase compute time." | |
| ) | |
| perf_alpha = st.number_input( | |
| "Performance Alpha", | |
| min_value=1, | |
| value=8, | |
| step=1, | |
| help="Controls how quickly the performance metric responds to new price behavior. Lower = more reactive, higher = more stable but slower to adapt." | |
| ) | |
| from_cluster = st.selectbox( | |
| "From Cluster", | |
| options=["Best", "Average", "Worst"], | |
| help="Selects which k-means cluster to use for final signal generation. 'Best' is typical for trend-following. 'Average' or 'Worst' can simulate conservative or contrarian behavior." | |
| ) | |
| max_iter = st.number_input( | |
| "Max Iterations", | |
| min_value=1, | |
| value=1000, | |
| step=1, | |
| help="Upper limit on how long k-means clustering can run. Higher values allow more precise convergence but slow down the run time." | |
| ) | |
| # Action button | |
| run_analysis = st.button("Run Analysis") | |
| if run_analysis: | |
| # Validate date input | |
| if start_date >= end_date: | |
| st.error("Start Date must be before End Date.") | |
| else: | |
| with st.spinner("Running analysis..."): | |
| try: | |
| # Convert dates to string format | |
| start_date_str = start_date.strftime("%Y-%m-%d") | |
| end_date_str = end_date.strftime("%Y-%m-%d") | |
| # 1) Download data | |
| df = yf.download(ticker, start=start_date_str, end=end_date_str, interval="1d", auto_adjust=False) | |
| if df.empty: | |
| st.error("No data returned from the data provider.") | |
| st.stop() | |
| if isinstance(df.columns, pd.MultiIndex): | |
| df.columns = df.columns.get_level_values(0) | |
| df.rename(columns={"Open": "Open", "High": "High", "Low": "Low", "Close": "Close", "Volume": "Volume"}, inplace=True) | |
| df.dropna(subset=["High", "Low", "Close"], inplace=True) | |
| df["hl2"] = (df["High"] + df["Low"]) / 2.0 | |
| # 2) Compute ATR | |
| df["prev_close"] = df["Close"].shift(1) | |
| df["tr1"] = df["High"] - df["Low"] | |
| df["tr2"] = (df["High"] - df["prev_close"]).abs() | |
| df["tr3"] = (df["Low"] - df["prev_close"]).abs() | |
| df["tr"] = df[["tr1", "tr2", "tr3"]].max(axis=1) | |
| df["atr"] = df["tr"].ewm(alpha=2/(atr_length+1), adjust=False).mean() | |
| df.dropna(inplace=True) | |
| df.reset_index(drop=False, inplace=True) | |
| n = len(df) | |
| # Helper function: sign | |
| def sign(x): | |
| return np.where(x > 0, 1, np.where(x < 0, -1, 0)) | |
| # 3) Compute supertrend for each factor | |
| def compute_supertrend(df, factor, perf_alpha): | |
| arr_close = df["Close"].values | |
| arr_hl2 = df["hl2"].values | |
| arr_atr = df["atr"].values | |
| trend = np.zeros(n, dtype=int) | |
| upper = np.zeros(n, dtype=float) | |
| lower = np.zeros(n, dtype=float) | |
| output = np.zeros(n, dtype=float) | |
| perf = np.zeros(n, dtype=float) | |
| trend[0] = 1 if arr_close[0] > arr_hl2[0] else 0 | |
| upper[0] = arr_hl2[0] | |
| lower[0] = arr_hl2[0] | |
| output[0] = arr_hl2[0] | |
| perf[0] = 0.0 | |
| for i in range(1, n): | |
| up = arr_hl2[i] + arr_atr[i] * factor | |
| dn = arr_hl2[i] - arr_atr[i] * factor | |
| if arr_close[i] > upper[i-1]: | |
| trend[i] = 1 | |
| elif arr_close[i] < lower[i-1]: | |
| trend[i] = 0 | |
| else: | |
| trend[i] = trend[i-1] | |
| if arr_close[i-1] < upper[i-1]: | |
| upper[i] = min(up, upper[i-1]) | |
| else: | |
| upper[i] = up | |
| if arr_close[i-1] > lower[i-1]: | |
| lower[i] = max(dn, lower[i-1]) | |
| else: | |
| lower[i] = dn | |
| diff_sign = sign(arr_close[i-1] - output[i-1]) | |
| perf[i] = perf[i-1] + 2/(perf_alpha+1)*((arr_close[i] - arr_close[i-1]) * diff_sign - perf[i-1]) | |
| output[i] = lower[i] if trend[i] == 1 else upper[i] | |
| return { | |
| "trend": trend, | |
| "upper": upper, | |
| "lower": lower, | |
| "output": output, | |
| "perf": perf, | |
| "factor": factor | |
| } | |
| factors = np.arange(min_mult, max_mult + 0.0001, step_mult) | |
| st_results = [] | |
| for f in factors: | |
| st_results.append(compute_supertrend(df, f, perf_alpha)) | |
| perf_vals = np.array([res["perf"][-1] for res in st_results]) | |
| fact_vals = np.array([res["factor"] for res in st_results]) | |
| # 4) K-means clustering (k=3) | |
| def k_means(data, factors, k=3, max_iter=max_iter): | |
| c1, c2, c3 = np.percentile(data, [25, 50, 75]) | |
| centroids = np.array([c1, c2, c3]) | |
| for _ in range(max_iter): | |
| clusters = {0: [], 1: [], 2: []} | |
| cluster_factors = {0: [], 1: [], 2: []} | |
| for d, f in zip(data, factors): | |
| dist = np.abs(d - centroids) | |
| idx = dist.argmin() | |
| clusters[idx].append(d) | |
| cluster_factors[idx].append(f) | |
| new_centroids = np.array([np.mean(clusters[i]) if len(clusters[i]) > 0 else centroids[i] for i in range(3)]) | |
| if np.allclose(new_centroids, centroids): | |
| break | |
| centroids = new_centroids | |
| return clusters, cluster_factors, centroids | |
| clusters, cluster_factors, centroids = k_means(perf_vals, fact_vals, k=3, max_iter=max_iter) | |
| order = np.argsort(centroids) | |
| sorted_clusters = {i: clusters[j] for i, j in enumerate(order)} | |
| sorted_cluster_factors = {i: cluster_factors[j] for i, j in enumerate(order)} | |
| sorted_centroids = centroids[order] | |
| if from_cluster == "Best": | |
| chosen_index = 2 | |
| elif from_cluster == "Average": | |
| chosen_index = 1 | |
| else: | |
| chosen_index = 0 | |
| if len(sorted_cluster_factors[chosen_index]) > 0: | |
| target_factor = np.mean(sorted_cluster_factors[chosen_index]) | |
| else: | |
| target_factor = factors[-1] | |
| if len(sorted_clusters[chosen_index]) > 0: | |
| target_perf = np.mean(sorted_clusters[chosen_index]) | |
| else: | |
| target_perf = 0.0 | |
| # 5) Recompute final supertrend with target_factor | |
| st_final = compute_supertrend(df, target_factor, perf_alpha) | |
| ts = st_final["output"] | |
| os_arr = np.zeros(n, dtype=int) | |
| os_arr[0] = 1 if df["Close"].iloc[0] > st_final["upper"][0] else 0 | |
| for i in range(1, n): | |
| c = df["Close"].iloc[i] | |
| up = st_final["upper"][i] | |
| dn = st_final["lower"][i] | |
| if c > up: | |
| os_arr[i] = 1 | |
| elif c < dn: | |
| os_arr[i] = 0 | |
| else: | |
| os_arr[i] = os_arr[i-1] | |
| # Build an adaptive MA for the trailing stop | |
| den_close_diff = (df["Close"] - df["Close"].shift(1)).abs() | |
| den = den_close_diff.ewm(alpha=2/(perf_alpha+1), adjust=False).mean() | |
| den_val = den.iloc[-1] if den.iloc[-1] != 0 else 1e-9 | |
| perf_idx = max(target_perf, 0) / den_val | |
| perf_ama = np.zeros(n, dtype=float) | |
| perf_ama[0] = ts[0] | |
| for i in range(1, n): | |
| perf_ama[i] = perf_ama[i-1] + perf_idx * (ts[i] - perf_ama[i-1]) | |
| # 6) Build Plotly chart | |
| fig = go.Figure() | |
| fig.add_trace(go.Scatter( | |
| x=df["Date"], | |
| y=df["Close"], | |
| mode="lines", | |
| line=dict(color="silver", width=1.2), | |
| name="Close Price" | |
| )) | |
| ts_bull = np.where(os_arr == 1, ts, np.nan) | |
| ts_bear = np.where(os_arr == 0, ts, np.nan) | |
| fig.add_trace(go.Scatter( | |
| x=df["Date"], | |
| y=ts_bull, | |
| mode="lines", | |
| line=dict(color="teal", width=1.2), | |
| name="Bullish Stop" | |
| )) | |
| fig.add_trace(go.Scatter( | |
| x=df["Date"], | |
| y=ts_bear, | |
| mode="lines", | |
| line=dict(color="red", width=1.2), | |
| name="Bearish Stop" | |
| )) | |
| fig.add_trace(go.Scatter( | |
| x=df["Date"], | |
| y=perf_ama, | |
| mode="lines", | |
| line=dict(color="orange", width=1.0), | |
| opacity=0.7, | |
| name="Trailing Stop AMA" | |
| )) | |
| for i in range(1, n): | |
| if os_arr[i] != os_arr[i-1]: | |
| if os_arr[i] == 1: | |
| fig.add_trace(go.Scatter( | |
| x=[df["Date"].iloc[i]], | |
| y=[ts[i]], | |
| mode="markers", | |
| marker=dict(symbol="triangle-up", size=10, color="teal", | |
| line=dict(color="white", width=1)), | |
| name="Bullish Signal", | |
| showlegend=False | |
| )) | |
| else: | |
| fig.add_trace(go.Scatter( | |
| x=[df["Date"].iloc[i]], | |
| y=[ts[i]], | |
| mode="markers", | |
| marker=dict(symbol="triangle-down", size=10, color="red", | |
| line=dict(color="white", width=1)), | |
| name="Bearish Signal", | |
| showlegend=False | |
| )) | |
| fig.update_layout( | |
| title=f"SuperTrend (Clustering) - {ticker} [Factor ~ {target_factor:.2f}]", | |
| xaxis_title="Date", | |
| yaxis_title="Price", | |
| template="plotly_dark", | |
| width=1600, | |
| height=800 | |
| ) | |
| fig.update_xaxes(tickformat='%Y-%m-%d') | |
| # Display results | |
| st.markdown("### Analysis Results") | |
| st.write("The plot below shows the closing price with the SuperTrend indicators and signals.") | |
| st.plotly_chart(fig, use_container_width=True) | |
| except Exception as e: | |
| st.error("An error occurred during the analysis.") | |
| st.error(str(e)) | |
| # Hide default Streamlit style | |
| st.markdown( | |
| """ | |
| <style> | |
| #MainMenu {visibility: hidden;} | |
| footer {visibility: hidden;} | |
| </style> | |
| """, | |
| unsafe_allow_html=True | |
| ) | |