import yfinance as yf import talib import pandas as pd import numpy as np import xgboost as xgb import argparse import sys import requests from datetime import datetime, timedelta import warnings warnings.filterwarnings('ignore') def parse_arguments(): """Parse command line arguments""" parser = argparse.ArgumentParser(description='Stock trading signal generator') parser.add_argument('tickers', nargs='+', help='List of stock ticker symbols to predict') parser.add_argument('--period', default='2y', help='Historical data period (default: 2y)') parser.add_argument('--target', help='Target stock ticker (default: first ticker)') return parser.parse_args() def get_news_sentiment_score(headlines, target_ticker): if not headlines: return 0.0, 0.0 positive_words = ['beat', 'exceed', 'strong', 'growth', 'upgrade', 'bullish', 'positive', 'record', 'surge', 'rally', 'gain', 'profit', 'success', 'innovation'] negative_words = ['miss', 'decline', 'drop', 'fall', 'downgrade', 'bearish', 'negative', 'loss', 'scandal', 'lawsuit', 'layoff', 'bankruptcy', 'crisis', 'warning'] company_keywords = [target_ticker.lower()] if target_ticker == 'AAPL': company_keywords.extend(['apple', 'iphone', 'mac', 'ios']) elif target_ticker == 'MSFT': company_keywords.extend(['microsoft', 'windows', 'azure', 'office']) elif target_ticker == 'GOOGL': company_keywords.extend(['google', 'alphabet', 'search', 'android', 'youtube']) elif target_ticker == 'AMZN': company_keywords.extend(['amazon', 'aws', 'prime', 'ecommerce']) elif target_ticker == 'QQQ': company_keywords.extend(['nasdaq', 'tech', 'technology', 'index']) total_sentiment = 0 relevant_articles = 0 total_articles = len(headlines) for headline in headlines: headline_lower = headline.lower() is_relevant = any(keyword in headline_lower for keyword in company_keywords) if is_relevant: relevant_articles += 1 pos_count = sum(1 for word in positive_words if word in headline_lower) neg_count = sum(1 for word in negative_words if word in headline_lower) sentiment = (pos_count - neg_count) / (pos_count + neg_count) if pos_count + neg_count > 0 else 0.0 total_sentiment += sentiment relevance_score = relevant_articles / total_articles if total_articles > 0 else 0.0 avg_sentiment = total_sentiment / relevant_articles if relevant_articles > 0 else 0.0 return avg_sentiment, relevance_score def fetch_breaking_news(target_ticker): headlines = [] try: stock = yf.Ticker(target_ticker) news = stock.news if news: for i in range(5): print(news[i]['content']['summary']) headlines.append(news[i]['content']['summary']) except Exception as e: pass return headlines def calculate_sample_weights(df, target_col): price_changes = df[target_col].diff().abs() price_changes = price_changes.replace(0, np.nan).fillna(price_changes.mean()) q75, q25 = np.percentile(price_changes.dropna(), [75, 25]) iqr = q75 - q25 if iqr == 0: iqr = price_changes.std() normalized_vol = (price_changes - price_changes.mean()) / (iqr + 1e-8) weights = 1 + np.clip(normalized_vol, 0, 3) weights = weights.fillna(1.0) return weights.values def detect_price_manipulation(df, close_col, volume_col=None): manipulation_signals = {} manipulation_score = 0.0 returns = df[close_col].pct_change() current_vol = returns.tail(5).std() historical_vol = returns.rolling(20).std().iloc[-6] vol_ratio = current_vol / historical_vol if historical_vol > 0 else 1.0 manipulation_signals['abnormal_volatility'] = vol_ratio > 2.0 manipulation_score += 0.2 if manipulation_signals['abnormal_volatility'] else 0 consecutive_up = 0 recent_returns = returns.tail(10) for ret in recent_returns[::-1]: if ret > 0: consecutive_up += 1 else: break manipulation_signals['consecutive_green_days'] = consecutive_up >= 5 manipulation_score += 0.15 if manipulation_signals['consecutive_green_days'] else 0 if volume_col is not None and volume_col in df.columns: recent_prices = df[close_col].tail(5) recent_volumes = df[volume_col].tail(5) price_trend = (recent_prices.iloc[-1] - recent_prices.iloc[0]) / recent_prices.iloc[0] volume_trend = (recent_volumes.iloc[-1] - recent_volumes.iloc[0]) / recent_volumes.iloc[0] manipulation_signals['price_volume_divergence'] = price_trend > 0.05 and volume_trend < -0.1 manipulation_score += 0.2 if manipulation_signals['price_volume_divergence'] else 0 else: manipulation_signals['price_volume_divergence'] = False gaps = (df[close_col] - df[close_col].shift(1)) / df[close_col].shift(1) recent_gaps = gaps.tail(10) large_gaps = (recent_gaps.abs() > 0.03).sum() manipulation_signals['excessive_gaps'] = large_gaps >= 3 manipulation_score += 0.15 if manipulation_signals['excessive_gaps'] else 0 sma_20 = df[close_col].rolling(20).mean() current_price = df[close_col].iloc[-1] current_sma = sma_20.iloc[-1] price_deviation = abs(current_price - current_sma) / current_sma manipulation_signals['extreme_ma_deviation'] = price_deviation > 0.15 manipulation_score += 0.15 if manipulation_signals['extreme_ma_deviation'] else 0 rsi = talib.RSI(df[close_col], 14) recent_rsi = rsi.tail(5) overbought_persistent = (recent_rsi > 70).all() manipulation_signals['persistent_overbought'] = overbought_persistent manipulation_score += 0.15 if manipulation_signals['persistent_overbought'] else 0 manipulation_score = min(manipulation_score, 1.0) return manipulation_score, manipulation_signals def main(): args = parse_arguments() target_ticker = args.target if args.target else args.tickers[0] if target_ticker not in args.tickers: args.tickers.append(target_ticker) tickers = {} for ticker in args.tickers: if ticker.upper() == 'VIX': tickers[ticker] = "^VIX" elif ticker.upper() == 'TNX': tickers[ticker] = "^TNX" elif ticker.upper() == 'DXY': tickers[ticker] = "DX-Y.NYB" else: tickers[ticker] = ticker # Download daily data raw_data = yf.download(list(tickers.values()), period=args.period, progress=False) if raw_data.empty: print("Error: Failed to download data") sys.exit(1) # Fetch news news_headlines = fetch_breaking_news(target_ticker) news_sentiment, news_relevance = get_news_sentiment_score(news_headlines, target_ticker) # Prepare training data training_data = raw_data.iloc[:-1] latest_target_price = raw_data['Close'][target_ticker].iloc[-1] latest_date = raw_data.index[-1] df = pd.DataFrame(index=training_data.index) df[f'{target_ticker}_Open'] = training_data['Open'][target_ticker] df[f'{target_ticker}_High'] = training_data['High'][target_ticker] df[f'{target_ticker}_Low'] = training_data['Low'][target_ticker] df[f'{target_ticker}_Close'] = training_data['Close'][target_ticker] df[f'{target_ticker}_Volume'] = training_data['Volume'][target_ticker] for ticker, yf_symbol in tickers.items(): if ticker != target_ticker: df[f'{ticker}_Close'] = training_data['Close'][yf_symbol] df = df.ffill().dropna() # Technical indicators close_col = f'{target_ticker}_Close' high_col = f'{target_ticker}_High' low_col = f'{target_ticker}_Low' volume_col = f'{target_ticker}_Volume' df['RSI'] = talib.RSI(df[close_col], 14) df['MACD'], df['MACD_signal'], _ = talib.MACD(df[close_col]) df['SMA_20'] = talib.SMA(df[close_col], 20) df['SMA_50'] = talib.SMA(df[close_col], 50) df['ATR'] = talib.ATR(df[high_col], df[low_col], df[close_col], 14) df['Vol_10'] = df[close_col].pct_change().rolling(10).std() # Cross-market features for ticker in tickers.keys(): if ticker != target_ticker: if ticker.upper() == 'VIX': df['VIX_Rank'] = df[f'{ticker}_Close'].rolling(126).rank(pct=True) * 100 df['VIX_Slope'] = df[f'{ticker}_Close'].diff(5) df['VIX_Sustained_High'] = ((df[f'{ticker}_Close'] > 20) & (df[f'{ticker}_Close'] > df[f'{ticker}_Close'].rolling(10).mean())).astype(int) elif ticker.upper() == 'TNX': df['TNX_SMA_20'] = talib.SMA(df[f'{ticker}_Close'], 20) df['TNX_Rising'] = (df[f'{ticker}_Close'] > df['TNX_SMA_20']).astype(int) df['TNX_Accel'] = df[f'{ticker}_Close'].diff(5) elif ticker.upper() == 'DXY': df['DXY_SMA_50'] = talib.SMA(df[f'{ticker}_Close'], 50) df['USD_Strength'] = (df[f'{ticker}_Close'] > df['DXY_SMA_50']).astype(int) df['DXY_Slope'] = df[f'{ticker}_Close'].diff(5) else: df[f'{target_ticker}_{ticker}_Ratio'] = df[close_col] / df[f'{ticker}_Close'] df[f'{target_ticker}_{ticker}_Ratio_SMA'] = talib.SMA(df[f'{target_ticker}_{ticker}_Ratio'].values, 20) df[f'{ticker}_Trend_Up'] = (df[f'{ticker}_Close'] > df[f'{ticker}_Close'].rolling(50).mean()).astype(int) # Create target df['Next_Return'] = df[close_col].pct_change().shift(-1) df['Target'] = (df['Next_Return'] > 0).astype(int) df_for_model = df.dropna().copy() feature_cols = [col for col in df.columns if f'{target_ticker}_' not in col and col not in ['Next_Return', 'Target']] if len(df_for_model) < 50: raise ValueError(f"Insufficient training {len(df_for_model)} rows") # Train model sample_weights = calculate_sample_weights(df_for_model, close_col) model_params = { 'n_estimators': 5, 'max_depth': 3, 'learning_rate': 0.01, 'subsample': 0.8, 'colsample_bytree': 0.8, 'random_state': 42, 'eval_metric': 'logloss', 'use_label_encoder': False } final_model = xgb.XGBClassifier(**model_params) final_model.fit(df_for_model[feature_cols], df_for_model['Target'], sample_weight=sample_weights) # Prepare prediction features prediction_features_df = pd.DataFrame(index=[raw_data.index[-2]]) prediction_features_df[f'{target_ticker}_Open'] = raw_data['Open'][target_ticker].iloc[-2] prediction_features_df[f'{target_ticker}_High'] = raw_data['High'][target_ticker].iloc[-2] prediction_features_df[f'{target_ticker}_Low'] = raw_data['Low'][target_ticker].iloc[-2] prediction_features_df[f'{target_ticker}_Close'] = raw_data['Close'][target_ticker].iloc[-2] prediction_features_df[f'{target_ticker}_Volume'] = raw_data['Volume'][target_ticker].iloc[-2] for ticker, yf_symbol in tickers.items(): if ticker != target_ticker: prediction_features_df[f'{ticker}_Close'] = raw_data['Close'][yf_symbol].iloc[-2] prediction_features_df['RSI'] = df['RSI'].iloc[-1] prediction_features_df['MACD'] = df['MACD'].iloc[-1] prediction_features_df['MACD_signal'] = df['MACD_signal'].iloc[-1] prediction_features_df['SMA_20'] = df['SMA_20'].iloc[-1] prediction_features_df['SMA_50'] = df['SMA_50'].iloc[-1] prediction_features_df['ATR'] = df['ATR'].iloc[-1] prediction_features_df['Vol_10'] = df['Vol_10'].iloc[-1] for ticker in tickers.keys(): if ticker != target_ticker: if ticker.upper() == 'VIX': prediction_features_df['VIX_Rank'] = df['VIX_Rank'].iloc[-1] prediction_features_df['VIX_Slope'] = df['VIX_Slope'].iloc[-1] prediction_features_df['VIX_Sustained_High'] = df['VIX_Sustained_High'].iloc[-1] elif ticker.upper() == 'TNX': prediction_features_df['TNX_SMA_20'] = df['TNX_SMA_20'].iloc[-1] prediction_features_df['TNX_Rising'] = df['TNX_Rising'].iloc[-1] prediction_features_df['TNX_Accel'] = df['TNX_Accel'].iloc[-1] elif ticker.upper() == 'DXY': prediction_features_df['DXY_SMA_50'] = df['DXY_SMA_50'].iloc[-1] prediction_features_df['USD_Strength'] = df['USD_Strength'].iloc[-1] prediction_features_df['DXY_Slope'] = df['DXY_Slope'].iloc[-1] else: ratio_val = raw_data['Close'][target_ticker].iloc[-2] / raw_data['Close'][yf_symbol].iloc[-2] prediction_features_df[f'{target_ticker}_{ticker}_Ratio'] = ratio_val prediction_features_df[f'{target_ticker}_{ticker}_Ratio_SMA'] = df[f'{target_ticker}_{ticker}_Ratio_SMA'].iloc[-1] prediction_features_df[f'{ticker}_Trend_Up'] = df[f'{ticker}_Trend_Up'].iloc[-1] pred_features = prediction_features_df[feature_cols].iloc[0:1] base_signal = int(final_model.predict(pred_features)[0]) # Manipulation detection target_stock_series = pd.DataFrame(index=raw_data.index) target_stock_series['Close'] = raw_data['Close'][target_ticker] target_stock_series['Volume'] = raw_data['Volume'][target_ticker] manipulation_score, _ = detect_price_manipulation(target_stock_series, 'Close', 'Volume') # News override final_signal = base_signal if news_relevance > 0.3 and abs(news_sentiment) > 0.5: if news_sentiment < -0.7: final_signal = 0 elif news_sentiment > 0.7: final_signal = 1 # Manipulation override if manipulation_score >= 0.5 and base_signal == 1: final_signal = 0 # Calculate price range vol_10 = df['Vol_10'].iloc[-1] expected_move = latest_target_price * vol_10 if pd.notna(vol_10) else latest_target_price * 0.02 if news_relevance > 0.3: news_multiplier = 1.0 + abs(news_sentiment) * news_relevance expected_move *= news_multiplier upper_target = latest_target_price + expected_move lower_target = latest_target_price - expected_move # **SIMPLIFIED OUTPUT - ALWAYS SHOW PRICE RANGE** print(f"{target_ticker} | {latest_date.strftime('%Y-%m-%d')} | ${latest_target_price:.2f}") if manipulation_score >= 0.7: print(f"SIGNAL: AVOID | Range: ${lower_target:.2f} - ${upper_target:.2f} (High manipulation risk)") elif final_signal == 1: print(f"SIGNAL: BUY | Range: ${lower_target:.2f} - ${upper_target:.2f} | Target: ${upper_target:.2f}") else: print(f"SIGNAL: HOLD CASH | Range: ${lower_target:.2f} - ${upper_target:.2f}") if __name__ == "__main__": main() print("Disclaimer: This is for informational purposes only and does not constitute investment advice.")