HindishFormer-v2 / model.py
SarwarShafee's picture
Upload folder using huggingface_hub
52a08a7 verified
# model.py
import torch
import torch.nn as nn
import math
from tokenizers import Tokenizer
import json
# Define all necessary classes
class EmbeddingLayer(nn.Module):
def __init__(self, vocab_size: int, d_model: int):
super().__init__()
self.embedding = nn.Embedding(vocab_size, d_model)
self.d_model = d_model
def forward(self, x):
return self.embedding(x) * math.sqrt(self.d_model)
class PositionalEncoding(nn.Module):
def __init__(self, max_seq_len: int, d_model: int, dropout_rate: float):
super().__init__()
self.dropout = nn.Dropout(dropout_rate)
pe = torch.zeros(max_seq_len, d_model)
pos = torch.arange(0, max_seq_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(pos * div_term)
pe[:, 1::2] = torch.cos(pos * div_term)
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)
def forward(self, input_embedding):
input_embedding = input_embedding + self.pe[:, :input_embedding.shape[1], :].requires_grad_(False)
return self.dropout(input_embedding)
class MultiHeadAttention(nn.Module):
def __init__(self, d_model: int, num_heads: int, dropout_rate: float):
super().__init__()
self.dropout = nn.Dropout(dropout_rate)
self.W_q = nn.Linear(d_model, d_model)
self.W_k = nn.Linear(d_model, d_model)
self.W_v = nn.Linear(d_model, d_model)
self.W_o = nn.Linear(d_model, d_model)
self.num_heads = num_heads
self.d_k = d_model // num_heads
def forward(self, q, k, v, encoder_mask=None):
query = self.W_q(q)
key = self.W_k(k)
value = self.W_v(v)
query = query.view(query.shape[0], query.shape[1], self.num_heads, self.d_k).transpose(1, 2)
key = key.view(key.shape[0], key.shape[1], self.num_heads, self.d_k).transpose(1, 2)
value = value.view(value.shape[0], value.shape[1], self.num_heads, self.d_k).transpose(1, 2)
attention_score = (query @ key.transpose(-2, -1)) / math.sqrt(self.d_k)
if encoder_mask is not None:
attention_score = attention_score.masked_fill(encoder_mask == 0, -1e9)
attention_weight = torch.softmax(attention_score, dim=-1)
attention_weight = self.dropout(attention_weight)
attention_output = attention_weight @ value
attention_output = attention_output.transpose(1, 2).contiguous().view(attention_output.shape[0], -1, self.num_heads * self.d_k)
multihead_output = self.W_o(attention_output)
return multihead_output
class FeedForward(nn.Module):
def __init__(self, d_model: int, d_ff: int, dropout_rate: float):
super().__init__()
self.layer_1 = nn.Linear(d_model, d_ff)
self.activation_1 = nn.ReLU()
self.dropout = nn.Dropout(dropout_rate)
self.layer_2 = nn.Linear(d_ff, d_model)
def forward(self, input):
return self.layer_2(self.dropout(self.activation_1(self.layer_1(input))))
class LayerNorm(nn.Module):
def __init__(self, eps: float = 1e-5):
super().__init__()
self.eps = eps
self.gamma = nn.Parameter(torch.ones(32))
self.beta = nn.Parameter(torch.zeros(32))
def forward(self, input):
mean = input.mean(dim=-1, keepdim=True)
std = input.std(dim=-1, keepdim=True)
return self.gamma * ((input - mean) / (std + self.eps)) + self.beta
class AddAndNorm(nn.Module):
def __init__(self, dropout_rate: float):
super().__init__()
self.dropout = nn.Dropout(dropout_rate)
self.layer_norm = LayerNorm()
def forward(self, input, sub_layer):
return input + self.dropout(sub_layer(self.layer_norm(input)))
class EncoderBlock(nn.Module):
def __init__(self, multihead_attention: MultiHeadAttention, feed_forward: FeedForward, dropout_rate: float):
super().__init__()
self.multihead_attention = multihead_attention
self.feed_forward = feed_forward
self.add_and_norm_list = nn.ModuleList([AddAndNorm(dropout_rate) for _ in range(2)])
def forward(self, encoder_input, encoder_mask):
encoder_input = self.add_and_norm_list[0](encoder_input, lambda encoder_input: self.multihead_attention(encoder_input, encoder_input, encoder_input, encoder_mask))
encoder_input = self.add_and_norm_list[1](encoder_input, self.feed_forward)
return encoder_input
class Encoder(nn.Module):
def __init__(self, encoderblocklist: nn.ModuleList):
super().__init__()
self.encoderblocklist = encoderblocklist
self.layer_norm = LayerNorm()
def forward(self, encoder_input, encoder_mask):
for encoderblock in self.encoderblocklist:
encoder_input = encoderblock(encoder_input, encoder_mask)
encoder_output = self.layer_norm(encoder_input)
return encoder_output
class DecoderBlock(nn.Module):
def __init__(self, masked_multihead_attention: MultiHeadAttention, multihead_attention: MultiHeadAttention, feed_forward: FeedForward, dropout_rate: float):
super().__init__()
self.masked_multihead_attention = masked_multihead_attention
self.multihead_attention = multihead_attention
self.feed_forward = feed_forward
self.add_and_norm_list = nn.ModuleList([AddAndNorm(dropout_rate) for _ in range(3)])
def forward(self, decoder_input, decoder_mask, encoder_output, encoder_mask):
decoder_input = self.add_and_norm_list[0](decoder_input, lambda decoder_input: self.masked_multihead_attention(decoder_input, decoder_input, decoder_input, decoder_mask))
decoder_input = self.add_and_norm_list[1](decoder_input, lambda decoder_input: self.multihead_attention(decoder_input, encoder_output, encoder_output, encoder_mask))
decoder_input = self.add_and_norm_list[2](decoder_input, self.feed_forward)
return decoder_input
class Decoder(nn.Module):
def __init__(self, decoderblocklist: nn.ModuleList):
super().__init__()
self.decoderblocklist = decoderblocklist
self.layer_norm = LayerNorm()
def forward(self, decoder_input, decoder_mask, encoder_output, encoder_mask):
for decoderblock in self.decoderblocklist:
decoder_input = decoderblock(decoder_input, decoder_mask, encoder_output, encoder_mask)
decoder_output = self.layer_norm(decoder_input)
return decoder_output
class ProjectionLayer(nn.Module):
def __init__(self, vocab_size: int, d_model: int):
super().__init__()
self.projection_layer = nn.Linear(d_model, vocab_size)
def forward(self, decoder_output):
output = self.projection_layer(decoder_output)
return torch.log_softmax(output, dim=-1)
class Transformer(nn.Module):
def __init__(self, source_embed, target_embed, positional_encoding, multihead_attention, masked_multihead_attention, feed_forward, encoder, decoder, projection_layer, dropout_rate):
super().__init__()
self.source_embed = source_embed
self.target_embed = target_embed
self.positional_encoding = positional_encoding
self.multihead_attention = multihead_attention
self.masked_multihead_attention = masked_multihead_attention
self.feed_forward = feed_forward
self.encoder = encoder
self.decoder = decoder
self.projection_layer = projection_layer
self.dropout = nn.Dropout(dropout_rate)
def encode(self, encoder_input, encoder_mask):
encoder_input = self.source_embed(encoder_input)
encoder_input = self.positional_encoding(encoder_input)
encoder_output = self.encoder(encoder_input, encoder_mask)
return encoder_output
def decode(self, decoder_input, decoder_mask, encoder_output, encoder_mask):
decoder_input = self.target_embed(decoder_input)
decoder_input = self.positional_encoding(decoder_input)
decoder_output = self.decoder(decoder_input, decoder_mask, encoder_output, encoder_mask)
return decoder_output
def project(self, decoder_output):
return self.projection_layer(decoder_output)
def build_model(source_vocab_size, target_vocab_size, max_seq_len, d_model, d_ff, num_heads, num_blocks, dropout_rate):
source_embed = EmbeddingLayer(source_vocab_size, d_model)
target_embed = EmbeddingLayer(target_vocab_size, d_model)
positional_encoding = PositionalEncoding(max_seq_len, d_model, dropout_rate)
multihead_attention = MultiHeadAttention(d_model, num_heads, dropout_rate)
masked_multihead_attention = MultiHeadAttention(d_model, num_heads, dropout_rate)
feed_forward = FeedForward(d_model, d_ff, dropout_rate)
projection_layer = ProjectionLayer(target_vocab_size, d_model)
encoder_block = EncoderBlock(multihead_attention, feed_forward, dropout_rate)
decoder_block = DecoderBlock(masked_multihead_attention, multihead_attention, feed_forward, dropout_rate)
# encoderblocklist = nn.ModuleList([encoder_block for _ in range(num_blocks)])
# decoderblocklist = nn.ModuleList([decoder_block for _ in range(num_blocks)])
encoderblocklist = nn.ModuleList([EncoderBlock(MultiHeadAttention(d_model, num_heads, dropout_rate), FeedForward(d_model, d_ff, dropout_rate), dropout_rate) for _ in range(num_blocks)])
decoderblocklist = nn.ModuleList([DecoderBlock(MultiHeadAttention(d_model, num_heads, dropout_rate), MultiHeadAttention(d_model, num_heads, dropout_rate), FeedForward(d_model, d_ff, dropout_rate), dropout_rate) for _ in range(num_blocks)])
encoder = Encoder(encoderblocklist)
decoder = Decoder(decoderblocklist)
model = Transformer(source_embed, target_embed, positional_encoding, multihead_attention, masked_multihead_attention, feed_forward, encoder, decoder, projection_layer, dropout_rate)
for param in model.parameters():
if param.dim() > 1:
nn.init.xavier_uniform_(param)
return model
def causal_mask(size):
mask = torch.triu(torch.ones(1, size, size), diagonal=1).type(torch.int)
return mask == 0
def hindishpt(user_input_text, model, tokenizer_en, tokenizer_my, max_seq_len, device):
model.eval()
with torch.inference_mode():
user_input_text = user_input_text.strip()
user_input_text_encoded = torch.tensor(tokenizer_en.encode(user_input_text).ids, dtype=torch.int64).to(device)
PAD_ID = tokenizer_my.token_to_id("[PAD]")
CLS_ID = torch.tensor([tokenizer_my.token_to_id("[CLS]")], dtype=torch.int64).to(device)
SEP_ID = torch.tensor([tokenizer_my.token_to_id("[SEP]")], dtype=torch.int64).to(device)
num_source_padding = max_seq_len - len(user_input_text_encoded) - 2
encoder_padding = torch.tensor([PAD_ID] * num_source_padding, dtype=torch.int64).to(device)
encoder_input = torch.cat([CLS_ID, user_input_text_encoded, SEP_ID, encoder_padding], dim=0).unsqueeze(0).to(device)
encoder_mask = (encoder_input != PAD_ID).unsqueeze(1).unsqueeze(1).int().to(device)
encoder_output = model.encode(encoder_input, encoder_mask)
decoder_input = torch.tensor([[tokenizer_my.token_to_id('[CLS]')]], dtype=torch.int64, device=device)
while True:
if decoder_input.size(1) == max_seq_len:
break
decoder_mask = causal_mask(decoder_input.size(1)).type_as(encoder_mask).to(device)
decoder_output = model.decode(decoder_input, decoder_mask, encoder_output, encoder_mask)
projection = model.project(decoder_output[:, -1])
_, new_token = torch.max(projection, dim=1)
new_token = new_token.unsqueeze(1)
decoder_input = torch.cat([decoder_input, new_token], dim=1)
if new_token.item() == tokenizer_my.token_to_id('[SEP]'):
break
decoder_output = decoder_input.squeeze(0)
model_predicted_text = tokenizer_my.decode(decoder_output.detach().cpu().numpy())
return model_predicted_text
# # Example usage
# if __name__ == "__main__":
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# tokenizer_en = Tokenizer.from_file("tokenizer_en.json")
# tokenizer_my = Tokenizer.from_file("tokenizer_hn.json")
# with open("config.json", "r") as f:
# config = json.load(f)
# model = build_model(**config)
# model.to(device)
# checkpoint = torch.load("model_1.pt", map_location=device)
# model.load_state_dict(checkpoint['model_state_dict'])
# model.eval()
# print(hindishpt("अब आप कैसे हैं?", model, tokenizer_en, tokenizer_my, 128, device))