Spaces:
Sleeping
Sleeping
| # gradio_app.py | |
| import gradio as gr | |
| import torch | |
| import os | |
| import math | |
| import torch.nn as nn | |
| import re | |
| import sys | |
| import asyncio | |
| if sys.platform.startswith('win'): | |
| asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) | |
| DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| MAX_LEN = 128 | |
| EMBED_DIM = 256 | |
| NHEAD = 4 | |
| NUM_ENCODER_LAYERS = 2 | |
| NUM_DECODER_LAYERS = 2 | |
| FF_DIM = 512 | |
| PAD_TOKEN = "<pad>" | |
| SOS_TOKEN = "<sos>" | |
| EOS_TOKEN = "<eos>" | |
| UNK_TOKEN = "<unk>" | |
| def tokenize_line(text: str): | |
| return re.findall(r"[A-Za-z0-9]+|[^\sA-Za-z0-9]", text) | |
| def numericalize(text: str, stoi: dict): | |
| tokens = tokenize_line(text) | |
| return [stoi.get(tok, stoi[UNK_TOKEN]) for tok in tokens] | |
| def pad_sequence(seq, max_len, pad_id): | |
| seq = seq[:max_len-1] | |
| seq = seq + [tgt_stoi[EOS_TOKEN]] | |
| if len(seq) < max_len: | |
| seq += [pad_id] * (max_len - len(seq)) | |
| return seq | |
| class PositionalEncoding(nn.Module): | |
| def __init__(self, d_model, max_len=5000): | |
| super().__init__() | |
| pe = torch.zeros(max_len, d_model) | |
| position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) | |
| div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) | |
| pe[:, 0::2] = torch.sin(position * div_term) | |
| pe[:, 1::2] = torch.cos(position * div_term) | |
| pe = pe.unsqueeze(0) | |
| self.register_buffer("pe", pe) | |
| def forward(self, x): | |
| return x + self.pe[:, :x.size(1), :] | |
| class MultiHeadAttention(nn.Module): | |
| def __init__(self, d_model, n_heads): | |
| super().__init__() | |
| assert d_model % n_heads == 0 | |
| self.d_model = d_model | |
| self.n_heads = n_heads | |
| self.head_dim = d_model // n_heads | |
| self.query_linear = nn.Linear(d_model, d_model) | |
| self.key_linear = nn.Linear(d_model, d_model) | |
| self.value_linear = nn.Linear(d_model, d_model) | |
| self.out_linear = nn.Linear(d_model, d_model) | |
| def forward(self, query, key, value, mask=None): | |
| B, Q_len, _ = query.size() | |
| B, K_len, _ = key.size() | |
| Q = self.query_linear(query) | |
| K = self.key_linear(key) | |
| V = self.value_linear(value) | |
| Q = Q.view(B, Q_len, self.n_heads, self.head_dim).transpose(1,2) | |
| K = K.view(B, K_len, self.n_heads, self.head_dim).transpose(1,2) | |
| V = V.view(B, K_len, self.n_heads, self.head_dim).transpose(1,2) | |
| scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim) | |
| if mask is not None: | |
| scores = scores.masked_fill(mask == 0, float('-inf')) | |
| attn = torch.softmax(scores, dim=-1) | |
| context = torch.matmul(attn, V) | |
| context = context.transpose(1,2).contiguous().view(B, Q_len, self.d_model) | |
| return self.out_linear(context) | |
| class FeedForward(nn.Module): | |
| def __init__(self, d_model, dim_feedforward): | |
| super().__init__() | |
| self.fc1 = nn.Linear(d_model, dim_feedforward) | |
| self.fc2 = nn.Linear(dim_feedforward, d_model) | |
| self.relu = nn.ReLU() | |
| def forward(self, x): | |
| return self.fc2(self.relu(self.fc1(x))) | |
| class EncoderLayer(nn.Module): | |
| def __init__(self, d_model, n_heads, dim_feedforward): | |
| super().__init__() | |
| self.self_attn = MultiHeadAttention(d_model, n_heads) | |
| self.ff = FeedForward(d_model, dim_feedforward) | |
| self.norm1 = nn.LayerNorm(d_model) | |
| self.norm2 = nn.LayerNorm(d_model) | |
| self.dropout = nn.Dropout(0.1) | |
| def forward(self, src, src_mask=None): | |
| attn_out = self.self_attn(src, src, src, mask=src_mask) | |
| src = self.norm1(src + self.dropout(attn_out)) | |
| ff_out = self.ff(src) | |
| return self.norm2(src + self.dropout(ff_out)) | |
| class DecoderLayer(nn.Module): | |
| def __init__(self, d_model, n_heads, dim_feedforward): | |
| super().__init__() | |
| self.self_attn = MultiHeadAttention(d_model, n_heads) | |
| self.cross_attn = MultiHeadAttention(d_model, n_heads) | |
| self.ff = FeedForward(d_model, dim_feedforward) | |
| self.norm1 = nn.LayerNorm(d_model) | |
| self.norm2 = nn.LayerNorm(d_model) | |
| self.norm3 = nn.LayerNorm(d_model) | |
| self.dropout = nn.Dropout(0.1) | |
| def forward(self, tgt, memory, tgt_mask=None, memory_mask=None): | |
| tgt = self.norm1(tgt + self.dropout(self.self_attn(tgt, tgt, tgt, mask=tgt_mask))) | |
| tgt = self.norm2(tgt + self.dropout(self.cross_attn(tgt, memory, memory, mask=memory_mask))) | |
| ff_out = self.ff(tgt) | |
| return self.norm3(tgt + self.dropout(ff_out)) | |
| class Encoder(nn.Module): | |
| def __init__(self, vocab_size, d_model, n_heads, num_layers, dim_feedforward): | |
| super().__init__() | |
| self.embedding = nn.Embedding(vocab_size, d_model) | |
| self.pos_encoding = PositionalEncoding(d_model) | |
| self.layers = nn.ModuleList([EncoderLayer(d_model, n_heads, dim_feedforward) for _ in range(num_layers)]) | |
| def forward(self, src, src_mask=None): | |
| x = self.embedding(src) | |
| x = self.pos_encoding(x) | |
| for layer in self.layers: | |
| x = layer(x, src_mask) | |
| return x | |
| class Decoder(nn.Module): | |
| def __init__(self, vocab_size, d_model, n_heads, num_layers, dim_feedforward): | |
| super().__init__() | |
| self.embedding = nn.Embedding(vocab_size, d_model) | |
| self.pos_encoding = PositionalEncoding(d_model) | |
| self.layers = nn.ModuleList([DecoderLayer(d_model, n_heads, dim_feedforward) for _ in range(num_layers)]) | |
| self.fc_out = nn.Linear(d_model, vocab_size) | |
| def forward(self, tgt, memory, tgt_mask=None, memory_mask=None): | |
| x = self.embedding(tgt) | |
| x = self.pos_encoding(x) | |
| for layer in self.layers: | |
| x = layer(x, memory, tgt_mask, memory_mask) | |
| return self.fc_out(x) | |
| class TransformerSeq2Seq(nn.Module): | |
| def __init__(self, src_vocab_size, tgt_vocab_size, d_model, n_heads, | |
| num_encoder_layers, num_decoder_layers, dim_feedforward): | |
| super().__init__() | |
| self.encoder = Encoder(src_vocab_size, d_model, n_heads, num_encoder_layers, dim_feedforward) | |
| self.decoder = Decoder(tgt_vocab_size, d_model, n_heads, num_decoder_layers, dim_feedforward) | |
| def forward(self, src, tgt, src_mask=None, tgt_mask=None): | |
| memory = self.encoder(src, src_mask) | |
| return self.decoder(tgt, memory, tgt_mask) | |
| def generate_subsequent_mask(size): | |
| mask = torch.triu(torch.ones(size, size), diagonal=1).bool() | |
| return ~mask | |
| def greedy_decode(model, src, src_stoi, tgt_stoi, tgt_itos, max_len=MAX_LEN): | |
| model.eval() | |
| src = torch.tensor(src, dtype=torch.long, device=DEVICE).unsqueeze(0) | |
| memory = model.encoder(src) | |
| ys = torch.tensor([tgt_stoi[SOS_TOKEN]], dtype=torch.long, device=DEVICE).unsqueeze(0) | |
| for i in range(max_len-1): | |
| tgt_mask = generate_subsequent_mask(ys.size(1)).to(DEVICE) | |
| out = model.decoder(ys, memory, tgt_mask) | |
| prob = out[:, -1, :] | |
| next_token = torch.argmax(prob, dim=1).item() | |
| ys = torch.cat([ys, torch.tensor([[next_token]], device=DEVICE)], dim=1) | |
| if next_token == tgt_stoi[EOS_TOKEN]: | |
| break | |
| out_tokens = ys.squeeze(0).tolist()[1:] | |
| if tgt_stoi[EOS_TOKEN] in out_tokens: | |
| out_tokens = out_tokens[:out_tokens.index(tgt_stoi[EOS_TOKEN])] | |
| return " ".join(tgt_itos[t] for t in out_tokens) | |
| # Load model and vocabulary | |
| if not os.path.exists("model.pth"): | |
| raise FileNotFoundError("Model file 'model.pth' not found. Please train first.") | |
| checkpoint = torch.load("model.pth", map_location=DEVICE) | |
| src_stoi = checkpoint['src_stoi'] | |
| src_itos = checkpoint['src_itos'] | |
| tgt_stoi = checkpoint['tgt_stoi'] | |
| tgt_itos = checkpoint['tgt_itos'] | |
| model = TransformerSeq2Seq( | |
| src_vocab_size=len(src_stoi), | |
| tgt_vocab_size=len(tgt_stoi), | |
| d_model=EMBED_DIM, | |
| n_heads=NHEAD, | |
| num_encoder_layers=NUM_ENCODER_LAYERS, | |
| num_decoder_layers=NUM_DECODER_LAYERS, | |
| dim_feedforward=FF_DIM | |
| ).to(DEVICE) | |
| model.load_state_dict(checkpoint['model_state_dict']) | |
| model.eval() | |
| def convert_pseudocode(text): | |
| lines = text.strip().split('\n') | |
| outputs = [] | |
| for i, line in enumerate(lines): | |
| line = line.strip() | |
| if not line: | |
| outputs.append("") | |
| elif line == "}": | |
| outputs.append("}") | |
| else: | |
| try: | |
| src_ids = numericalize(line, src_stoi) | |
| src_ids = pad_sequence(src_ids, MAX_LEN, src_stoi[PAD_TOKEN]) | |
| output_line = greedy_decode(model, src_ids, src_stoi, tgt_stoi, tgt_itos) | |
| outputs.append(output_line) | |
| except Exception as e: | |
| outputs.append(f"// [Error in line {i+1}]: {e}") | |
| return "int main() {\n" + '\n'.join(outputs) + "\nreturn 0;\n}" | |
| iface = gr.Interface( | |
| fn=convert_pseudocode, | |
| inputs=gr.Textbox(label="Enter pseudocode (line-by-line)", lines=10), | |
| outputs=gr.Code(language="cpp", label="Generated C++ Code"), | |
| title="PseudoCode to C++ Converter (Transformer from Scratch)" | |
| ) | |
| if __name__ == "__main__": | |
| iface.launch(share=True) | |