import os import torch import torch.nn as nn from torch.optim.lr_scheduler import ReduceLROnPlateau import numpy as np import pandas as pd import yfinance as yf import plotly.graph_objects as go from plotly.subplots import make_subplots import gradio as gr from sklearn.preprocessing import MinMaxScaler from datetime import datetime, timedelta from typing import Tuple, Dict import joblib import warnings import ta from tqdm import tqdm warnings.filterwarnings('ignore') class PriceScaler: def __init__(self): self.scaler = MinMaxScaler() def fit_transform(self, data): # Ensure data is 2D for fitting data_2d = np.array(data).reshape(-1, 1) # Transform and return 1D array return self.scaler.fit_transform(data_2d).flatten() def inverse_transform(self, data): # Ensure data is 2D for inverse transform data_2d = np.array(data).reshape(-1, 1) # Transform and return 1D array return self.scaler.inverse_transform(data_2d).flatten() class CryptoPredictor(nn.Module): def __init__(self, input_dim: int, hidden_dim: int = 128, num_layers: int = 2, dropout: float = 0.2): super().__init__() self.hidden_dim = hidden_dim self.num_layers = num_layers self.lstm = nn.LSTM( input_dim, hidden_dim, num_layers=num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0, bidirectional=True ) self.bn = nn.BatchNorm1d(hidden_dim * 2) self.fc = nn.Sequential( nn.Linear(hidden_dim * 2, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, 1) ) self.confidence_fc = nn.Sequential( nn.Linear(hidden_dim * 2, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, 1), nn.Sigmoid() ) def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: batch_size = x.size(0) h0 = torch.zeros(self.num_layers * 2, batch_size, self.hidden_dim).to(x.device) c0 = torch.zeros(self.num_layers * 2, batch_size, self.hidden_dim).to(x.device) lstm_out, _ = self.lstm(x, (h0, c0)) last_hidden = lstm_out[:, -1, :] normalized_hidden = self.bn(last_hidden) prediction = self.fc(normalized_hidden) confidence = self.confidence_fc(normalized_hidden) return prediction, confidence class CryptoAnalyzer: def __init__(self, model_dir: str = "models", cache_dir: str = "cache"): self.scaler = MinMaxScaler() self.price_scaler = PriceScaler() self.model_dir = model_dir self.cache_dir = cache_dir os.makedirs(model_dir, exist_ok=True) os.makedirs(cache_dir, exist_ok=True) self.feature_columns = [ 'Open', 'High', 'Low', 'Close', 'Volume', 'Returns', 'Volatility', 'MA5', 'MA20', 'RSI', 'Price_Momentum', 'Volume_Momentum', 'MACD', 'BB_upper', 'BB_lower', 'Stoch_K', 'Stoch_D', 'ADX', 'ATR' ] self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') def get_data(self, symbol: str, days: int) -> pd.DataFrame: end_date = datetime.now() start_date = end_date - timedelta(days=days + 30) # Extra 30 days for indicators df = yf.download(f"{symbol}-USD", start=start_date, end=end_date, progress=False) if df.empty: raise ValueError(f"No data available for {symbol}") # Calculate basic features df['Returns'] = df['Close'].pct_change() df['Volatility'] = df['Returns'].rolling(window=20).std() # Moving averages df['MA5'] = df['Close'].rolling(window=5).mean() df['MA20'] = df['Close'].rolling(window=20).mean() # Technical indicators df['RSI'] = ta.momentum.rsi(df['Close']) df['Price_Momentum'] = ta.momentum.roc(df['Close']) df['Volume_Momentum'] = ta.momentum.roc(df['Volume']) macd = ta.trend.macd(df['Close']) df['MACD'] = macd.iloc[:, 0] bollinger = ta.volatility.BollingerBands(df['Close']) df['BB_upper'] = bollinger.bollinger_hband() df['BB_lower'] = bollinger.bollinger_lband() stoch = ta.momentum.StochasticOscillator(df['High'], df['Low'], df['Close']) df['Stoch_K'] = stoch.stoch() df['Stoch_D'] = stoch.stoch_signal() df['ADX'] = ta.trend.adx(df['High'], df['Low'], df['Close']) df['ATR'] = ta.volatility.average_true_range(df['High'], df['Low'], df['Close']) df = df.dropna() return df.iloc[-days:] def prepare_data(self, df: pd.DataFrame, lookback: int) -> Tuple[torch.Tensor, torch.Tensor]: # Scale features features = df[self.feature_columns].values scaled_features = self.scaler.fit_transform(features) # Scale close prices - ensure 1D output close_prices = df['Close'].values scaled_close = self.price_scaler.fit_transform(close_prices) X, y = [], [] for i in range(len(df) - lookback): X.append(scaled_features[i:(i + lookback)]) y.append(scaled_close[i + lookback]) X = torch.FloatTensor(np.array(X)).to(self.device) y = torch.FloatTensor(np.array(y)).reshape(-1).to(self.device) return X, y def get_model_path(self, symbol: str) -> str: return os.path.join(self.model_dir, f"{symbol.lower()}_model.pth") def get_scaler_path(self, symbol: str) -> str: return os.path.join(self.model_dir, f"{symbol.lower()}_scaler.pkl") def train_model(self, X: torch.Tensor, y: torch.Tensor, symbol: str) -> CryptoPredictor: model = CryptoPredictor(X.shape[2]).to(self.device) criterion = nn.HuberLoss() optimizer = torch.optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01) scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True) batch_size = min(32, len(X) // 4) dataset = torch.utils.data.TensorDataset(X, y) train_loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True) best_loss = float('inf') patience = 10 patience_counter = 0 model.train() with tqdm(range(50), desc=f"Training {symbol} model") as pbar: for epoch in pbar: total_loss = 0 for batch_X, batch_y in train_loader: optimizer.zero_grad() predictions, _ = model(batch_X) loss = criterion(predictions, batch_y) loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) optimizer.step() total_loss += loss.item() avg_loss = total_loss / len(train_loader) scheduler.step(avg_loss) pbar.set_postfix({'loss': f'{avg_loss:.6f}'}) if avg_loss < best_loss: best_loss = avg_loss patience_counter = 0 torch.save(model.state_dict(), self.get_model_path(symbol)) else: patience_counter += 1 if patience_counter >= patience: break return model def get_predictions(self, symbol: str, days: int, lookback: int) -> Dict: try: df = self.get_data(symbol, days) X, y = self.prepare_data(df, lookback) model_path = self.get_model_path(symbol) if os.path.exists(model_path): model = CryptoPredictor(X.shape[2]).to(self.device) model.load_state_dict(torch.load(model_path)) else: model = self.train_model(X, y, symbol) joblib.dump(self.scaler, self.get_scaler_path(symbol)) model.eval() with torch.no_grad(): predictions, confidence = model(X) predictions = predictions.cpu().numpy().flatten() # Ensure 1D confidence = confidence.cpu().numpy().flatten() # Ensure 1D # Inverse transform predictions predictions = self.price_scaler.inverse_transform(predictions) # Inverse transform actual values y_np = y.cpu().numpy().flatten() # Ensure 1D actual_prices = self.price_scaler.inverse_transform(y_np) rmse = float(np.sqrt(np.mean((actual_prices - predictions) ** 2))) mape = float(np.mean(np.abs((actual_prices - predictions) / actual_prices)) * 100) r2 = float(1 - np.sum((actual_prices - predictions) ** 2) / np.sum((actual_prices - actual_prices.mean()) ** 2)) dates = df.index[lookback:].strftime('%Y-%m-%d').tolist() return { 'dates': dates, 'actual': actual_prices.tolist(), 'predicted': predictions.tolist(), 'confidence': confidence.flatten().tolist(), 'rmse': rmse, 'mape': mape, 'r2': r2, 'volatility': float(df['Volatility'].mean() * 100), 'current_price': float(df['Close'].iloc[-1]), 'volume': float(df['Volume'].iloc[-1]), 'rsi': float(df['RSI'].iloc[-1]), 'macd': float(df['MACD'].iloc[-1]) } except Exception as e: raise ValueError(f"Prediction failed: {str(e)}") def create_analysis_plots(symbol: str, days: int = 180, lookback: int = 30) -> Tuple[go.Figure, str]: try: analyzer = CryptoAnalyzer() predictions = analyzer.get_predictions(symbol, days, lookback) fig = make_subplots( rows=3, cols=1, subplot_titles=( f"{symbol} Price Prediction with Confidence Bands", "Technical Indicators", "Model Performance Metrics" ), vertical_spacing=0.1, specs=[[{"secondary_y": True}], [{"secondary_y": True}], [{"secondary_y": True}]] ) confidence_upper = np.array(predictions['predicted']) * (1 + np.array(predictions['confidence'])) confidence_lower = np.array(predictions['predicted']) * (1 - np.array(predictions['confidence'])) fig.add_trace( go.Scatter( x=predictions['dates'], y=predictions['actual'], name='Actual Price', line=dict(color='blue', width=2) ), row=1, col=1 ) fig.add_trace( go.Scatter( x=predictions['dates'], y=predictions['predicted'], name='Predicted Price', line=dict(color='red', width=2) ), row=1, col=1 ) fig.add_trace( go.Scatter( x=predictions['dates'] + predictions['dates'][::-1], y=list(confidence_upper) + list(confidence_lower)[::-1], fill='toself', fillcolor='rgba(255,0,0,0.1)', line=dict(color='rgba(255,0,0,0)'), name='Confidence Band' ), row=1, col=1 ) fig.add_trace( go.Scatter( x=predictions['dates'], y=predictions['confidence'], name='Model Confidence', line=dict(color='green', width=2) ), row=2, col=1 ) fig.add_trace( go.Scatter( x=predictions['dates'], y=[70] * len(predictions['dates']), line=dict(color='red', dash='dash'), name='RSI Overbought', showlegend=False ), row=2, col=1, secondary_y=True ) fig.add_trace( go.Scatter( x=predictions['dates'], y=[30] * len(predictions['dates']), line=dict(color='green', dash='dash'), name='RSI Oversold', showlegend=False ), row=2, col=1, secondary_y=True ) error = np.array(predictions['actual']) - np.array(predictions['predicted']) fig.add_trace( go.Scatter( x=predictions['dates'], y=error.tolist(), name='Prediction Error', line=dict(color='orange', width=2) ), row=3, col=1 ) fig.update_layout( height=1200, title_text=f"📈 {symbol} Price Analysis Dashboard", showlegend=True, template="plotly_dark", paper_bgcolor='rgba(0,0,0,0)', plot_bgcolor='rgba(0,0,0,0)', font=dict(size=12) ) fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)') fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)') summary = f""" ### 📊 Analysis Summary for {symbol} #### Current Market Status - **Current Price:** ${predictions['current_price']:,.2f} - **Predicted Next Price:** ${predictions['predicted'][-1]:,.2f} - **Expected Change:** {((predictions['predicted'][-1] - predictions['current_price']) / predictions['current_price'] * 100):,.2f}% - **24h Volume:** {predictions['volume']:,.0f} #### Technical Indicators - **RSI:** {predictions['rsi']:,.2f} - **MACD:** {predictions['macd']:,.2f} - **Volatility:** {predictions['volatility']:,.2f}% # Continuing from previous summary string #### Model Performance Metrics - **R² Score:** {predictions['r2']:,.4f} - **RMSE:** ${predictions['rmse']:,.2f} - **MAPE:** {predictions['mape']:,.2f}% #### Prediction Confidence - **Average Confidence:** {np.mean(predictions['confidence']) * 100:,.2f}% - **Trend Direction:** {'🔺 Upward' if predictions['predicted'][-1] > predictions['actual'][-1] else '🔻 Downward'} > *Note: Past performance does not guarantee future results. This analysis is for informational purposes only.* """ return fig, summary except Exception as e: fig = go.Figure() fig.add_annotation( text=str(e), xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False ) return fig, f"⚠️ Error: {str(e)}" def create_interface(): with gr.Blocks(theme=gr.themes.Soft()) as iface: gr.Markdown(""" # 🚀 Advanced Cryptocurrency Price Prediction This app uses deep learning to predict cryptocurrency prices and provide comprehensive market analysis. ### Features: - Real-time price predictions - Technical indicators analysis - Confidence metrics - Performance visualization """) with gr.Row(): with gr.Column(scale=1): crypto_input = gr.Dropdown( choices=['BTC', 'ETH', 'BNB', 'XRP', 'ADA', 'SOL', 'DOT', 'DOGE'], label="Select Cryptocurrency", value="BTC" ) custom_crypto = gr.Textbox( label="Or enter custom symbol", placeholder="e.g., MATIC" ) with gr.Row(): days_slider = gr.Slider( minimum=30, maximum=365, value=180, step=30, label="Historical Days" ) lookback_slider = gr.Slider( minimum=7, maximum=60, value=30, step=1, label="Lookback Period (Days)" ) submit_btn = gr.Button("📊 Generate Analysis", variant="primary") with gr.Column(scale=2): plot_output = gr.Plot(label="Analysis Plots") with gr.Row(): analysis_output = gr.Markdown(label="Analysis Summary") error_output = gr.Markdown(visible=False) gr.Markdown(""" ### 📈 Tips for best results: - Use longer historical periods for stable coins - Shorter lookback periods work better for volatile markets - Consider market conditions when interpreting predictions """) def handle_analysis(symbol, custom_symbol, days, lookback): try: final_symbol = custom_symbol if custom_symbol else symbol figure, summary = create_analysis_plots(final_symbol, days, lookback) return figure, summary, gr.update(visible=False, value="") except Exception as e: empty_fig = go.Figure() error_msg = f"⚠️ Error during analysis: {str(e)}" return empty_fig, "", gr.update(visible=True, value=error_msg) submit_btn.click( fn=handle_analysis, inputs=[crypto_input, custom_crypto, days_slider, lookback_slider], outputs=[plot_output, analysis_output, error_output] ) return iface if __name__ == "__main__": import logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('crypto_predictor.log'), logging.StreamHandler() ] ) try: os.makedirs("models", exist_ok=True) os.makedirs("cache", exist_ok=True) iface = create_interface() iface.launch( share=False, server_name="0.0.0.0", server_port=7860, debug=True ) except Exception as e: logging.error(f"Application failed to start: {str(e)}") raise