Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import numpy as np | |
| import pandas as pd | |
| import yfinance as yf | |
| import plotly.graph_objects as go | |
| import datetime | |
| # Set wide page layout | |
| st.set_page_config( | |
| page_title="Pattern Recognition with KNN and Lorentzian Distance", | |
| layout="wide" | |
| ) | |
| # --- Sidebar Inputs --- | |
| st.sidebar.title("Input Parameters") | |
| with st.sidebar.expander("Data Parameters", expanded=True): | |
| ticker = st.text_input("Ticker", value="ASML.AS", help="Enter the ticker symbol.") | |
| start_date = st.date_input("Start Date", value=datetime.date(2022, 1, 1), help="Select start date for daily data.") | |
| end_date = st.date_input("End Date", value=datetime.date.today() + datetime.timedelta(days=1), help="Select end date for daily data.") | |
| with st.sidebar.expander("Model Parameters", expanded=True): | |
| neighborsCount = st.number_input( | |
| "KNN Neighbors Count", | |
| value=100, | |
| min_value=1, | |
| step=1, | |
| help="Higher = smoother signals, lower = more reactive." | |
| ) | |
| maxBarsBack = st.number_input( | |
| "Lookback Bars", | |
| value=500, | |
| min_value=1, | |
| step=1, | |
| help="How far back to search for similar patterns. Longer = more data, shorter = more recent context." | |
| ) | |
| window_length = st.number_input( | |
| "Window Length", | |
| value=5, | |
| min_value=1, | |
| step=1, | |
| help="Number of bars per pattern. Longer = more structure, shorter = more sensitivity." | |
| ) | |
| barLookahead = st.number_input( | |
| "Bar Lookahead", | |
| value=4, | |
| min_value=1, | |
| step=1, | |
| help="How far ahead to judge outcomes. Longer = trend focus, shorter = short-term bias." | |
| ) | |
| run_button = st.sidebar.button("Run Analysis") | |
| # --- Title and Theory --- | |
| st.title("Pattern Recognition via Unsupervised KNN") | |
| st.markdown("#### **Compare recent market behavior to similar historical setups.**") | |
| st.write("This tool leverages historical price patterns to generate trading signals via self-supervised pattern recall. Instead of training a model, it compares the current market state to similar past conditions using a custom KNN approach with Lorentzian distance.") | |
| with st.expander("Methodology", expanded=False): | |
| st.write( | |
| """ | |
| **Market State Representation** | |
| Each trading day is represented by a feature vector built from 5 technical indicators: | |
| - **RSI:** Measures momentum. | |
| - **Wave Trend Oscillator:** Smooths price data. | |
| - **CCI:** Quantifies deviation from a moving average. | |
| - **ADX:** Assesses trend strength. | |
| - **Short-Term RSI:** Captures faster momentum. | |
| A sliding window (default: 5 bars) forms a flattened feature vector (5 indicators × 5 bars = 25 values) that captures short-term behavior. | |
| **Lorentzian Distance** | |
| Similarity between market states is measured using Lorentzian distance: | |
| $$ | |
| d(a, b) = \\sum_{i=1}^{N} \\log\\Big(1 + \\left|a_i - b_i\\right|\\Big) | |
| $$ | |
| This function reduces the impact of extreme differences, making it robust to outliers. | |
| **KNN-Based Signal Generation** | |
| For each new market state, the tool: | |
| 1. Compares its feature vector to past states within a user-defined lookback window. | |
| 2. Selects the \(k\) nearest neighbors (default: 100) using Lorentzian distance. | |
| 3. Retrieves future price movement labels (over the next 4 bars by default): | |
| - \(+1\) if the price rises. | |
| - \(-1\) if the price falls. | |
| - \(0\) if the price remains flat. | |
| 4. Sums these labels to create a directional score: | |
| - A positive sum indicates a long signal. | |
| - A negative sum indicates a short signal. | |
| - A zero sum retains the previous signal. | |
| **User Adjustable Variables** | |
| You can adjust the following parameters in the sidebar. Each one controls how the pattern recognition behaves: | |
| - **KNN Neighbors Count:** | |
| Sets how many similar past patterns to compare against. | |
| - Higher values smooth the signal and reduce noise. | |
| - Lower values make it more reactive but may introduce false signals. | |
| - **Lookback Bars:** | |
| Defines how far back in history to search for similar patterns. | |
| - A longer lookback gives more pattern variety but may include outdated behavior. | |
| - A shorter lookback limits comparisons to recent market regimes. | |
| - **Window Length:** | |
| Determines how many consecutive bars are used to form each pattern (i.e., feature vector). | |
| - Longer windows capture broader structure but reduce signal frequency. | |
| - Shorter windows react faster but may miss context. | |
| - **Bar Lookahead:** | |
| Controls how far ahead the tool checks to define “what happened” after each past setup. | |
| - A longer lookahead focuses on trend outcomes. | |
| - A shorter lookahead favors short-term price moves. | |
| """ | |
| ) | |
| if run_button: | |
| try: | |
| with st.spinner("Running analysis..."): | |
| # --- Download Data --- | |
| df = yf.download(ticker, start=start_date, end=end_date, interval="1d") | |
| if df.empty: | |
| st.error("No data returned. Please check your inputs.") | |
| st.stop() | |
| if isinstance(df.columns, pd.MultiIndex): | |
| df.columns = df.columns.get_level_values(0) | |
| df.rename(columns={"Open": "Open", "High": "High", "Low": "Low", "Close": "Close", "Volume": "Volume"}, inplace=True) | |
| df.dropna(subset=["Close", "High", "Low"], inplace=True) | |
| df["Date"] = df.index | |
| df.reset_index(drop=True, inplace=True) | |
| n = len(df) | |
| # --- Indicator Functions --- | |
| def rsi(series, length=14): | |
| delta = series.diff() | |
| gain = delta.clip(lower=0) | |
| loss = -delta.clip(upper=0) | |
| avg_gain = gain.ewm(alpha=1/length, adjust=False).mean() | |
| avg_loss = loss.ewm(alpha=1/length, adjust=False).mean() | |
| rs = avg_gain / avg_loss | |
| return 100 - (100 / (1 + rs)) | |
| def wave_trend(hlc3, n1=10, n2=11): | |
| esa = hlc3.ewm(span=n1, adjust=False).mean() | |
| d = abs(hlc3 - esa).ewm(span=n1, adjust=False).mean() | |
| ci = (hlc3 - esa) / (0.015 * d) | |
| wt = ci.ewm(span=n2, adjust=False).mean() | |
| return wt | |
| def cci(series, length=20): | |
| ma = series.rolling(length).mean() | |
| md = (series - ma).abs().rolling(length).mean() | |
| return (series - ma) / (0.015 * md) | |
| def adx(df, length=14): | |
| high = df["High"] | |
| low = df["Low"] | |
| close = df["Close"] | |
| plus_dm = (high - high.shift(1)).clip(lower=0) | |
| minus_dm = (low.shift(1) - low).clip(lower=0) | |
| plus_dm[plus_dm < minus_dm] = 0 | |
| minus_dm[minus_dm <= plus_dm] = 0 | |
| tr1 = df["High"] - df["Low"] | |
| tr2 = abs(df["High"] - close.shift(1)) | |
| tr3 = abs(df["Low"] - close.shift(1)) | |
| tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) | |
| atr = tr.ewm(alpha=1/length, adjust=False).mean() | |
| plus_di = 100 * (plus_dm.ewm(alpha=1/length, adjust=False).mean() / atr) | |
| minus_di = 100 * (minus_dm.ewm(alpha=1/length, adjust=False).mean() / atr) | |
| dx = 100 * abs(plus_di - minus_di) / (plus_di + minus_di) | |
| return dx.ewm(alpha=1/length, adjust=False).mean() | |
| # --- Build Features --- | |
| df["hlc3"] = (df["High"] + df["Low"] + df["Close"]) / 3.0 | |
| df["feat1"] = rsi(df["Close"], 14) | |
| df["feat2"] = wave_trend(df["hlc3"], 10, 11) | |
| df["feat3"] = cci(df["Close"], 20) | |
| df["feat4"] = adx(df, 14) | |
| df["feat5"] = rsi(df["Close"], 9) | |
| features = df[["feat1", "feat2", "feat3", "feat4", "feat5"]].to_numpy() | |
| features_windowed = np.array([ | |
| features[i - window_length + 1: i + 1].flatten() | |
| for i in range(window_length - 1, n) | |
| ]) | |
| n_window = features_windowed.shape[0] | |
| # --- Lorentzian Distance & KNN --- | |
| def lorentzian_distance(a, b): | |
| return np.sum(np.log1p(np.abs(a - b))) | |
| y_train = np.zeros(n, dtype=int) | |
| for i in range(n - barLookahead): | |
| if df["Close"].iloc[i + barLookahead] > df["Close"].iloc[i]: | |
| y_train[i] = 1 | |
| elif df["Close"].iloc[i + barLookahead] < df["Close"].iloc[i]: | |
| y_train[i] = -1 | |
| else: | |
| y_train[i] = 0 | |
| prediction_arr = np.zeros(n_window, dtype=float) | |
| for idx in range(n_window): | |
| global_idx = idx + window_length - 1 | |
| if global_idx < maxBarsBack: | |
| prediction_arr[idx] = 0 | |
| continue | |
| start_idx = max(0, idx - maxBarsBack) | |
| dist_list = [] | |
| idx_list = [] | |
| for j in range(start_idx, idx): | |
| d = lorentzian_distance(features_windowed[idx], features_windowed[j]) | |
| dist_list.append(d) | |
| idx_list.append(j) | |
| dist_list = np.array(dist_list) | |
| idx_list = np.array(idx_list) | |
| if len(dist_list) > 0: | |
| k = min(neighborsCount, len(dist_list)) | |
| nearest = np.argpartition(dist_list, k)[:k] | |
| neighbor_labels = y_train[idx_list[nearest] + window_length - 1] | |
| prediction_arr[idx] = neighbor_labels.sum() | |
| else: | |
| prediction_arr[idx] = 0 | |
| # --- Signal Logic --- | |
| signal = np.zeros(n_window, dtype=int) | |
| for idx in range(1, n_window): | |
| if prediction_arr[idx] > 0: | |
| signal[idx] = 1 | |
| elif prediction_arr[idx] < 0: | |
| signal[idx] = -1 | |
| else: | |
| signal[idx] = signal[idx - 1] | |
| startLong = np.zeros(n_window, dtype=bool) | |
| startShort = np.zeros(n_window, dtype=bool) | |
| for idx in range(1, n_window): | |
| startLong[idx] = (signal[idx] == 1) and (signal[idx - 1] != 1) | |
| startShort[idx] = (signal[idx] == -1) and (signal[idx - 1] != -1) | |
| n_long_signals = int(np.count_nonzero(startLong)) | |
| n_short_signals = int(np.count_nonzero(startShort)) | |
| # --- Build Plotly Chart --- | |
| fig = go.Figure() | |
| fig.add_trace(go.Scatter( | |
| x=df["Date"], | |
| y=df["Close"], | |
| mode='lines', | |
| line=dict(color="silver", width=1.2), | |
| name="Close Price" | |
| )) | |
| pos_x, pos_y = [], [] | |
| neg_x, neg_y = [], [] | |
| neu_x, neu_y = [], [] | |
| for idx in range(n_window): | |
| global_idx = idx + window_length - 1 | |
| x_date = df["Date"].iloc[global_idx] | |
| y_low = df["Low"].iloc[global_idx] | |
| y_high = df["High"].iloc[global_idx] | |
| if prediction_arr[idx] > 0: | |
| pos_x.extend([x_date, x_date, None]) | |
| pos_y.extend([y_low, y_high, None]) | |
| elif prediction_arr[idx] < 0: | |
| neg_x.extend([x_date, x_date, None]) | |
| neg_y.extend([y_low, y_high, None]) | |
| else: | |
| neu_x.extend([x_date, x_date, None]) | |
| neu_y.extend([y_low, y_high, None]) | |
| if pos_x: | |
| fig.add_trace(go.Scatter( | |
| x=pos_x, | |
| y=pos_y, | |
| mode="lines", | |
| line=dict(color="rgba(0,204,0,0.5)", width=1.0), | |
| name="Positive predictions" | |
| )) | |
| if neg_x: | |
| fig.add_trace(go.Scatter( | |
| x=neg_x, | |
| y=neg_y, | |
| mode="lines", | |
| line=dict(color="rgba(204,0,0,0.5)", width=1.0), | |
| name="Negative predictions" | |
| )) | |
| if neu_x: | |
| fig.add_trace(go.Scatter( | |
| x=neu_x, | |
| y=neu_y, | |
| mode="lines", | |
| line=dict(color="rgba(179,179,179,0.3)", width=1.0), | |
| name="Neutral predictions" | |
| )) | |
| long_x, long_y = [], [] | |
| short_x, short_y = [], [] | |
| for idx in range(1, n_window): | |
| global_idx = idx + window_length - 1 | |
| if startLong[idx]: | |
| long_x.append(df["Date"].iloc[global_idx]) | |
| long_y.append(df["Low"].iloc[global_idx] * 0.99) | |
| elif startShort[idx]: | |
| short_x.append(df["Date"].iloc[global_idx]) | |
| short_y.append(df["High"].iloc[global_idx] * 1.01) | |
| if long_x: | |
| fig.add_trace(go.Scatter( | |
| x=long_x, | |
| y=long_y, | |
| mode='markers', | |
| marker=dict(symbol="triangle-up", size=10, color="lime", line=dict(color="white", width=1)), | |
| name="Long Entry" | |
| )) | |
| if short_x: | |
| fig.add_trace(go.Scatter( | |
| x=short_x, | |
| y=short_y, | |
| mode='markers', | |
| marker=dict(symbol="triangle-down", size=10, color="red", line=dict(color="white", width=1)), | |
| name="Short Entry" | |
| )) | |
| fig.update_layout( | |
| template="plotly_dark", | |
| title=dict(text=f"{ticker} — KNN Signals via Lorentzian Distance ({start_date} to {end_date})", font=dict(color="white")), | |
| xaxis=dict(title="Date", tickformat="%Y-%m-%d", titlefont=dict(color="white"), tickfont=dict(color="white")), | |
| yaxis=dict(title="Price", titlefont=dict(color="white"), tickfont=dict(color="white")), | |
| legend=dict(font=dict(color="white")) | |
| ) | |
| fig.update_xaxes(showgrid=True, gridcolor="grey") | |
| fig.update_yaxes(showgrid=True, gridcolor="grey") | |
| # --- Output Only the Chart --- | |
| st.markdown("### Price and Signal Annotations") | |
| st.markdown( | |
| f""" | |
| The chart below shows **{ticker}** close price along with signals derived from historical pattern similarity. **Gray bars** mark the initial lookback window with no predictions. | |
| """ | |
| ) | |
| st.write(f"Long signals: {n_long_signals}, Short signals: {n_short_signals}.") | |
| st.plotly_chart(fig, use_container_width=True) | |
| except Exception: | |
| st.error("An error occurred during the analysis.") | |
| # Hide default Streamlit style | |
| st.markdown( | |
| """ | |
| <style> | |
| #MainMenu {visibility: hidden;} | |
| footer {visibility: hidden;} | |
| </style> | |
| """, | |
| unsafe_allow_html=True | |
| ) | |