Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| def load_data(ticker): | |
| try: | |
| # Load data from CSV | |
| data = pd.read_csv(f'tickers/{ticker}.csv', index_col="Date", parse_dates=['Date']) | |
| data.sort_index(inplace=True) # Ensure data is sorted by date | |
| return data | |
| except FileNotFoundError: | |
| print(f"Data for {ticker} not found.") | |
| return None | |
| def sma(data, period): | |
| return data['Close'].rolling(window=period).mean() | |
| def ema(data, period): | |
| return data['Close'].ewm(span=period, adjust=False).mean() | |
| def score_downward_trend(data, window=4): | |
| """ | |
| Score the downward trend based on price action and volume. | |
| """ | |
| if len(data) < window: | |
| return 0 # Not enough data | |
| score = 0 | |
| for j in range(1, window): | |
| if data['High'].iloc[j] < data['High'].iloc[j-1]: | |
| score += 1 # Increment score for each lower high | |
| if data['Close'].iloc[j] < data['Close'].iloc[j-1]: | |
| score += 1 # Increment score for each lower close | |
| if data['Volume'].iloc[j] > data['Volume'].iloc[j-1]: | |
| score += 0.5 # Increment score for increasing volume during downtrend | |
| return score | |
| def score_candle(candle, data, i): | |
| """ | |
| Score the candle based on its pattern, volume, and position relative to moving averages. | |
| """ | |
| open_price, close_price, low_price, high_price = candle['Open'], candle['Close'], candle['Low'], candle['High'] | |
| prev_candle = data.iloc[i-1] | |
| score = 0 | |
| body = abs(close_price - open_price) | |
| bottom_wick_length = min(open_price, close_price) - low_price | |
| top_wick_length = high_price - max(open_price, close_price) | |
| # Add 16 points if candle is green and there's a significant gap | |
| if close_price > open_price and low_price > prev_candle['High']: | |
| score += 16 | |
| # Rest of the existing scoring logic | |
| if bottom_wick_length > 2 * body: | |
| score += 10 | |
| if abs(open_price - close_price) < (0.1 * (high_price - low_price)): | |
| score += 15 | |
| if bottom_wick_length > top_wick_length: | |
| score += 8 | |
| # Volume analysis | |
| if candle['Volume'] > data['Volume'].rolling(window=20).mean().iloc[i]: | |
| score += 5 | |
| # Moving average analysis | |
| ema_20 = ema(data.iloc[:i+1], 20).iloc[-1] | |
| sma_50 = sma(data.iloc[:i+1], 50).iloc[-1] | |
| sma_200 = sma(data.iloc[:i+1], 200).iloc[-1] | |
| if close_price > ema_20 and open_price < ema_20: | |
| score += 5 | |
| if close_price > sma_50: | |
| score += 3 | |
| if close_price > sma_200: | |
| score += 2 | |
| # Momentum indicator | |
| rsi = calculate_rsi(data.iloc[:i+1], period=14).iloc[-1] | |
| if rsi < 30: | |
| score += 5 | |
| penalty = 0 | |
| conditions_met = 0 | |
| if candle['High'] > prev_candle['High']: | |
| conditions_met += 1 | |
| if candle['Low'] > prev_candle['High']: | |
| conditions_met += 1 | |
| if candle['Close'] > max(prev_candle['Close'], prev_candle['Open']): | |
| conditions_met += 1 | |
| if candle['Open'] > max(prev_candle['Open'], prev_candle['Close']): | |
| conditions_met += 1 | |
| current_avg = (candle['Open'] + candle['Close'] + candle['High'] + candle['Low']) / 4 | |
| prev_avg = (prev_candle['Open'] + prev_candle['Close'] + prev_candle['High'] + prev_candle['Low']) / 4 | |
| if current_avg > prev_avg: | |
| conditions_met += 1 | |
| if conditions_met == 3: | |
| penalty = -10 | |
| elif conditions_met == 4: | |
| penalty = -12 | |
| elif conditions_met == 5: | |
| penalty = -17 | |
| score += penalty | |
| return score | |
| def calculate_rsi(data, period=14): | |
| delta = data['Close'].diff() | |
| gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() | |
| loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() | |
| rs = gain / loss | |
| return 100 - (100 / (1 + rs)) | |
| def calculate_risk_reward(data, entry_index, stop_loss_percent=0.02, target_percent=0.06): | |
| entry_price = data['Close'].iloc[entry_index] | |
| stop_loss = entry_price * (1 - stop_loss_percent) | |
| target = entry_price * (1 + target_percent) | |
| risk = entry_price - stop_loss | |
| reward = target - entry_price | |
| return reward / risk | |
| def find_reversal_patterns(data, window=4, candle_score_threshold=20, trend_score_threshold=5, risk_reward_threshold=2): | |
| patterns = [] | |
| for i in range(window, len(data)): | |
| trend_score = score_downward_trend(data.iloc[i-window:i], window=window) | |
| if trend_score >= trend_score_threshold: | |
| candle_score = score_candle(data.iloc[i], data, i) | |
| if candle_score >= candle_score_threshold: | |
| risk_reward = calculate_risk_reward(data, i) | |
| if risk_reward >= risk_reward_threshold: | |
| format_date = data.index[i].strftime('%Y-%m-%d') | |
| patterns.append((format_date, trend_score, candle_score, risk_reward)) | |
| return patterns | |
| def back_reversal_finder(data, window=4, candle_score_threshold=20, trend_score_threshold=4.5, risk_reward_threshold=1.5): | |
| patterns = [] | |
| for i in range(window, len(data)): | |
| trend_score = score_downward_trend(data.iloc[i-window:i], window=window) | |
| if trend_score >= trend_score_threshold: | |
| candle_score = score_candle(data.iloc[i], data, i) | |
| if candle_score >= candle_score_threshold: | |
| risk_reward = calculate_risk_reward(data, i) | |
| if risk_reward >= risk_reward_threshold: | |
| format_date = data.index[i].strftime('%Y-%m-%d') | |
| patterns.append(format_date) | |
| return patterns | |
| def check_for_reversal_patterns(ticker, window=4, candle_score_threshold=20, trend_score_threshold=5, risk_reward_threshold=2): | |
| data = load_data(ticker) | |
| if data is None: | |
| return | |
| patterns = find_reversal_patterns(data, window=window, candle_score_threshold=candle_score_threshold, | |
| trend_score_threshold=trend_score_threshold, risk_reward_threshold=risk_reward_threshold) | |
| if patterns: | |
| print(f"{ticker}: Potential reversal patterns found:") | |
| for date, trend_score, candle_score, risk_reward in patterns: | |
| print(f"Date: {date}, Trend Score: {trend_score:.2f}, Candle Score: {candle_score:.2f}, Risk-Reward: {risk_reward:.2f}") | |
| else: | |
| print(f"{ticker}: No clear reversal patterns detected.") | |
| def main(): | |
| ticker = input("Enter Ticker: ").upper() | |
| check_for_reversal_patterns(ticker) | |
| if __name__ == '__main__': | |
| main() |