Froex_trading / src /streamlit_app.py
Nishur's picture
Update src/streamlit_app.py
1d6094e verified
import streamlit as st
import requests
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime, timedelta
import google.generativeai as genai
import time
import os
from scipy.signal import argrelextrema
# Set page config
st.set_page_config(
page_title="PNP Forex Trading Strategy",
page_icon="๐Ÿ“Š",
layout="wide",
initial_sidebar_state="expanded"
)
# Get API keys from secrets
ALPHA_VANTAGE_API_KEY = os.environ.get('ALPHA_VANTAGE_API_KEY')
GEMINI_API_KEY = os.environ.get('GEMINI_API_KEY')
# Check if keys are available
if not ALPHA_VANTAGE_API_KEY or not GEMINI_API_KEY:
st.error("API keys not found. Please check your secrets configuration.")
st.stop()
# Configure Gemini AI
genai.configure(api_key=GEMINI_API_KEY)
model = genai.GenerativeModel('gemini-2.0-flash')
class PNPForexTrader:
"""
PNP Strategy Forex Trading System
Focused on detecting specific patterns like Double Tops/Bottoms and Head & Shoulders
using 200 EMA on 5-minute and 15-minute timeframes
"""
def __init__(self, currency_pairs):
self.currency_pairs = currency_pairs # Store the currency pairs
self.data = {} # Raw and processed data for each pair
self.patterns = {} # Detected patterns for each pair
self.trades = {} # Trade signals for each pair
self.risk_management = {} # Risk management parameters
self.trading_hours = (10, 22) # Trading hours: 10 AM to 10 PM
# PNP Strategy Parameters
self.min_rrr = 1.5 # Minimum risk-reward ratio
self.max_stoploss_pct = 0.30 # Maximum stop loss percentage
self.min_shoulder_candles = 7 # Minimum candles in shoulder formation
self.max_entry_candle_size_pct = 0.25 # Maximum entry candle size percentage
def fetch_forex_data(self, symbol, interval="5min", days_back=10):
"""
Fetch historical forex data in 5-minute intervals
We fetch both 5-minute data and aggregate to 15-minute for dual timeframe analysis
"""
try:
# Calculate date ranges
to_date = datetime.now().strftime('%Y-%m-%d')
from_date = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d')
# Fetch 5-minute data
ticker = f"C:{symbol}"
url = f"https://api.polygon.io/v2/aggs/ticker/{ticker}/range/5/minute/{from_date}/{to_date}?adjusted=true&sort=asc&limit=5000&apiKey={ALPHA_VANTAGE_API_KEY}"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
if 'results' in data and data['results']:
# Convert to DataFrame
df_5min = pd.DataFrame(data['results'])
# Rename columns to more readable format
df_5min = df_5min.rename(columns={
'o': 'open',
'h': 'high',
'l': 'low',
'c': 'close',
'v': 'volume',
't': 'timestamp'
})
# Convert timestamp to datetime
df_5min['timestamp'] = pd.to_datetime(df_5min['timestamp'], unit='ms')
df_5min['hour'] = df_5min['timestamp'].dt.hour
# Keep only data during trading hours (10 AM to 10 PM)
df_5min = df_5min[(df_5min['hour'] >= self.trading_hours[0]) & (df_5min['hour'] < self.trading_hours[1])]
# Calculate candle size as percentage
df_5min['candle_size_pct'] = abs(df_5min['close'] - df_5min['open']) / df_5min['open'] * 100
# Add 200 EMA
df_5min['ema_200'] = df_5min['close'].ewm(span=200, adjust=False).mean()
# Create 15-minute aggregated data
df_15min = self._aggregate_to_15min(df_5min)
df_15min['ema_200'] = df_15min['close'].ewm(span=200, adjust=False).mean()
# Add technical indicators
self._add_technical_indicators(df_5min)
self._add_technical_indicators(df_15min)
# Store both timeframes
self.data[symbol] = {
'5min': df_5min,
'15min': df_15min
}
st.success(f"Successfully fetched data for {symbol}")
return True
else:
st.warning(f"No results found for {symbol}")
return False
else:
st.error(f"Error fetching data for {symbol}: {response.status_code}")
st.error(response.text)
return False
except Exception as e:
st.error(f"Exception when fetching data for {symbol}: {e}")
return False
def _aggregate_to_15min(self, df_5min):
"""Aggregate 5-minute data to 15-minute timeframe"""
# Create timestamp for 15-minute grouping
df_5min['timestamp_15min'] = df_5min['timestamp'].dt.floor('15min')
# Group by 15-minute intervals and aggregate
df_15min = df_5min.groupby('timestamp_15min').agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum',
'hour': 'first'
}).reset_index()
# Rename the timestamp column back
df_15min = df_15min.rename(columns={'timestamp_15min': 'timestamp'})
# Calculate candle size percentage
df_15min['candle_size_pct'] = abs(df_15min['close'] - df_15min['open']) / df_15min['open'] * 100
return df_15min
def _add_technical_indicators(self, df):
"""Add technical indicators to the dataframe"""
# RSI (Relative Strength Index)
delta = df['close'].diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
avg_gain = gain.rolling(window=14).mean()
avg_loss = loss.rolling(window=14).mean()
rs = avg_gain / avg_loss
df['rsi'] = 100 - (100 / (1 + rs))
# MACD
df['ema_12'] = df['close'].ewm(span=12, adjust=False).mean()
df['ema_26'] = df['close'].ewm(span=26, adjust=False).mean()
df['macd'] = df['ema_12'] - df['ema_26']
df['macd_signal'] = df['macd'].ewm(span=9, adjust=False).mean()
df['macd_hist'] = df['macd'] - df['macd_signal']
# Bollinger Bands
df['bb_middle'] = df['close'].rolling(window=20).mean()
df['bb_std'] = df['close'].rolling(window=20).std()
df['bb_upper'] = df['bb_middle'] + (df['bb_std'] * 2)
df['bb_lower'] = df['bb_middle'] - (df['bb_std'] * 2)
# Average True Range (ATR) for volatility measurement
high_low = df['high'] - df['low']
high_close_prev = abs(df['high'] - df['close'].shift(1))
low_close_prev = abs(df['low'] - df['close'].shift(1))
true_range = pd.concat([high_low, high_close_prev, low_close_prev], axis=1).max(axis=1)
df['atr'] = true_range.rolling(window=14).mean()
return df
def detect_all_patterns(self):
"""Detect all patterns for all currency pairs"""
for symbol in self.data:
st.info(f"Analyzing patterns for {symbol}...")
self.detect_patterns(symbol)
def detect_patterns(self, symbol):
"""Detect patterns for a specific currency pair according to PNP strategy"""
if symbol not in self.data:
st.warning(f"No data available for {symbol}")
return False
# Initialize pattern storage for this symbol
self.patterns[symbol] = {
'double_tops': [],
'double_bottoms': [],
'head_and_shoulders': [],
'inv_head_and_shoulders': []
}
# Get 5-minute and 15-minute dataframes
df_5min = self.data[symbol]['5min']
df_15min = self.data[symbol]['15min']
# Detect patterns in 5-minute data
st.info(f"Detecting Double Tops/Bottoms for {symbol}...")
self._detect_double_tops_bottoms(symbol, df_5min, '5min')
st.info(f"Detecting Head and Shoulders for {symbol}...")
self._detect_head_and_shoulders(symbol, df_5min, '5min')
# Validate patterns against 15-minute EMA
self._validate_patterns_with_ema(symbol, df_5min, df_15min)
# Generate trade signals based on validated patterns
self._generate_trade_signals(symbol)
return True
def _find_swing_points(self, df, window=5):
"""Find swing highs and lows in the data"""
# Find local maxima and minima
df['swing_high'] = df.iloc[argrelextrema(df['high'].values, np.greater_equal, order=window)[0]]['high']
df['swing_low'] = df.iloc[argrelextrema(df['low'].values, np.less_equal, order=window)[0]]['low']
return df
def _detect_double_tops_bottoms(self, symbol, df, timeframe):
"""
Detect Double Tops and Double Bottoms patterns
Must have proper V shape in the center
"""
# Find swing points first
df = self._find_swing_points(df.copy())
# Look for double tops (two similar highs with a lower valley in between)
for i in range(len(df) - 20): # Look within a reasonable window
# Skip if we don't have enough data ahead
if i + 20 >= len(df):
continue
window_df = df.iloc[i:i+20]
# Find swing highs in this window
swing_highs = window_df.loc[~window_df['swing_high'].isna()]
# Need at least 2 swing highs for double top
if len(swing_highs) >= 2:
for j in range(len(swing_highs) - 1):
# Get two consecutive swing highs
first_high = swing_highs.iloc[j]
second_high = swing_highs.iloc[j+1]
# Calculate the difference between the two highs
diff_pct = abs(second_high['high'] - first_high['high']) / first_high['high'] * 100
# Check if they're similar (within 0.1% of each other)
if diff_pct < 0.1:
# Find the low point between these highs
idx1 = first_high.name
idx2 = second_high.name
between_df = df.loc[idx1:idx2]
if len(between_df) < 3: # Need at least 3 candles between
continue
# Find the lowest point between the two highs
valley = between_df['low'].min()
valley_idx = between_df['low'].idxmin()
# Check if it's a proper V shape (valley at least 0.1% lower than peaks)
valley_diff_pct = (first_high['high'] - valley) / valley * 100
if valley_diff_pct > 0.1:
# This is a potential double top
pattern_data = {
'type': 'double_top',
'first_high_idx': idx1,
'valley_idx': valley_idx,
'second_high_idx': idx2,
'first_high': first_high['high'],
'valley': valley,
'second_high': second_high['high'],
'timeframe': timeframe
}
self.patterns[symbol]['double_tops'].append(pattern_data)
# Look for double bottoms (two similar lows with a higher peak in between)
for i in range(len(df) - 20):
# Skip if we don't have enough data ahead
if i + 20 >= len(df):
continue
window_df = df.iloc[i:i+20]
# Find swing lows in this window
swing_lows = window_df.loc[~window_df['swing_low'].isna()]
# Need at least 2 swing lows for double bottom
if len(swing_lows) >= 2:
for j in range(len(swing_lows) - 1):
# Get two consecutive swing lows
first_low = swing_lows.iloc[j]
second_low = swing_lows.iloc[j+1]
# Calculate the difference between the two lows
diff_pct = abs(second_low['low'] - first_low['low']) / first_low['low'] * 100
# Check if they're similar (within 0.1% of each other)
if diff_pct < 0.1:
# Find the high point between these lows
idx1 = first_low.name
idx2 = second_low.name
between_df = df.loc[idx1:idx2]
if len(between_df) < 3: # Need at least 3 candles between
continue
# Find the highest point between the two lows
peak = between_df['high'].max()
peak_idx = between_df['high'].idxmax()
# Check if it's a proper V shape (peak at least 0.1% higher than valleys)
peak_diff_pct = (peak - first_low['low']) / first_low['low'] * 100
if peak_diff_pct > 0.1:
# This is a potential double bottom
pattern_data = {
'type': 'double_bottom',
'first_low_idx': idx1,
'peak_idx': peak_idx,
'second_low_idx': idx2,
'first_low': first_low['low'],
'peak': peak,
'second_low': second_low['low'],
'timeframe': timeframe
}
self.patterns[symbol]['double_bottoms'].append(pattern_data)
def _detect_head_and_shoulders(self, symbol, df, timeframe):
"""
Detect Head and Shoulders and Inverted Head and Shoulders patterns
Must have at least 7-8 candles in both shoulders
"""
# Find swing points first
df = self._find_swing_points(df.copy())
# Look for head and shoulders (three peaks with the middle one higher)
for i in range(len(df) - 40): # Look within a reasonable window
# Skip if we don't have enough data ahead
if i + 40 >= len(df):
continue
window_df = df.iloc[i:i+40]
# Find swing highs in this window
swing_highs = window_df.loc[~window_df['swing_high'].isna()]
# Need at least 3 swing highs for head and shoulders
if len(swing_highs) >= 3:
for j in range(len(swing_highs) - 2):
# Get three consecutive swing highs
left_shoulder = swing_highs.iloc[j]
head = swing_highs.iloc[j+1]
right_shoulder = swing_highs.iloc[j+2]
# Check if head is higher than both shoulders
if head['high'] > left_shoulder['high'] and head['high'] > right_shoulder['high']:
# Check if shoulders are similar in height (within 0.2%)
shoulder_diff_pct = abs(right_shoulder['high'] - left_shoulder['high']) / left_shoulder['high'] * 100
if shoulder_diff_pct < 0.2:
# Get indices
ls_idx = left_shoulder.name
head_idx = head.name
rs_idx = right_shoulder.name
# Check if we have at least 7 candles in both shoulders
left_candle_count = head_idx - ls_idx
right_candle_count = rs_idx - head_idx
if left_candle_count >= self.min_shoulder_candles and right_candle_count >= self.min_shoulder_candles:
# Find neckline (connecting the lows between shoulders and head)
left_trough_idx = df.loc[ls_idx:head_idx]['low'].idxmin()
left_trough = df.loc[left_trough_idx]['low']
right_trough_idx = df.loc[head_idx:rs_idx]['low'].idxmin()
right_trough = df.loc[right_trough_idx]['low']
# Check if troughs are similar (flat neckline preferred)
trough_diff_pct = abs(right_trough - left_trough) / left_trough * 100
if trough_diff_pct < 0.15:
# This is a potential head and shoulders pattern
pattern_data = {
'type': 'head_and_shoulders',
'left_shoulder_idx': ls_idx,
'head_idx': head_idx,
'right_shoulder_idx': rs_idx,
'left_trough_idx': left_trough_idx,
'right_trough_idx': right_trough_idx,
'left_shoulder': left_shoulder['high'],
'head': head['high'],
'right_shoulder': right_shoulder['high'],
'left_trough': left_trough,
'right_trough': right_trough,
'neckline': (left_trough + right_trough) / 2,
'timeframe': timeframe
}
self.patterns[symbol]['head_and_shoulders'].append(pattern_data)
# Look for inverted head and shoulders (three troughs with the middle one lower)
for i in range(len(df) - 40):
# Skip if we don't have enough data ahead
if i + 40 >= len(df):
continue
window_df = df.iloc[i:i+40]
# Find swing lows in this window
swing_lows = window_df.loc[~window_df['swing_low'].isna()]
# Need at least 3 swing lows for inverted head and shoulders
if len(swing_lows) >= 3:
for j in range(len(swing_lows) - 2):
# Get three consecutive swing lows
left_shoulder = swing_lows.iloc[j]
head = swing_lows.iloc[j+1]
right_shoulder = swing_lows.iloc[j+2]
# Check if head is lower than both shoulders
if head['low'] < left_shoulder['low'] and head['low'] < right_shoulder['low']:
# Check if shoulders are similar in height (within 0.2%)
shoulder_diff_pct = abs(right_shoulder['low'] - left_shoulder['low']) / left_shoulder['low'] * 100
if shoulder_diff_pct < 0.2:
# Get indices
ls_idx = left_shoulder.name
head_idx = head.name
rs_idx = right_shoulder.name
# Check if we have at least 7 candles in both shoulders
left_candle_count = head_idx - ls_idx
right_candle_count = rs_idx - head_idx
if left_candle_count >= self.min_shoulder_candles and right_candle_count >= self.min_shoulder_candles:
# Find neckline (connecting the highs between shoulders and head)
left_peak_idx = df.loc[ls_idx:head_idx]['high'].idxmax()
left_peak = df.loc[left_peak_idx]['high']
right_peak_idx = df.loc[head_idx:rs_idx]['high'].idxmax()
right_peak = df.loc[right_peak_idx]['high']
# Check if peaks are similar (flat neckline preferred)
peak_diff_pct = abs(right_peak - left_peak) / left_peak * 100
if peak_diff_pct < 0.15:
# This is a potential inverted head and shoulders pattern
pattern_data = {
'type': 'inv_head_and_shoulders',
'left_shoulder_idx': ls_idx,
'head_idx': head_idx,
'right_shoulder_idx': rs_idx,
'left_peak_idx': left_peak_idx,
'right_peak_idx': right_peak_idx,
'left_shoulder': left_shoulder['low'],
'head': head['low'],
'right_shoulder': right_shoulder['low'],
'left_peak': left_peak,
'right_peak': right_peak,
'neckline': (left_peak + right_peak) / 2,
'timeframe': timeframe
}
self.patterns[symbol]['inv_head_and_shoulders'].append(pattern_data)
def _validate_patterns_with_ema(self, symbol, df_5min, df_15min):
"""
Validate patterns against both 5min and 15min EMAs
Double Tops/Bottoms & Head & Shoulders should only form on the EMA
"""
valid_patterns = {
'double_tops': [],
'double_bottoms': [],
'head_and_shoulders': [],
'inv_head_and_shoulders': []
}
# Validate Double Tops
for pattern in self.patterns[symbol]['double_tops']:
# Check if pattern forms near 5min EMA
first_high_idx = pattern['first_high_idx']
second_high_idx = pattern['second_high_idx']
# Skip if indices are invalid
if first_high_idx >= len(df_5min) or second_high_idx >= len(df_5min):
continue
first_high = pattern['first_high']
second_high = pattern['second_high']
first_ema = df_5min.loc[first_high_idx]['ema_200']
second_ema = df_5min.loc[second_high_idx]['ema_200']
# Calculate how close highs are to EMA (within 0.1% of EMA)
first_diff_pct = abs(first_high - first_ema) / first_ema * 100
second_diff_pct = abs(second_high - second_ema) / second_ema * 100
# Also check with 15min EMA
# Find closest 15min candle to our 5min points
first_time = df_5min.loc[first_high_idx]['timestamp']
second_time = df_5min.loc[second_high_idx]['timestamp']
closest_15min_first = df_15min.loc[(df_15min['timestamp'] - first_time).abs().idxmin()]
closest_15min_second = df_15min.loc[(df_15min['timestamp'] - second_time).abs().idxmin()]
first_15min_ema = closest_15min_first['ema_200']
second_15min_ema = closest_15min_second['ema_200']
first_15min_diff_pct = abs(first_high - first_15min_ema) / first_15min_ema * 100
second_15min_diff_pct = abs(second_high - second_15min_ema) / second_15min_ema * 100
# Pattern must be near at least one of the EMAs (5min or 15min)
if (first_diff_pct < 0.15 and second_diff_pct < 0.15) or \
(first_15min_diff_pct < 0.15 and second_15min_diff_pct < 0.15):
valid_patterns['double_tops'].append(pattern)
# Validate Double Bottoms (same logic as double tops)
for pattern in self.patterns[symbol]['double_bottoms']:
first_low_idx = pattern['first_low_idx']
second_low_idx = pattern['second_low_idx']
# Skip if indices are invalid
if first_low_idx >= len(df_5min) or second_low_idx >= len(df_5min):
continue
first_low = pattern['first_low']
second_low = pattern['second_low']
first_ema = df_5min.loc[first_low_idx]['ema_200']
second_ema = df_5min.loc[second_low_idx]['ema_200']
first_diff_pct = abs(first_low - first_ema) / first_ema * 100
second_diff_pct = abs(second_low - second_ema) / second_ema * 100
# Check with 15min EMA
first_time = df_5min.loc[first_low_idx]['timestamp']
second_time = df_5min.loc[second_low_idx]['timestamp']
closest_15min_first = df_15min.loc[(df_15min['timestamp'] - first_time).abs().idxmin()]
closest_15min_second = df_15min.loc[(df_15min['timestamp'] - second_time).abs().idxmin()]
first_15min_ema = closest_15min_first['ema_200']
second_15min_ema = closest_15min_second['ema_200']
first_15min_diff_pct = abs(first_low - first_15min_ema) / first_15min_ema * 100
second_15min_diff_pct = abs(second_low - second_15min_ema) / second_15min_ema * 100
if (first_diff_pct < 0.15 and second_diff_pct < 0.15) or \
(first_15min_diff_pct < 0.15 and second_15min_diff_pct < 0.15):
valid_patterns['double_bottoms'].append(pattern)
# Validate Head and Shoulders
for pattern in self.patterns[symbol]['head_and_shoulders']:
# For H&S, focus on neckline proximity to EMA
neckline = pattern['neckline']
ls_idx = pattern['left_shoulder_idx']
rs_idx = pattern['right_shoulder_idx']
# Skip if indices are invalid
if ls_idx >= len(df_5min) or rs_idx >= len(df_5min):
continue
# Get average EMA in the pattern region
pattern_indices = range(ls_idx, rs_idx + 1)
avg_ema_5min = df_5min.loc[pattern_indices]['ema_200'].mean()
# Calculate how close neckline is to average EMA
neckline_diff_pct = abs(neckline - avg_ema_5min) / avg_ema_5min * 100
# Also check with 15min EMA (use start and end times to find relevant 15min candles)
start_time = df_5min.loc[ls_idx]['timestamp']
end_time = df_5min.loc[rs_idx]['timestamp']
relevant_15min = df_15min[(df_15min['timestamp'] >= start_time) & (df_15min['timestamp'] <= end_time)]
if not relevant_15min.empty:
avg_ema_15min = relevant_15min['ema_200'].mean()
neckline_15min_diff_pct = abs(neckline - avg_ema_15min) / avg_ema_15min * 100
if neckline_diff_pct < 0.2 or neckline_15min_diff_pct < 0.2:
valid_patterns['head_and_shoulders'].append(pattern)
# Validate Inverted Head and Shoulders (same logic as H&S)
for pattern in self.patterns[symbol]['inv_head_and_shoulders']:
neckline = pattern['neckline']
ls_idx = pattern['left_shoulder_idx']
rs_idx = pattern['right_shoulder_idx']
# Skip if indices are invalid
if ls_idx >= len(df_5min) or rs_idx >= len(df_5min):
continue
# Get average EMA in the pattern region
pattern_indices = range(ls_idx, rs_idx + 1)
avg_ema_5min = df_5min.loc[pattern_indices]['ema_200'].mean()
# Calculate how close neckline is to average EMA
neckline_diff_pct = abs(neckline - avg_ema_5min) / avg_ema_5min * 100
# Also check with 15min EMA (use start and end times to find relevant 15min candles)
start_time = df_5min.loc[ls_idx]['timestamp']
end_time = df_5min.loc[rs_idx]['timestamp']
relevant_15min = df_15min[(df_15min['timestamp'] >= start_time) & (df_15min['timestamp'] <= end_time)]
if not relevant_15min.empty:
avg_ema_15min = relevant_15min['ema_200'].mean()
neckline_15min_diff_pct = abs(neckline - avg_ema_15min) / avg_ema_15min * 100
if neckline_diff_pct < 0.2 or neckline_15min_diff_pct < 0.2:
valid_patterns['inv_head_and_shoulders'].append(pattern)
# Update the patterns dictionary with validated patterns only
self.patterns[symbol] = valid_patterns
# Print validation results
pattern_counts = {
'double_tops': len(valid_patterns['double_tops']),
'double_bottoms': len(valid_patterns['double_bottoms']),
'head_and_shoulders': len(valid_patterns['head_and_shoulders']),
'inv_head_and_shoulders': len(valid_patterns['inv_head_and_shoulders'])
}
st.info(f"Validated patterns for {symbol}: {pattern_counts}")
def _check_dcc_confirmation(self, df, pattern_end_idx):
"""
Check for DCC (Directional Candle Confirmation) or 1 big candle compared to previous 10 candles
For Head & Shoulders patterns
"""
# Ensure we have enough data
if pattern_end_idx + 1 >= len(df) or pattern_end_idx < 10:
return False
confirmation_candle = df.iloc[pattern_end_idx + 1]
# Get previous 10 candles
start_idx = max(0, pattern_end_idx - 10)
previous_candles = df.iloc[start_idx:pattern_end_idx]
if len(previous_candles) < 5: # Need at least 5 candles for comparison
return False
# Check if confirmation candle is significantly larger than previous candles
avg_prev_size = previous_candles['candle_size_pct'].mean()
confirmation_size = confirmation_candle['candle_size_pct']
# Candle must be at least 1.5x the average size of previous candles
return confirmation_size > (1.5 * avg_prev_size)
def _check_entry_candle_size(self, df, pattern_end_idx):
"""Check if the entry candle is smaller than the maximum allowed size (0.25%)"""
if pattern_end_idx + 1 >= len(df) or pattern_end_idx < 0:
return False
entry_candle = df.iloc[pattern_end_idx + 1]
return entry_candle['candle_size_pct'] < self.max_entry_candle_size_pct
def _calculate_risk_reward(self, entry, stop_loss, take_profit):
"""Calculate risk-reward ratio"""
risk = abs(entry - stop_loss)
reward = abs(take_profit - entry)
if risk == 0: # Avoid division by zero
return 0
return reward / risk
def _check_recent_high_low(self, df, pattern_end_idx, target_price, is_buy):
"""
Check if the recent high/low is within 50% of our overall target
Avoid entry if the recent high or low is within 50% of our overall target
"""
# Get last 20 candles before pattern end
start_idx = max(0, pattern_end_idx - 20)
recent_candles = df.iloc[start_idx:pattern_end_idx]
if is_buy:
# For buy signals, check recent highs
recent_high = recent_candles['high'].max()
target_distance = abs(target_price - df.iloc[pattern_end_idx]['close'])
high_distance = abs(recent_high - df.iloc[pattern_end_idx]['close'])
# If recent high is within 50% of target distance, avoid entry
return high_distance < (0.5 * target_distance)
else:
# For sell signals, check recent lows
recent_low = recent_candles['low'].min()
target_distance = abs(df.iloc[pattern_end_idx]['close'] - target_price)
low_distance = abs(df.iloc[pattern_end_idx]['close'] - recent_low)
# If recent low is within 50% of target distance, avoid entry
return low_distance < (0.5 * target_distance)
def _generate_trade_signals(self, symbol):
"""Generate trade signals based on validated patterns"""
if symbol not in self.patterns:
return
df_5min = self.data[symbol]['5min']
# Initialize trade signals
self.trades[symbol] = []
# Process Double Tops
for pattern in self.patterns[symbol]['double_tops']:
# Get pattern details
second_high_idx = pattern['second_high_idx']
# Skip if index is invalid
if second_high_idx >= len(df_5min):
continue
valley = pattern['valley']
second_high = pattern['second_high']
# Entry point is just below the neckline (valley)
entry_price = valley * 0.999 # Slightly below neckline
# Stop loss just above the second high
stop_loss = second_high * 1.001 # Slightly above second high
# Target is typically the height of the pattern projected downward from entry
pattern_height = second_high - valley
take_profit = entry_price - pattern_height
# Calculate risk-reward ratio
rrr = self._calculate_risk_reward(entry_price, stop_loss, take_profit)
# Check entry candle size
valid_entry_size = self._check_entry_candle_size(df_5min, second_high_idx)
# Check if recent high is too close to target
avoid_recent_high = self._check_recent_high_low(df_5min, second_high_idx, take_profit, False)
# Validate trade according to PNP rules
if rrr >= self.min_rrr and valid_entry_size and not avoid_recent_high:
# Calculate stop loss percentage
stop_loss_pct = (stop_loss - entry_price) / entry_price * 100
if abs(stop_loss_pct) <= self.max_stoploss_pct:
# This is a valid trade signal
trade = {
'symbol': symbol,
'pattern_type': 'double_top',
'action': 'SELL',
'entry_price': entry_price,
'stop_loss': stop_loss,
'take_profit': take_profit,
'risk_reward_ratio': rrr,
'entry_time': df_5min.iloc[second_high_idx]['timestamp'],
'pattern_end_idx': second_high_idx,
'stop_loss_pct': abs(stop_loss_pct)
}
self.trades[symbol].append(trade)
# Process Double Bottoms
for pattern in self.patterns[symbol]['double_bottoms']:
# Get pattern details
second_low_idx = pattern['second_low_idx']
# Skip if index is invalid
if second_low_idx >= len(df_5min):
continue
peak = pattern['peak']
second_low = pattern['second_low']
# Entry point is just above the neckline (peak)
entry_price = peak * 1.001 # Slightly above neckline
# Stop loss just below the second low
stop_loss = second_low * 0.999 # Slightly below second low
# Target is typically the height of the pattern projected upward from entry
pattern_height = peak - second_low
take_profit = entry_price + pattern_height
# Calculate risk-reward ratio
rrr = self._calculate_risk_reward(entry_price, stop_loss, take_profit)
# Check entry candle size
valid_entry_size = self._check_entry_candle_size(df_5min, second_low_idx)
# Check if recent low is too close to target
avoid_recent_low = self._check_recent_high_low(df_5min, second_low_idx, take_profit, True)
# Validate trade according to PNP rules
if rrr >= self.min_rrr and valid_entry_size and not avoid_recent_low:
# Calculate stop loss percentage
stop_loss_pct = (entry_price - stop_loss) / entry_price * 100
if abs(stop_loss_pct) <= self.max_stoploss_pct:
# This is a valid trade signal
trade = {
'symbol': symbol,
'pattern_type': 'double_bottom',
'action': 'BUY',
'entry_price': entry_price,
'stop_loss': stop_loss,
'take_profit': take_profit,
'risk_reward_ratio': rrr,
'entry_time': df_5min.iloc[second_low_idx]['timestamp'],
'pattern_end_idx': second_low_idx,
'stop_loss_pct': abs(stop_loss_pct)
}
self.trades[symbol].append(trade)
# Process Head and Shoulders
for pattern in self.patterns[symbol]['head_and_shoulders']:
# Get pattern details
right_shoulder_idx = pattern['right_shoulder_idx']
# Skip if index is invalid
if right_shoulder_idx >= len(df_5min):
continue
neckline = pattern['neckline']
head = pattern['head']
# Check for DCC confirmation or big candle
has_confirmation = self._check_dcc_confirmation(df_5min, right_shoulder_idx)
if has_confirmation:
# Entry point is just below the neckline
entry_price = neckline * 0.999 # Slightly below neckline
# Stop loss just above the right shoulder
stop_loss = pattern['right_shoulder'] * 1.001
# Target is typically the height of the pattern projected downward from entry
pattern_height = head - neckline
take_profit = entry_price - pattern_height
# Calculate risk-reward ratio
rrr = self._calculate_risk_reward(entry_price, stop_loss, take_profit)
# Check entry candle size
valid_entry_size = self._check_entry_candle_size(df_5min, right_shoulder_idx)
# Check if recent high is too close to target
avoid_recent_high = self._check_recent_high_low(df_5min, right_shoulder_idx, take_profit, False)
# Validate trade according to PNP rules
if rrr >= self.min_rrr and valid_entry_size and not avoid_recent_high:
# Calculate stop loss percentage
stop_loss_pct = (stop_loss - entry_price) / entry_price * 100
if abs(stop_loss_pct) <= self.max_stoploss_pct:
# This is a valid trade signal
trade = {
'symbol': symbol,
'pattern_type': 'head_and_shoulders',
'action': 'SELL',
'entry_price': entry_price,
'stop_loss': stop_loss,
'take_profit': take_profit,
'risk_reward_ratio': rrr,
'entry_time': df_5min.iloc[right_shoulder_idx]['timestamp'],
'pattern_end_idx': right_shoulder_idx,
'stop_loss_pct': abs(stop_loss_pct)
}
self.trades[symbol].append(trade)
# Process Inverted Head and Shoulders
for pattern in self.patterns[symbol]['inv_head_and_shoulders']:
# Get pattern details
right_shoulder_idx = pattern['right_shoulder_idx']
# Skip if index is invalid
if right_shoulder_idx >= len(df_5min):
continue
neckline = pattern['neckline']
head = pattern['head']
# Check for DCC confirmation or big candle
has_confirmation = self._check_dcc_confirmation(df_5min, right_shoulder_idx)
if has_confirmation:
# Entry point is just above the neckline
entry_price = neckline * 1.001 # Slightly above neckline
# Stop loss just below the right shoulder
stop_loss = pattern['right_shoulder'] * 0.999
# Target is typically the height of the pattern projected upward from entry
pattern_height = neckline - head
take_profit = entry_price + pattern_height
# Calculate risk-reward ratio
rrr = self._calculate_risk_reward(entry_price, stop_loss, take_profit)
# Check entry candle size
valid_entry_size = self._check_entry_candle_size(df_5min, right_shoulder_idx)
# Check if recent low is too close to target
avoid_recent_low = self._check_recent_high_low(df_5min, right_shoulder_idx, take_profit, True)
# Validate trade according to PNP rules
if rrr >= self.min_rrr and valid_entry_size and not avoid_recent_low:
# Calculate stop loss percentage
stop_loss_pct = (entry_price - stop_loss) / entry_price * 100
if abs(stop_loss_pct) <= self.max_stoploss_pct:
# This is a valid trade signal
trade = {
'symbol': symbol,
'pattern_type': 'inv_head_and_shoulders',
'action': 'BUY',
'entry_price': entry_price,
'stop_loss': stop_loss,
'take_profit': take_profit,
'risk_reward_ratio': rrr,
'entry_time': df_5min.iloc[right_shoulder_idx]['timestamp'],
'pattern_end_idx': right_shoulder_idx,
'stop_loss_pct': abs(stop_loss_pct)
}
self.trades[symbol].append(trade)
st.info(f"Generated {len(self.trades[symbol])} trade signals for {symbol}")
def generate_risk_management_plan(self):
"""Generate overall risk management plan using Gemini AI"""
if not any(self.trades.values()):
st.warning("No trade signals available for risk management plan")
return None
# Prepare a summary of trade signals
trades_summary = ""
for symbol, trades in self.trades.items():
for trade in trades:
trades_summary += f"{symbol}: {trade['action']} - Entry: {trade['entry_price']:.5f}, SL: {trade['stop_loss']:.5f}, TP: {trade['take_profit']:.5f}, RRR: {trade['risk_reward_ratio']:.2f}\n"
# If no trades, return a simple message
if not trades_summary:
self.risk_management['overall_plan'] = "No valid trade signals detected based on the PNP strategy criteria."
return self.risk_management['overall_plan']
# Prepare a prompt for Gemini AI
prompt = f"""
Based on the following forex trading signals from the PNP strategy, create a comprehensive risk management plan.
The plan should detail how to manage positions according to the PNP strategy guidelines.
Current Trading Signals:
{trades_summary}
PNP Strategy Guidelines:
1. Maximum stop loss: 0.30%
2. Minimum risk-reward ratio: 1:1.5
3. Aim for 1:2 if no price action level nearby
4. Trade only between 10 AM to 10 PM
5. Patterns must form on the 200 EMA
6. Entry candle must be smaller than 0.25%
7. Don't take trades if recent high/low is within 50% of target
Please provide:
1. Maximum portfolio risk percentage (total capital at risk)
2. Position sizing rules with examples (assume a $10,000 account)
3. Criteria for adjusting stop losses
4. Rules for partial profit-taking
5. Criteria for exiting all positions (market conditions)
6. Daily/weekly loss limits that would trigger trading pause
Format your response as a structured plan with clear rules and guidelines.
"""
try:
# Query Gemini AI
with st.spinner("Generating risk management plan with AI..."):
response = model.generate_content(prompt)
risk_plan = response.text
# Store the risk management plan
self.risk_management['overall_plan'] = risk_plan
return risk_plan
except Exception as e:
st.error(f"Error generating risk management plan with Gemini AI: {e}")
# Create a basic risk management plan without AI
self.risk_management['overall_plan'] = self._create_basic_risk_plan()
return self.risk_management['overall_plan']
def _create_basic_risk_plan(self):
"""Create a basic risk management plan without using AI"""
plan = """
# PNP Strategy Risk Management Plan
## Position Sizing
- Risk no more than 1% of account on any single trade
- Calculate position size based on entry price and stop loss
- For a $10,000 account, maximum risk per trade = $100
## Multiple Positions
- Maximum correlation exposure: 15% of account
- Avoid having more than 2 pairs with same base or quote currency
## Stop Loss Management
- Initial stop loss based on pattern structure (max 0.30%)
- Move to break-even after price moves in favor by at least 1x the risk
## Profit Taking
- Take partial profits (50%) at 1x risk
- Move stop loss to break-even at this point
- Let remainder run to full target (min 1.5x risk)
## Risk Limits
- Daily loss limit: 2% of account
- Weekly loss limit: 5% of account
- Monthly loss limit: 10% of account
- If any limit is hit, stop trading for the period
## Trade Management
- Only trade during 10 AM - 10 PM
- Avoid trading during major news events
- Always stick to minimum 1.5 risk-reward ratio
"""
return plan
def plot_chart_with_patterns(self, symbol):
"""Plot a chart with detected patterns and trade signals"""
if symbol not in self.data or symbol not in self.trades:
st.warning(f"No data or trades available for {symbol}")
return
df = self.data[symbol]['5min'].copy()
# Create a figure with 3 subplots
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(14, 12), gridspec_kw={'height_ratios': [3, 1, 1]})
# Main chart with price and moving averages
ax1.plot(df['timestamp'], df['close'], label='Close Price', linewidth=2)
ax1.plot(df['timestamp'], df['ema_200'], label='200 EMA', linestyle='--', alpha=0.8)
# Add Bollinger Bands for reference
ax1.plot(df['timestamp'], df['bb_upper'], 'k--', alpha=0.3)
ax1.plot(df['timestamp'], df['bb_lower'], 'k--', alpha=0.3)
ax1.fill_between(df['timestamp'], df['bb_upper'], df['bb_lower'], alpha=0.1, color='gray')
# Highlight the trading hours (10 AM to 10 PM)
trading_hours_df = df[(df['hour'] >= self.trading_hours[0]) & (df['hour'] < self.trading_hours[1])]
if not trading_hours_df.empty:
min_price = df['low'].min() * 0.999
max_price = df['high'].max() * 1.001
# Group consecutive timestamps into ranges
dates = trading_hours_df['timestamp'].dt.date.unique()
for date in dates:
day_df = trading_hours_df[trading_hours_df['timestamp'].dt.date == date]
if not day_df.empty:
start = day_df['timestamp'].min()
end = day_df['timestamp'].max()
ax1.axvspan(start, end, alpha=0.2, color='green', label='Trading Hours' if date == dates[0] else "")
# Add trade signals
for trade in self.trades[symbol]:
pattern_end_idx = trade['pattern_end_idx']
entry_time = trade['entry_time']
entry_price = trade['entry_price']
stop_loss = trade['stop_loss']
take_profit = trade['take_profit']
action = trade['action']
pattern_type = trade['pattern_type']
rrr = trade['risk_reward_ratio']
# Set color based on action
color = 'green' if action == 'BUY' else 'red'
# Add entry point marker
ax1.scatter(entry_time, entry_price, s=100, color=color, marker='^' if action == 'BUY' else 'v', zorder=5)
# Add annotation with pattern type and RRR
ax1.annotate(f"{pattern_type}\n{action} @ {entry_price:.5f}\nRRR: {rrr:.2f}",
xy=(entry_time, entry_price),
xytext=(10, 10 if action == 'BUY' else -10),
textcoords="offset points",
arrowprops=dict(arrowstyle="->", color=color),
color=color,
fontsize=9,
bbox=dict(boxstyle="round,pad=0.3", fc="white", ec=color, lw=1))
# Add stop loss line
ax1.axhline(y=stop_loss, color='red', linestyle='--', alpha=0.5, linewidth=1)
ax1.annotate(f"SL: {stop_loss:.5f}",
xy=(df['timestamp'].iloc[-1], stop_loss),
xytext=(-50, -5),
textcoords="offset points",
color='red',
fontsize=8)
# Add take profit line
ax1.axhline(y=take_profit, color='green', linestyle='--', alpha=0.5, linewidth=1)
ax1.annotate(f"TP: {take_profit:.5f}",
xy=(df['timestamp'].iloc[-1], take_profit),
xytext=(-50, 5),
textcoords="offset points",
color='green',
fontsize=8)
# Format the timestamp on x-axis
ax1.xaxis.set_major_formatter(mdates.DateFormatter('%m-%d %H:%M'))
plt.setp(ax1.xaxis.get_majorticklabels(), rotation=45)
ax1.set_title(f'{symbol} with PNP Strategy Trading Signals', fontsize=14)
ax1.set_ylabel('Price', fontsize=10)
ax1.grid(True, alpha=0.3)
# Ensure we don't get duplicate labels in the legend
handles, labels = ax1.get_legend_handles_labels()
by_label = dict(zip(labels, handles))
ax1.legend(by_label.values(), by_label.keys(), loc='upper left', fontsize=9)
# RSI subplot
ax2.plot(df['timestamp'], df['rsi'], label='RSI', color='purple', linewidth=1)
ax2.axhline(y=70, color='red', linestyle='--', alpha=0.3)
ax2.axhline(y=30, color='green', linestyle='--', alpha=0.3)
ax2.fill_between(df['timestamp'], 70, 30, alpha=0.1, color='gray')
ax2.set_ylabel('RSI', fontsize=10)
ax2.grid(True, alpha=0.3)
ax2.legend(loc='upper left', fontsize=9)
# MACD subplot
ax3.plot(df['timestamp'], df['macd'], label='MACD', color='blue', linewidth=1)
ax3.plot(df['timestamp'], df['macd_signal'], label='Signal', color='orange', linewidth=1)
ax3.bar(df['timestamp'], df['macd_hist'], label='Histogram', alpha=0.5, color=np.where(df['macd_hist'] >= 0, 'green', 'red'))
ax3.axhline(y=0, color='black', linestyle='-', alpha=0.3)
ax3.set_ylabel('MACD', fontsize=10)
ax3.set_xlabel('Date', fontsize=10)
ax3.xaxis.set_major_formatter(mdates.DateFormatter('%m-%d %H:%M'))
plt.setp(ax3.xaxis.get_majorticklabels(), rotation=45)
ax3.grid(True, alpha=0.3)
ax3.legend(loc='upper left', fontsize=9)
plt.tight_layout()
# Display the chart in Streamlit
st.pyplot(fig)
plt.close()
def run_full_analysis(self, days_back=10):
"""Run a full analysis on all currency pairs"""
st.info("Starting PNP Strategy Forex Analysis...")
# Fetch data for all currency pairs
for pair in self.currency_pairs: # Use self.currency_pairs
st.info(f"\nFetching data for {pair}...")
success = self.fetch_forex_data(pair, days_back=days_back)
if not success:
st.warning(f"Failed to fetch data for {pair}. Skipping analysis.")
time.sleep(1) # Avoid API rate limits
# Detect patterns for all pairs
st.info("\nDetecting patterns...")
self.detect_all_patterns()
# Generate risk management plan
st.info("\nGenerating risk management plan...")
risk_plan = self.generate_risk_management_plan()
# Generate charts
st.info("\nGenerating charts...")
for pair in self.trades:
if self.trades[pair]: # Only generate charts for pairs with trades
self.plot_chart_with_patterns(pair)
# Print summary
self.print_summary()
def print_summary(self):
"""Print a summary of all analysis and recommendations"""
st.markdown("---")
st.markdown("## PNP FOREX TRADING STRATEGY SUMMARY")
st.markdown("---")
# Print trade signals
total_trades = sum(len(trades) for trades in self.trades.values())
if total_trades == 0:
st.warning("\nNo valid trade signals detected based on the PNP strategy criteria.")
else:
st.success(f"\nTotal valid trade signals: {total_trades}")
for symbol in self.trades:
if not self.trades[symbol]:
continue
st.markdown(f"### {symbol} TRADING SIGNALS ({len(self.trades[symbol])}):")
for i, trade in enumerate(self.trades[symbol], 1):
st.markdown(f"#### Signal #{i}:")
st.write(f"**Pattern Type:** {trade['pattern_type']}")
st.write(f"**Action:** {trade['action']}")
st.write(f"**Entry Price:** {trade['entry_price']:.5f}")
st.write(f"**Stop Loss:** {trade['stop_loss']:.5f} ({trade['stop_loss_pct']:.2f}%)")
st.write(f"**Take Profit:** {trade['take_profit']:.5f}")
st.write(f"**Risk-Reward Ratio:** {trade['risk_reward_ratio']:.2f}")
st.write(f"**Entry Time:** {trade['entry_time']}")
st.markdown("---")
# Print risk management plan
st.markdown("---")
st.markdown("## RISK MANAGEMENT PLAN")
st.markdown("---")
if 'overall_plan' in self.risk_management:
st.markdown(self.risk_management['overall_plan'])
else:
st.warning("No risk management plan available.")
st.markdown("---")
st.markdown("## CONCLUSION")
st.markdown("---")
st.success("The PNP Strategy analysis has been completed.")
st.info("Review the recommendations and risk management plan before placing any trades.")
st.warning("Remember that all trading involves risk, and past performance is not indicative of future results.")
# Streamlit UI
def main():
st.title("๐Ÿ“Š PNP Forex Trading Strategy Analyzer")
st.markdown("""
This app analyzes forex currency pairs using the PNP (Price and Pattern) Trading Strategy.
It detects Double Tops/Bottoms and Head & Shoulders patterns on 5-minute and 15-minute charts,
validates them against the 200 EMA, and generates trade signals with risk management parameters.
""")
# Define currency pairs to analyze
CURRENCY_PAIRS = ["USDJPY", "EURUSD", "GBPUSD", "AUDUSD"]
# Sidebar controls
st.sidebar.header("Settings")
days_back = st.sidebar.slider("Days of historical data to analyze", 1, 30, 10)
min_rrr = st.sidebar.slider("Minimum Risk-Reward Ratio", 1.0, 3.0, 1.5, 0.1)
max_stoploss_pct = st.sidebar.slider("Maximum Stop Loss (%)", 0.1, 1.0, 0.3, 0.05)
# Initialize trader with settings and currency pairs
trader = PNPForexTrader(currency_pairs=CURRENCY_PAIRS)
trader.min_rrr = min_rrr
trader.max_stoploss_pct = max_stoploss_pct
# Run analysis button
if st.sidebar.button("Run Full Analysis"):
with st.spinner("Running analysis..."):
trader.run_full_analysis(days_back=days_back)
# Display currency pairs being analyzed
st.sidebar.markdown("---")
st.sidebar.markdown("### Currency Pairs Being Analyzed")
for pair in CURRENCY_PAIRS:
st.sidebar.write(f"- {pair}")
# About section
st.sidebar.markdown("---")
st.sidebar.markdown("### About PNP Strategy")
st.sidebar.markdown("""
The PNP (Price and Pattern) Strategy focuses on:
- Double Tops/Bottoms
- Head & Shoulders patterns
- Validated against 200 EMA
- With strict risk management rules
""")
if __name__ == "__main__":
main()