import streamlit as st import requests import pandas as pd import numpy as np import matplotlib.pyplot as plt import matplotlib.dates as mdates from datetime import datetime, timedelta import google.generativeai as genai import time import os from scipy.signal import argrelextrema # Set page config st.set_page_config( page_title="PNP Forex Trading Strategy", page_icon="📊", layout="wide", initial_sidebar_state="expanded" ) # Get API keys from secrets ALPHA_VANTAGE_API_KEY = os.environ.get('ALPHA_VANTAGE_API_KEY') GEMINI_API_KEY = os.environ.get('GEMINI_API_KEY') # Check if keys are available if not ALPHA_VANTAGE_API_KEY or not GEMINI_API_KEY: st.error("API keys not found. Please check your secrets configuration.") st.stop() # Configure Gemini AI genai.configure(api_key=GEMINI_API_KEY) model = genai.GenerativeModel('gemini-2.0-flash') class PNPForexTrader: """ PNP Strategy Forex Trading System Focused on detecting specific patterns like Double Tops/Bottoms and Head & Shoulders using 200 EMA on 5-minute and 15-minute timeframes """ def __init__(self, currency_pairs): self.currency_pairs = currency_pairs # Store the currency pairs self.data = {} # Raw and processed data for each pair self.patterns = {} # Detected patterns for each pair self.trades = {} # Trade signals for each pair self.risk_management = {} # Risk management parameters self.trading_hours = (10, 22) # Trading hours: 10 AM to 10 PM # PNP Strategy Parameters self.min_rrr = 1.5 # Minimum risk-reward ratio self.max_stoploss_pct = 0.30 # Maximum stop loss percentage self.min_shoulder_candles = 7 # Minimum candles in shoulder formation self.max_entry_candle_size_pct = 0.25 # Maximum entry candle size percentage def fetch_forex_data(self, symbol, interval="5min", days_back=10): """ Fetch historical forex data in 5-minute intervals We fetch both 5-minute data and aggregate to 15-minute for dual timeframe analysis """ try: # Calculate date ranges to_date = datetime.now().strftime('%Y-%m-%d') from_date = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d') # Fetch 5-minute data ticker = f"C:{symbol}" url = f"https://api.polygon.io/v2/aggs/ticker/{ticker}/range/5/minute/{from_date}/{to_date}?adjusted=true&sort=asc&limit=5000&apiKey={ALPHA_VANTAGE_API_KEY}" response = requests.get(url) if response.status_code == 200: data = response.json() if 'results' in data and data['results']: # Convert to DataFrame df_5min = pd.DataFrame(data['results']) # Rename columns to more readable format df_5min = df_5min.rename(columns={ 'o': 'open', 'h': 'high', 'l': 'low', 'c': 'close', 'v': 'volume', 't': 'timestamp' }) # Convert timestamp to datetime df_5min['timestamp'] = pd.to_datetime(df_5min['timestamp'], unit='ms') df_5min['hour'] = df_5min['timestamp'].dt.hour # Keep only data during trading hours (10 AM to 10 PM) df_5min = df_5min[(df_5min['hour'] >= self.trading_hours[0]) & (df_5min['hour'] < self.trading_hours[1])] # Calculate candle size as percentage df_5min['candle_size_pct'] = abs(df_5min['close'] - df_5min['open']) / df_5min['open'] * 100 # Add 200 EMA df_5min['ema_200'] = df_5min['close'].ewm(span=200, adjust=False).mean() # Create 15-minute aggregated data df_15min = self._aggregate_to_15min(df_5min) df_15min['ema_200'] = df_15min['close'].ewm(span=200, adjust=False).mean() # Add technical indicators self._add_technical_indicators(df_5min) self._add_technical_indicators(df_15min) # Store both timeframes self.data[symbol] = { '5min': df_5min, '15min': df_15min } st.success(f"Successfully fetched data for {symbol}") return True else: st.warning(f"No results found for {symbol}") return False else: st.error(f"Error fetching data for {symbol}: {response.status_code}") st.error(response.text) return False except Exception as e: st.error(f"Exception when fetching data for {symbol}: {e}") return False def _aggregate_to_15min(self, df_5min): """Aggregate 5-minute data to 15-minute timeframe""" # Create timestamp for 15-minute grouping df_5min['timestamp_15min'] = df_5min['timestamp'].dt.floor('15min') # Group by 15-minute intervals and aggregate df_15min = df_5min.groupby('timestamp_15min').agg({ 'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum', 'hour': 'first' }).reset_index() # Rename the timestamp column back df_15min = df_15min.rename(columns={'timestamp_15min': 'timestamp'}) # Calculate candle size percentage df_15min['candle_size_pct'] = abs(df_15min['close'] - df_15min['open']) / df_15min['open'] * 100 return df_15min def _add_technical_indicators(self, df): """Add technical indicators to the dataframe""" # RSI (Relative Strength Index) delta = df['close'].diff() gain = delta.where(delta > 0, 0) loss = -delta.where(delta < 0, 0) avg_gain = gain.rolling(window=14).mean() avg_loss = loss.rolling(window=14).mean() rs = avg_gain / avg_loss df['rsi'] = 100 - (100 / (1 + rs)) # MACD df['ema_12'] = df['close'].ewm(span=12, adjust=False).mean() df['ema_26'] = df['close'].ewm(span=26, adjust=False).mean() df['macd'] = df['ema_12'] - df['ema_26'] df['macd_signal'] = df['macd'].ewm(span=9, adjust=False).mean() df['macd_hist'] = df['macd'] - df['macd_signal'] # Bollinger Bands df['bb_middle'] = df['close'].rolling(window=20).mean() df['bb_std'] = df['close'].rolling(window=20).std() df['bb_upper'] = df['bb_middle'] + (df['bb_std'] * 2) df['bb_lower'] = df['bb_middle'] - (df['bb_std'] * 2) # Average True Range (ATR) for volatility measurement high_low = df['high'] - df['low'] high_close_prev = abs(df['high'] - df['close'].shift(1)) low_close_prev = abs(df['low'] - df['close'].shift(1)) true_range = pd.concat([high_low, high_close_prev, low_close_prev], axis=1).max(axis=1) df['atr'] = true_range.rolling(window=14).mean() return df def detect_all_patterns(self): """Detect all patterns for all currency pairs""" for symbol in self.data: st.info(f"Analyzing patterns for {symbol}...") self.detect_patterns(symbol) def detect_patterns(self, symbol): """Detect patterns for a specific currency pair according to PNP strategy""" if symbol not in self.data: st.warning(f"No data available for {symbol}") return False # Initialize pattern storage for this symbol self.patterns[symbol] = { 'double_tops': [], 'double_bottoms': [], 'head_and_shoulders': [], 'inv_head_and_shoulders': [] } # Get 5-minute and 15-minute dataframes df_5min = self.data[symbol]['5min'] df_15min = self.data[symbol]['15min'] # Detect patterns in 5-minute data st.info(f"Detecting Double Tops/Bottoms for {symbol}...") self._detect_double_tops_bottoms(symbol, df_5min, '5min') st.info(f"Detecting Head and Shoulders for {symbol}...") self._detect_head_and_shoulders(symbol, df_5min, '5min') # Validate patterns against 15-minute EMA self._validate_patterns_with_ema(symbol, df_5min, df_15min) # Generate trade signals based on validated patterns self._generate_trade_signals(symbol) return True def _find_swing_points(self, df, window=5): """Find swing highs and lows in the data""" # Find local maxima and minima df['swing_high'] = df.iloc[argrelextrema(df['high'].values, np.greater_equal, order=window)[0]]['high'] df['swing_low'] = df.iloc[argrelextrema(df['low'].values, np.less_equal, order=window)[0]]['low'] return df def _detect_double_tops_bottoms(self, symbol, df, timeframe): """ Detect Double Tops and Double Bottoms patterns Must have proper V shape in the center """ # Find swing points first df = self._find_swing_points(df.copy()) # Look for double tops (two similar highs with a lower valley in between) for i in range(len(df) - 20): # Look within a reasonable window # Skip if we don't have enough data ahead if i + 20 >= len(df): continue window_df = df.iloc[i:i+20] # Find swing highs in this window swing_highs = window_df.loc[~window_df['swing_high'].isna()] # Need at least 2 swing highs for double top if len(swing_highs) >= 2: for j in range(len(swing_highs) - 1): # Get two consecutive swing highs first_high = swing_highs.iloc[j] second_high = swing_highs.iloc[j+1] # Calculate the difference between the two highs diff_pct = abs(second_high['high'] - first_high['high']) / first_high['high'] * 100 # Check if they're similar (within 0.1% of each other) if diff_pct < 0.1: # Find the low point between these highs idx1 = first_high.name idx2 = second_high.name between_df = df.loc[idx1:idx2] if len(between_df) < 3: # Need at least 3 candles between continue # Find the lowest point between the two highs valley = between_df['low'].min() valley_idx = between_df['low'].idxmin() # Check if it's a proper V shape (valley at least 0.1% lower than peaks) valley_diff_pct = (first_high['high'] - valley) / valley * 100 if valley_diff_pct > 0.1: # This is a potential double top pattern_data = { 'type': 'double_top', 'first_high_idx': idx1, 'valley_idx': valley_idx, 'second_high_idx': idx2, 'first_high': first_high['high'], 'valley': valley, 'second_high': second_high['high'], 'timeframe': timeframe } self.patterns[symbol]['double_tops'].append(pattern_data) # Look for double bottoms (two similar lows with a higher peak in between) for i in range(len(df) - 20): # Skip if we don't have enough data ahead if i + 20 >= len(df): continue window_df = df.iloc[i:i+20] # Find swing lows in this window swing_lows = window_df.loc[~window_df['swing_low'].isna()] # Need at least 2 swing lows for double bottom if len(swing_lows) >= 2: for j in range(len(swing_lows) - 1): # Get two consecutive swing lows first_low = swing_lows.iloc[j] second_low = swing_lows.iloc[j+1] # Calculate the difference between the two lows diff_pct = abs(second_low['low'] - first_low['low']) / first_low['low'] * 100 # Check if they're similar (within 0.1% of each other) if diff_pct < 0.1: # Find the high point between these lows idx1 = first_low.name idx2 = second_low.name between_df = df.loc[idx1:idx2] if len(between_df) < 3: # Need at least 3 candles between continue # Find the highest point between the two lows peak = between_df['high'].max() peak_idx = between_df['high'].idxmax() # Check if it's a proper V shape (peak at least 0.1% higher than valleys) peak_diff_pct = (peak - first_low['low']) / first_low['low'] * 100 if peak_diff_pct > 0.1: # This is a potential double bottom pattern_data = { 'type': 'double_bottom', 'first_low_idx': idx1, 'peak_idx': peak_idx, 'second_low_idx': idx2, 'first_low': first_low['low'], 'peak': peak, 'second_low': second_low['low'], 'timeframe': timeframe } self.patterns[symbol]['double_bottoms'].append(pattern_data) def _detect_head_and_shoulders(self, symbol, df, timeframe): """ Detect Head and Shoulders and Inverted Head and Shoulders patterns Must have at least 7-8 candles in both shoulders """ # Find swing points first df = self._find_swing_points(df.copy()) # Look for head and shoulders (three peaks with the middle one higher) for i in range(len(df) - 40): # Look within a reasonable window # Skip if we don't have enough data ahead if i + 40 >= len(df): continue window_df = df.iloc[i:i+40] # Find swing highs in this window swing_highs = window_df.loc[~window_df['swing_high'].isna()] # Need at least 3 swing highs for head and shoulders if len(swing_highs) >= 3: for j in range(len(swing_highs) - 2): # Get three consecutive swing highs left_shoulder = swing_highs.iloc[j] head = swing_highs.iloc[j+1] right_shoulder = swing_highs.iloc[j+2] # Check if head is higher than both shoulders if head['high'] > left_shoulder['high'] and head['high'] > right_shoulder['high']: # Check if shoulders are similar in height (within 0.2%) shoulder_diff_pct = abs(right_shoulder['high'] - left_shoulder['high']) / left_shoulder['high'] * 100 if shoulder_diff_pct < 0.2: # Get indices ls_idx = left_shoulder.name head_idx = head.name rs_idx = right_shoulder.name # Check if we have at least 7 candles in both shoulders left_candle_count = head_idx - ls_idx right_candle_count = rs_idx - head_idx if left_candle_count >= self.min_shoulder_candles and right_candle_count >= self.min_shoulder_candles: # Find neckline (connecting the lows between shoulders and head) left_trough_idx = df.loc[ls_idx:head_idx]['low'].idxmin() left_trough = df.loc[left_trough_idx]['low'] right_trough_idx = df.loc[head_idx:rs_idx]['low'].idxmin() right_trough = df.loc[right_trough_idx]['low'] # Check if troughs are similar (flat neckline preferred) trough_diff_pct = abs(right_trough - left_trough) / left_trough * 100 if trough_diff_pct < 0.15: # This is a potential head and shoulders pattern pattern_data = { 'type': 'head_and_shoulders', 'left_shoulder_idx': ls_idx, 'head_idx': head_idx, 'right_shoulder_idx': rs_idx, 'left_trough_idx': left_trough_idx, 'right_trough_idx': right_trough_idx, 'left_shoulder': left_shoulder['high'], 'head': head['high'], 'right_shoulder': right_shoulder['high'], 'left_trough': left_trough, 'right_trough': right_trough, 'neckline': (left_trough + right_trough) / 2, 'timeframe': timeframe } self.patterns[symbol]['head_and_shoulders'].append(pattern_data) # Look for inverted head and shoulders (three troughs with the middle one lower) for i in range(len(df) - 40): # Skip if we don't have enough data ahead if i + 40 >= len(df): continue window_df = df.iloc[i:i+40] # Find swing lows in this window swing_lows = window_df.loc[~window_df['swing_low'].isna()] # Need at least 3 swing lows for inverted head and shoulders if len(swing_lows) >= 3: for j in range(len(swing_lows) - 2): # Get three consecutive swing lows left_shoulder = swing_lows.iloc[j] head = swing_lows.iloc[j+1] right_shoulder = swing_lows.iloc[j+2] # Check if head is lower than both shoulders if head['low'] < left_shoulder['low'] and head['low'] < right_shoulder['low']: # Check if shoulders are similar in height (within 0.2%) shoulder_diff_pct = abs(right_shoulder['low'] - left_shoulder['low']) / left_shoulder['low'] * 100 if shoulder_diff_pct < 0.2: # Get indices ls_idx = left_shoulder.name head_idx = head.name rs_idx = right_shoulder.name # Check if we have at least 7 candles in both shoulders left_candle_count = head_idx - ls_idx right_candle_count = rs_idx - head_idx if left_candle_count >= self.min_shoulder_candles and right_candle_count >= self.min_shoulder_candles: # Find neckline (connecting the highs between shoulders and head) left_peak_idx = df.loc[ls_idx:head_idx]['high'].idxmax() left_peak = df.loc[left_peak_idx]['high'] right_peak_idx = df.loc[head_idx:rs_idx]['high'].idxmax() right_peak = df.loc[right_peak_idx]['high'] # Check if peaks are similar (flat neckline preferred) peak_diff_pct = abs(right_peak - left_peak) / left_peak * 100 if peak_diff_pct < 0.15: # This is a potential inverted head and shoulders pattern pattern_data = { 'type': 'inv_head_and_shoulders', 'left_shoulder_idx': ls_idx, 'head_idx': head_idx, 'right_shoulder_idx': rs_idx, 'left_peak_idx': left_peak_idx, 'right_peak_idx': right_peak_idx, 'left_shoulder': left_shoulder['low'], 'head': head['low'], 'right_shoulder': right_shoulder['low'], 'left_peak': left_peak, 'right_peak': right_peak, 'neckline': (left_peak + right_peak) / 2, 'timeframe': timeframe } self.patterns[symbol]['inv_head_and_shoulders'].append(pattern_data) def _validate_patterns_with_ema(self, symbol, df_5min, df_15min): """ Validate patterns against both 5min and 15min EMAs Double Tops/Bottoms & Head & Shoulders should only form on the EMA """ valid_patterns = { 'double_tops': [], 'double_bottoms': [], 'head_and_shoulders': [], 'inv_head_and_shoulders': [] } # Validate Double Tops for pattern in self.patterns[symbol]['double_tops']: # Check if pattern forms near 5min EMA first_high_idx = pattern['first_high_idx'] second_high_idx = pattern['second_high_idx'] # Skip if indices are invalid if first_high_idx >= len(df_5min) or second_high_idx >= len(df_5min): continue first_high = pattern['first_high'] second_high = pattern['second_high'] first_ema = df_5min.loc[first_high_idx]['ema_200'] second_ema = df_5min.loc[second_high_idx]['ema_200'] # Calculate how close highs are to EMA (within 0.1% of EMA) first_diff_pct = abs(first_high - first_ema) / first_ema * 100 second_diff_pct = abs(second_high - second_ema) / second_ema * 100 # Also check with 15min EMA # Find closest 15min candle to our 5min points first_time = df_5min.loc[first_high_idx]['timestamp'] second_time = df_5min.loc[second_high_idx]['timestamp'] closest_15min_first = df_15min.loc[(df_15min['timestamp'] - first_time).abs().idxmin()] closest_15min_second = df_15min.loc[(df_15min['timestamp'] - second_time).abs().idxmin()] first_15min_ema = closest_15min_first['ema_200'] second_15min_ema = closest_15min_second['ema_200'] first_15min_diff_pct = abs(first_high - first_15min_ema) / first_15min_ema * 100 second_15min_diff_pct = abs(second_high - second_15min_ema) / second_15min_ema * 100 # Pattern must be near at least one of the EMAs (5min or 15min) if (first_diff_pct < 0.15 and second_diff_pct < 0.15) or \ (first_15min_diff_pct < 0.15 and second_15min_diff_pct < 0.15): valid_patterns['double_tops'].append(pattern) # Validate Double Bottoms (same logic as double tops) for pattern in self.patterns[symbol]['double_bottoms']: first_low_idx = pattern['first_low_idx'] second_low_idx = pattern['second_low_idx'] # Skip if indices are invalid if first_low_idx >= len(df_5min) or second_low_idx >= len(df_5min): continue first_low = pattern['first_low'] second_low = pattern['second_low'] first_ema = df_5min.loc[first_low_idx]['ema_200'] second_ema = df_5min.loc[second_low_idx]['ema_200'] first_diff_pct = abs(first_low - first_ema) / first_ema * 100 second_diff_pct = abs(second_low - second_ema) / second_ema * 100 # Check with 15min EMA first_time = df_5min.loc[first_low_idx]['timestamp'] second_time = df_5min.loc[second_low_idx]['timestamp'] closest_15min_first = df_15min.loc[(df_15min['timestamp'] - first_time).abs().idxmin()] closest_15min_second = df_15min.loc[(df_15min['timestamp'] - second_time).abs().idxmin()] first_15min_ema = closest_15min_first['ema_200'] second_15min_ema = closest_15min_second['ema_200'] first_15min_diff_pct = abs(first_low - first_15min_ema) / first_15min_ema * 100 second_15min_diff_pct = abs(second_low - second_15min_ema) / second_15min_ema * 100 if (first_diff_pct < 0.15 and second_diff_pct < 0.15) or \ (first_15min_diff_pct < 0.15 and second_15min_diff_pct < 0.15): valid_patterns['double_bottoms'].append(pattern) # Validate Head and Shoulders for pattern in self.patterns[symbol]['head_and_shoulders']: # For H&S, focus on neckline proximity to EMA neckline = pattern['neckline'] ls_idx = pattern['left_shoulder_idx'] rs_idx = pattern['right_shoulder_idx'] # Skip if indices are invalid if ls_idx >= len(df_5min) or rs_idx >= len(df_5min): continue # Get average EMA in the pattern region pattern_indices = range(ls_idx, rs_idx + 1) avg_ema_5min = df_5min.loc[pattern_indices]['ema_200'].mean() # Calculate how close neckline is to average EMA neckline_diff_pct = abs(neckline - avg_ema_5min) / avg_ema_5min * 100 # Also check with 15min EMA (use start and end times to find relevant 15min candles) start_time = df_5min.loc[ls_idx]['timestamp'] end_time = df_5min.loc[rs_idx]['timestamp'] relevant_15min = df_15min[(df_15min['timestamp'] >= start_time) & (df_15min['timestamp'] <= end_time)] if not relevant_15min.empty: avg_ema_15min = relevant_15min['ema_200'].mean() neckline_15min_diff_pct = abs(neckline - avg_ema_15min) / avg_ema_15min * 100 if neckline_diff_pct < 0.2 or neckline_15min_diff_pct < 0.2: valid_patterns['head_and_shoulders'].append(pattern) # Validate Inverted Head and Shoulders (same logic as H&S) for pattern in self.patterns[symbol]['inv_head_and_shoulders']: neckline = pattern['neckline'] ls_idx = pattern['left_shoulder_idx'] rs_idx = pattern['right_shoulder_idx'] # Skip if indices are invalid if ls_idx >= len(df_5min) or rs_idx >= len(df_5min): continue # Get average EMA in the pattern region pattern_indices = range(ls_idx, rs_idx + 1) avg_ema_5min = df_5min.loc[pattern_indices]['ema_200'].mean() # Calculate how close neckline is to average EMA neckline_diff_pct = abs(neckline - avg_ema_5min) / avg_ema_5min * 100 # Also check with 15min EMA (use start and end times to find relevant 15min candles) start_time = df_5min.loc[ls_idx]['timestamp'] end_time = df_5min.loc[rs_idx]['timestamp'] relevant_15min = df_15min[(df_15min['timestamp'] >= start_time) & (df_15min['timestamp'] <= end_time)] if not relevant_15min.empty: avg_ema_15min = relevant_15min['ema_200'].mean() neckline_15min_diff_pct = abs(neckline - avg_ema_15min) / avg_ema_15min * 100 if neckline_diff_pct < 0.2 or neckline_15min_diff_pct < 0.2: valid_patterns['inv_head_and_shoulders'].append(pattern) # Update the patterns dictionary with validated patterns only self.patterns[symbol] = valid_patterns # Print validation results pattern_counts = { 'double_tops': len(valid_patterns['double_tops']), 'double_bottoms': len(valid_patterns['double_bottoms']), 'head_and_shoulders': len(valid_patterns['head_and_shoulders']), 'inv_head_and_shoulders': len(valid_patterns['inv_head_and_shoulders']) } st.info(f"Validated patterns for {symbol}: {pattern_counts}") def _check_dcc_confirmation(self, df, pattern_end_idx): """ Check for DCC (Directional Candle Confirmation) or 1 big candle compared to previous 10 candles For Head & Shoulders patterns """ # Ensure we have enough data if pattern_end_idx + 1 >= len(df) or pattern_end_idx < 10: return False confirmation_candle = df.iloc[pattern_end_idx + 1] # Get previous 10 candles start_idx = max(0, pattern_end_idx - 10) previous_candles = df.iloc[start_idx:pattern_end_idx] if len(previous_candles) < 5: # Need at least 5 candles for comparison return False # Check if confirmation candle is significantly larger than previous candles avg_prev_size = previous_candles['candle_size_pct'].mean() confirmation_size = confirmation_candle['candle_size_pct'] # Candle must be at least 1.5x the average size of previous candles return confirmation_size > (1.5 * avg_prev_size) def _check_entry_candle_size(self, df, pattern_end_idx): """Check if the entry candle is smaller than the maximum allowed size (0.25%)""" if pattern_end_idx + 1 >= len(df) or pattern_end_idx < 0: return False entry_candle = df.iloc[pattern_end_idx + 1] return entry_candle['candle_size_pct'] < self.max_entry_candle_size_pct def _calculate_risk_reward(self, entry, stop_loss, take_profit): """Calculate risk-reward ratio""" risk = abs(entry - stop_loss) reward = abs(take_profit - entry) if risk == 0: # Avoid division by zero return 0 return reward / risk def _check_recent_high_low(self, df, pattern_end_idx, target_price, is_buy): """ Check if the recent high/low is within 50% of our overall target Avoid entry if the recent high or low is within 50% of our overall target """ # Get last 20 candles before pattern end start_idx = max(0, pattern_end_idx - 20) recent_candles = df.iloc[start_idx:pattern_end_idx] if is_buy: # For buy signals, check recent highs recent_high = recent_candles['high'].max() target_distance = abs(target_price - df.iloc[pattern_end_idx]['close']) high_distance = abs(recent_high - df.iloc[pattern_end_idx]['close']) # If recent high is within 50% of target distance, avoid entry return high_distance < (0.5 * target_distance) else: # For sell signals, check recent lows recent_low = recent_candles['low'].min() target_distance = abs(df.iloc[pattern_end_idx]['close'] - target_price) low_distance = abs(df.iloc[pattern_end_idx]['close'] - recent_low) # If recent low is within 50% of target distance, avoid entry return low_distance < (0.5 * target_distance) def _generate_trade_signals(self, symbol): """Generate trade signals based on validated patterns""" if symbol not in self.patterns: return df_5min = self.data[symbol]['5min'] # Initialize trade signals self.trades[symbol] = [] # Process Double Tops for pattern in self.patterns[symbol]['double_tops']: # Get pattern details second_high_idx = pattern['second_high_idx'] # Skip if index is invalid if second_high_idx >= len(df_5min): continue valley = pattern['valley'] second_high = pattern['second_high'] # Entry point is just below the neckline (valley) entry_price = valley * 0.999 # Slightly below neckline # Stop loss just above the second high stop_loss = second_high * 1.001 # Slightly above second high # Target is typically the height of the pattern projected downward from entry pattern_height = second_high - valley take_profit = entry_price - pattern_height # Calculate risk-reward ratio rrr = self._calculate_risk_reward(entry_price, stop_loss, take_profit) # Check entry candle size valid_entry_size = self._check_entry_candle_size(df_5min, second_high_idx) # Check if recent high is too close to target avoid_recent_high = self._check_recent_high_low(df_5min, second_high_idx, take_profit, False) # Validate trade according to PNP rules if rrr >= self.min_rrr and valid_entry_size and not avoid_recent_high: # Calculate stop loss percentage stop_loss_pct = (stop_loss - entry_price) / entry_price * 100 if abs(stop_loss_pct) <= self.max_stoploss_pct: # This is a valid trade signal trade = { 'symbol': symbol, 'pattern_type': 'double_top', 'action': 'SELL', 'entry_price': entry_price, 'stop_loss': stop_loss, 'take_profit': take_profit, 'risk_reward_ratio': rrr, 'entry_time': df_5min.iloc[second_high_idx]['timestamp'], 'pattern_end_idx': second_high_idx, 'stop_loss_pct': abs(stop_loss_pct) } self.trades[symbol].append(trade) # Process Double Bottoms for pattern in self.patterns[symbol]['double_bottoms']: # Get pattern details second_low_idx = pattern['second_low_idx'] # Skip if index is invalid if second_low_idx >= len(df_5min): continue peak = pattern['peak'] second_low = pattern['second_low'] # Entry point is just above the neckline (peak) entry_price = peak * 1.001 # Slightly above neckline # Stop loss just below the second low stop_loss = second_low * 0.999 # Slightly below second low # Target is typically the height of the pattern projected upward from entry pattern_height = peak - second_low take_profit = entry_price + pattern_height # Calculate risk-reward ratio rrr = self._calculate_risk_reward(entry_price, stop_loss, take_profit) # Check entry candle size valid_entry_size = self._check_entry_candle_size(df_5min, second_low_idx) # Check if recent low is too close to target avoid_recent_low = self._check_recent_high_low(df_5min, second_low_idx, take_profit, True) # Validate trade according to PNP rules if rrr >= self.min_rrr and valid_entry_size and not avoid_recent_low: # Calculate stop loss percentage stop_loss_pct = (entry_price - stop_loss) / entry_price * 100 if abs(stop_loss_pct) <= self.max_stoploss_pct: # This is a valid trade signal trade = { 'symbol': symbol, 'pattern_type': 'double_bottom', 'action': 'BUY', 'entry_price': entry_price, 'stop_loss': stop_loss, 'take_profit': take_profit, 'risk_reward_ratio': rrr, 'entry_time': df_5min.iloc[second_low_idx]['timestamp'], 'pattern_end_idx': second_low_idx, 'stop_loss_pct': abs(stop_loss_pct) } self.trades[symbol].append(trade) # Process Head and Shoulders for pattern in self.patterns[symbol]['head_and_shoulders']: # Get pattern details right_shoulder_idx = pattern['right_shoulder_idx'] # Skip if index is invalid if right_shoulder_idx >= len(df_5min): continue neckline = pattern['neckline'] head = pattern['head'] # Check for DCC confirmation or big candle has_confirmation = self._check_dcc_confirmation(df_5min, right_shoulder_idx) if has_confirmation: # Entry point is just below the neckline entry_price = neckline * 0.999 # Slightly below neckline # Stop loss just above the right shoulder stop_loss = pattern['right_shoulder'] * 1.001 # Target is typically the height of the pattern projected downward from entry pattern_height = head - neckline take_profit = entry_price - pattern_height # Calculate risk-reward ratio rrr = self._calculate_risk_reward(entry_price, stop_loss, take_profit) # Check entry candle size valid_entry_size = self._check_entry_candle_size(df_5min, right_shoulder_idx) # Check if recent high is too close to target avoid_recent_high = self._check_recent_high_low(df_5min, right_shoulder_idx, take_profit, False) # Validate trade according to PNP rules if rrr >= self.min_rrr and valid_entry_size and not avoid_recent_high: # Calculate stop loss percentage stop_loss_pct = (stop_loss - entry_price) / entry_price * 100 if abs(stop_loss_pct) <= self.max_stoploss_pct: # This is a valid trade signal trade = { 'symbol': symbol, 'pattern_type': 'head_and_shoulders', 'action': 'SELL', 'entry_price': entry_price, 'stop_loss': stop_loss, 'take_profit': take_profit, 'risk_reward_ratio': rrr, 'entry_time': df_5min.iloc[right_shoulder_idx]['timestamp'], 'pattern_end_idx': right_shoulder_idx, 'stop_loss_pct': abs(stop_loss_pct) } self.trades[symbol].append(trade) # Process Inverted Head and Shoulders for pattern in self.patterns[symbol]['inv_head_and_shoulders']: # Get pattern details right_shoulder_idx = pattern['right_shoulder_idx'] # Skip if index is invalid if right_shoulder_idx >= len(df_5min): continue neckline = pattern['neckline'] head = pattern['head'] # Check for DCC confirmation or big candle has_confirmation = self._check_dcc_confirmation(df_5min, right_shoulder_idx) if has_confirmation: # Entry point is just above the neckline entry_price = neckline * 1.001 # Slightly above neckline # Stop loss just below the right shoulder stop_loss = pattern['right_shoulder'] * 0.999 # Target is typically the height of the pattern projected upward from entry pattern_height = neckline - head take_profit = entry_price + pattern_height # Calculate risk-reward ratio rrr = self._calculate_risk_reward(entry_price, stop_loss, take_profit) # Check entry candle size valid_entry_size = self._check_entry_candle_size(df_5min, right_shoulder_idx) # Check if recent low is too close to target avoid_recent_low = self._check_recent_high_low(df_5min, right_shoulder_idx, take_profit, True) # Validate trade according to PNP rules if rrr >= self.min_rrr and valid_entry_size and not avoid_recent_low: # Calculate stop loss percentage stop_loss_pct = (entry_price - stop_loss) / entry_price * 100 if abs(stop_loss_pct) <= self.max_stoploss_pct: # This is a valid trade signal trade = { 'symbol': symbol, 'pattern_type': 'inv_head_and_shoulders', 'action': 'BUY', 'entry_price': entry_price, 'stop_loss': stop_loss, 'take_profit': take_profit, 'risk_reward_ratio': rrr, 'entry_time': df_5min.iloc[right_shoulder_idx]['timestamp'], 'pattern_end_idx': right_shoulder_idx, 'stop_loss_pct': abs(stop_loss_pct) } self.trades[symbol].append(trade) st.info(f"Generated {len(self.trades[symbol])} trade signals for {symbol}") def generate_risk_management_plan(self): """Generate overall risk management plan using Gemini AI""" if not any(self.trades.values()): st.warning("No trade signals available for risk management plan") return None # Prepare a summary of trade signals trades_summary = "" for symbol, trades in self.trades.items(): for trade in trades: trades_summary += f"{symbol}: {trade['action']} - Entry: {trade['entry_price']:.5f}, SL: {trade['stop_loss']:.5f}, TP: {trade['take_profit']:.5f}, RRR: {trade['risk_reward_ratio']:.2f}\n" # If no trades, return a simple message if not trades_summary: self.risk_management['overall_plan'] = "No valid trade signals detected based on the PNP strategy criteria." return self.risk_management['overall_plan'] # Prepare a prompt for Gemini AI prompt = f""" Based on the following forex trading signals from the PNP strategy, create a comprehensive risk management plan. The plan should detail how to manage positions according to the PNP strategy guidelines. Current Trading Signals: {trades_summary} PNP Strategy Guidelines: 1. Maximum stop loss: 0.30% 2. Minimum risk-reward ratio: 1:1.5 3. Aim for 1:2 if no price action level nearby 4. Trade only between 10 AM to 10 PM 5. Patterns must form on the 200 EMA 6. Entry candle must be smaller than 0.25% 7. Don't take trades if recent high/low is within 50% of target Please provide: 1. Maximum portfolio risk percentage (total capital at risk) 2. Position sizing rules with examples (assume a $10,000 account) 3. Criteria for adjusting stop losses 4. Rules for partial profit-taking 5. Criteria for exiting all positions (market conditions) 6. Daily/weekly loss limits that would trigger trading pause Format your response as a structured plan with clear rules and guidelines. """ try: # Query Gemini AI with st.spinner("Generating risk management plan with AI..."): response = model.generate_content(prompt) risk_plan = response.text # Store the risk management plan self.risk_management['overall_plan'] = risk_plan return risk_plan except Exception as e: st.error(f"Error generating risk management plan with Gemini AI: {e}") # Create a basic risk management plan without AI self.risk_management['overall_plan'] = self._create_basic_risk_plan() return self.risk_management['overall_plan'] def _create_basic_risk_plan(self): """Create a basic risk management plan without using AI""" plan = """ # PNP Strategy Risk Management Plan ## Position Sizing - Risk no more than 1% of account on any single trade - Calculate position size based on entry price and stop loss - For a $10,000 account, maximum risk per trade = $100 ## Multiple Positions - Maximum correlation exposure: 15% of account - Avoid having more than 2 pairs with same base or quote currency ## Stop Loss Management - Initial stop loss based on pattern structure (max 0.30%) - Move to break-even after price moves in favor by at least 1x the risk ## Profit Taking - Take partial profits (50%) at 1x risk - Move stop loss to break-even at this point - Let remainder run to full target (min 1.5x risk) ## Risk Limits - Daily loss limit: 2% of account - Weekly loss limit: 5% of account - Monthly loss limit: 10% of account - If any limit is hit, stop trading for the period ## Trade Management - Only trade during 10 AM - 10 PM - Avoid trading during major news events - Always stick to minimum 1.5 risk-reward ratio """ return plan def plot_chart_with_patterns(self, symbol): """Plot a chart with detected patterns and trade signals""" if symbol not in self.data or symbol not in self.trades: st.warning(f"No data or trades available for {symbol}") return df = self.data[symbol]['5min'].copy() # Create a figure with 3 subplots fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(14, 12), gridspec_kw={'height_ratios': [3, 1, 1]}) # Main chart with price and moving averages ax1.plot(df['timestamp'], df['close'], label='Close Price', linewidth=2) ax1.plot(df['timestamp'], df['ema_200'], label='200 EMA', linestyle='--', alpha=0.8) # Add Bollinger Bands for reference ax1.plot(df['timestamp'], df['bb_upper'], 'k--', alpha=0.3) ax1.plot(df['timestamp'], df['bb_lower'], 'k--', alpha=0.3) ax1.fill_between(df['timestamp'], df['bb_upper'], df['bb_lower'], alpha=0.1, color='gray') # Highlight the trading hours (10 AM to 10 PM) trading_hours_df = df[(df['hour'] >= self.trading_hours[0]) & (df['hour'] < self.trading_hours[1])] if not trading_hours_df.empty: min_price = df['low'].min() * 0.999 max_price = df['high'].max() * 1.001 # Group consecutive timestamps into ranges dates = trading_hours_df['timestamp'].dt.date.unique() for date in dates: day_df = trading_hours_df[trading_hours_df['timestamp'].dt.date == date] if not day_df.empty: start = day_df['timestamp'].min() end = day_df['timestamp'].max() ax1.axvspan(start, end, alpha=0.2, color='green', label='Trading Hours' if date == dates[0] else "") # Add trade signals for trade in self.trades[symbol]: pattern_end_idx = trade['pattern_end_idx'] entry_time = trade['entry_time'] entry_price = trade['entry_price'] stop_loss = trade['stop_loss'] take_profit = trade['take_profit'] action = trade['action'] pattern_type = trade['pattern_type'] rrr = trade['risk_reward_ratio'] # Set color based on action color = 'green' if action == 'BUY' else 'red' # Add entry point marker ax1.scatter(entry_time, entry_price, s=100, color=color, marker='^' if action == 'BUY' else 'v', zorder=5) # Add annotation with pattern type and RRR ax1.annotate(f"{pattern_type}\n{action} @ {entry_price:.5f}\nRRR: {rrr:.2f}", xy=(entry_time, entry_price), xytext=(10, 10 if action == 'BUY' else -10), textcoords="offset points", arrowprops=dict(arrowstyle="->", color=color), color=color, fontsize=9, bbox=dict(boxstyle="round,pad=0.3", fc="white", ec=color, lw=1)) # Add stop loss line ax1.axhline(y=stop_loss, color='red', linestyle='--', alpha=0.5, linewidth=1) ax1.annotate(f"SL: {stop_loss:.5f}", xy=(df['timestamp'].iloc[-1], stop_loss), xytext=(-50, -5), textcoords="offset points", color='red', fontsize=8) # Add take profit line ax1.axhline(y=take_profit, color='green', linestyle='--', alpha=0.5, linewidth=1) ax1.annotate(f"TP: {take_profit:.5f}", xy=(df['timestamp'].iloc[-1], take_profit), xytext=(-50, 5), textcoords="offset points", color='green', fontsize=8) # Format the timestamp on x-axis ax1.xaxis.set_major_formatter(mdates.DateFormatter('%m-%d %H:%M')) plt.setp(ax1.xaxis.get_majorticklabels(), rotation=45) ax1.set_title(f'{symbol} with PNP Strategy Trading Signals', fontsize=14) ax1.set_ylabel('Price', fontsize=10) ax1.grid(True, alpha=0.3) # Ensure we don't get duplicate labels in the legend handles, labels = ax1.get_legend_handles_labels() by_label = dict(zip(labels, handles)) ax1.legend(by_label.values(), by_label.keys(), loc='upper left', fontsize=9) # RSI subplot ax2.plot(df['timestamp'], df['rsi'], label='RSI', color='purple', linewidth=1) ax2.axhline(y=70, color='red', linestyle='--', alpha=0.3) ax2.axhline(y=30, color='green', linestyle='--', alpha=0.3) ax2.fill_between(df['timestamp'], 70, 30, alpha=0.1, color='gray') ax2.set_ylabel('RSI', fontsize=10) ax2.grid(True, alpha=0.3) ax2.legend(loc='upper left', fontsize=9) # MACD subplot ax3.plot(df['timestamp'], df['macd'], label='MACD', color='blue', linewidth=1) ax3.plot(df['timestamp'], df['macd_signal'], label='Signal', color='orange', linewidth=1) ax3.bar(df['timestamp'], df['macd_hist'], label='Histogram', alpha=0.5, color=np.where(df['macd_hist'] >= 0, 'green', 'red')) ax3.axhline(y=0, color='black', linestyle='-', alpha=0.3) ax3.set_ylabel('MACD', fontsize=10) ax3.set_xlabel('Date', fontsize=10) ax3.xaxis.set_major_formatter(mdates.DateFormatter('%m-%d %H:%M')) plt.setp(ax3.xaxis.get_majorticklabels(), rotation=45) ax3.grid(True, alpha=0.3) ax3.legend(loc='upper left', fontsize=9) plt.tight_layout() # Display the chart in Streamlit st.pyplot(fig) plt.close() def run_full_analysis(self, days_back=10): """Run a full analysis on all currency pairs""" st.info("Starting PNP Strategy Forex Analysis...") # Fetch data for all currency pairs for pair in self.currency_pairs: # Use self.currency_pairs st.info(f"\nFetching data for {pair}...") success = self.fetch_forex_data(pair, days_back=days_back) if not success: st.warning(f"Failed to fetch data for {pair}. Skipping analysis.") time.sleep(1) # Avoid API rate limits # Detect patterns for all pairs st.info("\nDetecting patterns...") self.detect_all_patterns() # Generate risk management plan st.info("\nGenerating risk management plan...") risk_plan = self.generate_risk_management_plan() # Generate charts st.info("\nGenerating charts...") for pair in self.trades: if self.trades[pair]: # Only generate charts for pairs with trades self.plot_chart_with_patterns(pair) # Print summary self.print_summary() def print_summary(self): """Print a summary of all analysis and recommendations""" st.markdown("---") st.markdown("## PNP FOREX TRADING STRATEGY SUMMARY") st.markdown("---") # Print trade signals total_trades = sum(len(trades) for trades in self.trades.values()) if total_trades == 0: st.warning("\nNo valid trade signals detected based on the PNP strategy criteria.") else: st.success(f"\nTotal valid trade signals: {total_trades}") for symbol in self.trades: if not self.trades[symbol]: continue st.markdown(f"### {symbol} TRADING SIGNALS ({len(self.trades[symbol])}):") for i, trade in enumerate(self.trades[symbol], 1): st.markdown(f"#### Signal #{i}:") st.write(f"**Pattern Type:** {trade['pattern_type']}") st.write(f"**Action:** {trade['action']}") st.write(f"**Entry Price:** {trade['entry_price']:.5f}") st.write(f"**Stop Loss:** {trade['stop_loss']:.5f} ({trade['stop_loss_pct']:.2f}%)") st.write(f"**Take Profit:** {trade['take_profit']:.5f}") st.write(f"**Risk-Reward Ratio:** {trade['risk_reward_ratio']:.2f}") st.write(f"**Entry Time:** {trade['entry_time']}") st.markdown("---") # Print risk management plan st.markdown("---") st.markdown("## RISK MANAGEMENT PLAN") st.markdown("---") if 'overall_plan' in self.risk_management: st.markdown(self.risk_management['overall_plan']) else: st.warning("No risk management plan available.") st.markdown("---") st.markdown("## CONCLUSION") st.markdown("---") st.success("The PNP Strategy analysis has been completed.") st.info("Review the recommendations and risk management plan before placing any trades.") st.warning("Remember that all trading involves risk, and past performance is not indicative of future results.") # Streamlit UI def main(): st.title("📊 PNP Forex Trading Strategy Analyzer") st.markdown(""" This app analyzes forex currency pairs using the PNP (Price and Pattern) Trading Strategy. It detects Double Tops/Bottoms and Head & Shoulders patterns on 5-minute and 15-minute charts, validates them against the 200 EMA, and generates trade signals with risk management parameters. """) # Define currency pairs to analyze CURRENCY_PAIRS = ["USDJPY", "EURUSD", "GBPUSD", "AUDUSD"] # Sidebar controls st.sidebar.header("Settings") days_back = st.sidebar.slider("Days of historical data to analyze", 1, 30, 10) min_rrr = st.sidebar.slider("Minimum Risk-Reward Ratio", 1.0, 3.0, 1.5, 0.1) max_stoploss_pct = st.sidebar.slider("Maximum Stop Loss (%)", 0.1, 1.0, 0.3, 0.05) # Initialize trader with settings and currency pairs trader = PNPForexTrader(currency_pairs=CURRENCY_PAIRS) trader.min_rrr = min_rrr trader.max_stoploss_pct = max_stoploss_pct # Run analysis button if st.sidebar.button("Run Full Analysis"): with st.spinner("Running analysis..."): trader.run_full_analysis(days_back=days_back) # Display currency pairs being analyzed st.sidebar.markdown("---") st.sidebar.markdown("### Currency Pairs Being Analyzed") for pair in CURRENCY_PAIRS: st.sidebar.write(f"- {pair}") # About section st.sidebar.markdown("---") st.sidebar.markdown("### About PNP Strategy") st.sidebar.markdown(""" The PNP (Price and Pattern) Strategy focuses on: - Double Tops/Bottoms - Head & Shoulders patterns - Validated against 200 EMA - With strict risk management rules """) if __name__ == "__main__": main()