import streamlit as st import yfinance as yf import pandas as pd import numpy as np import plotly.graph_objects as go from datetime import datetime, timedelta import requests from bs4 import BeautifulSoup import pandas_ta as ta from pypfopt import EfficientFrontier, risk_models, expected_returns from typing import Dict import tensorflow as tf import warnings warnings.filterwarnings("ignore") # Enhanced Technical Analysis class EnhancedTechnicalAnalysis: """Robust technical analysis with validation and advanced features""" def __init__(self, data: pd.DataFrame): self.data = data.copy() self.model = self._load_deep_learning_model() def _load_deep_learning_model(self): try: return tf.keras.models.load_model('lstm_model.h5') except: return None def calculate_all_indicators(self) -> pd.DataFrame: try: bb = ta.bbands(self.data['Close'], length=20, std=2) if bb is not None: self.data = pd.concat([self.data, bb], axis=1) if len(self.data) >= 14: self.data['RSI'] = ta.rsi(self.data['Close'], length=14) else: self.data['RSI'] = np.nan if len(self.data) >= 20: self.data['SMA20'] = ta.sma(self.data['Close'], length=20) if len(self.data) >= 50: self.data['SMA50'] = ta.sma(self.data['Close'], length=50) if self.model: preds = self.model.predict(self.data[['Close']].values) self.data['DL_Prediction'] = preds.flatten() self.data['Volatility_Cluster'] = self._detect_volatility_clusters() return self.data.dropna(axis=1, how='all') except Exception as e: st.error(f"Technical analysis failed: {str(e)}") return self.data def _detect_volatility_clusters(self) -> pd.Series: returns = self.data['Close'].pct_change() return (returns.rolling(20).std() > returns.std()).astype(int) # Market Data Integrator class MarketDataIntegrator: """Enhanced market data integration with fixed market movers""" def __init__(self): self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } def get_market_movers(self) -> Dict: try: # Using Yahoo Finance API for more reliable data gainers = yf.Ticker("^GSPC").info.get('most_actively_traded', [])[:5] losers = yf.Ticker("^GSPC").info.get('most_actively_traded', [])[-5:] active = yf.Ticker("^GSPC").info.get('most_actively_traded', [])[:5] if not gainers: # Fallback data if API fails gainers = ["AAPL", "MSFT", "GOOGL", "AMZN", "NVDA"] losers = ["META", "NFLX", "TSLA", "AMD", "INTC"] active = ["SPY", "QQQ", "IWM", "DIA", "VIX"] return { "gainers": gainers, "losers": losers, "active": active } except Exception as e: st.warning(f"Using backup market data due to: {str(e)}") return { "gainers": ["AAPL", "MSFT", "GOOGL", "AMZN", "NVDA"], "losers": ["META", "NFLX", "TSLA", "AMD", "INTC"], "active": ["SPY", "QQQ", "IWM", "DIA", "VIX"] } def get_market_sentiment(self, symbol: str) -> float: try: url = f"https://finviz.com/quote.ashx?t={symbol}" response = requests.get(url, headers=self.headers) soup = BeautifulSoup(response.text, 'html.parser') return self._analyze_sentiment(soup.get_text()) except: return 0.5 def _analyze_sentiment(self, text: str) -> float: positive_words = ['buy', 'strong', 'growth', 'bullish', 'positive'] negative_words = ['sell', 'weak', 'decline', 'bearish', 'negative'] positive_count = sum(text.lower().count(word) for word in positive_words) negative_count = sum(text.lower().count(word) for word in negative_words) total = positive_count + negative_count return 0.5 if total == 0 else positive_count / total # Portfolio Optimization class PortfolioEngine: """Fixed portfolio optimization with robust error handling""" def __init__(self): self.risk_free_rate = 0.02 def optimize_allocation(self, returns: pd.DataFrame) -> Dict: try: mu = expected_returns.mean_historical_return(returns) S = risk_models.CovarianceShrinkage(returns).ledoit_wolf() ef = EfficientFrontier(mu, S) ef.add_constraint(lambda w: w >= 0) # Long-only constraint ef.add_constraint(lambda w: sum(w) == 1) # Fully invested constraint weights = ef.max_sharpe(risk_free_rate=self.risk_free_rate) cleaned_weights = ef.clean_weights() performance = self.calculate_portfolio_metrics(returns, cleaned_weights) return { "weights": cleaned_weights, "metrics": performance } except Exception as e: st.error(f"Portfolio optimization failed: {str(e)}") n_assets = len(returns.columns) equal_weights = {col: 1.0/n_assets for col in returns.columns} return { "weights": equal_weights, "metrics": { "expected_return": 0.0, "volatility": 0.0, "sharpe_ratio": 0.0 } } def calculate_portfolio_metrics(self, returns: pd.DataFrame, weights: Dict) -> Dict: try: w = np.array([weights[col] for col in returns.columns]) portfolio_return = np.sum(returns.mean() * w) * 252 portfolio_vol = np.sqrt(np.dot(w.T, np.dot(returns.cov() * 252, w))) sharpe = (portfolio_return - self.risk_free_rate) / portfolio_vol return { "expected_return": portfolio_return, "volatility": portfolio_vol, "sharpe_ratio": sharpe } except: return { "expected_return": 0.0, "volatility": 0.0, "sharpe_ratio": 0.0 } # Trading Assistant class TradingAssistant: """Main trading analysis engine with improved signal generation""" def __init__(self): self.asset_options = { "Stocks": ["AAPL", "MSFT", "GOOGL", "AMZN", "NVDA", "META"], "Crypto": ["BTC-USD", "ETH-USD", "BNB-USD", "XRP-USD"], "Forex": ["EURUSD=X", "GBPUSD=X", "USDJPY=X"] } def generate_signal(self, symbol: str, data: pd.DataFrame) -> Dict: try: analysis = EnhancedTechnicalAnalysis(data).calculate_all_indicators() last_close = analysis['Close'].iloc[-1] rsi = analysis.get('RSI', pd.Series([50])).iloc[-1] sma20 = analysis.get('SMA20', pd.Series([last_close])).iloc[-1] sma50 = analysis.get('SMA50', pd.Series([last_close])).iloc[-1] signal = "HOLD" if rsi < 30 and last_close < sma20: signal = "BUY" elif rsi > 70 and last_close > sma20: signal = "SELL" return { "symbol": symbol, "signal": signal, "details": { "RSI": rsi, "SMA20": sma20, "SMA50": sma50, "Last_Close": last_close, "Volatility_Cluster": analysis.get('Volatility_Cluster', pd.Series([0])).iloc[-1] } } except Exception as e: st.error(f"Signal generation failed: {str(e)}") return { "symbol": symbol, "signal": "HOLD", "details": { "RSI": 50, "SMA20": None, "SMA50": None, "Last_Close": None, "Volatility_Cluster": 0 } } # Main Application def main(): st.set_page_config(page_title="AI Trading Pro", layout="wide", page_icon="📈") st.title("🚀 AI Trading Pro - Enhanced Trading Solution") market_data = MarketDataIntegrator() trading_engine = TradingAssistant() portfolio_optimizer = PortfolioEngine() with st.sidebar: st.header("⚙️ Configuration") asset_type = st.selectbox("Asset Class", list(trading_engine.asset_options.keys())) symbol = st.selectbox("Symbol", trading_engine.asset_options[asset_type]) timeframe = st.select_slider("Analysis Period", options=["1D", "1W", "1M", "3M", "1Y"], value="1M") with st.expander("🔧 Advanced Tools"): enable_portfolio = st.checkbox("Portfolio Optimization") show_education = st.checkbox("Tutorial Mode") risk_level = st.select_slider("Risk Profile", ["Low", "Medium", "High"]) tab_analysis, tab_portfolio, tab_learn = st.tabs(["Analysis", "Portfolio", "Academy"]) with tab_analysis: if st.button("Run Analysis"): with st.spinner("Processing market data..."): try: data = yf.download(symbol, period=timeframe.lower()) if data.empty: st.error("No data available for the selected symbol and timeframe.") return analysis = trading_engine.generate_signal(symbol, data) movers = market_data.get_market_movers() sentiment = market_data.get_market_sentiment(symbol) col1, col2, col3, col4 = st.columns(4) col1.metric("Price", f"${data['Close'].iloc[-1]:.2f}") col2.metric("Signal", analysis['signal']) col3.metric("RSI", f"{analysis['details']['RSI']:.1f}") col4.metric("Sentiment", f"{sentiment:.2f}") fig = go.Figure() fig.add_trace(go.Candlestick( x=data.index, open=data['Open'], high=data['High'], low=data['Low'], close=data['Close'], name="Price" )) fig.update_layout(height=600, xaxis_rangeslider_visible=False) st.plotly_chart(fig, use_container_width=True) with st.expander("📈 Market Movers"): cols = st.columns(3) cols[0].subheader("Top Gainers") cols[1].subheader("Top Losers") cols[2].subheader("Most Active") for i in range(5): if i < len(movers['gainers']): cols[0].write(f"{i+1}. {movers['gainers'][i]}") if i < len(movers['losers']): cols[1].write(f"{i+1}. {movers['losers'][i]}") if i < len(movers['active']): cols[2].write(f"{i+1}. {movers['active'][i]}") except Exception as e: st.error(f"Analysis failed: {str(e)}") with tab_portfolio: if enable_portfolio: st.subheader("⚖️ Portfolio Optimization") selected_assets = st.multiselect( "Select Assets", trading_engine.asset_options[asset_type], default=trading_engine.asset_options[asset_type][:3] ) if st.button("Optimize Portfolio"): try: returns = yf.download(selected_assets, period="1y")['Adj Close'].pct_change().dropna() if returns.empty: st.error("No data available for selected assets.") return result = portfolio_optimizer.optimize_allocation(returns) col1, col2 = st.columns(2) with col1: st.write("### Optimal Weights") for asset, weight in result['weights'].items(): st.metric(asset, f"{weight:.1%}") with col2: metrics = result['metrics'] st.write("### Portfolio Metrics") st.metric("Expected Return", f"{metrics['expected_return']:.1%}") st.metric("Volatility", f"{metrics['volatility']:.1%}") st.metric("Sharpe Ratio", f"{metrics['sharpe_ratio']:.2f}") except Exception as e: st.error(f"Optimization failed: {str(e)}") with tab_learn: st.subheader("📚 Trading Education") st.markdown(""" ### How to Use This App 1. **Select Asset**: Choose the asset class and specific symbol you want to analyze. 2. **Run Analysis**: Click "Run Analysis" to see historical data and technical indicators. 3. **Generate Predictions**: Use the "Prediction" tab to generate future price predictions. 4. **Optimize Portfolio**: Select multiple assets and optimize your portfolio allocation. 5. **Simulate Trades**: Use the simulation feature to test your strategies without risking real money. """) if __name__ == "__main__": main()