| | import pandas as pd |
| | import numpy as np |
| | import yfinance as yf |
| | import streamlit as st |
| | from datetime import datetime, timedelta |
| | import pytz |
| |
|
| |
|
| |
|
| |
|
| | |
| | def fetch_stock_data(ticker, start_datetime, end_datetime): |
| | stock_data = yf.download(ticker, start=start_datetime, end=end_datetime) |
| | return stock_data |
| |
|
| | def fetch_stock_data(ticker, start_datetime, end_datetime): |
| | |
| | stock_data = yf.download(ticker, start=start_datetime, end=end_datetime) |
| |
|
| | |
| | df.reset_index(inplace=True) |
| |
|
| | return stock_data |
| |
|
| | |
| | def detect_head_shoulder(df, window=3): |
| | roll_window = window |
| | df['high_roll_max'] = df['High'].rolling(window=roll_window).max() |
| | df['low_roll_min'] = df['Low'].rolling(window=roll_window).min() |
| | mask_head_shoulder = ( |
| | (df['high_roll_max'] > df['High'].shift(1)) & |
| | (df['high_roll_max'] > df['High'].shift(-1)) & |
| | (df['High'] < df['High'].shift(1)) & |
| | (df['High'] < df['High'].shift(-1)) |
| | ) |
| | mask_inv_head_shoulder = ( |
| | (df['low_roll_min'] < df['Low'].shift(1)) & |
| | (df['low_roll_min'] < df['Low'].shift(-1)) & |
| | (df['Low'] > df['Low'].shift(1)) & |
| | (df['Low'] > df['Low'].shift(-1)) |
| | ) |
| | df['head_shoulder_pattern'] = np.nan |
| | df.loc[mask_head_shoulder, 'head_shoulder_pattern'] = 'Head and Shoulder' |
| | df.loc[mask_inv_head_shoulder, 'head_shoulder_pattern'] = 'Inverse Head and Shoulder' |
| | return df |
| |
|
| | |
| | def detect_multiple_tops_bottoms(df, window=3): |
| | roll_window = window |
| | df['high_roll_max'] = df['High'].rolling(window=roll_window).max() |
| | df['low_roll_min'] = df['Low'].rolling(window=roll_window).min() |
| | df['close_roll_max'] = df['Close'].rolling(window=roll_window).max() |
| | df['close_roll_min'] = df['Close'].rolling(window=roll_window).min() |
| | mask_top = (df['high_roll_max'] >= df['High'].shift(1)) & (df['close_roll_max'] < df['Close'].shift(1)) |
| | mask_bottom = (df['low_roll_min'] <= df['Low'].shift(1)) & (df['close_roll_min'] > df['Close'].shift(1)) |
| | df['multiple_top_bottom_pattern'] = np.nan |
| | df.loc[mask_top, 'multiple_top_bottom_pattern'] = 'Multiple Top' |
| | df.loc[mask_bottom, 'multiple_top_bottom_pattern'] = 'Multiple Bottom' |
| | return df |
| |
|
| | |
| | def calculate_support_resistance(df, window=3): |
| | roll_window = window |
| | std_dev = 2 |
| | df['high_roll_max'] = df['High'].rolling(window=roll_window).max() |
| | df['low_roll_min'] = df['Low'].rolling(window=roll_window).min() |
| | mean_high = df['High'].rolling(window=roll_window).mean() |
| | std_high = df['High'].rolling(window=roll_window).std() |
| | mean_low = df['Low'].rolling(window=roll_window).mean() |
| | std_low = df['Low'].rolling(window=roll_window).std() |
| | df['support'] = mean_low - std_dev * std_low |
| | df['resistance'] = mean_high + std_dev * std_high |
| | return df |
| |
|
| | |
| | def detect_triangle_pattern(df, window=3): |
| | roll_window = window |
| | df['high_roll_max'] = df['High'].rolling(window=roll_window).max() |
| | df['low_roll_min'] = df['Low'].rolling(window=roll_window).min() |
| | mask_asc = ( |
| | (df['high_roll_max'] >= df['High'].shift(1)) & |
| | (df['low_roll_min'] <= df['Low'].shift(1)) & |
| | (df['Close'] > df['Close'].shift(1)) |
| | ) |
| | mask_desc = ( |
| | (df['high_roll_max'] <= df['High'].shift(1)) & |
| | (df['low_roll_min'] >= df['Low'].shift(1)) & |
| | (df['Close'] < df['Close'].shift(1)) |
| | ) |
| | df['triangle_pattern'] = np.nan |
| | df.loc[mask_asc, 'triangle_pattern'] = 'Ascending Triangle' |
| | df.loc[mask_desc, 'triangle_pattern'] = 'Descending Triangle' |
| | return df |
| |
|
| | |
| | def detect_wedge(df, window=3): |
| | roll_window = window |
| | df['high_roll_max'] = df['High'].rolling(window=roll_window).max() |
| | df['low_roll_min'] = df['Low'].rolling(window=roll_window).min() |
| | df['trend_high'] = df['High'].rolling(window=roll_window).apply(lambda x: 1 if (x[-1]-x[0]) > 0 else -1 if (x[-1]-x[0]) < 0 else 0) |
| | df['trend_low'] = df['Low'].rolling(window=roll_window).apply(lambda x: 1 if (x[-1]-x[0]) > 0 else -1 if (x[-1]-x[0]) < 0 else 0) |
| | mask_wedge_up = ( |
| | (df['high_roll_max'] >= df['High'].shift(1)) & |
| | (df['low_roll_min'] <= df['Low'].shift(1)) & |
| | (df['trend_high'] == 1) & |
| | (df['trend_low'] == 1) |
| | ) |
| | mask_wedge_down = ( |
| | (df['high_roll_max'] <= df['High'].shift(1)) & |
| | (df['low_roll_min'] >= df['Low'].shift(1)) & |
| | (df['trend_high'] == -1) & |
| | (df['trend_low'] == -1) |
| | ) |
| | df['wedge_pattern'] = np.nan |
| | df.loc[mask_wedge_up, 'wedge_pattern'] = 'Wedge Up' |
| | df.loc[mask_wedge_down, 'wedge_pattern'] = 'Wedge Down' |
| | return df |
| |
|
| | |
| | def detect_channel(df, window=3): |
| | roll_window = window |
| | channel_range = 0.1 |
| | df['high_roll_max'] = df['High'].rolling(window=roll_window).max() |
| | df['low_roll_min'] = df['Low'].rolling(window=roll_window).min() |
| | df['trend_high'] = df['High'].rolling(window=roll_window).apply(lambda x: 1 if (x[-1]-x[0]) > 0 else -1 if (x[-1]-x[0]) < 0 else 0) |
| | df['trend_low'] = df['Low'].rolling(window=roll_window).apply(lambda x: 1 if (x[-1]-x[0]) > 0 else -1 if (x[-1]-x[0]) <0 else 0) |
| | mask_channel_up = ( |
| | (df['high_roll_max'] >= df['High'].shift(1)) & |
| | (df['low_roll_min'] <= df['Low'].shift(1)) & |
| | (df['high_roll_max'] - df['low_roll_min'] <= channel_range * (df['high_roll_max'] + df['low_roll_min'])/2) & |
| | (df['trend_high'] == 1) & |
| | (df['trend_low'] == 1) |
| | ) |
| | mask_channel_down = ( |
| | (df['high_roll_max'] <= df['High'].shift(1)) & |
| | (df['low_roll_min'] >= df['Low'].shift(1)) & |
| | (df['high_roll_max'] - df['low_roll_min'] <= channel_range * (df['high_roll_max'] + df['low_roll_min'])/2) & |
| | (df['trend_high'] == -1) & |
| | (df['trend_low'] == -1) |
| | ) |
| | df['channel_pattern'] = np.nan |
| | df.loc[mask_channel_up, 'channel_pattern'] = 'Channel Up' |
| | df.loc[mask_channel_down, 'channel_pattern'] = 'Channel Down' |
| | return df |
| |
|
| | |
| | def detect_double_top_bottom(df, window=3, threshold=0.05): |
| | roll_window = window |
| | range_threshold = threshold |
| | df['high_roll_max'] = df['High'].rolling(window=roll_window).max() |
| | df['low_roll_min'] = df['Low'].rolling(window=roll_window).min() |
| | mask_double_top = ( |
| | (df['high_roll_max'] >= df['High'].shift(1)) & |
| | (df['high_roll_max'] >= df['High'].shift(-1)) & |
| | (df['High'] < df['High'].shift(1)) & |
| | (df['High'] < df['High'].shift(-1)) & |
| | ((df['High'].shift(1) - df['Low'].shift(1)) <= range_threshold * (df['High'].shift(1) + df['Low'].shift(1))/2) & |
| | ((df['High'].shift(-1) - df['Low'].shift(-1)) <= range_threshold * (df['High'].shift(-1) + df['Low'].shift(-1))/2) |
| | ) |
| | mask_double_bottom = ( |
| | (df['low_roll_min'] <= df['Low'].shift(1)) & |
| | (df['low_roll_min'] <= df['Low'].shift(-1)) & |
| | (df['Low'] > df['Low'].shift(1)) & |
| | (df['Low'] > df['Low'].shift(-1)) & |
| | ((df['High'].shift(1) - df['Low'].shift(1)) <= range_threshold * (df['High'].shift(1) + df['Low'].shift(1))/2) & |
| | ((df['High'].shift(-1) - df['Low'].shift(-1)) <= range_threshold * (df['High'].shift(-1) + df['Low'].shift(-1))/2) |
| | ) |
| | df['double_pattern'] = np.nan |
| | df.loc[mask_double_top, 'double_pattern'] = 'Double Top' |
| | df.loc[mask_double_bottom, 'double_pattern'] = 'Double Bottom' |
| | return df |
| |
|
| | |
| | def detect_trendline(df, window=2): |
| | roll_window = window |
| | df['slope'] = np.nan |
| | df['intercept'] = np.nan |
| |
|
| | for i in range(window, len(df)): |
| | x = np.array(range(i-window, i)) |
| | y = df['Close'][i-window:i] |
| | A = np.vstack([x, np.ones(len(x))]).T |
| | m, c = np.linalg.lstsq(A, y, rcond=None)[0] |
| | df.at[df.index[i], 'slope'] = m |
| | df.at[df.index[i], 'intercept'] = c |
| |
|
| | mask_support = df['slope'] > 0 |
| | mask_resistance = df['slope'] < 0 |
| | df['support'] = np.nan |
| | df['resistance'] = np.nan |
| | df.loc[mask_support, 'support'] = df['Close'] * df['slope'] + df['intercept'] |
| | df.loc[mask_resistance, 'resistance'] = df['Close'] * df['slope'] + df['intercept'] |
| |
|
| | return df |
| |
|
| | |
| | def find_pivots(df): |
| | high_diffs = df['High'].diff() |
| | low_diffs = df['Low'].diff() |
| | higher_high_mask = (high_diffs > 0) & (high_diffs.shift(-1) < 0) |
| | lower_low_mask = (low_diffs < 0) & (low_diffs.shift(-1) > 0) |
| | lower_high_mask = (high_diffs < 0) & (high_diffs.shift(-1) > 0) |
| | higher_low_mask = (low_diffs > 0) & (low_diffs.shift(-1) < 0) |
| | df['signal'] = '' |
| | df.loc[higher_high_mask, 'signal'] = 'HH' |
| | df.loc[lower_low_mask, 'signal'] = 'LL' |
| | df.loc[lower_high_mask, 'signal'] = 'LH' |
| | df.loc[higher_low_mask, 'signal'] = 'HL' |
| | return df |
| |
|
| | |
| | def main(): |
| | st.title('Live Stock Pattern Detection App') |
| | ticker = st.text_input('Enter Stock Ticker:', 'AAPL') |
| | |
| | |
| | start_date_placeholder = st.empty() |
| | end_date_placeholder = st.empty() |
| |
|
| | start_date = start_date_placeholder.date_input('Start Date', pd.to_datetime('2022-01-01')) |
| | end_date = end_date_placeholder.date_input('End Date', pd.to_datetime('2022-02-01')) |
| |
|
| | st.info("Select Preferred Timezone:") |
| | preferred_timezone = st.selectbox('Timezone', list(pytz.all_timezones)) |
| |
|
| | |
| | start_hour, start_minute, start_second = st.slider('Select Start Time', 0, 23, 0), st.slider('', 0, 59, 0), st.slider('', 0, 59, 0) |
| | end_hour, end_minute, end_second = st.slider('Select End Time', 0, 23, 23), st.slider('', 0, 59, 59), st.slider('', 0, 59, 59) |
| |
|
| | start_datetime = datetime(start_date.year, start_date.month, start_date.day, start_hour, start_minute, start_second) |
| | end_datetime = datetime(end_date.year, end_date.month, end_date.day, end_hour, end_minute, end_second) |
| |
|
| | start_datetime = pytz.timezone(preferred_timezone).localize(start_datetime) |
| | end_datetime = pytz.timezone(preferred_timezone).localize(end_datetime) |
| |
|
| | start_date_placeholder.info(f"Start Date (UTC): {start_datetime.strftime('%Y-%m-%d %H:%M:%S %Z')}") |
| | end_date_placeholder.info(f"End Date (UTC): {end_datetime.strftime('%Y-%m-%d %H:%M:%S %Z')}") |
| |
|
| | if st.button('Detect Patterns'): |
| | stock_data = fetch_stock_data(ticker, start_datetime, end_datetime) |
| | stock_data = detect_head_shoulder(stock_data) |
| | stock_data = detect_multiple_tops_bottoms(stock_data) |
| | stock_data = calculate_support_resistance(stock_data) |
| | stock_data = detect_triangle_pattern(stock_data) |
| | stock_data = detect_wedge(stock_data) |
| | stock_data = detect_channel(stock_data) |
| | stock_data = detect_double_top_bottom(stock_data) |
| | stock_data = detect_trendline(stock_data) |
| | stock_data = find_pivots(stock_data) |
| |
|
| | st.write(stock_data) |
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|
| |
|