Spaces:
Build error
Build error
| import gradio as gr | |
| import numpy as np | |
| import json | |
| import re | |
| import random | |
| from typing import List, Dict, Tuple, Optional | |
| import os | |
| import time | |
| import matplotlib.pyplot as plt | |
| from io import BytesIO | |
| import base64 | |
| from datetime import datetime | |
| # Assuming all the classes (ActivationFunctions, LossFunctions, Layer, DenseLayer, | |
| # DropoutLayer, NeuralNetwork, TextProcessor, Chatbot) are defined as in your uploaded code | |
| # I'm not repeating them here for brevity | |
| class ActivationFunctions: | |
| """Class containing various activation functions and their derivatives.""" | |
| def sigmoid(z: np.ndarray) -> np.ndarray: | |
| """Sigmoid activation function.""" | |
| z = np.clip(z, -500, 500) | |
| return 1 / (1 + np.exp(-z)) | |
| def sigmoid_derivative(z: np.ndarray) -> np.ndarray: | |
| """Derivative of the sigmoid function.""" | |
| s = ActivationFunctions.sigmoid(z) | |
| return s * (1 - s) | |
| def relu(z: np.ndarray) -> np.ndarray: | |
| """ReLU activation function.""" | |
| return np.maximum(0, z) | |
| def relu_derivative(z: np.ndarray) -> np.ndarray: | |
| """Derivative of the ReLU function.""" | |
| return np.where(z > 0, 1, 0) | |
| def softmax(z: np.ndarray) -> np.ndarray: | |
| """Softmax activation function.""" | |
| exp_z = np.exp(z - np.max(z)) | |
| return exp_z / exp_z.sum(axis=0, keepdims=True) | |
| class LossFunctions: | |
| """Class containing various loss functions and their derivatives.""" | |
| def mse(output: np.ndarray, target: np.ndarray) -> float: | |
| """Mean Squared Error loss.""" | |
| return np.mean((output - target) ** 2) | |
| def mse_derivative(output: np.ndarray, target: np.ndarray) -> np.ndarray: | |
| """Derivative of MSE loss.""" | |
| return 2 * (output - target) / output.size | |
| def cross_entropy(output: np.ndarray, target: np.ndarray) -> float: | |
| """Cross Entropy loss for multi-class classification.""" | |
| epsilon = 1e-15 | |
| output = np.clip(output, epsilon, 1 - epsilon) | |
| return -np.sum(target * np.log(output)) / output.shape[1] | |
| def cross_entropy_derivative(output: np.ndarray, target: np.ndarray) -> np.ndarray: | |
| """Derivative of Cross Entropy loss.""" | |
| epsilon = 1e-15 | |
| output = np.clip(output, epsilon, 1 - epsilon) | |
| return -target / output / output.shape[1] | |
| class Layer: | |
| """Base class for neural network layers.""" | |
| def forward(self, inputs: np.ndarray) -> np.ndarray: | |
| """Forward pass through the layer.""" | |
| raise NotImplementedError | |
| def backward(self, grad: np.ndarray) -> np.ndarray: | |
| """Backward pass through the layer.""" | |
| raise NotImplementedError | |
| def update(self, learning_rate: float) -> None: | |
| """Update layer parameters.""" | |
| pass | |
| def get_parameters(self) -> List: | |
| """Get layer parameters.""" | |
| return [] | |
| class DenseLayer(Layer): | |
| """Fully connected layer with improved numerical stability.""" | |
| def __init__(self, input_size: int, output_size: int, activation: str = "sigmoid"): | |
| """Initialize the dense layer with more stable parameters.""" | |
| self.input_size = input_size | |
| self.output_size = output_size | |
| # Use smaller initialization to prevent exploding gradients | |
| # Xavier/Glorot initialization with smaller scale factor | |
| self.weights = np.random.randn(output_size, input_size) * np.sqrt( | |
| 1 / (input_size + output_size) | |
| ) | |
| self.biases = np.zeros((output_size, 1)) | |
| # Set activation function | |
| if activation == "sigmoid": | |
| self.activation_fn = ActivationFunctions.sigmoid | |
| self.activation_derivative = ActivationFunctions.sigmoid_derivative | |
| elif activation == "relu": | |
| self.activation_fn = ActivationFunctions.relu | |
| self.activation_derivative = ActivationFunctions.relu_derivative | |
| elif activation == "softmax": | |
| self.activation_fn = ActivationFunctions.softmax | |
| self.activation_derivative = None | |
| else: | |
| raise ValueError(f"Unsupported activation function: {activation}") | |
| self.activation_name = activation | |
| # Cache for backward pass | |
| self.inputs = None | |
| self.z = None | |
| self.output = None | |
| # Gradients | |
| self.dW = None | |
| self.db = None | |
| def forward(self, inputs: np.ndarray) -> np.ndarray: | |
| """Forward pass through the layer with improved numerical stability.""" | |
| self.inputs = inputs | |
| # Use dot product with better numerical stability | |
| self.z = np.dot(self.weights, inputs) + self.biases | |
| # Clip values to prevent overflow in activations | |
| if self.activation_name == "sigmoid": | |
| self.z = np.clip(self.z, -15, 15) # Prevent overflow in sigmoid | |
| self.output = self.activation_fn(self.z) | |
| # Add small epsilon to prevent exact zeros or ones | |
| if self.activation_name == "softmax": | |
| epsilon = 1e-10 | |
| self.output = np.clip(self.output, epsilon, 1.0 - epsilon) | |
| return self.output | |
| def backward(self, grad: np.ndarray) -> np.ndarray: | |
| """Backward pass through the layer with gradient clipping.""" | |
| if self.activation_name == "softmax": | |
| # Special case for softmax + cross-entropy | |
| delta = grad | |
| else: | |
| delta = grad * self.activation_derivative(self.z) | |
| # Compute gradients | |
| self.dW = np.dot(delta, self.inputs.T) | |
| self.db = np.sum(delta, axis=1, keepdims=True) | |
| # Clip gradients to prevent exploding gradients | |
| max_grad_norm = 5.0 | |
| self.dW = np.clip(self.dW, -max_grad_norm, max_grad_norm) | |
| self.db = np.clip(self.db, -max_grad_norm, max_grad_norm) | |
| # Gradient to pass to the previous layer | |
| return np.dot(self.weights.T, delta) | |
| def update(self, learning_rate: float) -> None: | |
| """Update layer parameters using gradient descent with weight decay.""" | |
| # Add small weight decay to prevent overfitting | |
| weight_decay = 1e-4 | |
| weight_decay_term = weight_decay * self.weights | |
| self.weights -= learning_rate * (self.dW + weight_decay_term) | |
| self.biases -= learning_rate * self.db | |
| class DropoutLayer(Layer): | |
| """Dropout layer for regularization.""" | |
| def __init__(self, dropout_rate: float = 0.5): | |
| """Initialize the dropout layer.""" | |
| self.dropout_rate = dropout_rate | |
| self.mask = None | |
| def forward(self, inputs: np.ndarray, training: bool = True) -> np.ndarray: | |
| """Forward pass through the layer.""" | |
| if not training: | |
| return inputs | |
| # Create dropout mask | |
| self.mask = np.random.binomial(1, 1 - self.dropout_rate, size=inputs.shape) / ( | |
| 1 - self.dropout_rate | |
| ) | |
| return inputs * self.mask | |
| def backward(self, grad: np.ndarray) -> np.ndarray: | |
| """Backward pass through the layer.""" | |
| return grad * self.mask | |
| class NeuralNetwork: | |
| """Neural network with multiple layers.""" | |
| def __init__(self): | |
| """Initialize the neural network.""" | |
| self.layers = [] | |
| self.loss_fn = None | |
| self.loss_derivative = None | |
| def add(self, layer: Layer) -> None: | |
| """Add a layer to the network.""" | |
| self.layers.append(layer) | |
| def set_loss(self, loss_type: str) -> None: | |
| """Set the loss function.""" | |
| if loss_type == "mse": | |
| self.loss_fn = LossFunctions.mse | |
| self.loss_derivative = LossFunctions.mse_derivative | |
| elif loss_type == "cross_entropy": | |
| self.loss_fn = LossFunctions.cross_entropy | |
| self.loss_derivative = LossFunctions.cross_entropy_derivative | |
| else: | |
| raise ValueError(f"Unsupported loss function: {loss_type}") | |
| def forward(self, x: np.ndarray, training: bool = True) -> np.ndarray: | |
| """Forward pass through the network.""" | |
| output = x | |
| for layer in self.layers: | |
| if isinstance(layer, DropoutLayer): | |
| output = layer.forward(output, training) | |
| else: | |
| output = layer.forward(output) | |
| return output | |
| def compute_loss(self, y_pred: np.ndarray, y_true: np.ndarray) -> float: | |
| """Compute the loss.""" | |
| return self.loss_fn(y_pred, y_true) | |
| def backward(self, y_pred: np.ndarray, y_true: np.ndarray) -> None: | |
| """Backward pass through the network.""" | |
| # Initial gradient from the loss function | |
| grad = self.loss_derivative(y_pred, y_true) | |
| # Propagate gradient through layers in reverse order | |
| for layer in reversed(self.layers): | |
| grad = layer.backward(grad) | |
| def update(self, learning_rate: float) -> None: | |
| """Update network parameters.""" | |
| for layer in self.layers: | |
| layer.update(learning_rate) | |
| def predict(self, x: np.ndarray) -> np.ndarray: | |
| """Make predictions.""" | |
| return self.forward(x, training=False) | |
| def load(cls, filename: str) -> "NeuralNetwork": | |
| """Load a model from a file.""" | |
| with open(filename, "r") as f: | |
| model_data = json.load(f) | |
| network = cls() | |
| network.set_loss(model_data.get("loss_type", "cross_entropy")) | |
| for layer_data in model_data["layers"]: | |
| if layer_data["type"] == "dense": | |
| layer = DenseLayer( | |
| layer_data["input_size"], | |
| layer_data["output_size"], | |
| layer_data["activation"], | |
| ) | |
| layer.weights = np.array(layer_data["weights"]) | |
| layer.biases = np.array(layer_data["biases"]) | |
| network.add(layer) | |
| elif layer_data["type"] == "dropout": | |
| layer = DropoutLayer(layer_data["dropout_rate"]) | |
| network.add(layer) | |
| return network | |
| def save(self, filename: str) -> None: | |
| """Save the model to a file.""" | |
| model_data = {"layers": []} | |
| for layer in self.layers: | |
| if isinstance(layer, DenseLayer): | |
| layer_data = { | |
| "type": "dense", | |
| "input_size": layer.input_size, | |
| "output_size": layer.output_size, | |
| "activation": layer.activation_name, | |
| "weights": layer.weights.tolist(), | |
| "biases": layer.biases.tolist(), | |
| } | |
| model_data["layers"].append(layer_data) | |
| elif isinstance(layer, DropoutLayer): | |
| layer_data = {"type": "dropout", "dropout_rate": layer.dropout_rate} | |
| model_data["layers"].append(layer_data) | |
| with open(filename, "w") as f: | |
| json.dump(model_data, f) | |
| class TextProcessor: | |
| """Class for processing text data.""" | |
| def __init__(self): | |
| """Initialize the text processor.""" | |
| self.vocabulary = [] | |
| self.vocabulary_size = 0 | |
| def tokenize(self, sentence: str) -> List[str]: | |
| """Tokenize a sentence.""" | |
| return re.findall(r"\w+", sentence.lower()) | |
| def build_vocabulary(self, sentences: List[str]) -> None: | |
| """Build the vocabulary from a list of sentences.""" | |
| vocabulary = set() | |
| for sentence in sentences: | |
| tokens = self.tokenize(sentence) | |
| vocabulary.update(tokens) | |
| self.vocabulary = sorted(list(vocabulary)) | |
| self.vocabulary_size = len(self.vocabulary) | |
| def sentence_to_bow(self, sentence: str) -> np.ndarray: | |
| """Convert a sentence to a bag-of-words vector.""" | |
| tokens = self.tokenize(sentence) | |
| vector = np.zeros((self.vocabulary_size, 1)) | |
| for token in tokens: | |
| if token in self.vocabulary: | |
| idx = self.vocabulary.index(token) | |
| vector[idx, 0] = 1 | |
| return vector | |
| def save(self, filename: str) -> None: | |
| """Save the text processor to a file.""" | |
| processor_data = { | |
| "vocabulary": self.vocabulary, | |
| "vocabulary_size": self.vocabulary_size, | |
| } | |
| with open(filename, "w") as f: | |
| json.dump(processor_data, f) | |
| def load(cls, filename: str) -> "TextProcessor": | |
| """Load a text processor from a file.""" | |
| with open(filename, "r") as f: | |
| processor_data = json.load(f) | |
| processor = cls() | |
| processor.vocabulary = processor_data["vocabulary"] | |
| processor.vocabulary_size = processor_data["vocabulary_size"] | |
| return processor | |
| class Chatbot: | |
| """Neural network based chatbot.""" | |
| def __init__(self): | |
| """Initialize the chatbot.""" | |
| self.intents = {} | |
| self.text_processor = TextProcessor() | |
| self.model = NeuralNetwork() | |
| self.intent_names = [] | |
| self.confidence_threshold = 0.5 | |
| self.default_response = "I'm not sure I understand. Could you rephrase that?" | |
| self.training_history = None | |
| def load_intents(self, intents_data: Dict) -> None: | |
| """Load intents data.""" | |
| self.intents = intents_data | |
| self.intent_names = list(self.intents.keys()) | |
| # Extract all patterns for building vocabulary | |
| all_patterns = [] | |
| for intent in self.intents.values(): | |
| all_patterns.extend(intent["patterns"]) | |
| # Build vocabulary from patterns | |
| self.text_processor.build_vocabulary(all_patterns) | |
| def load_intents_from_file(self, filename: str) -> None: | |
| """Load intents from a JSON file.""" | |
| with open(filename, "r") as f: | |
| intents_data = json.load(f) | |
| self.load_intents(intents_data) | |
| def save_intents(self, filename: str) -> None: | |
| """Save intents to a JSON file.""" | |
| with open(filename, "w") as f: | |
| json.dump(self.intents, f, indent=4) | |
| def load_model(self, filename: str) -> None: | |
| """Load a model from a file.""" | |
| self.model = NeuralNetwork.load(filename) | |
| def save_model(self, filename: str) -> None: | |
| """Save the model to a file.""" | |
| self.model.save(filename) | |
| # Also save the text processor and intent names | |
| self.text_processor.save(filename.replace(".json", "_processor.json")) | |
| # Save intent names | |
| with open(filename.replace(".json", "_intents.json"), "w") as f: | |
| json.dump( | |
| { | |
| "intent_names": self.intent_names, | |
| "confidence_threshold": self.confidence_threshold, | |
| "default_response": self.default_response, | |
| }, | |
| f, | |
| ) | |
| def build_model( | |
| self, hidden_layers: List[int] = [8], dropout_rate: float = 0.0 | |
| ) -> None: | |
| """Build the neural network model.""" | |
| # Input layer size is the vocabulary size | |
| input_size = self.text_processor.vocabulary_size | |
| # Output layer size is the number of intents | |
| output_size = len(self.intent_names) | |
| if output_size == 0: | |
| raise ValueError("No intents loaded. Please load intents first.") | |
| # Create the model | |
| self.model = NeuralNetwork() | |
| # Add first hidden layer | |
| self.model.add(DenseLayer(input_size, hidden_layers[0], "relu")) | |
| # Add dropout if needed | |
| if dropout_rate > 0: | |
| self.model.add(DropoutLayer(dropout_rate)) | |
| # Add additional hidden layers | |
| for i in range(1, len(hidden_layers)): | |
| self.model.add(DenseLayer(hidden_layers[i - 1], hidden_layers[i], "relu")) | |
| # Add dropout if needed | |
| if dropout_rate > 0: | |
| self.model.add(DropoutLayer(dropout_rate)) | |
| # Add output layer with softmax activation for classification | |
| self.model.add(DenseLayer(hidden_layers[-1], output_size, "softmax")) | |
| # Set cross-entropy loss for classification | |
| self.model.set_loss("cross_entropy") | |
| def train( | |
| self, | |
| epochs: int = 1000, | |
| learning_rate: float = 0.01, | |
| batch_size: int = None, | |
| verbose: bool = True, | |
| ) -> Dict: | |
| """Train the model with numerical stability fixes.""" | |
| # Prepare training data | |
| X_train = [] | |
| y_train = [] | |
| for idx, intent in enumerate(self.intent_names): | |
| for pattern in self.intents[intent]["patterns"]: | |
| # Convert pattern to bag-of-words | |
| X_train.append(self.text_processor.sentence_to_bow(pattern)) | |
| # Create one-hot encoded target | |
| target = np.zeros((len(self.intent_names), 1)) | |
| target[idx, 0] = 1 | |
| y_train.append(target) | |
| # Convert to numpy arrays | |
| X_train = np.hstack(X_train) | |
| y_train = np.hstack(y_train) | |
| # Training history | |
| history = {"loss": [], "accuracy": []} | |
| # Apply gradient clipping to prevent exploding gradients | |
| max_grad_norm = 1.0 | |
| # Training loop | |
| for epoch in range(epochs): | |
| # Forward pass | |
| outputs = self.model.forward(X_train) | |
| # Add small epsilon to prevent log(0) | |
| epsilon = 1e-10 | |
| outputs = np.clip(outputs, epsilon, 1.0 - epsilon) | |
| # Compute loss | |
| loss = self.model.compute_loss(outputs, y_train) | |
| # Check for NaN and if found, break training | |
| if np.isnan(loss): | |
| if verbose: | |
| print(f"NaN loss detected at epoch {epoch+1}. Stopping training.") | |
| # If we have previous good values, use those | |
| if epoch > 0: | |
| break | |
| else: | |
| # Otherwise, return with error | |
| return {"loss": [0], "accuracy": [0]} | |
| # Backward pass | |
| self.model.backward(outputs, y_train) | |
| # Apply gradient clipping to each layer | |
| for layer in self.model.layers: | |
| if hasattr(layer, "dW") and layer.dW is not None: | |
| # Clip gradients | |
| layer.dW = np.clip(layer.dW, -max_grad_norm, max_grad_norm) | |
| if hasattr(layer, "db") and layer.db is not None: | |
| layer.db = np.clip(layer.db, -max_grad_norm, max_grad_norm) | |
| # Update parameters | |
| self.model.update(learning_rate) | |
| # Compute accuracy | |
| predictions = np.argmax(outputs, axis=0) | |
| targets = np.argmax(y_train, axis=0) | |
| accuracy = np.mean(predictions == targets) | |
| # Save history | |
| history["loss"].append( | |
| float(loss) | |
| ) # Convert to Python float to ensure it's serializable | |
| history["accuracy"].append(float(accuracy)) | |
| # Print progress | |
| if verbose and (epoch + 1) % 100 == 0: | |
| print( | |
| f"Epoch {epoch + 1}/{epochs}, Loss: {loss:.4f}, Accuracy: {accuracy:.4f}" | |
| ) | |
| self.training_history = history | |
| return history | |
| def predict(self, sentence: str) -> Tuple[str, float]: | |
| """Predict the intent of a sentence.""" | |
| # Convert to bag-of-words | |
| bow = self.text_processor.sentence_to_bow(sentence) | |
| # Get prediction | |
| prediction = self.model.predict(bow) | |
| # Get predicted intent and confidence | |
| intent_idx = np.argmax(prediction) | |
| confidence = prediction[intent_idx, 0] | |
| return self.intent_names[intent_idx], confidence | |
| def get_response(self, sentence: str) -> Tuple[str, str, float]: | |
| """Get a response for a user input.""" | |
| intent, confidence = self.predict(sentence) | |
| # Use default response if confidence is below threshold | |
| if confidence < self.confidence_threshold: | |
| return "unknown", self.default_response, confidence | |
| # Get a random response for the predicted intent | |
| responses = self.intents[intent]["responses"] | |
| response = random.choice(responses) | |
| return intent, response, confidence | |
| def plot_training_history(self, history: Dict = None) -> None: | |
| """Plot the training history.""" | |
| if history is None: | |
| history = self.training_history | |
| if history is None: | |
| print("No training history available.") | |
| return | |
| plt.figure(figsize=(12, 5)) | |
| plt.subplot(1, 2, 1) | |
| plt.plot(history["loss"]) | |
| plt.title("Model Loss") | |
| plt.xlabel("Epoch") | |
| plt.ylabel("Loss") | |
| plt.subplot(1, 2, 2) | |
| plt.plot(history["accuracy"]) | |
| plt.title("Model Accuracy") | |
| plt.xlabel("Epoch") | |
| plt.ylabel("Accuracy") | |
| plt.tight_layout() | |
| plt.show() | |
| def get_training_plot_as_base64(self, history: Dict = None) -> str: | |
| """Generate a base64 encoded image of the training history plot with improved error handling.""" | |
| if history is None: | |
| history = self.training_history | |
| if history is None or "loss" not in history or len(history["loss"]) == 0: | |
| return None | |
| try: | |
| plt.figure(figsize=(12, 5)) | |
| # Check for NaN values and filter them out | |
| loss_values = [x for x in history["loss"] if not np.isnan(x)] | |
| acc_values = [x for x in history["accuracy"] if not np.isnan(x)] | |
| if len(loss_values) == 0 or len(acc_values) == 0: | |
| return None | |
| # Plot loss (with error handling) | |
| plt.subplot(1, 2, 1) | |
| plt.plot(loss_values) | |
| plt.title("Model Loss") | |
| plt.xlabel("Epoch") | |
| plt.ylabel("Loss") | |
| # Plot accuracy (with error handling) | |
| plt.subplot(1, 2, 2) | |
| plt.plot(acc_values) | |
| plt.title("Model Accuracy") | |
| plt.xlabel("Epoch") | |
| plt.ylabel("Accuracy") | |
| plt.tight_layout() | |
| # Save plot to a BytesIO object | |
| buf = BytesIO() | |
| plt.savefig(buf, format="png") | |
| buf.seek(0) | |
| # Encode to base64 | |
| img_str = base64.b64encode(buf.read()).decode("utf-8") | |
| plt.close() | |
| # Save the image to a file instead of returning the base64 string directly | |
| # This avoids the file name too long error | |
| img_path = "training_plot.png" | |
| with open(img_path, "wb") as f: | |
| f.write(base64.b64decode(img_str)) | |
| return img_path | |
| except Exception as e: | |
| print(f"Error generating training plot: {str(e)}") | |
| return None | |
| def chat(self): | |
| """Start a chat session in the console.""" | |
| print("Chatbot: Hello! Type 'quit' to exit.") | |
| while True: | |
| user_input = input("You: ") | |
| if user_input.lower() in ["quit", "exit", "bye"]: | |
| print("Chatbot: Goodbye!") | |
| break | |
| intent, response, confidence = self.get_response(user_input) | |
| print(f"Chatbot ({intent}, {confidence:.2f}): {response}") | |
| # Initialize the chatbot | |
| chatbot = Chatbot() | |
| # Default intents | |
| default_intents = { | |
| "greeting": { | |
| "patterns": ["Hi", "Hello", "Hey", "Good morning", "What's up"], | |
| "responses": ["Hello!", "Hi there!", "Greetings!", "Hey! How can I help you?"], | |
| }, | |
| "farewell": { | |
| "patterns": ["Bye", "See you", "Goodbye", "Later", "I'm leaving"], | |
| "responses": ["Goodbye!", "See you later!", "Farewell!", "Take care!"], | |
| }, | |
| "thanks": { | |
| "patterns": ["Thanks", "Thank you", "Much appreciated", "Appreciate it"], | |
| "responses": ["You're welcome!", "No problem!", "Anytime!", "Glad to help!"], | |
| }, | |
| "help": { | |
| "patterns": ["Help", "I need help", "Can you help me", "Support"], | |
| "responses": [ | |
| "How can I help you?", | |
| "I'm here to assist you.", | |
| "What do you need help with?", | |
| ], | |
| }, | |
| } | |
| # Function to initialize the chatbot | |
| def initialize_chatbot(): | |
| global chatbot | |
| # Check if model exists | |
| model_path = "chatbot_model.json" | |
| processor_path = "chatbot_model_processor.json" | |
| intents_names_path = "chatbot_model_intents.json" | |
| intents_path = "intents.json" | |
| # Check if intents file exists | |
| if os.path.exists(intents_path): | |
| try: | |
| chatbot.load_intents_from_file(intents_path) | |
| print(f"Loaded intents from {intents_path}") | |
| except Exception as e: | |
| print(f"Error loading intents: {e}") | |
| print("Loading default intents") | |
| chatbot.load_intents(default_intents) | |
| else: | |
| print("No intents file found. Loading default intents") | |
| chatbot.load_intents(default_intents) | |
| # Save default intents | |
| chatbot.save_intents(intents_path) | |
| # Check if all model files exist | |
| if ( | |
| os.path.exists(model_path) | |
| and os.path.exists(processor_path) | |
| and os.path.exists(intents_names_path) | |
| ): | |
| try: | |
| # Load the model | |
| chatbot.load_model(model_path) | |
| # Load the text processor | |
| chatbot.text_processor = TextProcessor.load(processor_path) | |
| # Load intent names and settings | |
| with open(intents_names_path, "r") as f: | |
| intents_data = json.load(f) | |
| chatbot.intent_names = intents_data["intent_names"] | |
| chatbot.confidence_threshold = intents_data.get( | |
| "confidence_threshold", 0.5 | |
| ) | |
| chatbot.default_response = intents_data.get( | |
| "default_response", | |
| "I'm not sure I understand. Could you rephrase that?", | |
| ) | |
| print(f"Loaded existing model from {model_path}") | |
| except Exception as e: | |
| print(f"Error loading model: {e}") | |
| print("A new model will be built and trained") | |
| chatbot.build_model(hidden_layers=[32, 16]) | |
| else: | |
| print( | |
| "No model found or incomplete model files. A new model will be built and trained" | |
| ) | |
| chatbot.build_model(hidden_layers=[32, 16]) | |
| # Call initialize | |
| initialize_chatbot() | |
| # Chat history for the interface | |
| chat_history = [] | |
| # Function to respond to user messages | |
| def respond(message, history): | |
| if not message: | |
| return "Please type a message." | |
| # Get response from chatbot | |
| intent, response, confidence = chatbot.get_response(message) | |
| # Add thinking animation (simulate processing) | |
| time.sleep(0.5) | |
| # Return the response | |
| return response | |
| # Function to get intent and confidence | |
| def get_intent_info(message): | |
| if not message: | |
| return "N/A", 0.0 | |
| # Get intent and confidence | |
| intent, confidence = chatbot.predict(message) | |
| return intent, float(confidence) | |
| # Function to add a new intent | |
| def add_intent(intent_name, patterns, responses): | |
| if not intent_name or not patterns or not responses: | |
| return "Please fill all fields" | |
| # Split patterns and responses | |
| pattern_list = [p.strip() for p in patterns.split("\n") if p.strip()] | |
| response_list = [r.strip() for r in responses.split("\n") if r.strip()] | |
| if not pattern_list or not response_list: | |
| return "Please provide at least one pattern and one response" | |
| # Check if intent already exists | |
| if intent_name in chatbot.intents: | |
| # Update existing intent | |
| chatbot.intents[intent_name]["patterns"].extend(pattern_list) | |
| chatbot.intents[intent_name]["responses"].extend(response_list) | |
| else: | |
| # Add new intent | |
| chatbot.intents[intent_name] = { | |
| "patterns": pattern_list, | |
| "responses": response_list, | |
| } | |
| chatbot.intent_names.append(intent_name) | |
| # Save intents | |
| chatbot.save_intents("intents.json") | |
| return f"Intent '{intent_name}' added/updated successfully" | |
| # Fixed train_model function with corrected format string | |
| def train_model(epochs, learning_rate, hidden_layers_str, dropout_rate): | |
| try: | |
| # Parse hidden layers | |
| hidden_layers = [ | |
| int(x.strip()) for x in hidden_layers_str.split(",") if x.strip() | |
| ] | |
| if not hidden_layers: | |
| return ( | |
| "Error: Invalid hidden layer format. Use comma-separated numbers, e.g. '32,16'", | |
| None, | |
| ) | |
| # Convert to float/int and use lower learning rate for stability | |
| epochs = int(epochs) | |
| learning_rate = min( | |
| float(learning_rate), 0.005 | |
| ) # Cap learning rate for stability | |
| dropout_rate = float(dropout_rate) | |
| # Validate intents and vocabulary | |
| if len(chatbot.intent_names) < 2: | |
| return ( | |
| "Error: Need at least 2 intents for training. Please add more intents.", | |
| None, | |
| ) | |
| if chatbot.text_processor.vocabulary_size == 0: | |
| return ( | |
| "Error: No vocabulary built. Please add more patterns to your intents.", | |
| None, | |
| ) | |
| # Rebuild model with new architecture | |
| chatbot.build_model(hidden_layers=hidden_layers, dropout_rate=dropout_rate) | |
| # Train the model | |
| history = chatbot.train( | |
| epochs=epochs, learning_rate=learning_rate, verbose=True | |
| ) | |
| # Check if training was successful | |
| if not history or "loss" not in history or not history["loss"]: | |
| return "Training failed - no history data returned", None | |
| # Format final loss and accuracy safely | |
| final_loss = history["loss"][-1] if history["loss"] else 0 | |
| final_accuracy = history["accuracy"][-1] if history["accuracy"] else 0 | |
| if np.isnan(final_loss): | |
| final_loss_str = "NaN" | |
| else: | |
| final_loss_str = f"{final_loss:.4f}" | |
| if np.isnan(final_accuracy): | |
| final_accuracy_str = "NaN" | |
| else: | |
| final_accuracy_str = f"{final_accuracy:.4f}" | |
| # Save the model | |
| chatbot.save_model("chatbot_model.json") | |
| # Generate plot image | |
| img_str = chatbot.get_training_plot_as_base64(history) | |
| return ( | |
| f"Model trained successfully with:\n" | |
| f"- Epochs: {epochs}\n" | |
| f"- Learning Rate: {learning_rate}\n" | |
| f"- Hidden Layers: {hidden_layers}\n" | |
| f"- Dropout Rate: {dropout_rate}\n" | |
| f"- Final Loss: {final_loss_str}\n" | |
| f"- Final Accuracy: {final_accuracy_str}" | |
| ), img_str | |
| except Exception as e: | |
| import traceback | |
| error_details = traceback.format_exc() | |
| return f"Error training model: {str(e)}\n\nDetails:\n{error_details}", None | |
| # Function to load an existing model | |
| def load_model_from_file(file_obj): | |
| if not file_obj: | |
| return "No file uploaded" | |
| try: | |
| file_path = file_obj.name | |
| # Check file extension | |
| if not file_path.endswith(".json"): | |
| return "Please upload a JSON model file" | |
| # Load the model | |
| chatbot.load_model(file_path) | |
| # Get the base name without extension for related files | |
| base_name = os.path.splitext(file_path)[0] | |
| processor_path = f"{base_name}_processor.json" | |
| intents_names_path = f"{base_name}_intents.json" | |
| # Check for related files | |
| if os.path.exists(processor_path): | |
| chatbot.text_processor = TextProcessor.load(processor_path) | |
| if os.path.exists(intents_names_path): | |
| with open(intents_names_path, "r") as f: | |
| intents_data = json.load(f) | |
| chatbot.intent_names = intents_data["intent_names"] | |
| chatbot.confidence_threshold = intents_data.get( | |
| "confidence_threshold", 0.5 | |
| ) | |
| chatbot.default_response = intents_data.get( | |
| "default_response", | |
| "I'm not sure I understand. Could you rephrase that?", | |
| ) | |
| return f"Model loaded successfully from {file_path}" | |
| except Exception as e: | |
| return f"Error loading model: {str(e)}" | |
| # Function to save the current model | |
| def save_model(): | |
| try: | |
| # Get timestamp for filename | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"chatbot_model_{timestamp}.json" | |
| # Save the model | |
| chatbot.save_model(filename) | |
| return f"Model saved as {filename}" | |
| except Exception as e: | |
| return f"Error saving model: {str(e)}" | |
| # Function to update settings | |
| def update_settings(threshold, default_response): | |
| try: | |
| # Update settings | |
| chatbot.confidence_threshold = float(threshold) | |
| chatbot.default_response = default_response | |
| # Save settings to the model intents file | |
| with open("chatbot_model_intents.json", "w") as f: | |
| json.dump( | |
| { | |
| "intent_names": chatbot.intent_names, | |
| "confidence_threshold": chatbot.confidence_threshold, | |
| "default_response": chatbot.default_response, | |
| }, | |
| f, | |
| ) | |
| return "Settings updated successfully" | |
| except Exception as e: | |
| return f"Error updating settings: {str(e)}" | |
| # Function to list intents | |
| def list_intents(): | |
| if not chatbot.intents: | |
| return "No intents available" | |
| intents_info = "" | |
| for intent_name, intent_data in chatbot.intents.items(): | |
| patterns = ", ".join(intent_data["patterns"][:3]) | |
| if len(intent_data["patterns"]) > 3: | |
| patterns += "..." | |
| responses = ", ".join(intent_data["responses"][:3]) | |
| if len(intent_data["responses"]) > 3: | |
| responses += "..." | |
| intents_info += f"**Intent**: {intent_name}\n" | |
| intents_info += f"**Patterns**: {patterns}\n" | |
| intents_info += f"**Responses**: {responses}\n\n" | |
| return intents_info | |
| # Function to edit an intent | |
| def edit_intent(intent_name, new_patterns, new_responses): | |
| if not intent_name or intent_name not in chatbot.intents: | |
| return f"Intent '{intent_name}' not found" | |
| # Split patterns and responses | |
| if new_patterns: | |
| pattern_list = [p.strip() for p in new_patterns.split("\n") if p.strip()] | |
| if pattern_list: | |
| chatbot.intents[intent_name]["patterns"] = pattern_list | |
| if new_responses: | |
| response_list = [r.strip() for r in new_responses.split("\n") if r.strip()] | |
| if response_list: | |
| chatbot.intents[intent_name]["responses"] = response_list | |
| # Save intents | |
| chatbot.save_intents("intents.json") | |
| return f"Intent '{intent_name}' updated successfully" | |
| # Function to delete an intent | |
| def delete_intent(intent_name): | |
| if not intent_name or intent_name not in chatbot.intents: | |
| return f"Intent '{intent_name}' not found" | |
| # Delete intent | |
| del chatbot.intents[intent_name] | |
| chatbot.intent_names.remove(intent_name) | |
| # Save intents | |
| chatbot.save_intents("intents.json") | |
| return f"Intent '{intent_name}' deleted successfully" | |
| # Get the list of intents for dropdown | |
| def get_intent_list(): | |
| return chatbot.intent_names | |
| # Function to export intents | |
| def export_intents(): | |
| try: | |
| # Get timestamp for filename | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"intents_{timestamp}.json" | |
| # Save intents | |
| with open(filename, "w") as f: | |
| json.dump(chatbot.intents, f, indent=4) | |
| return f"Intents exported as {filename}" | |
| except Exception as e: | |
| return f"Error exporting intents: {str(e)}" | |
| # Function to import intents | |
| def import_intents_from_file(file_obj): | |
| if not file_obj: | |
| return "No file uploaded" | |
| try: | |
| file_path = file_obj.name | |
| # Check file extension | |
| if not file_path.endswith(".json"): | |
| return "Please upload a JSON intents file" | |
| # Load intents | |
| with open(file_path, "r") as f: | |
| intents_data = json.load(f) | |
| # Validate intents format | |
| for intent_name, intent_data in intents_data.items(): | |
| if ( | |
| not isinstance(intent_data, dict) | |
| or "patterns" not in intent_data | |
| or "responses" not in intent_data | |
| ): | |
| return f"Invalid intent format for '{intent_name}'" | |
| # Update chatbot intents | |
| chatbot.load_intents(intents_data) | |
| # Save intents | |
| chatbot.save_intents("intents.json") | |
| return f"Imported {len(intents_data)} intents from {file_path}" | |
| except Exception as e: | |
| return f"Error importing intents: {str(e)}" | |
| # Function to get intent details | |
| def get_intent_details(intent_name): | |
| if not intent_name or intent_name not in chatbot.intents: | |
| return "", "" | |
| patterns = "\n".join(chatbot.intents[intent_name]["patterns"]) | |
| responses = "\n".join(chatbot.intents[intent_name]["responses"]) | |
| return patterns, responses | |
| # Create the Gradio interface with multiple tabs | |
| with gr.Blocks(title="Neural Network Chatbot", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# 🤖 Neural Network Chatbot") | |
| gr.Markdown( | |
| """ This chatbot uses a neural network to understand and respond to your messages. | |
| This chatbot application was developed by: | |
| | **Name** | **Student ID** | **Email** | | |
| |----------|----------------|-----------| | |
| | AARJEYAN SHRESTHA | C0927422 | C0927422@mylambton.ca | | |
| | PRAJWAL LUITEL | C0927658 | C0927658@mylambton.ca | | |
| | RAJAN GHIMIRE | C0924991 | C0924991@mylambton.ca | | |
| | RISHABH JHA | C0923563 | C0923563@mylambton.ca | | |
| | SUDIP CHAUDHARY | C0922310 | C0922310@mylambton.ca | | |
| - **Course**: Software Tools and Emerging Technologies for AI and ML | |
| - **Term**: 3rd | |
| - **Instructor**: [Peter Sigurdson](https://www.linkedin.com/in/petersigurdson/) | |
| """ | |
| ) | |
| with gr.Tabs(): | |
| # Chat tab | |
| with gr.Tab("Chat"): | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| chatbot_interface = gr.Chatbot(label="Conversation", height=400) | |
| with gr.Row(): | |
| msg = gr.Textbox( | |
| placeholder="Type your message here...", | |
| label="Your message", | |
| lines=2, | |
| show_label=False, | |
| ) | |
| send_btn = gr.Button("Send", variant="primary") | |
| with gr.Accordion("Examples", open=False): | |
| gr.Examples( | |
| examples=[ | |
| "Hello!", | |
| "How are you?", | |
| "What can you help me with?", | |
| "Thank you", | |
| "Goodbye", | |
| ], | |
| inputs=msg, | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Analysis") | |
| intent_label = gr.Label(label="Predicted Intent") | |
| confidence_score = gr.Number(label="Confidence Score") | |
| gr.Markdown("### Settings") | |
| confidence_slider = gr.Slider( | |
| label="Confidence Threshold", | |
| minimum=0.0, | |
| maximum=1.0, | |
| step=0.05, | |
| value=chatbot.confidence_threshold, | |
| ) | |
| default_resp = gr.Textbox( | |
| label="Default Response", | |
| value=chatbot.default_response, | |
| lines=2, | |
| ) | |
| update_settings_btn = gr.Button("Update Settings") | |
| # Event handlers for chat | |
| def user_message(user_message, history): | |
| return "", history + [[user_message, None]] | |
| def bot_message(history): | |
| if history: | |
| user_message = history[-1][0] | |
| intent, response, confidence = chatbot.get_response(user_message) | |
| history[-1][1] = response | |
| return history, intent, confidence | |
| return history, "N/A", 0.0 | |
| msg.submit( | |
| user_message, | |
| [msg, chatbot_interface], | |
| [msg, chatbot_interface], | |
| queue=False, | |
| ).then( | |
| bot_message, | |
| chatbot_interface, | |
| [chatbot_interface, intent_label, confidence_score], | |
| ) | |
| send_btn.click( | |
| user_message, | |
| [msg, chatbot_interface], | |
| [msg, chatbot_interface], | |
| queue=False, | |
| ).then( | |
| bot_message, | |
| chatbot_interface, | |
| [chatbot_interface, intent_label, confidence_score], | |
| ) | |
| update_settings_btn.click( | |
| update_settings, | |
| [confidence_slider, default_resp], | |
| gr.Textbox(label="Status"), | |
| ) | |
| # Intents Management tab | |
| with gr.Tab("Intents Management"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Add New Intent") | |
| new_intent_name = gr.Textbox(label="Intent Name") | |
| new_patterns = gr.Textbox(label="Patterns (one per line)", lines=5) | |
| new_responses = gr.Textbox( | |
| label="Responses (one per line)", lines=5 | |
| ) | |
| add_intent_btn = gr.Button("Add Intent", variant="primary") | |
| add_intent_status = gr.Textbox(label="Status") | |
| with gr.Column(): | |
| gr.Markdown("### Edit Intent") | |
| edit_intent_dropdown = gr.Dropdown( | |
| label="Select Intent to Edit", | |
| choices=get_intent_list(), | |
| interactive=True, | |
| ) | |
| edit_patterns = gr.Textbox(label="Patterns (one per line)", lines=5) | |
| edit_responses = gr.Textbox( | |
| label="Responses (one per line)", lines=5 | |
| ) | |
| with gr.Row(): | |
| update_intent_btn = gr.Button("Update Intent") | |
| delete_intent_btn = gr.Button("Delete Intent", variant="stop") | |
| edit_intent_status = gr.Textbox(label="Status") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Import/Export Intents") | |
| with gr.Row(): | |
| export_intents_btn = gr.Button("Export Intents") | |
| import_intents_file = gr.File( | |
| label="Import Intents (JSON file)" | |
| ) | |
| import_export_status = gr.Textbox(label="Status") | |
| with gr.Column(): | |
| gr.Markdown("### Current Intents") | |
| refresh_intents_btn = gr.Button("Refresh Intents List") | |
| intents_list = gr.Markdown() | |
| # Event handlers for intents management | |
| add_intent_btn.click( | |
| add_intent, | |
| [new_intent_name, new_patterns, new_responses], | |
| add_intent_status, | |
| ) | |
| # Update dropdown when adding/deleting intents | |
| add_intent_btn.click(get_intent_list, [], edit_intent_dropdown) | |
| edit_intent_dropdown.change( | |
| get_intent_details, | |
| edit_intent_dropdown, | |
| [edit_patterns, edit_responses], | |
| ) | |
| update_intent_btn.click( | |
| edit_intent, | |
| [edit_intent_dropdown, edit_patterns, edit_responses], | |
| edit_intent_status, | |
| ) | |
| delete_intent_btn.click( | |
| delete_intent, edit_intent_dropdown, edit_intent_status | |
| ).then(get_intent_list, [], edit_intent_dropdown) | |
| export_intents_btn.click(export_intents, [], import_export_status) | |
| import_intents_file.change( | |
| import_intents_from_file, import_intents_file, import_export_status | |
| ).then(get_intent_list, [], edit_intent_dropdown) | |
| refresh_intents_btn.click(list_intents, [], intents_list) | |
| # Training tab | |
| with gr.Tab("Training"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Train Model") | |
| epochs_input = gr.Number( | |
| label="Epochs", value=500, minimum=100, maximum=5000, step=100 | |
| ) | |
| learning_rate_input = gr.Number( | |
| label="Learning Rate", | |
| value=0.01, | |
| minimum=0.0001, | |
| maximum=0.1, | |
| step=0.001, | |
| ) | |
| hidden_layers_input = gr.Textbox( | |
| label="Hidden Layers (comma-separated)", value="32, 16" | |
| ) | |
| dropout_rate_input = gr.Number( | |
| label="Dropout Rate", | |
| value=0.2, | |
| minimum=0.0, | |
| maximum=0.5, | |
| step=0.05, | |
| ) | |
| train_btn = gr.Button("Train Model", variant="primary") | |
| with gr.Column(): | |
| training_status = gr.Textbox(label="Training Status", lines=6) | |
| training_plot = gr.Image(label="Training History") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Model Management") | |
| save_model_btn = gr.Button("Save Current Model") | |
| load_model_file = gr.File(label="Load Model (JSON file)") | |
| model_status = gr.Textbox(label="Status") | |
| # Event handlers for training | |
| train_btn.click( | |
| train_model, | |
| [ | |
| epochs_input, | |
| learning_rate_input, | |
| hidden_layers_input, | |
| dropout_rate_input, | |
| ], | |
| [training_status, training_plot], | |
| ) | |
| save_model_btn.click(save_model, [], model_status) | |
| load_model_file.change(load_model_from_file, load_model_file, model_status) | |
| # About tab | |
| with gr.Tab("About"): | |
| gr.Markdown( | |
| """ | |
| ## Neural Network Chatbot | |
| This chatbot uses a neural network to understand and respond to user messages. | |
| The model is trained on a set of intents, each with patterns and responses. | |
| ### Features: | |
| - **Neural Network Backend**: The chatbot uses a fully-connected neural network with configurable layers. | |
| - **Intent Recognition**: Recognizes user intents based on trained patterns. | |
| - **Customizable Responses**: Each intent has multiple possible responses for variety. | |
| - **Training Interface**: Train the model directly from the web interface. | |
| - **Intent Management**: Add, edit, delete, import, and export intents. | |
| - **Model Management**: Save and load models for future use. | |
| ### How to Use: | |
| 1. **Chat Tab**: Interact with the chatbot. | |
| 2. **Intents Management Tab**: Manage the chatbot's knowledge. | |
| 3. **Training Tab**: Train the neural network model. | |
| 4. **About Tab**: Learn about the chatbot and its features. | |
| ### Technical Details: | |
| - Built with Python, NumPy, and Gradio. | |
| - Uses a bag-of-words approach for text representation. | |
| - Neural network with configurable hidden layers and activation functions. | |
| - Cross-entropy loss for multi-class classification. | |
| Created for deployment on Hugging Face Spaces. | |
| """ | |
| ) | |
| # Call initialize again after defining the UI | |
| # to make sure dropdown is populated | |
| chat_intents = get_intent_list() | |
| # Launch the app | |
| if __name__ == "__main__": | |
| demo.launch() | |