|
|
import yfinance as yf |
|
|
import talib |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import xgboost as xgb |
|
|
import argparse |
|
|
import sys |
|
|
import requests |
|
|
from datetime import datetime, timedelta |
|
|
import warnings |
|
|
warnings.filterwarnings('ignore') |
|
|
|
|
|
def parse_arguments(): |
|
|
"""Parse command line arguments""" |
|
|
parser = argparse.ArgumentParser(description='Stock trading signal generator') |
|
|
parser.add_argument('tickers', nargs='+', help='List of stock ticker symbols to predict') |
|
|
parser.add_argument('--period', default='2y', help='Historical data period (default: 2y)') |
|
|
parser.add_argument('--target', help='Target stock ticker (default: first ticker)') |
|
|
return parser.parse_args() |
|
|
|
|
|
def get_news_sentiment_score(headlines, target_ticker): |
|
|
if not headlines: |
|
|
return 0.0, 0.0 |
|
|
|
|
|
positive_words = ['beat', 'exceed', 'strong', 'growth', 'upgrade', 'bullish', 'positive', |
|
|
'record', 'surge', 'rally', 'gain', 'profit', 'success', 'innovation'] |
|
|
negative_words = ['miss', 'decline', 'drop', 'fall', 'downgrade', 'bearish', 'negative', |
|
|
'loss', 'scandal', 'lawsuit', 'layoff', 'bankruptcy', 'crisis', 'warning'] |
|
|
|
|
|
company_keywords = [target_ticker.lower()] |
|
|
if target_ticker == 'AAPL': |
|
|
company_keywords.extend(['apple', 'iphone', 'mac', 'ios']) |
|
|
elif target_ticker == 'MSFT': |
|
|
company_keywords.extend(['microsoft', 'windows', 'azure', 'office']) |
|
|
elif target_ticker == 'GOOGL': |
|
|
company_keywords.extend(['google', 'alphabet', 'search', 'android', 'youtube']) |
|
|
elif target_ticker == 'AMZN': |
|
|
company_keywords.extend(['amazon', 'aws', 'prime', 'ecommerce']) |
|
|
elif target_ticker == 'QQQ': |
|
|
company_keywords.extend(['nasdaq', 'tech', 'technology', 'index']) |
|
|
|
|
|
total_sentiment = 0 |
|
|
relevant_articles = 0 |
|
|
total_articles = len(headlines) |
|
|
|
|
|
for headline in headlines: |
|
|
headline_lower = headline.lower() |
|
|
is_relevant = any(keyword in headline_lower for keyword in company_keywords) |
|
|
|
|
|
if is_relevant: |
|
|
relevant_articles += 1 |
|
|
pos_count = sum(1 for word in positive_words if word in headline_lower) |
|
|
neg_count = sum(1 for word in negative_words if word in headline_lower) |
|
|
sentiment = (pos_count - neg_count) / (pos_count + neg_count) if pos_count + neg_count > 0 else 0.0 |
|
|
total_sentiment += sentiment |
|
|
|
|
|
relevance_score = relevant_articles / total_articles if total_articles > 0 else 0.0 |
|
|
avg_sentiment = total_sentiment / relevant_articles if relevant_articles > 0 else 0.0 |
|
|
|
|
|
return avg_sentiment, relevance_score |
|
|
|
|
|
def fetch_breaking_news(target_ticker): |
|
|
headlines = [] |
|
|
try: |
|
|
stock = yf.Ticker(target_ticker) |
|
|
news = stock.news |
|
|
if news: |
|
|
for i in range(5): |
|
|
print(news[i]['content']['summary']) |
|
|
headlines.append(news[i]['content']['summary']) |
|
|
except Exception as e: |
|
|
pass |
|
|
return headlines |
|
|
|
|
|
def calculate_sample_weights(df, target_col): |
|
|
price_changes = df[target_col].diff().abs() |
|
|
price_changes = price_changes.replace(0, np.nan).fillna(price_changes.mean()) |
|
|
|
|
|
q75, q25 = np.percentile(price_changes.dropna(), [75, 25]) |
|
|
iqr = q75 - q25 |
|
|
if iqr == 0: |
|
|
iqr = price_changes.std() |
|
|
|
|
|
normalized_vol = (price_changes - price_changes.mean()) / (iqr + 1e-8) |
|
|
weights = 1 + np.clip(normalized_vol, 0, 3) |
|
|
weights = weights.fillna(1.0) |
|
|
|
|
|
return weights.values |
|
|
|
|
|
def detect_price_manipulation(df, close_col, volume_col=None): |
|
|
manipulation_signals = {} |
|
|
manipulation_score = 0.0 |
|
|
|
|
|
returns = df[close_col].pct_change() |
|
|
current_vol = returns.tail(5).std() |
|
|
historical_vol = returns.rolling(20).std().iloc[-6] |
|
|
vol_ratio = current_vol / historical_vol if historical_vol > 0 else 1.0 |
|
|
manipulation_signals['abnormal_volatility'] = vol_ratio > 2.0 |
|
|
manipulation_score += 0.2 if manipulation_signals['abnormal_volatility'] else 0 |
|
|
|
|
|
consecutive_up = 0 |
|
|
recent_returns = returns.tail(10) |
|
|
for ret in recent_returns[::-1]: |
|
|
if ret > 0: |
|
|
consecutive_up += 1 |
|
|
else: |
|
|
break |
|
|
manipulation_signals['consecutive_green_days'] = consecutive_up >= 5 |
|
|
manipulation_score += 0.15 if manipulation_signals['consecutive_green_days'] else 0 |
|
|
|
|
|
if volume_col is not None and volume_col in df.columns: |
|
|
recent_prices = df[close_col].tail(5) |
|
|
recent_volumes = df[volume_col].tail(5) |
|
|
price_trend = (recent_prices.iloc[-1] - recent_prices.iloc[0]) / recent_prices.iloc[0] |
|
|
volume_trend = (recent_volumes.iloc[-1] - recent_volumes.iloc[0]) / recent_volumes.iloc[0] |
|
|
manipulation_signals['price_volume_divergence'] = price_trend > 0.05 and volume_trend < -0.1 |
|
|
manipulation_score += 0.2 if manipulation_signals['price_volume_divergence'] else 0 |
|
|
else: |
|
|
manipulation_signals['price_volume_divergence'] = False |
|
|
|
|
|
gaps = (df[close_col] - df[close_col].shift(1)) / df[close_col].shift(1) |
|
|
recent_gaps = gaps.tail(10) |
|
|
large_gaps = (recent_gaps.abs() > 0.03).sum() |
|
|
manipulation_signals['excessive_gaps'] = large_gaps >= 3 |
|
|
manipulation_score += 0.15 if manipulation_signals['excessive_gaps'] else 0 |
|
|
|
|
|
sma_20 = df[close_col].rolling(20).mean() |
|
|
current_price = df[close_col].iloc[-1] |
|
|
current_sma = sma_20.iloc[-1] |
|
|
price_deviation = abs(current_price - current_sma) / current_sma |
|
|
manipulation_signals['extreme_ma_deviation'] = price_deviation > 0.15 |
|
|
manipulation_score += 0.15 if manipulation_signals['extreme_ma_deviation'] else 0 |
|
|
|
|
|
rsi = talib.RSI(df[close_col], 14) |
|
|
recent_rsi = rsi.tail(5) |
|
|
overbought_persistent = (recent_rsi > 70).all() |
|
|
manipulation_signals['persistent_overbought'] = overbought_persistent |
|
|
manipulation_score += 0.15 if manipulation_signals['persistent_overbought'] else 0 |
|
|
|
|
|
manipulation_score = min(manipulation_score, 1.0) |
|
|
return manipulation_score, manipulation_signals |
|
|
|
|
|
def main(): |
|
|
args = parse_arguments() |
|
|
target_ticker = args.target if args.target else args.tickers[0] |
|
|
|
|
|
if target_ticker not in args.tickers: |
|
|
args.tickers.append(target_ticker) |
|
|
|
|
|
tickers = {} |
|
|
for ticker in args.tickers: |
|
|
if ticker.upper() == 'VIX': |
|
|
tickers[ticker] = "^VIX" |
|
|
elif ticker.upper() == 'TNX': |
|
|
tickers[ticker] = "^TNX" |
|
|
elif ticker.upper() == 'DXY': |
|
|
tickers[ticker] = "DX-Y.NYB" |
|
|
else: |
|
|
tickers[ticker] = ticker |
|
|
|
|
|
|
|
|
raw_data = yf.download(list(tickers.values()), period=args.period, progress=False) |
|
|
if raw_data.empty: |
|
|
print("Error: Failed to download data") |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
news_headlines = fetch_breaking_news(target_ticker) |
|
|
news_sentiment, news_relevance = get_news_sentiment_score(news_headlines, target_ticker) |
|
|
|
|
|
|
|
|
training_data = raw_data.iloc[:-1] |
|
|
latest_target_price = raw_data['Close'][target_ticker].iloc[-1] |
|
|
latest_date = raw_data.index[-1] |
|
|
|
|
|
df = pd.DataFrame(index=training_data.index) |
|
|
df[f'{target_ticker}_Open'] = training_data['Open'][target_ticker] |
|
|
df[f'{target_ticker}_High'] = training_data['High'][target_ticker] |
|
|
df[f'{target_ticker}_Low'] = training_data['Low'][target_ticker] |
|
|
df[f'{target_ticker}_Close'] = training_data['Close'][target_ticker] |
|
|
df[f'{target_ticker}_Volume'] = training_data['Volume'][target_ticker] |
|
|
|
|
|
for ticker, yf_symbol in tickers.items(): |
|
|
if ticker != target_ticker: |
|
|
df[f'{ticker}_Close'] = training_data['Close'][yf_symbol] |
|
|
|
|
|
df = df.ffill().dropna() |
|
|
|
|
|
|
|
|
close_col = f'{target_ticker}_Close' |
|
|
high_col = f'{target_ticker}_High' |
|
|
low_col = f'{target_ticker}_Low' |
|
|
volume_col = f'{target_ticker}_Volume' |
|
|
|
|
|
df['RSI'] = talib.RSI(df[close_col], 14) |
|
|
df['MACD'], df['MACD_signal'], _ = talib.MACD(df[close_col]) |
|
|
df['SMA_20'] = talib.SMA(df[close_col], 20) |
|
|
df['SMA_50'] = talib.SMA(df[close_col], 50) |
|
|
df['ATR'] = talib.ATR(df[high_col], df[low_col], df[close_col], 14) |
|
|
df['Vol_10'] = df[close_col].pct_change().rolling(10).std() |
|
|
|
|
|
|
|
|
for ticker in tickers.keys(): |
|
|
if ticker != target_ticker: |
|
|
if ticker.upper() == 'VIX': |
|
|
df['VIX_Rank'] = df[f'{ticker}_Close'].rolling(126).rank(pct=True) * 100 |
|
|
df['VIX_Slope'] = df[f'{ticker}_Close'].diff(5) |
|
|
df['VIX_Sustained_High'] = ((df[f'{ticker}_Close'] > 20) & |
|
|
(df[f'{ticker}_Close'] > df[f'{ticker}_Close'].rolling(10).mean())).astype(int) |
|
|
elif ticker.upper() == 'TNX': |
|
|
df['TNX_SMA_20'] = talib.SMA(df[f'{ticker}_Close'], 20) |
|
|
df['TNX_Rising'] = (df[f'{ticker}_Close'] > df['TNX_SMA_20']).astype(int) |
|
|
df['TNX_Accel'] = df[f'{ticker}_Close'].diff(5) |
|
|
elif ticker.upper() == 'DXY': |
|
|
df['DXY_SMA_50'] = talib.SMA(df[f'{ticker}_Close'], 50) |
|
|
df['USD_Strength'] = (df[f'{ticker}_Close'] > df['DXY_SMA_50']).astype(int) |
|
|
df['DXY_Slope'] = df[f'{ticker}_Close'].diff(5) |
|
|
else: |
|
|
df[f'{target_ticker}_{ticker}_Ratio'] = df[close_col] / df[f'{ticker}_Close'] |
|
|
df[f'{target_ticker}_{ticker}_Ratio_SMA'] = talib.SMA(df[f'{target_ticker}_{ticker}_Ratio'].values, 20) |
|
|
df[f'{ticker}_Trend_Up'] = (df[f'{ticker}_Close'] > df[f'{ticker}_Close'].rolling(50).mean()).astype(int) |
|
|
|
|
|
|
|
|
df['Next_Return'] = df[close_col].pct_change().shift(-1) |
|
|
df['Target'] = (df['Next_Return'] > 0).astype(int) |
|
|
df_for_model = df.dropna().copy() |
|
|
|
|
|
feature_cols = [col for col in df.columns if f'{target_ticker}_' not in col and col not in ['Next_Return', 'Target']] |
|
|
|
|
|
if len(df_for_model) < 50: |
|
|
raise ValueError(f"Insufficient training {len(df_for_model)} rows") |
|
|
|
|
|
|
|
|
sample_weights = calculate_sample_weights(df_for_model, close_col) |
|
|
model_params = { |
|
|
'n_estimators': 5, 'max_depth': 3, 'learning_rate': 0.01, 'subsample': 0.8, |
|
|
'colsample_bytree': 0.8, 'random_state': 42, 'eval_metric': 'logloss', 'use_label_encoder': False |
|
|
} |
|
|
|
|
|
final_model = xgb.XGBClassifier(**model_params) |
|
|
final_model.fit(df_for_model[feature_cols], df_for_model['Target'], sample_weight=sample_weights) |
|
|
|
|
|
|
|
|
prediction_features_df = pd.DataFrame(index=[raw_data.index[-2]]) |
|
|
prediction_features_df[f'{target_ticker}_Open'] = raw_data['Open'][target_ticker].iloc[-2] |
|
|
prediction_features_df[f'{target_ticker}_High'] = raw_data['High'][target_ticker].iloc[-2] |
|
|
prediction_features_df[f'{target_ticker}_Low'] = raw_data['Low'][target_ticker].iloc[-2] |
|
|
prediction_features_df[f'{target_ticker}_Close'] = raw_data['Close'][target_ticker].iloc[-2] |
|
|
prediction_features_df[f'{target_ticker}_Volume'] = raw_data['Volume'][target_ticker].iloc[-2] |
|
|
|
|
|
for ticker, yf_symbol in tickers.items(): |
|
|
if ticker != target_ticker: |
|
|
prediction_features_df[f'{ticker}_Close'] = raw_data['Close'][yf_symbol].iloc[-2] |
|
|
|
|
|
prediction_features_df['RSI'] = df['RSI'].iloc[-1] |
|
|
prediction_features_df['MACD'] = df['MACD'].iloc[-1] |
|
|
prediction_features_df['MACD_signal'] = df['MACD_signal'].iloc[-1] |
|
|
prediction_features_df['SMA_20'] = df['SMA_20'].iloc[-1] |
|
|
prediction_features_df['SMA_50'] = df['SMA_50'].iloc[-1] |
|
|
prediction_features_df['ATR'] = df['ATR'].iloc[-1] |
|
|
prediction_features_df['Vol_10'] = df['Vol_10'].iloc[-1] |
|
|
|
|
|
for ticker in tickers.keys(): |
|
|
if ticker != target_ticker: |
|
|
if ticker.upper() == 'VIX': |
|
|
prediction_features_df['VIX_Rank'] = df['VIX_Rank'].iloc[-1] |
|
|
prediction_features_df['VIX_Slope'] = df['VIX_Slope'].iloc[-1] |
|
|
prediction_features_df['VIX_Sustained_High'] = df['VIX_Sustained_High'].iloc[-1] |
|
|
elif ticker.upper() == 'TNX': |
|
|
prediction_features_df['TNX_SMA_20'] = df['TNX_SMA_20'].iloc[-1] |
|
|
prediction_features_df['TNX_Rising'] = df['TNX_Rising'].iloc[-1] |
|
|
prediction_features_df['TNX_Accel'] = df['TNX_Accel'].iloc[-1] |
|
|
elif ticker.upper() == 'DXY': |
|
|
prediction_features_df['DXY_SMA_50'] = df['DXY_SMA_50'].iloc[-1] |
|
|
prediction_features_df['USD_Strength'] = df['USD_Strength'].iloc[-1] |
|
|
prediction_features_df['DXY_Slope'] = df['DXY_Slope'].iloc[-1] |
|
|
else: |
|
|
ratio_val = raw_data['Close'][target_ticker].iloc[-2] / raw_data['Close'][yf_symbol].iloc[-2] |
|
|
prediction_features_df[f'{target_ticker}_{ticker}_Ratio'] = ratio_val |
|
|
prediction_features_df[f'{target_ticker}_{ticker}_Ratio_SMA'] = df[f'{target_ticker}_{ticker}_Ratio_SMA'].iloc[-1] |
|
|
prediction_features_df[f'{ticker}_Trend_Up'] = df[f'{ticker}_Trend_Up'].iloc[-1] |
|
|
|
|
|
pred_features = prediction_features_df[feature_cols].iloc[0:1] |
|
|
base_signal = int(final_model.predict(pred_features)[0]) |
|
|
|
|
|
|
|
|
target_stock_series = pd.DataFrame(index=raw_data.index) |
|
|
target_stock_series['Close'] = raw_data['Close'][target_ticker] |
|
|
target_stock_series['Volume'] = raw_data['Volume'][target_ticker] |
|
|
manipulation_score, _ = detect_price_manipulation(target_stock_series, 'Close', 'Volume') |
|
|
|
|
|
|
|
|
final_signal = base_signal |
|
|
if news_relevance > 0.3 and abs(news_sentiment) > 0.5: |
|
|
if news_sentiment < -0.7: |
|
|
final_signal = 0 |
|
|
elif news_sentiment > 0.7: |
|
|
final_signal = 1 |
|
|
|
|
|
|
|
|
if manipulation_score >= 0.5 and base_signal == 1: |
|
|
final_signal = 0 |
|
|
|
|
|
|
|
|
vol_10 = df['Vol_10'].iloc[-1] |
|
|
expected_move = latest_target_price * vol_10 if pd.notna(vol_10) else latest_target_price * 0.02 |
|
|
|
|
|
if news_relevance > 0.3: |
|
|
news_multiplier = 1.0 + abs(news_sentiment) * news_relevance |
|
|
expected_move *= news_multiplier |
|
|
|
|
|
upper_target = latest_target_price + expected_move |
|
|
lower_target = latest_target_price - expected_move |
|
|
|
|
|
|
|
|
print(f"{target_ticker} | {latest_date.strftime('%Y-%m-%d')} | ${latest_target_price:.2f}") |
|
|
|
|
|
if manipulation_score >= 0.7: |
|
|
print(f"SIGNAL: AVOID | Range: ${lower_target:.2f} - ${upper_target:.2f} (High manipulation risk)") |
|
|
elif final_signal == 1: |
|
|
print(f"SIGNAL: BUY | Range: ${lower_target:.2f} - ${upper_target:.2f} | Target: ${upper_target:.2f}") |
|
|
else: |
|
|
print(f"SIGNAL: HOLD CASH | Range: ${lower_target:.2f} - ${upper_target:.2f}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
print("Disclaimer: This is for informational purposes only and does not constitute investment advice.") |
|
|
|