quant_signal / signal_generator.py
cloudyu's picture
Create signal_generator.py
eaaed28 verified
import yfinance as yf
import talib
import pandas as pd
import numpy as np
import xgboost as xgb
import argparse
import sys
import requests
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
def parse_arguments():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(description='Stock trading signal generator')
parser.add_argument('tickers', nargs='+', help='List of stock ticker symbols to predict')
parser.add_argument('--period', default='2y', help='Historical data period (default: 2y)')
parser.add_argument('--target', help='Target stock ticker (default: first ticker)')
return parser.parse_args()
def get_news_sentiment_score(headlines, target_ticker):
if not headlines:
return 0.0, 0.0
positive_words = ['beat', 'exceed', 'strong', 'growth', 'upgrade', 'bullish', 'positive',
'record', 'surge', 'rally', 'gain', 'profit', 'success', 'innovation']
negative_words = ['miss', 'decline', 'drop', 'fall', 'downgrade', 'bearish', 'negative',
'loss', 'scandal', 'lawsuit', 'layoff', 'bankruptcy', 'crisis', 'warning']
company_keywords = [target_ticker.lower()]
if target_ticker == 'AAPL':
company_keywords.extend(['apple', 'iphone', 'mac', 'ios'])
elif target_ticker == 'MSFT':
company_keywords.extend(['microsoft', 'windows', 'azure', 'office'])
elif target_ticker == 'GOOGL':
company_keywords.extend(['google', 'alphabet', 'search', 'android', 'youtube'])
elif target_ticker == 'AMZN':
company_keywords.extend(['amazon', 'aws', 'prime', 'ecommerce'])
elif target_ticker == 'QQQ':
company_keywords.extend(['nasdaq', 'tech', 'technology', 'index'])
total_sentiment = 0
relevant_articles = 0
total_articles = len(headlines)
for headline in headlines:
headline_lower = headline.lower()
is_relevant = any(keyword in headline_lower for keyword in company_keywords)
if is_relevant:
relevant_articles += 1
pos_count = sum(1 for word in positive_words if word in headline_lower)
neg_count = sum(1 for word in negative_words if word in headline_lower)
sentiment = (pos_count - neg_count) / (pos_count + neg_count) if pos_count + neg_count > 0 else 0.0
total_sentiment += sentiment
relevance_score = relevant_articles / total_articles if total_articles > 0 else 0.0
avg_sentiment = total_sentiment / relevant_articles if relevant_articles > 0 else 0.0
return avg_sentiment, relevance_score
def fetch_breaking_news(target_ticker):
headlines = []
try:
stock = yf.Ticker(target_ticker)
news = stock.news
if news:
for i in range(5):
print(news[i]['content']['summary'])
headlines.append(news[i]['content']['summary'])
except Exception as e:
pass
return headlines
def calculate_sample_weights(df, target_col):
price_changes = df[target_col].diff().abs()
price_changes = price_changes.replace(0, np.nan).fillna(price_changes.mean())
q75, q25 = np.percentile(price_changes.dropna(), [75, 25])
iqr = q75 - q25
if iqr == 0:
iqr = price_changes.std()
normalized_vol = (price_changes - price_changes.mean()) / (iqr + 1e-8)
weights = 1 + np.clip(normalized_vol, 0, 3)
weights = weights.fillna(1.0)
return weights.values
def detect_price_manipulation(df, close_col, volume_col=None):
manipulation_signals = {}
manipulation_score = 0.0
returns = df[close_col].pct_change()
current_vol = returns.tail(5).std()
historical_vol = returns.rolling(20).std().iloc[-6]
vol_ratio = current_vol / historical_vol if historical_vol > 0 else 1.0
manipulation_signals['abnormal_volatility'] = vol_ratio > 2.0
manipulation_score += 0.2 if manipulation_signals['abnormal_volatility'] else 0
consecutive_up = 0
recent_returns = returns.tail(10)
for ret in recent_returns[::-1]:
if ret > 0:
consecutive_up += 1
else:
break
manipulation_signals['consecutive_green_days'] = consecutive_up >= 5
manipulation_score += 0.15 if manipulation_signals['consecutive_green_days'] else 0
if volume_col is not None and volume_col in df.columns:
recent_prices = df[close_col].tail(5)
recent_volumes = df[volume_col].tail(5)
price_trend = (recent_prices.iloc[-1] - recent_prices.iloc[0]) / recent_prices.iloc[0]
volume_trend = (recent_volumes.iloc[-1] - recent_volumes.iloc[0]) / recent_volumes.iloc[0]
manipulation_signals['price_volume_divergence'] = price_trend > 0.05 and volume_trend < -0.1
manipulation_score += 0.2 if manipulation_signals['price_volume_divergence'] else 0
else:
manipulation_signals['price_volume_divergence'] = False
gaps = (df[close_col] - df[close_col].shift(1)) / df[close_col].shift(1)
recent_gaps = gaps.tail(10)
large_gaps = (recent_gaps.abs() > 0.03).sum()
manipulation_signals['excessive_gaps'] = large_gaps >= 3
manipulation_score += 0.15 if manipulation_signals['excessive_gaps'] else 0
sma_20 = df[close_col].rolling(20).mean()
current_price = df[close_col].iloc[-1]
current_sma = sma_20.iloc[-1]
price_deviation = abs(current_price - current_sma) / current_sma
manipulation_signals['extreme_ma_deviation'] = price_deviation > 0.15
manipulation_score += 0.15 if manipulation_signals['extreme_ma_deviation'] else 0
rsi = talib.RSI(df[close_col], 14)
recent_rsi = rsi.tail(5)
overbought_persistent = (recent_rsi > 70).all()
manipulation_signals['persistent_overbought'] = overbought_persistent
manipulation_score += 0.15 if manipulation_signals['persistent_overbought'] else 0
manipulation_score = min(manipulation_score, 1.0)
return manipulation_score, manipulation_signals
def main():
args = parse_arguments()
target_ticker = args.target if args.target else args.tickers[0]
if target_ticker not in args.tickers:
args.tickers.append(target_ticker)
tickers = {}
for ticker in args.tickers:
if ticker.upper() == 'VIX':
tickers[ticker] = "^VIX"
elif ticker.upper() == 'TNX':
tickers[ticker] = "^TNX"
elif ticker.upper() == 'DXY':
tickers[ticker] = "DX-Y.NYB"
else:
tickers[ticker] = ticker
# Download daily data
raw_data = yf.download(list(tickers.values()), period=args.period, progress=False)
if raw_data.empty:
print("Error: Failed to download data")
sys.exit(1)
# Fetch news
news_headlines = fetch_breaking_news(target_ticker)
news_sentiment, news_relevance = get_news_sentiment_score(news_headlines, target_ticker)
# Prepare training data
training_data = raw_data.iloc[:-1]
latest_target_price = raw_data['Close'][target_ticker].iloc[-1]
latest_date = raw_data.index[-1]
df = pd.DataFrame(index=training_data.index)
df[f'{target_ticker}_Open'] = training_data['Open'][target_ticker]
df[f'{target_ticker}_High'] = training_data['High'][target_ticker]
df[f'{target_ticker}_Low'] = training_data['Low'][target_ticker]
df[f'{target_ticker}_Close'] = training_data['Close'][target_ticker]
df[f'{target_ticker}_Volume'] = training_data['Volume'][target_ticker]
for ticker, yf_symbol in tickers.items():
if ticker != target_ticker:
df[f'{ticker}_Close'] = training_data['Close'][yf_symbol]
df = df.ffill().dropna()
# Technical indicators
close_col = f'{target_ticker}_Close'
high_col = f'{target_ticker}_High'
low_col = f'{target_ticker}_Low'
volume_col = f'{target_ticker}_Volume'
df['RSI'] = talib.RSI(df[close_col], 14)
df['MACD'], df['MACD_signal'], _ = talib.MACD(df[close_col])
df['SMA_20'] = talib.SMA(df[close_col], 20)
df['SMA_50'] = talib.SMA(df[close_col], 50)
df['ATR'] = talib.ATR(df[high_col], df[low_col], df[close_col], 14)
df['Vol_10'] = df[close_col].pct_change().rolling(10).std()
# Cross-market features
for ticker in tickers.keys():
if ticker != target_ticker:
if ticker.upper() == 'VIX':
df['VIX_Rank'] = df[f'{ticker}_Close'].rolling(126).rank(pct=True) * 100
df['VIX_Slope'] = df[f'{ticker}_Close'].diff(5)
df['VIX_Sustained_High'] = ((df[f'{ticker}_Close'] > 20) &
(df[f'{ticker}_Close'] > df[f'{ticker}_Close'].rolling(10).mean())).astype(int)
elif ticker.upper() == 'TNX':
df['TNX_SMA_20'] = talib.SMA(df[f'{ticker}_Close'], 20)
df['TNX_Rising'] = (df[f'{ticker}_Close'] > df['TNX_SMA_20']).astype(int)
df['TNX_Accel'] = df[f'{ticker}_Close'].diff(5)
elif ticker.upper() == 'DXY':
df['DXY_SMA_50'] = talib.SMA(df[f'{ticker}_Close'], 50)
df['USD_Strength'] = (df[f'{ticker}_Close'] > df['DXY_SMA_50']).astype(int)
df['DXY_Slope'] = df[f'{ticker}_Close'].diff(5)
else:
df[f'{target_ticker}_{ticker}_Ratio'] = df[close_col] / df[f'{ticker}_Close']
df[f'{target_ticker}_{ticker}_Ratio_SMA'] = talib.SMA(df[f'{target_ticker}_{ticker}_Ratio'].values, 20)
df[f'{ticker}_Trend_Up'] = (df[f'{ticker}_Close'] > df[f'{ticker}_Close'].rolling(50).mean()).astype(int)
# Create target
df['Next_Return'] = df[close_col].pct_change().shift(-1)
df['Target'] = (df['Next_Return'] > 0).astype(int)
df_for_model = df.dropna().copy()
feature_cols = [col for col in df.columns if f'{target_ticker}_' not in col and col not in ['Next_Return', 'Target']]
if len(df_for_model) < 50:
raise ValueError(f"Insufficient training {len(df_for_model)} rows")
# Train model
sample_weights = calculate_sample_weights(df_for_model, close_col)
model_params = {
'n_estimators': 5, 'max_depth': 3, 'learning_rate': 0.01, 'subsample': 0.8,
'colsample_bytree': 0.8, 'random_state': 42, 'eval_metric': 'logloss', 'use_label_encoder': False
}
final_model = xgb.XGBClassifier(**model_params)
final_model.fit(df_for_model[feature_cols], df_for_model['Target'], sample_weight=sample_weights)
# Prepare prediction features
prediction_features_df = pd.DataFrame(index=[raw_data.index[-2]])
prediction_features_df[f'{target_ticker}_Open'] = raw_data['Open'][target_ticker].iloc[-2]
prediction_features_df[f'{target_ticker}_High'] = raw_data['High'][target_ticker].iloc[-2]
prediction_features_df[f'{target_ticker}_Low'] = raw_data['Low'][target_ticker].iloc[-2]
prediction_features_df[f'{target_ticker}_Close'] = raw_data['Close'][target_ticker].iloc[-2]
prediction_features_df[f'{target_ticker}_Volume'] = raw_data['Volume'][target_ticker].iloc[-2]
for ticker, yf_symbol in tickers.items():
if ticker != target_ticker:
prediction_features_df[f'{ticker}_Close'] = raw_data['Close'][yf_symbol].iloc[-2]
prediction_features_df['RSI'] = df['RSI'].iloc[-1]
prediction_features_df['MACD'] = df['MACD'].iloc[-1]
prediction_features_df['MACD_signal'] = df['MACD_signal'].iloc[-1]
prediction_features_df['SMA_20'] = df['SMA_20'].iloc[-1]
prediction_features_df['SMA_50'] = df['SMA_50'].iloc[-1]
prediction_features_df['ATR'] = df['ATR'].iloc[-1]
prediction_features_df['Vol_10'] = df['Vol_10'].iloc[-1]
for ticker in tickers.keys():
if ticker != target_ticker:
if ticker.upper() == 'VIX':
prediction_features_df['VIX_Rank'] = df['VIX_Rank'].iloc[-1]
prediction_features_df['VIX_Slope'] = df['VIX_Slope'].iloc[-1]
prediction_features_df['VIX_Sustained_High'] = df['VIX_Sustained_High'].iloc[-1]
elif ticker.upper() == 'TNX':
prediction_features_df['TNX_SMA_20'] = df['TNX_SMA_20'].iloc[-1]
prediction_features_df['TNX_Rising'] = df['TNX_Rising'].iloc[-1]
prediction_features_df['TNX_Accel'] = df['TNX_Accel'].iloc[-1]
elif ticker.upper() == 'DXY':
prediction_features_df['DXY_SMA_50'] = df['DXY_SMA_50'].iloc[-1]
prediction_features_df['USD_Strength'] = df['USD_Strength'].iloc[-1]
prediction_features_df['DXY_Slope'] = df['DXY_Slope'].iloc[-1]
else:
ratio_val = raw_data['Close'][target_ticker].iloc[-2] / raw_data['Close'][yf_symbol].iloc[-2]
prediction_features_df[f'{target_ticker}_{ticker}_Ratio'] = ratio_val
prediction_features_df[f'{target_ticker}_{ticker}_Ratio_SMA'] = df[f'{target_ticker}_{ticker}_Ratio_SMA'].iloc[-1]
prediction_features_df[f'{ticker}_Trend_Up'] = df[f'{ticker}_Trend_Up'].iloc[-1]
pred_features = prediction_features_df[feature_cols].iloc[0:1]
base_signal = int(final_model.predict(pred_features)[0])
# Manipulation detection
target_stock_series = pd.DataFrame(index=raw_data.index)
target_stock_series['Close'] = raw_data['Close'][target_ticker]
target_stock_series['Volume'] = raw_data['Volume'][target_ticker]
manipulation_score, _ = detect_price_manipulation(target_stock_series, 'Close', 'Volume')
# News override
final_signal = base_signal
if news_relevance > 0.3 and abs(news_sentiment) > 0.5:
if news_sentiment < -0.7:
final_signal = 0
elif news_sentiment > 0.7:
final_signal = 1
# Manipulation override
if manipulation_score >= 0.5 and base_signal == 1:
final_signal = 0
# Calculate price range
vol_10 = df['Vol_10'].iloc[-1]
expected_move = latest_target_price * vol_10 if pd.notna(vol_10) else latest_target_price * 0.02
if news_relevance > 0.3:
news_multiplier = 1.0 + abs(news_sentiment) * news_relevance
expected_move *= news_multiplier
upper_target = latest_target_price + expected_move
lower_target = latest_target_price - expected_move
# **SIMPLIFIED OUTPUT - ALWAYS SHOW PRICE RANGE**
print(f"{target_ticker} | {latest_date.strftime('%Y-%m-%d')} | ${latest_target_price:.2f}")
if manipulation_score >= 0.7:
print(f"SIGNAL: AVOID | Range: ${lower_target:.2f} - ${upper_target:.2f} (High manipulation risk)")
elif final_signal == 1:
print(f"SIGNAL: BUY | Range: ${lower_target:.2f} - ${upper_target:.2f} | Target: ${upper_target:.2f}")
else:
print(f"SIGNAL: HOLD CASH | Range: ${lower_target:.2f} - ${upper_target:.2f}")
if __name__ == "__main__":
main()
print("Disclaimer: This is for informational purposes only and does not constitute investment advice.")