# Model/model.py import torch import torch.nn as nn import inspect from huggingface_hub import PyTorchModelHubMixin # Define hyperparameters and constants BATCH_SIZE = 16 BLOCK_SIZE = 1024 MAX_ITERS = 5 EVAL_INTERVAL = 500 LEARNING_RATE = 6e-4 DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu' EVAL_ITERS = 200 N_EMBD = 768 N_HEAD = 12 N_LAYER = 12 DROPOUT = 0.2 MODEL_PATH = "Naive_gpt\model_weights_llama" # Where to save weights class CausalSelfAttention(nn.Module): def __init__(self): super().__init__() assert N_EMBD % N_HEAD == 0 # key, query, value projections for all heads, but in a batch self.c_attn = nn.Linear(N_EMBD, 3 * N_EMBD) # output projection self.c_proj = nn.Linear(N_EMBD, N_EMBD) self.c_proj.NANOGPT_SCALE_INIT = 1 # regularization self.n_head = N_HEAD self.n_embd = N_EMBD def forward(self, x): B, T, C = x.size() # batch size, sequence length, embedding dimensionality (n_embd) # calculate query, key, values for all heads in batch and move head forward to be the batch dim # nh is "number of heads", hs is "head size", and C (number of channels) = nh * hs # e.g. in GPT-2 (124M), n_head=12, hs=64, so nh*hs=C=768 channels in the Transformer qkv = self.c_attn(x) q, k, v = qkv.split(self.n_embd, dim=2) k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs) q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs) v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs) y = nn.functional.scaled_dot_product_attention( q, k, v, is_causal=True) # flash attention # re-assemble all head outputs side by side y = y.transpose(1, 2).contiguous().view(B, T, C) # output projection y = self.c_proj(y) return y class FeedFoward(nn.Module): #yeh MLP hai karpathy wala -> Feed forward hai sebastian wala def __init__(self): super().__init__() self.c_fc = nn.Linear(N_EMBD, 4 * N_EMBD) self.gelu = nn.GELU(approximate='tanh') self.c_proj = nn.Linear(4 * N_EMBD, N_EMBD) self.c_proj.NANOGPT_SCALE_INIT = 1 def forward(self, x): x = self.c_fc(x) x = self.gelu(x) x = self.c_proj(x) return x """ a simple linear layer followed by a non-linearity """ class Block(nn.Module): """ Transformer block: communication followed by computation """ def __init__(self, n_embd, n_head): super().__init__() head_size = N_EMBD // n_head self.sa = CausalSelfAttention() self.ffwd = FeedFoward() self.ln1 = nn.LayerNorm(N_EMBD) self.ln2 = nn.LayerNorm(N_EMBD) def forward(self, x): x = x + self.sa(self.ln1(x)) x = x + self.ffwd(self.ln2(x)) return x class GPTLanguageModel(nn.Module, PyTorchModelHubMixin): def __init__(self, vocab_size=20000, block_size=1024, n_embd=768, n_head=12, n_layer=12): super().__init__() print("This is vocab size:", vocab_size) self.token_embedding_table = nn.Embedding(vocab_size, n_embd) self.position_embedding_table = nn.Embedding(block_size, n_embd) self.blocks = nn.Sequential( *[Block(n_embd, n_head=n_head) for _ in range(n_layer)] ) self.ln_f = nn.LayerNorm(n_embd) self.lm_head = nn.Linear(n_embd, vocab_size) self.token_embedding_table.weight = self.lm_head.weight self.apply(self._init_weights) self.config = {"BLOCK_SIZE": block_size, "N_EMBD": n_embd, "N_HEAD":n_head, "N_LAYER": n_layer} def _init_weights(self, module): if isinstance(module, nn.Linear): std = 0.02 if hasattr(module, 'NANOGPT_SCALE_INIT'): std *= (2 * N_LAYER) ** -0.5 torch.nn.init.normal_(module.weight, mean=0.0, std=std) if module.bias is not None: torch.nn.init.zeros_(module.bias) elif isinstance(module, nn.Embedding): torch.nn.init.normal_(module.weight, mean=0.0, std=0.02) def forward(self, idx, targets=None): B, T = idx.shape assert T <= BLOCK_SIZE, f"Cannot forward sequence of length {T}, block size is only {BLOCK_SIZE}" tok_emb = self.token_embedding_table(idx) pos_emb = self.position_embedding_table(torch.arange(0, T, dtype=torch.long, device=idx.device)) x = tok_emb + pos_emb x = self.blocks(x) x = self.ln_f(x) logits = self.lm_head(x) if targets is None: loss = None else: loss = nn.functional.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1)) return logits, loss def generate(self, idx, max_new_tokens, temperature=1.0): """ Generate tokens using the language model. Args: idx: Input token indices max_new_tokens: Number of tokens to generate temperature: Controls randomness in generation - temperature > 1.0 increases randomness - temperature < 1.0 decreases randomness - temperature = 0 makes it deterministic (always picks highest probability) """ for _ in range(max_new_tokens): # Truncate the sequence to the last BLOCK_SIZE tokens idx_cond = idx[:, -BLOCK_SIZE:] # Get logits from the model logits, _ = self(idx_cond) # Focus only on the last time step logits = logits[:, -1, :] if temperature == 0.0: # For temperature = 0, simply take the argmax idx_next = torch.argmax(logits, dim=-1, keepdim=True) else: # Apply temperature scaling logits = logits / temperature # Convert to probabilities probs = torch.softmax(logits, dim=-1) # Sample from the distribution idx_next = torch.multinomial(probs, num_samples=1) # Append the new token to the sequence idx = torch.cat((idx, idx_next), dim=1) return idx def save(self, path=MODEL_PATH): torch.save(self.state_dict(), path) def load(self, path=MODEL_PATH): # Load the state dict state_dict = torch.load(path)["model"] new_state_dict = {} for key, value in state_dict.items(): new_key = key.replace('_orig_mod.', '') # Remove 'orig_mod.' prefix new_state_dict[new_key] = value self.load_state_dict(new_state_dict) def configure_optimizers(self, weight_decay=0.1, learning_rate=LEARNING_RATE, device=DEVICE): param_dict = {pn: p for pn, p in self.named_parameters()} param_dict = {pn: p for pn, p in param_dict.items() if p.requires_grad} decay_parameters = [p for n, p in param_dict.items() if p.dim() >= 2] nodecay_parameters = [p for n, p in param_dict.items() if p.dim() < 2] optim_groups = [ {"params": decay_parameters, "weight_decay": weight_decay}, {"params": nodecay_parameters, "weight_decay": 0.0}, ] fused_available = 'fused' in inspect.signature(torch.optim.AdamW).parameters use_fused = fused_available and device == "cuda" optimizer = torch.optim.AdamW(optim_groups, lr=learning_rate, betas=(0.9, 0.95), eps=1e-8, fused = use_fused) return optimizer MODEL_PATH = "Naive_gpt\model_weights_llama" # Where to save weights