Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import torch | |
| import torch.nn as nn | |
| import time | |
| # Define the custom model class with detailed layer structures | |
| class Head(nn.Module): | |
| def __init__(self, head_size): | |
| super().__init__() | |
| self.key = nn.Linear(64, head_size, bias=False) | |
| self.query = nn.Linear(64, head_size, bias=False) | |
| self.value = nn.Linear(64, head_size, bias=False) | |
| self.register_buffer('tril', torch.tril(torch.ones(32, 32))) | |
| self.dropout = nn.Dropout(0.1) | |
| def forward(self, x): | |
| B, T, C = x.shape | |
| k = self.key(x) | |
| q = self.query(x) | |
| wei = q @ k.transpose(-2, -1) * C**-0.5 | |
| wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) | |
| wei = nn.functional.softmax(wei, dim=-1) | |
| wei = self.dropout(wei) | |
| v = self.value(x) | |
| return wei @ v | |
| class MultiHeadAttention(nn.Module): | |
| def __init__(self, num_heads, head_size): | |
| super().__init__() | |
| self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)]) | |
| self.proj = nn.Linear(64, 64) | |
| self.dropout = nn.Dropout(0.1) | |
| def forward(self, x): | |
| out = torch.cat([h(x) for h in self.heads], dim=-1) | |
| return self.dropout(self.proj(out)) | |
| class FeedForward(nn.Module): | |
| def __init__(self, n_embd): | |
| super().__init__() | |
| self.net = nn.Sequential( | |
| nn.Linear(n_embd, 4 * n_embd), | |
| nn.ReLU(), | |
| nn.Linear(4 * n_embd, n_embd), | |
| nn.Dropout(0.1), | |
| ) | |
| def forward(self, x): | |
| return self.net(x) | |
| class Block(nn.Module): | |
| def __init__(self, n_embd, n_head): | |
| super().__init__() | |
| head_size = n_embd // n_head | |
| self.sa = MultiHeadAttention(n_head, head_size) | |
| self.ffwd = FeedForward(n_embd) | |
| self.ln1 = nn.LayerNorm(n_embd) | |
| self.ln2 = nn.LayerNorm(n_embd) | |
| def forward(self, x): | |
| x = x + self.sa(self.ln1(x)) | |
| x = x + self.ffwd(self.ln2(x)) | |
| return x | |
| class BigramLanguageModel(nn.Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.token_embedding_table = nn.Embedding(61, 64) | |
| self.position_embedding_table = nn.Embedding(32, 64) | |
| self.blocks = nn.Sequential(*[Block(64, n_head=4) for _ in range(4)]) | |
| self.ln_f = nn.LayerNorm(64) | |
| self.lm_head = nn.Linear(64, 61) | |
| def forward(self, idx, targets=None): | |
| B, T = idx.shape | |
| tok_emb = self.token_embedding_table(idx) | |
| pos_emb = self.position_embedding_table(torch.arange(T, device=idx.device)) | |
| x = tok_emb + pos_emb | |
| x = self.blocks(x) | |
| x = self.ln_f(x) | |
| logits = self.lm_head(x) | |
| return logits, None | |
| def generate(self, idx, max_new_tokens, temperature=0.7): | |
| for _ in range(max_new_tokens): | |
| idx_cond = idx[:, -32:] # Truncate to the latest 32 tokens | |
| logits, _ = self(idx_cond) | |
| logits = logits[:, -1, :] # Get the logits for the last token | |
| logits = logits / temperature # Apply temperature control | |
| probs = nn.functional.softmax(logits, dim=-1) | |
| idx_next = torch.multinomial(probs, num_samples=1) | |
| idx_next = torch.clamp(idx_next, min=0, max=60) # Strictly enforce index range [0, 60] | |
| idx = torch.cat((idx, idx_next), dim=1) | |
| return idx | |
| # Load the model with strict=False to handle missing or unexpected keys | |
| def load_model(): | |
| model = BigramLanguageModel() | |
| model_url = "https://huggingface.co/yoonusajwardapiit/triptuner/resolve/main/pytorch_model.bin" | |
| model_weights = torch.hub.load_state_dict_from_url(model_url, map_location=torch.device('cpu'), weights_only=True) | |
| model.load_state_dict(model_weights, strict=False) | |
| model.eval() | |
| return model | |
| model = load_model() | |
| # Define a comprehensive character set based on training data | |
| chars = sorted(list(set("abcdefghijklmnopqrstuvwxyz0123456789 .,!?-:;'\"\n"))) | |
| stoi = {ch: i for i, ch in enumerate(chars)} | |
| itos = {i: ch for i, ch in enumerate(chars)} | |
| encode = lambda s: [stoi.get(c, stoi.get(c.lower(), -1)) for c in s if c in stoi or c.lower() in stoi] # Handles both cases | |
| decode = lambda l: ''.join([itos[i] for i in l if i < len(itos)]) # Ensures index is within bounds | |
| # Function to generate text using the model | |
| def generate_text(prompt): | |
| try: | |
| start_time = time.time() | |
| print(f"Received prompt: {prompt}") | |
| encoded_prompt = encode(prompt) | |
| # Check for out-of-vocabulary indices | |
| if any(idx == -1 for idx in encoded_prompt): | |
| return "Error: Input contains characters not in the model vocabulary." | |
| # Ensure the prompt length fits within the block size | |
| if len(encoded_prompt) > 32: | |
| encoded_prompt = encoded_prompt[:32] # Truncate to fit block size | |
| context = torch.tensor([encoded_prompt], dtype=torch.long) | |
| print(f"Encoded prompt: {context}") | |
| with torch.no_grad(): | |
| generated = model.generate(context, max_new_tokens=20, temperature=0.7) # Adjust temperature | |
| print(f"Generated tensor: {generated}") | |
| result = decode(generated[0].tolist()) | |
| print(f"Decoded result: {result}") | |
| # Post-process to clean up and make output more readable | |
| cleaned_result = result.replace('\n', ' ').strip() | |
| print(f"Cleaned result: {cleaned_result}") | |
| print(f"Processing time: {time.time() - start_time:.2f}s") | |
| return cleaned_result | |
| except Exception as e: | |
| print(f"Error during generation: {e}") | |
| return f"Error: {str(e)}" | |
| # Create a Gradio interface | |
| interface = gr.Interface( | |
| fn=generate_text, | |
| inputs=gr.Textbox(lines=2, placeholder="Enter a location or prompt..."), | |
| outputs="text", | |
| title="Triptuner Model", | |
| description="Generate itineraries for locations in Sri Lanka's Central Province." | |
| ) | |
| # Launch the interface | |
| interface.launch() | |