Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import Dataset, DataLoader | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.model_selection import train_test_split | |
| import gradio as gr | |
| import os | |
| # Define the Dataset class | |
| class BankNiftyDataset(Dataset): | |
| def __init__(self, data, seq_len, target_cols=['close']): | |
| self.data = data | |
| self.seq_len = seq_len | |
| self.target_cols = target_cols | |
| def __len__(self): | |
| return max(0, len(self.data) - self.seq_len + 1) | |
| def __getitem__(self, idx): | |
| seq_data = self.data.iloc[idx:idx+self.seq_len] | |
| features = torch.tensor(seq_data[['open', 'high', 'low', 'close', 'volume', 'oi']].values, dtype=torch.float32) | |
| label = torch.tensor(seq_data[self.target_cols].iloc[-1].values, dtype=torch.float32) | |
| return features, label | |
| # Define the LSTM model | |
| class LSTMModel(nn.Module): | |
| def __init__(self, input_dim, hidden_dim, output_dim, num_layers=2, dropout=0.1): | |
| super(LSTMModel, self).__init__() | |
| self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers=num_layers, batch_first=True, dropout=dropout) | |
| self.fc = nn.Sequential( | |
| nn.Linear(hidden_dim, hidden_dim // 2), | |
| nn.ReLU(), | |
| nn.Dropout(dropout), | |
| nn.Linear(hidden_dim // 2, output_dim) | |
| ) | |
| def forward(self, x): | |
| lstm_out, _ = self.lstm(x) | |
| out = self.fc(lstm_out[:, -1, :]) | |
| return out | |
| # Function to train the model | |
| def train_model(model, train_loader, val_loader, num_epochs=10): | |
| criterion = nn.MSELoss() | |
| optimizer = optim.Adam(model.parameters(), lr=0.001) | |
| best_val_loss = float('inf') | |
| best_model = None | |
| for epoch in range(num_epochs): | |
| model.train() | |
| for features, labels in train_loader: | |
| optimizer.zero_grad() | |
| outputs = model(features) | |
| loss = criterion(outputs, labels) | |
| loss.backward() | |
| optimizer.step() | |
| model.eval() | |
| val_loss = 0 | |
| with torch.no_grad(): | |
| for features, labels in val_loader: | |
| outputs = model(features) | |
| val_loss += criterion(outputs, labels).item() | |
| val_loss /= len(val_loader) | |
| print(f"Epoch {epoch+1}/{num_epochs}, Validation Loss: {val_loss:.4f}") | |
| if val_loss < best_val_loss: | |
| best_val_loss = val_loss | |
| best_model = model.state_dict().copy() | |
| model.load_state_dict(best_model) | |
| return model, best_val_loss | |
| # Function to generate trading signals | |
| def generate_signals(predictions, actual_values, stop_loss_threshold=0.05): | |
| signals = [] | |
| for pred, actual in zip(predictions, actual_values): | |
| if pred > actual * (1 + stop_loss_threshold): | |
| signals.append("Buy CE") | |
| elif pred < actual * (1 - stop_loss_threshold): | |
| signals.append("Buy PE") | |
| else: | |
| signals.append("Hold") | |
| return signals | |
| # Function to generate a report | |
| def generate_report(predictions, actual_values, signals, val_loss): | |
| report = [] | |
| cumulative_profit = 0 | |
| for i in range(len(signals)): | |
| signal = signals[i] | |
| profit = actual_values[i] - predictions[i] | |
| if signal == "Buy CE": | |
| cumulative_profit += profit | |
| elif signal == "Buy PE": | |
| cumulative_profit -= profit | |
| report.append(f"Signal: {signal}, Actual: {actual_values[i]:.2f}, Predicted: {predictions[i]:.2f}, Profit: {profit:.2f}") | |
| total_profit = cumulative_profit | |
| report.append(f"Total Profit: {total_profit:.2f}") | |
| report.append(f"Model Validation Loss: {val_loss:.4f}") | |
| return "\n".join(report) | |
| # Global variables to store the model and scaler | |
| global_model = None | |
| global_scaler = None | |
| # Function to process data and make predictions | |
| def predict(): | |
| global global_model, global_scaler | |
| # Load the pre-existing CSV file | |
| csv_path = 'BANKNIFTY_OPTION_CHAIN_data.csv' | |
| if not os.path.exists(csv_path): | |
| return "Error: CSV file not found in the expected location." | |
| # Load and preprocess data | |
| data = pd.read_csv(csv_path) | |
| if global_scaler is None: | |
| global_scaler = StandardScaler() | |
| scaled_data = global_scaler.fit_transform(data[['open', 'high', 'low', 'close', 'volume', 'oi']]) | |
| else: | |
| scaled_data = global_scaler.transform(data[['open', 'high', 'low', 'close', 'volume', 'oi']]) | |
| data[['open', 'high', 'low', 'close', 'volume', 'oi']] = scaled_data | |
| # Split data | |
| train_data, val_data = train_test_split(data, test_size=0.2, random_state=42) | |
| # Create datasets and dataloaders | |
| seq_len = 20 | |
| target_cols = ['close'] | |
| train_dataset = BankNiftyDataset(train_data, seq_len, target_cols) | |
| val_dataset = BankNiftyDataset(val_data, seq_len, target_cols) | |
| train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) | |
| val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False) | |
| # Initialize and train the model | |
| input_dim = 6 | |
| hidden_dim = 64 | |
| output_dim = len(target_cols) | |
| if global_model is None: | |
| global_model = LSTMModel(input_dim, hidden_dim, output_dim) | |
| global_model, val_loss = train_model(global_model, train_loader, val_loader) | |
| # Make predictions | |
| global_model.eval() | |
| predictions = [] | |
| actual_values = val_data['close'].values[seq_len-1:] | |
| with torch.no_grad(): | |
| for i in range(len(val_dataset)): | |
| features, _ = val_dataset[i] | |
| pred = global_model(features.unsqueeze(0)).item() | |
| predictions.append(pred) | |
| # Generate signals and report | |
| signals = generate_signals(predictions, actual_values) | |
| report = generate_report(predictions, actual_values, signals, val_loss) | |
| return report | |
| # Set up the Gradio interface | |
| iface = gr.Interface( | |
| fn=predict, | |
| inputs=None, | |
| outputs=gr.Textbox(label="Prediction Report"), | |
| title="BankNifty Option Chain Predictor", | |
| description="Click 'Submit' to generate predictions and trading signals based on the latest BankNifty option chain data. The model is automatically trained and improved with each run." | |
| ) | |
| # Launch the app | |
| iface.launch() |