Spaces:
Sleeping
Sleeping
| import os | |
| # ✅ Fix PermissionError on Hugging Face Spaces | |
| os.environ["HF_HOME"] = "/tmp" | |
| os.environ["HF_DATASETS_CACHE"] = "/tmp" | |
| import streamlit as st | |
| from datasets import load_dataset | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from collections import defaultdict, Counter | |
| from sklearn.tree import DecisionTreeClassifier | |
| from sklearn.ensemble import GradientBoostingClassifier | |
| import random | |
| st.title("🧠 Language Model Explorer") | |
| ################################### | |
| # Sidebar configuration | |
| ################################### | |
| dataset_name = st.sidebar.selectbox( | |
| "Choose Dataset", | |
| ["squad", "tiny_shakespeare"] | |
| ) | |
| tokenizer_type = st.sidebar.selectbox( | |
| "Choose Tokenizer", | |
| ["character", "word"] | |
| ) | |
| model_type = st.sidebar.selectbox( | |
| "Choose Model", | |
| ["N-gram", "Feed Forward NN", "Decision Tree", "Gradient Boosted Tree", "RNN"] | |
| ) | |
| temperature = st.sidebar.slider("Sampling Temperature", 0.1, 2.0, 1.0) | |
| context_size = st.sidebar.slider("Context Size (how many tokens to look back)", min_value=2, max_value=10, value=3, step=1) | |
| # Number of tokens from dataset to use for training (minimum 100 tokens) | |
| num_train_tokens = st.sidebar.slider("Number of tokens from dataset to train on", min_value=100, max_value=100000, value=1000, step=100) | |
| train_button = st.sidebar.button("Train Model") | |
| device = torch.device("cpu") # force CPU usage | |
| ################################### | |
| # Load dataset | |
| ################################### | |
| def load_text(dataset_name): | |
| if dataset_name == "squad": | |
| data = load_dataset("squad", split="train[:1%]") | |
| texts = [x['context'] for x in data] | |
| elif dataset_name == "tiny_shakespeare": | |
| data = load_dataset("tiny_shakespeare") | |
| texts = [data['train'][0]['text']] | |
| else: | |
| texts = ["hello world"] | |
| return " ".join(texts) | |
| text_data = load_text(dataset_name) | |
| ################################### | |
| # Tokenization | |
| ################################### | |
| def tokenize(text, tokenizer_type): | |
| if tokenizer_type == "character": | |
| tokens = list(text) | |
| elif tokenizer_type == "word": | |
| tokens = text.split() | |
| return tokens | |
| tokens_all = tokenize(text_data, tokenizer_type) | |
| # Cap tokens to requested number for training | |
| tokens = tokens_all[:num_train_tokens] | |
| vocab = list(set(tokens)) | |
| PAD_TOKEN = "<PAD>" | |
| if PAD_TOKEN not in vocab: | |
| vocab.append(PAD_TOKEN) | |
| token_to_idx = {tok: i for i, tok in enumerate(vocab)} | |
| idx_to_token = {i: tok for tok, i in token_to_idx.items()} | |
| ################################### | |
| # Helper to pad context | |
| ################################### | |
| def pad_context(context, size): | |
| pad_len = size - len(context) | |
| if pad_len > 0: | |
| return [PAD_TOKEN]*pad_len + context | |
| else: | |
| return context[-size:] | |
| ################################### | |
| # Models | |
| ################################### | |
| class NGramModel: | |
| def __init__(self, tokens, n=3): | |
| self.n = n | |
| self.model = defaultdict(Counter) | |
| for i in range(len(tokens) - n): | |
| context = tuple(tokens[i:i+n-1]) | |
| next_token = tokens[i+n-1] | |
| self.model[context][next_token] += 1 | |
| def predict(self, context, temperature=1.0): | |
| context = tuple(context[-(self.n-1):]) | |
| counts = self.model.get(context, None) | |
| if counts is None: | |
| return random.choice(list(token_to_idx.keys())) | |
| items = list(counts.items()) | |
| tokens_, freqs = zip(*items) | |
| probs = np.array(freqs, dtype=float) | |
| probs = probs ** (1.0 / temperature) | |
| probs /= probs.sum() | |
| return np.random.choice(tokens_, p=probs) | |
| ################################### | |
| # Feed Forward NN | |
| ################################### | |
| class FFNN(nn.Module): | |
| def __init__(self, vocab_size, context_size, hidden_size=128): | |
| super().__init__() | |
| self.embed = nn.Embedding(vocab_size, hidden_size) | |
| self.fc1 = nn.Linear(hidden_size * context_size, hidden_size) | |
| self.fc2 = nn.Linear(hidden_size, vocab_size) | |
| def forward(self, x): | |
| x = self.embed(x) | |
| x = x.view(x.size(0), -1) | |
| x = torch.relu(self.fc1(x)) | |
| x = self.fc2(x) | |
| return x | |
| def train_ffnn(tokens, context_size=3, epochs=3): | |
| data = [] | |
| for i in range(len(tokens) - (context_size - 1)): | |
| context = tokens[i : i + context_size - 1] | |
| context = pad_context(context, context_size - 1) | |
| target = tokens[i + context_size - 1] | |
| data.append(( | |
| torch.tensor([token_to_idx.get(t, token_to_idx[PAD_TOKEN]) for t in context], device=device), | |
| token_to_idx.get(target, token_to_idx[PAD_TOKEN]) | |
| )) | |
| if len(data) == 0: | |
| st.warning("No training data generated. Increase dataset size or reduce context size.") | |
| return None | |
| model = FFNN(len(vocab), context_size - 1).to(device) | |
| optimizer = optim.Adam(model.parameters(), lr=0.01) | |
| criterion = nn.CrossEntropyLoss() | |
| progress_bar = st.progress(0) | |
| total_steps = len(data) * epochs | |
| step = 0 | |
| model.train() | |
| for epoch in range(epochs): | |
| total_loss = 0 | |
| random.shuffle(data) | |
| for x, y in data: | |
| x = x.unsqueeze(0) | |
| y = torch.tensor([y], device=device) | |
| optimizer.zero_grad() | |
| out = model(x) | |
| loss = criterion(out, y) | |
| loss.backward() | |
| optimizer.step() | |
| total_loss += loss.item() | |
| step += 1 | |
| progress_bar.progress(step / total_steps) | |
| st.write(f"Epoch {epoch+1}, Loss: {total_loss/len(data):.4f}") | |
| progress_bar.empty() | |
| return model | |
| def ffnn_predict(model, context, temperature=1.0): | |
| context = pad_context(context, context_size - 1) | |
| x = torch.tensor([token_to_idx.get(tok, token_to_idx[PAD_TOKEN]) for tok in context], device=device).unsqueeze(0) | |
| with torch.no_grad(): | |
| logits = model(x).squeeze() | |
| probs = torch.softmax(logits / temperature, dim=0).cpu().numpy() | |
| return np.random.choice(vocab, p=probs) | |
| ################################### | |
| # Decision Tree | |
| ################################### | |
| def train_dt(tokens, context_size=3): | |
| X, y = [], [] | |
| for i in range(len(tokens) - (context_size - 1)): | |
| context = tokens[i : i + context_size - 1] | |
| context = pad_context(context, context_size - 1) | |
| target = tokens[i + context_size - 1] | |
| X.append([token_to_idx.get(t, token_to_idx[PAD_TOKEN]) for t in context]) | |
| y.append(token_to_idx.get(target, token_to_idx[PAD_TOKEN])) | |
| with st.spinner("Training Decision Tree..."): | |
| model = DecisionTreeClassifier() | |
| model.fit(X, y) | |
| return model | |
| def dt_predict(model, context): | |
| context = pad_context(context, context_size - 1) | |
| x = [token_to_idx.get(tok, token_to_idx[PAD_TOKEN]) for tok in context] | |
| pred = model.predict([x])[0] | |
| return idx_to_token[pred] | |
| ################################### | |
| # Gradient Boosted Tree | |
| ################################### | |
| def train_gbt(tokens, context_size=3): | |
| X, y = [], [] | |
| for i in range(len(tokens) - (context_size - 1)): | |
| context = tokens[i : i + context_size - 1] | |
| context = pad_context(context, context_size - 1) | |
| target = tokens[i + context_size - 1] | |
| X.append([token_to_idx.get(t, token_to_idx[PAD_TOKEN]) for t in context]) | |
| y.append(token_to_idx.get(target, token_to_idx[PAD_TOKEN])) | |
| with st.spinner("Training Gradient Boosted Tree..."): | |
| model = GradientBoostingClassifier() | |
| model.fit(X, y) | |
| return model | |
| def gbt_predict(model, context): | |
| context = pad_context(context, context_size - 1) | |
| x = [token_to_idx.get(tok, token_to_idx[PAD_TOKEN]) for tok in context] | |
| pred = model.predict([x])[0] | |
| return idx_to_token[pred] | |
| ################################### | |
| # RNN | |
| ################################### | |
| class RNNModel(nn.Module): | |
| def __init__(self, vocab_size, embed_size=64, hidden_size=128): | |
| super().__init__() | |
| self.embed = nn.Embedding(vocab_size, embed_size) | |
| self.rnn = nn.RNN(embed_size, hidden_size, batch_first=True) | |
| self.fc = nn.Linear(hidden_size, vocab_size) | |
| def forward(self, x, h=None): | |
| x = self.embed(x) | |
| out, h = self.rnn(x, h) | |
| out = self.fc(out[:, -1, :]) | |
| return out, h | |
| def train_rnn(tokens, context_size=3, epochs=3): | |
| data = [] | |
| for i in range(len(tokens) - (context_size - 1)): | |
| context = tokens[i : i + context_size - 1] | |
| context = pad_context(context, context_size - 1) | |
| target = tokens[i + context_size - 1] | |
| data.append(( | |
| torch.tensor([token_to_idx.get(t, token_to_idx[PAD_TOKEN]) for t in context], device=device), | |
| token_to_idx.get(target, token_to_idx[PAD_TOKEN]) | |
| )) | |
| if len(data) == 0: | |
| st.warning("No training data generated. Increase dataset size or reduce context size.") | |
| return None | |
| model = RNNModel(len(vocab)).to(device) | |
| optimizer = optim.Adam(model.parameters(), lr=0.01) | |
| criterion = nn.CrossEntropyLoss() | |
| progress_bar = st.progress(0) | |
| total_steps = len(data) * epochs | |
| step = 0 | |
| model.train() | |
| for epoch in range(epochs): | |
| total_loss = 0 | |
| h = None | |
| random.shuffle(data) | |
| for x, y in data: | |
| x = x.unsqueeze(0) | |
| y = torch.tensor([y], device=device) | |
| out, h = model(x, h) | |
| loss = criterion(out, y) | |
| optimizer.zero_grad() | |
| loss.backward() | |
| optimizer.step() | |
| total_loss += loss.item() | |
| step += 1 | |
| progress_bar.progress(step / total_steps) | |
| st.write(f"Epoch {epoch+1}, Loss: {total_loss/len(data):.4f}") | |
| progress_bar.empty() | |
| return model | |
| def rnn_predict(model, context, temperature=1.0): | |
| context = pad_context(context, context_size - 1) | |
| x = torch.tensor([token_to_idx.get(tok, token_to_idx[PAD_TOKEN]) for tok in context], device=device).unsqueeze(0) | |
| with torch.no_grad(): | |
| logits, _ = model(x) | |
| probs = torch.softmax(logits.squeeze() / temperature, dim=0).cpu().numpy() | |
| return np.random.choice(vocab, p=probs) | |
| ################################### | |
| # Train and evaluate | |
| ################################### | |
| if train_button: | |
| st.write(f"Training **{model_type}** model with context size {context_size} on {len(tokens)} tokens...") | |
| if model_type == "N-gram": | |
| with st.spinner("Training N-gram model..."): | |
| model = NGramModel(tokens, n=context_size) | |
| elif model_type == "Feed Forward NN": | |
| model = train_ffnn(tokens, context_size=context_size) | |
| elif model_type == "Decision Tree": | |
| model = train_dt(tokens, context_size=context_size) | |
| elif model_type == "Gradient Boosted Tree": | |
| model = train_gbt(tokens, context_size=context_size) | |
| elif model_type == "RNN": | |
| model = train_rnn(tokens, context_size=context_size) | |
| if model is not None: | |
| st.session_state["model"] = model | |
| st.session_state["model_type"] = model_type | |
| st.session_state["context_size"] = context_size | |
| st.success(f"{model_type} model trained.") | |
| else: | |
| st.error("Training failed due to no data.") | |
| ################################### | |
| # Chat interface | |
| ################################### | |
| st.header("💬 Chat with the model") | |
| if "model" in st.session_state: | |
| user_input = st.text_input("Type a prompt:") | |
| if user_input: | |
| context = tokenize(user_input, tokenizer_type) | |
| generated = context.copy() | |
| for _ in range(20): | |
| ctx = pad_context(generated, st.session_state["context_size"] - 1) | |
| if st.session_state["model_type"] == "N-gram": | |
| next_tok = st.session_state["model"].predict(ctx, temperature) | |
| elif st.session_state["model_type"] == "Feed Forward NN": | |
| next_tok = ffnn_predict(st.session_state["model"], ctx, temperature) | |
| elif st.session_state["model_type"] == "Decision Tree": | |
| next_tok = dt_predict(st.session_state["model"], ctx) | |
| elif st.session_state["model_type"] == "Gradient Boosted Tree": | |
| next_tok = gbt_predict(st.session_state["model"], ctx) | |
| elif st.session_state["model_type"] == "RNN": | |
| next_tok = rnn_predict(st.session_state["model"], ctx, temperature) | |
| generated.append(next_tok) | |
| if next_tok == "<END>": | |
| break | |
| if tokenizer_type == "character": | |
| output = "".join(generated) | |
| else: | |
| output = " ".join(generated) | |
| st.write("**Model Output:**") | |
| st.write(output) | |
| else: | |
| st.info("Train a model to begin chatting.") | |