|
|
|
|
|
import torch |
|
|
import torch.nn as nn |
|
|
import math |
|
|
from tokenizers import Tokenizer |
|
|
import json |
|
|
|
|
|
|
|
|
class EmbeddingLayer(nn.Module): |
|
|
def __init__(self, vocab_size: int, d_model: int): |
|
|
super().__init__() |
|
|
self.embedding = nn.Embedding(vocab_size, d_model) |
|
|
self.d_model = d_model |
|
|
|
|
|
def forward(self, x): |
|
|
return self.embedding(x) * math.sqrt(self.d_model) |
|
|
|
|
|
class PositionalEncoding(nn.Module): |
|
|
def __init__(self, max_seq_len: int, d_model: int, dropout_rate: float): |
|
|
super().__init__() |
|
|
self.dropout = nn.Dropout(dropout_rate) |
|
|
pe = torch.zeros(max_seq_len, d_model) |
|
|
pos = torch.arange(0, max_seq_len, dtype=torch.float).unsqueeze(1) |
|
|
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) |
|
|
pe[:, 0::2] = torch.sin(pos * div_term) |
|
|
pe[:, 1::2] = torch.cos(pos * div_term) |
|
|
pe = pe.unsqueeze(0) |
|
|
self.register_buffer('pe', pe) |
|
|
|
|
|
def forward(self, input_embedding): |
|
|
input_embedding = input_embedding + self.pe[:, :input_embedding.shape[1], :].requires_grad_(False) |
|
|
return self.dropout(input_embedding) |
|
|
|
|
|
class MultiHeadAttention(nn.Module): |
|
|
def __init__(self, d_model: int, num_heads: int, dropout_rate: float): |
|
|
super().__init__() |
|
|
self.dropout = nn.Dropout(dropout_rate) |
|
|
self.W_q = nn.Linear(d_model, d_model) |
|
|
self.W_k = nn.Linear(d_model, d_model) |
|
|
self.W_v = nn.Linear(d_model, d_model) |
|
|
self.W_o = nn.Linear(d_model, d_model) |
|
|
self.num_heads = num_heads |
|
|
self.d_k = d_model // num_heads |
|
|
|
|
|
def forward(self, q, k, v, encoder_mask=None): |
|
|
query = self.W_q(q) |
|
|
key = self.W_k(k) |
|
|
value = self.W_v(v) |
|
|
query = query.view(query.shape[0], query.shape[1], self.num_heads, self.d_k).transpose(1, 2) |
|
|
key = key.view(key.shape[0], key.shape[1], self.num_heads, self.d_k).transpose(1, 2) |
|
|
value = value.view(value.shape[0], value.shape[1], self.num_heads, self.d_k).transpose(1, 2) |
|
|
attention_score = (query @ key.transpose(-2, -1)) / math.sqrt(self.d_k) |
|
|
if encoder_mask is not None: |
|
|
attention_score = attention_score.masked_fill(encoder_mask == 0, -1e9) |
|
|
attention_weight = torch.softmax(attention_score, dim=-1) |
|
|
attention_weight = self.dropout(attention_weight) |
|
|
attention_output = attention_weight @ value |
|
|
attention_output = attention_output.transpose(1, 2).contiguous().view(attention_output.shape[0], -1, self.num_heads * self.d_k) |
|
|
multihead_output = self.W_o(attention_output) |
|
|
return multihead_output |
|
|
|
|
|
class FeedForward(nn.Module): |
|
|
def __init__(self, d_model: int, d_ff: int, dropout_rate: float): |
|
|
super().__init__() |
|
|
self.layer_1 = nn.Linear(d_model, d_ff) |
|
|
self.activation_1 = nn.ReLU() |
|
|
self.dropout = nn.Dropout(dropout_rate) |
|
|
self.layer_2 = nn.Linear(d_ff, d_model) |
|
|
|
|
|
def forward(self, input): |
|
|
return self.layer_2(self.dropout(self.activation_1(self.layer_1(input)))) |
|
|
|
|
|
class LayerNorm(nn.Module): |
|
|
def __init__(self, eps: float = 1e-5): |
|
|
super().__init__() |
|
|
self.eps = eps |
|
|
self.gamma = nn.Parameter(torch.ones(32)) |
|
|
self.beta = nn.Parameter(torch.zeros(32)) |
|
|
|
|
|
def forward(self, input): |
|
|
mean = input.mean(dim=-1, keepdim=True) |
|
|
std = input.std(dim=-1, keepdim=True) |
|
|
return self.gamma * ((input - mean) / (std + self.eps)) + self.beta |
|
|
|
|
|
class AddAndNorm(nn.Module): |
|
|
def __init__(self, dropout_rate: float): |
|
|
super().__init__() |
|
|
self.dropout = nn.Dropout(dropout_rate) |
|
|
self.layer_norm = LayerNorm() |
|
|
|
|
|
def forward(self, input, sub_layer): |
|
|
return input + self.dropout(sub_layer(self.layer_norm(input))) |
|
|
|
|
|
class EncoderBlock(nn.Module): |
|
|
def __init__(self, multihead_attention: MultiHeadAttention, feed_forward: FeedForward, dropout_rate: float): |
|
|
super().__init__() |
|
|
self.multihead_attention = multihead_attention |
|
|
self.feed_forward = feed_forward |
|
|
self.add_and_norm_list = nn.ModuleList([AddAndNorm(dropout_rate) for _ in range(2)]) |
|
|
|
|
|
def forward(self, encoder_input, encoder_mask): |
|
|
encoder_input = self.add_and_norm_list[0](encoder_input, lambda encoder_input: self.multihead_attention(encoder_input, encoder_input, encoder_input, encoder_mask)) |
|
|
encoder_input = self.add_and_norm_list[1](encoder_input, self.feed_forward) |
|
|
return encoder_input |
|
|
|
|
|
class Encoder(nn.Module): |
|
|
def __init__(self, encoderblocklist: nn.ModuleList): |
|
|
super().__init__() |
|
|
self.encoderblocklist = encoderblocklist |
|
|
self.layer_norm = LayerNorm() |
|
|
|
|
|
def forward(self, encoder_input, encoder_mask): |
|
|
for encoderblock in self.encoderblocklist: |
|
|
encoder_input = encoderblock(encoder_input, encoder_mask) |
|
|
encoder_output = self.layer_norm(encoder_input) |
|
|
return encoder_output |
|
|
|
|
|
class DecoderBlock(nn.Module): |
|
|
def __init__(self, masked_multihead_attention: MultiHeadAttention, multihead_attention: MultiHeadAttention, feed_forward: FeedForward, dropout_rate: float): |
|
|
super().__init__() |
|
|
self.masked_multihead_attention = masked_multihead_attention |
|
|
self.multihead_attention = multihead_attention |
|
|
self.feed_forward = feed_forward |
|
|
self.add_and_norm_list = nn.ModuleList([AddAndNorm(dropout_rate) for _ in range(3)]) |
|
|
|
|
|
def forward(self, decoder_input, decoder_mask, encoder_output, encoder_mask): |
|
|
decoder_input = self.add_and_norm_list[0](decoder_input, lambda decoder_input: self.masked_multihead_attention(decoder_input, decoder_input, decoder_input, decoder_mask)) |
|
|
decoder_input = self.add_and_norm_list[1](decoder_input, lambda decoder_input: self.multihead_attention(decoder_input, encoder_output, encoder_output, encoder_mask)) |
|
|
decoder_input = self.add_and_norm_list[2](decoder_input, self.feed_forward) |
|
|
return decoder_input |
|
|
|
|
|
class Decoder(nn.Module): |
|
|
def __init__(self, decoderblocklist: nn.ModuleList): |
|
|
super().__init__() |
|
|
self.decoderblocklist = decoderblocklist |
|
|
self.layer_norm = LayerNorm() |
|
|
|
|
|
def forward(self, decoder_input, decoder_mask, encoder_output, encoder_mask): |
|
|
for decoderblock in self.decoderblocklist: |
|
|
decoder_input = decoderblock(decoder_input, decoder_mask, encoder_output, encoder_mask) |
|
|
decoder_output = self.layer_norm(decoder_input) |
|
|
return decoder_output |
|
|
|
|
|
class ProjectionLayer(nn.Module): |
|
|
def __init__(self, vocab_size: int, d_model: int): |
|
|
super().__init__() |
|
|
self.projection_layer = nn.Linear(d_model, vocab_size) |
|
|
|
|
|
def forward(self, decoder_output): |
|
|
output = self.projection_layer(decoder_output) |
|
|
return torch.log_softmax(output, dim=-1) |
|
|
|
|
|
class Transformer(nn.Module): |
|
|
def __init__(self, source_embed, target_embed, positional_encoding, multihead_attention, masked_multihead_attention, feed_forward, encoder, decoder, projection_layer, dropout_rate): |
|
|
super().__init__() |
|
|
self.source_embed = source_embed |
|
|
self.target_embed = target_embed |
|
|
self.positional_encoding = positional_encoding |
|
|
self.multihead_attention = multihead_attention |
|
|
self.masked_multihead_attention = masked_multihead_attention |
|
|
self.feed_forward = feed_forward |
|
|
self.encoder = encoder |
|
|
self.decoder = decoder |
|
|
self.projection_layer = projection_layer |
|
|
self.dropout = nn.Dropout(dropout_rate) |
|
|
|
|
|
def encode(self, encoder_input, encoder_mask): |
|
|
encoder_input = self.source_embed(encoder_input) |
|
|
encoder_input = self.positional_encoding(encoder_input) |
|
|
encoder_output = self.encoder(encoder_input, encoder_mask) |
|
|
return encoder_output |
|
|
|
|
|
def decode(self, decoder_input, decoder_mask, encoder_output, encoder_mask): |
|
|
decoder_input = self.target_embed(decoder_input) |
|
|
decoder_input = self.positional_encoding(decoder_input) |
|
|
decoder_output = self.decoder(decoder_input, decoder_mask, encoder_output, encoder_mask) |
|
|
return decoder_output |
|
|
|
|
|
def project(self, decoder_output): |
|
|
return self.projection_layer(decoder_output) |
|
|
|
|
|
def build_model(source_vocab_size, target_vocab_size, max_seq_len, d_model, d_ff, num_heads, num_blocks, dropout_rate): |
|
|
source_embed = EmbeddingLayer(source_vocab_size, d_model) |
|
|
target_embed = EmbeddingLayer(target_vocab_size, d_model) |
|
|
positional_encoding = PositionalEncoding(max_seq_len, d_model, dropout_rate) |
|
|
multihead_attention = MultiHeadAttention(d_model, num_heads, dropout_rate) |
|
|
masked_multihead_attention = MultiHeadAttention(d_model, num_heads, dropout_rate) |
|
|
feed_forward = FeedForward(d_model, d_ff, dropout_rate) |
|
|
projection_layer = ProjectionLayer(target_vocab_size, d_model) |
|
|
encoder_block = EncoderBlock(multihead_attention, feed_forward, dropout_rate) |
|
|
decoder_block = DecoderBlock(masked_multihead_attention, multihead_attention, feed_forward, dropout_rate) |
|
|
|
|
|
|
|
|
encoderblocklist = nn.ModuleList([EncoderBlock(MultiHeadAttention(d_model, num_heads, dropout_rate), FeedForward(d_model, d_ff, dropout_rate), dropout_rate) for _ in range(num_blocks)]) |
|
|
decoderblocklist = nn.ModuleList([DecoderBlock(MultiHeadAttention(d_model, num_heads, dropout_rate), MultiHeadAttention(d_model, num_heads, dropout_rate), FeedForward(d_model, d_ff, dropout_rate), dropout_rate) for _ in range(num_blocks)]) |
|
|
encoder = Encoder(encoderblocklist) |
|
|
decoder = Decoder(decoderblocklist) |
|
|
model = Transformer(source_embed, target_embed, positional_encoding, multihead_attention, masked_multihead_attention, feed_forward, encoder, decoder, projection_layer, dropout_rate) |
|
|
for param in model.parameters(): |
|
|
if param.dim() > 1: |
|
|
nn.init.xavier_uniform_(param) |
|
|
return model |
|
|
|
|
|
def causal_mask(size): |
|
|
mask = torch.triu(torch.ones(1, size, size), diagonal=1).type(torch.int) |
|
|
return mask == 0 |
|
|
|
|
|
def hindishpt(user_input_text, model, tokenizer_en, tokenizer_my, max_seq_len, device): |
|
|
model.eval() |
|
|
with torch.inference_mode(): |
|
|
user_input_text = user_input_text.strip() |
|
|
user_input_text_encoded = torch.tensor(tokenizer_en.encode(user_input_text).ids, dtype=torch.int64).to(device) |
|
|
PAD_ID = tokenizer_my.token_to_id("[PAD]") |
|
|
CLS_ID = torch.tensor([tokenizer_my.token_to_id("[CLS]")], dtype=torch.int64).to(device) |
|
|
SEP_ID = torch.tensor([tokenizer_my.token_to_id("[SEP]")], dtype=torch.int64).to(device) |
|
|
num_source_padding = max_seq_len - len(user_input_text_encoded) - 2 |
|
|
encoder_padding = torch.tensor([PAD_ID] * num_source_padding, dtype=torch.int64).to(device) |
|
|
encoder_input = torch.cat([CLS_ID, user_input_text_encoded, SEP_ID, encoder_padding], dim=0).unsqueeze(0).to(device) |
|
|
encoder_mask = (encoder_input != PAD_ID).unsqueeze(1).unsqueeze(1).int().to(device) |
|
|
encoder_output = model.encode(encoder_input, encoder_mask) |
|
|
decoder_input = torch.tensor([[tokenizer_my.token_to_id('[CLS]')]], dtype=torch.int64, device=device) |
|
|
while True: |
|
|
if decoder_input.size(1) == max_seq_len: |
|
|
break |
|
|
decoder_mask = causal_mask(decoder_input.size(1)).type_as(encoder_mask).to(device) |
|
|
decoder_output = model.decode(decoder_input, decoder_mask, encoder_output, encoder_mask) |
|
|
projection = model.project(decoder_output[:, -1]) |
|
|
_, new_token = torch.max(projection, dim=1) |
|
|
new_token = new_token.unsqueeze(1) |
|
|
decoder_input = torch.cat([decoder_input, new_token], dim=1) |
|
|
if new_token.item() == tokenizer_my.token_to_id('[SEP]'): |
|
|
break |
|
|
decoder_output = decoder_input.squeeze(0) |
|
|
model_predicted_text = tokenizer_my.decode(decoder_output.detach().cpu().numpy()) |
|
|
return model_predicted_text |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|