Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| import yfinance as yf | |
| from datetime import datetime, timedelta | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from pattern_finder import score_downward_trend, score_candle, calculate_risk_reward | |
| import urllib3 | |
| from datetime import datetime, timedelta | |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
| def load_sp500_tickers(): | |
| """Load S&P 500 tickers from Wikipedia.""" | |
| url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies" | |
| response = requests.get(url, verify=False) | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| table = soup.find('table', {'id': 'constituents'}) | |
| tickers = [] | |
| if table: | |
| for row in table.find_all('tr')[1:]: | |
| cells = row.find_all('td') | |
| if cells: | |
| ticker = cells[0].text.strip() | |
| tickers.append(ticker) | |
| return tickers | |
| def load_data(ticker, interval="1d"): | |
| """Load stock data using yfinance with a specified interval.""" | |
| end_date = datetime.today() | |
| start_date = end_date - timedelta(days=365) # Get 1 year of data | |
| data = yf.download(ticker, start=start_date, end=end_date, interval=interval) | |
| return data | |
| def calculate_sma(data, window): | |
| """Calculate the Simple Moving Average (SMA) for a given window.""" | |
| return data['Close'].rolling(window=window).mean() | |
| def calculate_ema(data, window): | |
| """Calculate the Exponential Moving Average (EMA) for a given window.""" | |
| return data['Close'].ewm(span=window, adjust=False).mean() | |
| def average_downtrend(data, method, window=4): | |
| """Calculate the average difference between consecutive prices for the last 'window' candles.""" | |
| if len(data) < window: | |
| return 0.0 | |
| price_diffs = data[method].diff().iloc[-window:] | |
| avg_diff = price_diffs.mean() | |
| return avg_diff if avg_diff < 0 else 0.0 | |
| def score_candle(candle, prev_candle, trend_strength): | |
| """Score a single candle based on its characteristics and previous candle.""" | |
| open_price = candle['Open'] | |
| close_price = candle['Close'] | |
| low_price = candle['Low'] | |
| high_price = candle['High'] | |
| prev_close = prev_candle['Close'] | |
| # Bottom and top wick lengths | |
| bottom_wick_length = min(open_price, close_price) - low_price | |
| top_wick_length = high_price - max(open_price, close_price) | |
| # Initial score based on trend strength | |
| score = trend_strength * 2 | |
| # Doji: Open and Close are almost the same (small body) | |
| if abs(open_price - close_price) <= 0.1 * (high_price - low_price): # Adjust tolerance if needed | |
| score += 5 # Bonus points for doji candles | |
| # Hammer: Small body at the top, long bottom wick (typical reversal candle) | |
| if close_price < open_price and bottom_wick_length > 2 * (open_price - close_price): | |
| score += 7 # Extra points for hammer-like candles | |
| # Bottom Tailing Wick: Long bottom wick compared to the overall range | |
| if bottom_wick_length > 0.5 * (high_price - low_price): | |
| score += 6 # Extra points for bottom tailing wick | |
| # Additional logic: Boost red candles with long bottom wicks following a downtrend | |
| if close_price < open_price and bottom_wick_length > 0.5 * (open_price - close_price): | |
| score += 3 # Boost for red candle with long bottom wick | |
| # Penalize if the current close is higher than the previous close | |
| if close_price > prev_close: | |
| score -= ((close_price - prev_close) / prev_close) * 100 | |
| return score | |
| def score_today_candle(data, window=4): | |
| """Score today's candle based on the downtrend from the past 'window' days.""" | |
| if len(data) < window + 1: | |
| return 0 # Not enough data | |
| today_candle = data.iloc[-1] | |
| prev_candle = data.iloc[-2] | |
| close_price = today_candle['Close'] | |
| previous_data = data.iloc[-(window+1):-1] | |
| down_High = average_downtrend(previous_data, method="High",window=window) + average_downtrend(previous_data, method="High",window=7) / 2 | |
| down_Close = average_downtrend(previous_data, method="Close",window=window) + average_downtrend(previous_data, method="Close",window=7) / 2 | |
| avg_downtrend = (down_High + down_Close) / 2 | |
| if avg_downtrend == 0.0: | |
| return -1 | |
| # Calculate SMAs for the last row | |
| sma_50 = calculate_sma(data, window=50).iloc[-1] | |
| sma_200 = calculate_sma(data, window=200).iloc[-1] | |
| sma_20 = calculate_sma(data, window=20).iloc[-1] | |
| ema_10 = calculate_ema(data, window=10).iloc[-1] | |
| if (close_price < ema_10) or (close_price < sma_20) or (close_price < sma_50) or (close_price < sma_200): | |
| return -1 | |
| return score_candle(today_candle, prev_candle, abs(avg_downtrend)) | |
| def scan_sp500(top_n=25, interval="1d", progress=gr.Progress()): | |
| tickers = load_sp500_tickers() | |
| scores = [] | |
| tickers.append("QQQ") | |
| for i, ticker in enumerate(progress.tqdm(tickers)): | |
| data = load_data(ticker, interval) | |
| if not data.empty: | |
| score = score_today_candle(data) | |
| if score > 0: | |
| scores.append((ticker, score)) | |
| scores = sorted(scores, key=lambda x: x[1], reverse=True) | |
| return scores[:top_n] | |
| def next_business_day(date): | |
| next_day = date + timedelta(days=1) | |
| while next_day.weekday() >= 5: # 5 = Saturday, 6 = Sunday | |
| next_day += timedelta(days=1) | |
| return next_day | |
| def gradio_scan_sp500(top_n, interval, progress=gr.Progress()): | |
| progress(0, desc="Downloading Data") | |
| tickers = load_sp500_tickers() | |
| tickers.append("QQQ") | |
| progress(0.3, desc="Running Scanner") | |
| results = scan_sp500(top_n, interval, progress) | |
| # Get the last date of the data and find the next business day | |
| last_data = load_data(results[0][0], interval) # Load data for the first ticker in results | |
| last_date = last_data.index[-1].date() | |
| next_market_day = next_business_day(last_date) | |
| date_created = next_market_day.strftime("%Y-%m-%d") | |
| output = f"Scan Results for Market Open on: {date_created}\n\n" | |
| output += "Top {} stocks based on pattern finder score:\n\n".format(top_n) | |
| for ticker, score in results: | |
| output += "{}: Total Score = {:.2f}\n".format(ticker, score) | |
| return output | |
| iface = gr.Interface( | |
| fn=gradio_scan_sp500, | |
| inputs=[ | |
| gr.Slider(minimum=1, maximum=100, step=1, label="Number of top stocks to display", value=25), | |
| gr.Dropdown(choices=["1m", "1d", "1wk", "1mo"], label="Data Interval", value="1d"), | |
| ], | |
| outputs="text", | |
| title="S&P 500 Stock Scanner", | |
| description="Scan S&P 500 stocks and display top N stocks based on today's candle score.", | |
| allow_flagging="never", | |
| ) | |
| if __name__ == "__main__": | |
| iface.launch(server_name="0.0.0.0", server_port=7860) |