Spaces:
Sleeping
Sleeping
| # crypto_price_prediction.py | |
| import os | |
| import torch | |
| import torch.nn as nn | |
| from torch.optim.lr_scheduler import ReduceLROnPlateau | |
| import numpy as np | |
| import pandas as pd | |
| import yfinance as yf | |
| import plotly.graph_objects as go | |
| from plotly.subplots import make_subplots | |
| import gradio as gr | |
| from sklearn.preprocessing import MinMaxScaler | |
| from datetime import datetime, timedelta | |
| import joblib | |
| import warnings | |
| import ta | |
| from tqdm import tqdm | |
| warnings.filterwarnings('ignore') | |
| class PriceScaler: | |
| def __init__(self): | |
| self.scaler = MinMaxScaler() | |
| def fit_transform(self, data): | |
| data_2d = np.array(data).reshape(-1, 1) | |
| return self.scaler.fit_transform(data_2d).flatten() | |
| def inverse_transform(self, data): | |
| data_2d = np.array(data).reshape(-1, 1) | |
| return self.scaler.inverse_transform(data_2d).flatten() | |
| class CryptoPredictor(nn.Module): | |
| def __init__(self, input_dim, hidden_dim=128, num_layers=2, dropout=0.2): | |
| super().__init__() | |
| self.hidden_dim = hidden_dim | |
| self.num_layers = num_layers | |
| self.lstm = nn.LSTM( | |
| input_dim, hidden_dim, num_layers=num_layers, batch_first=True, | |
| dropout=dropout if num_layers > 1 else 0, bidirectional=True | |
| ) | |
| self.bn = nn.BatchNorm1d(hidden_dim * 2) | |
| self.fc = nn.Sequential( | |
| nn.Linear(hidden_dim * 2, hidden_dim), | |
| nn.ReLU(), | |
| nn.Linear(hidden_dim, 1) | |
| ) | |
| self.confidence_fc = nn.Sequential( | |
| nn.Linear(hidden_dim * 2, hidden_dim), | |
| nn.ReLU(), | |
| nn.Linear(hidden_dim, 1), | |
| nn.Sigmoid() | |
| ) | |
| def forward(self, x): | |
| batch_size = x.size(0) | |
| h0 = torch.zeros(self.num_layers * 2, batch_size, self.hidden_dim).to(x.device) | |
| c0 = torch.zeros(self.num_layers * 2, batch_size, self.hidden_dim).to(x.device) | |
| lstm_out, _ = self.lstm(x, (h0, c0)) | |
| last_hidden = lstm_out[:, -1, :] | |
| normalized_hidden = self.bn(last_hidden) | |
| prediction = self.fc(normalized_hidden) | |
| confidence = self.confidence_fc(normalized_hidden) | |
| return prediction, confidence | |
| class CryptoAnalyzer: | |
| def __init__(self, model_dir="models", cache_dir="cache"): | |
| self.scaler = MinMaxScaler() | |
| self.price_scaler = PriceScaler() | |
| self.model_dir = model_dir | |
| self.cache_dir = cache_dir | |
| os.makedirs(model_dir, exist_ok=True) | |
| os.makedirs(cache_dir, exist_ok=True) | |
| self.feature_columns = [ | |
| 'Open', 'High', 'Low', 'Close', 'Volume', 'Returns', 'Volatility', | |
| 'MA5', 'MA20', 'RSI', 'Price_Momentum', 'Volume_Momentum', 'MACD', | |
| 'BB_upper', 'BB_lower', 'Stoch_K', 'Stoch_D', 'ADX', 'ATR' | |
| ] | |
| self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| def get_data(self, symbol, days): | |
| end_date = datetime.now() | |
| start_date = end_date - timedelta(days=days + 30) | |
| df = yf.download(f"{symbol}-USD", start=start_date, end=end_date, progress=False) | |
| if df.empty: | |
| raise ValueError(f"No data available for {symbol}") | |
| df['Returns'] = df['Close'].pct_change() | |
| df['Volatility'] = df['Returns'].rolling(window=20).std() | |
| df['MA5'] = df['Close'].rolling(window=5).mean() | |
| df['MA20'] = df['Close'].rolling(window=20).mean() | |
| df['RSI'] = ta.momentum.rsi(df['Close']) | |
| df['Price_Momentum'] = ta.momentum.roc(df['Close']) | |
| df['Volume_Momentum'] = ta.momentum.roc(df['Volume']) | |
| macd = ta.trend.macd(df['Close']) | |
| df['MACD'] = macd.iloc[:, 0] | |
| bollinger = ta.volatility.BollingerBands(df['Close']) | |
| df['BB_upper'] = bollinger.bollinger_hband() | |
| df['BB_lower'] = bollinger.bollinger_lband() | |
| stoch = ta.momentum.StochasticOscillator(df['High'], df['Low'], df['Close']) | |
| df['Stoch_K'] = stoch.stoch() | |
| df['Stoch_D'] = stoch.stoch_signal() | |
| df['ADX'] = ta.trend.adx(df['High'], df['Low'], df['Close']) | |
| df['ATR'] = ta.volatility.average_true_range(df['High'], df['Low'], df['Close']) | |
| df = df.dropna() | |
| return df.iloc[-days:] | |
| def prepare_data(self, df, lookback): | |
| features = df[self.feature_columns].values | |
| scaled_features = self.scaler.fit_transform(features) | |
| close_prices = df['Close'].values | |
| scaled_close = self.price_scaler.fit_transform(close_prices) | |
| X, y = [], [] | |
| for i in range(len(df) - lookback): | |
| X.append(scaled_features[i:(i + lookback)]) | |
| y.append(scaled_close[i + lookback]) | |
| X = torch.FloatTensor(np.array(X)).to(self.device) | |
| y = torch.FloatTensor(np.array(y)).reshape(-1).to(self.device) | |
| return X, y | |
| def get_model_path(self, symbol): | |
| return os.path.join(self.model_dir, f"{symbol.lower()}_model.pth") | |
| def get_scaler_path(self, symbol): | |
| return os.path.join(self.model_dir, f"{symbol.lower()}_scaler.pkl") | |
| def train_model(self, X, y, symbol): | |
| model = CryptoPredictor(X.shape[2]).to(self.device) | |
| criterion = nn.HuberLoss() | |
| optimizer = torch.optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01) | |
| scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True) | |
| batch_size = min(32, len(X) // 4) | |
| dataset = torch.utils.data.TensorDataset(X, y) | |
| train_loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True) | |
| best_loss = float('inf') | |
| patience = 10 | |
| patience_counter = 0 | |
| model.train() | |
| with tqdm(range(50), desc=f"Training {symbol} model") as pbar: | |
| for epoch in pbar: | |
| total_loss = 0 | |
| for batch_X, batch_y in train_loader: | |
| optimizer.zero_grad() | |
| predictions, _ = model(batch_X) | |
| loss = criterion(predictions, batch_y) | |
| loss.backward() | |
| torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) | |
| optimizer.step() | |
| total_loss += loss.item() | |
| avg_loss = total_loss / len(train_loader) | |
| scheduler.step(avg_loss) | |
| pbar.set_postfix({'loss': f'{avg_loss:.6f}'}) | |
| if avg_loss < best_loss: | |
| best_loss = avg_loss | |
| patience_counter = 0 | |
| torch.save(model.state_dict(), self.get_model_path(symbol)) | |
| else: | |
| patience_counter += 1 | |
| if patience_counter >= patience: | |
| break | |
| return model | |
| def get_predictions(self, symbol, days, lookback): | |
| try: | |
| logging.info("Fetching data...") | |
| df = self.get_data(symbol, days) | |
| logging.info(f"Data fetched: {len(df)} rows.") | |
| logging.info("Preparing data...") | |
| X, y = self.prepare_data(df, lookback) | |
| logging.info(f"Data prepared. Features shape: {X.shape}, Targets shape: {y.shape}") | |
| model_path = self.get_model_path(symbol) | |
| if os.path.exists(model_path): | |
| logging.info("Loading existing model...") | |
| model = CryptoPredictor(X.shape[2]).to(self.device) | |
| model.load_state_dict(torch.load(model_path)) | |
| else: | |
| logging.info("Training new model...") | |
| model = self.train_model(X, y, symbol) | |
| joblib.dump(self.scaler, self.get_scaler_path(symbol)) | |
| model.eval() | |
| with torch.no_grad(): | |
| logging.info("Generating predictions...") | |
| predictions, confidence = model(X) | |
| # Log raw predictions shape | |
| logging.info(f"Raw predictions shape: {predictions.shape}") | |
| # Ensure predictions are 2D for inverse_transform | |
| predictions_reshaped = predictions.cpu().numpy().reshape(-1, 1) | |
| logging.info(f"Reshaped predictions for inverse transform: {predictions_reshaped.shape}") | |
| predictions = self.price_scaler.inverse_transform(predictions_reshaped).flatten() | |
| # Ensure actual prices are 2D for inverse_transform | |
| y_np_reshaped = y.cpu().numpy().reshape(-1, 1) | |
| logging.info(f"Reshaped actual prices for inverse transform: {y_np_reshaped.shape}") | |
| actual_prices = self.price_scaler.inverse_transform(y_np_reshaped).flatten() | |
| # Calculate metrics | |
| rmse = float(np.sqrt(np.mean((actual_prices - predictions) ** 2))) | |
| mape = float(np.mean(np.abs((actual_prices - predictions) / actual_prices)) * 100) | |
| r2 = float(1 - np.sum((actual_prices - predictions) ** 2) / np.sum((actual_prices - actual_prices.mean()) ** 2)) | |
| logging.info("Metrics calculated.") | |
| # Prepare date labels | |
| dates = df.index[lookback:].strftime('%Y-%m-%d').tolist() | |
| return { | |
| 'dates': dates, | |
| 'actual': actual_prices.tolist(), | |
| 'predicted': predictions.tolist(), | |
| 'confidence': confidence.cpu().numpy().flatten().tolist(), | |
| 'rmse': rmse, | |
| 'mape': mape, | |
| 'r2': r2, | |
| 'volatility': float(df['Volatility'].mean() * 100), | |
| 'current_price': float(df['Close'].iloc[-1]), | |
| 'volume': float(df['Volume'].iloc[-1]), | |
| 'rsi': float(df['RSI'].iloc[-1]), | |
| 'macd': float(df['MACD'].iloc[-1]) | |
| } | |
| except Exception as e: | |
| logging.error(f"Error during predictions: {str(e)}") | |
| raise ValueError(f"Prediction failed: {str(e)}") | |
| def create_analysis_plots(symbol, days=180, lookback=30): | |
| try: | |
| analyzer = CryptoAnalyzer() | |
| predictions = analyzer.get_predictions(symbol, days, lookback) | |
| fig = make_subplots( | |
| rows=3, cols=1, | |
| subplot_titles=( | |
| f"{symbol} Price Prediction with Confidence Bands", | |
| "Technical Indicators", | |
| "Model Performance Metrics" | |
| ), | |
| vertical_spacing=0.1, | |
| specs=[[{"secondary_y": True}], | |
| [{"secondary_y": True}], | |
| [{"secondary_y": True}]] | |
| ) | |
| confidence_upper = np.array(predictions['predicted']) * (1 + np.array(predictions['confidence'])) | |
| confidence_lower = np.array(predictions['predicted']) * (1 - np.array(predictions['confidence'])) | |
| fig.add_trace( | |
| go.Scatter( | |
| x=predictions['dates'], | |
| y=predictions['actual'], | |
| name='Actual Price', | |
| line=dict(color='blue', width=2) | |
| ), | |
| row=1, col=1 | |
| ) | |
| fig.add_trace( | |
| go.Scatter( | |
| x=predictions['dates'], | |
| y=predictions['predicted'], | |
| name='Predicted Price', | |
| line=dict(color='red', width=2) | |
| ), | |
| row=1, col=1 | |
| ) | |
| fig.add_trace( | |
| go.Scatter( | |
| x=predictions['dates'] + predictions['dates'][::-1], | |
| y=list(confidence_upper) + list(confidence_lower)[::-1], | |
| fill='toself', | |
| fillcolor='rgba(255,0,0,0.1)', | |
| line=dict(color='rgba(255,0,0,0)'), | |
| name='Confidence Band' | |
| ), | |
| row=1, col=1 | |
| ) | |
| fig.update_layout( | |
| height=1200, | |
| title_text=f"π {symbol} Price Analysis Dashboard", | |
| showlegend=True, | |
| template="plotly_dark", | |
| paper_bgcolor='rgba(0,0,0,0)', | |
| plot_bgcolor='rgba(0,0,0,0)', | |
| font=dict(size=12) | |
| ) | |
| summary = f""" | |
| ### π Analysis Summary for {symbol} | |
| #### Current Market Status | |
| - **Current Price:** ${predictions['current_price']:,.2f} | |
| - **Predicted Next Price:** ${predictions['predicted'][-1]:,.2f} | |
| - **Expected Change:** {((predictions['predicted'][-1] - predictions['current_price']) / predictions['current_price'] * 100):,.2f}% | |
| - **24h Volume:** {predictions['volume']:,.0f} | |
| #### Technical Indicators | |
| - **RSI:** {predictions['rsi']:,.2f} | |
| - **MACD:** {predictions['macd']:,.2f} | |
| - **Volatility:** {predictions['volatility']:,.2f}% | |
| #### Model Performance Metrics | |
| - **RΒ² Score:** {predictions['r2']:,.4f} | |
| - **RMSE:** ${predictions['rmse']:,.2f} | |
| - **MAPE:** {predictions['mape']:,.2f}% | |
| #### Prediction Confidence | |
| - **Average Confidence:** {np.mean(predictions['confidence']) * 100:,.2f}% | |
| - **Trend Direction:** {'πΊ Upward' if predictions['predicted'][-1] > predictions['actual'][-1] else 'π» Downward'} | |
| > *Note: Past performance does not guarantee future results. This analysis is for informational purposes only.* | |
| """ | |
| return fig, summary | |
| except Exception as e: | |
| fig = go.Figure() | |
| fig.add_annotation( | |
| text=str(e), | |
| xref="paper", | |
| yref="paper", | |
| x=0.5, | |
| y=0.5, | |
| showarrow=False | |
| ) | |
| return fig, f"β οΈ Error: {str(e)}" | |
| def create_interface(): | |
| with gr.Blocks(theme=gr.themes.Soft()) as iface: | |
| gr.Markdown(""" | |
| # π Advanced Cryptocurrency Price Prediction | |
| This app uses deep learning to predict cryptocurrency prices and provide comprehensive market analysis. | |
| ### Features: | |
| - Real-time price predictions | |
| - Technical indicators analysis | |
| - Confidence metrics | |
| - Performance visualization | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| crypto_input = gr.Dropdown( | |
| choices=['BTC', 'ETH', 'BNB', 'XRP', 'ADA', 'SOL', 'DOT', 'DOGE'], | |
| label="Select Cryptocurrency", | |
| value="BTC" | |
| ) | |
| custom_crypto = gr.Textbox( | |
| label="Or enter custom symbol", | |
| placeholder="e.g., MATIC" | |
| ) | |
| with gr.Row(): | |
| days_slider = gr.Slider( | |
| minimum=30, maximum=365, value=180, step=30, | |
| label="Historical Days" | |
| ) | |
| lookback_slider = gr.Slider( | |
| minimum=7, maximum=60, value=30, step=1, | |
| label="Lookback Period (Days)" | |
| ) | |
| submit_btn = gr.Button("π Generate Analysis", variant="primary") | |
| with gr.Column(scale=2): | |
| plot_output = gr.Plot(label="Analysis Plots") | |
| with gr.Row(): | |
| analysis_output = gr.Markdown(label="Analysis Summary") | |
| error_output = gr.Markdown(visible=False) | |
| def handle_analysis(symbol, custom_symbol, days, lookback): | |
| try: | |
| final_symbol = custom_symbol if custom_symbol else symbol | |
| figure, summary = create_analysis_plots(final_symbol, days, lookback) | |
| return figure, summary, gr.update(visible=False, value="") | |
| except Exception as e: | |
| empty_fig = go.Figure() | |
| error_msg = f"β οΈ Error during analysis: {str(e)}" | |
| return empty_fig, "", gr.update(visible=True, value=error_msg) | |
| submit_btn.click( | |
| fn=handle_analysis, | |
| inputs=[crypto_input, custom_crypto, days_slider, lookback_slider], | |
| outputs=[plot_output, analysis_output, error_output] | |
| ) | |
| return iface | |
| if __name__ == "__main__": | |
| import logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler('crypto_predictor.log'), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| try: | |
| os.makedirs("models", exist_ok=True) | |
| os.makedirs("cache", exist_ok=True) | |
| iface = create_interface() | |
| iface.launch( | |
| share=False, server_name="0.0.0.0", server_port=7860, debug=True | |
| ) | |
| except Exception as e: | |
| logging.error(f"Application failed to start: {str(e)}") | |
| raise | |