Spaces:
Running
Running
| import tensorflow as tf | |
| from tensorflow import keras | |
| from tensorflow.keras import layers | |
| import numpy as np | |
| from typing import Optional | |
| class PositionalEncoding(layers.Layer): | |
| """Positional encoding layer for transformer""" | |
| def __init__(self, max_length: int, d_model: int, **kwargs): | |
| super().__init__(**kwargs) | |
| self.max_length = max_length | |
| self.d_model = d_model | |
| # Create positional encoding matrix | |
| position = np.arange(max_length)[:, np.newaxis] | |
| div_term = np.exp(np.arange(0, d_model, 2) * -(np.log(10000.0) / d_model)) | |
| pe = np.zeros((max_length, d_model)) | |
| pe[:, 0::2] = np.sin(position * div_term) | |
| pe[:, 1::2] = np.cos(position * div_term) | |
| self.positional_encoding = tf.constant(pe, dtype=tf.float32) | |
| def call(self, x): | |
| seq_length = tf.shape(x)[1] | |
| return x + self.positional_encoding[:seq_length, :] | |
| def get_config(self): | |
| config = super().get_config() | |
| config.update({ | |
| 'max_length': self.max_length, | |
| 'd_model': self.d_model | |
| }) | |
| return config | |
| class TransformerBlock(layers.Layer): | |
| """Transformer decoder block""" | |
| def __init__(self, d_model: int, num_heads: int, ff_dim: int, | |
| dropout_rate: float = 0.1, **kwargs): | |
| super().__init__(**kwargs) | |
| self.d_model = d_model | |
| self.num_heads = num_heads | |
| self.ff_dim = ff_dim | |
| self.dropout_rate = dropout_rate | |
| self.attention = layers.MultiHeadAttention( | |
| num_heads=num_heads, | |
| key_dim=d_model // num_heads, | |
| dropout=dropout_rate | |
| ) | |
| self.ffn = keras.Sequential([ | |
| layers.Dense(ff_dim, activation='gelu'), | |
| layers.Dropout(dropout_rate), | |
| layers.Dense(d_model), | |
| layers.Dropout(dropout_rate) | |
| ]) | |
| self.layernorm1 = layers.LayerNormalization(epsilon=1e-6) | |
| self.layernorm2 = layers.LayerNormalization(epsilon=1e-6) | |
| self.dropout = layers.Dropout(dropout_rate) | |
| def call(self, x, training=False, mask=None): | |
| # Causal self-attention | |
| attn_output = self.attention( | |
| query=x, | |
| value=x, | |
| key=x, | |
| attention_mask=mask, | |
| training=training | |
| ) | |
| attn_output = self.dropout(attn_output, training=training) | |
| out1 = self.layernorm1(x + attn_output) | |
| # Feed forward network | |
| ffn_output = self.ffn(out1, training=training) | |
| return self.layernorm2(out1 + ffn_output) | |
| def get_config(self): | |
| config = super().get_config() | |
| config.update({ | |
| 'd_model': self.d_model, | |
| 'num_heads': self.num_heads, | |
| 'ff_dim': self.ff_dim, | |
| 'dropout_rate': self.dropout_rate | |
| }) | |
| return config | |
| class VedaProgrammingLLM(keras.Model): | |
| """Veda Programming Language Model""" | |
| def __init__( | |
| self, | |
| vocab_size: int, | |
| max_length: int = 512, | |
| d_model: int = 256, | |
| num_heads: int = 8, | |
| num_layers: int = 6, | |
| ff_dim: int = 1024, | |
| dropout_rate: float = 0.1, | |
| **kwargs | |
| ): | |
| super().__init__(**kwargs) | |
| self.vocab_size = vocab_size | |
| self.max_length = max_length | |
| self.d_model = d_model | |
| self.num_heads = num_heads | |
| self.num_layers = num_layers | |
| self.ff_dim = ff_dim | |
| self.dropout_rate = dropout_rate | |
| # Embedding layers | |
| self.token_embedding = layers.Embedding( | |
| input_dim=vocab_size, | |
| output_dim=d_model | |
| ) | |
| self.positional_encoding = PositionalEncoding(max_length, d_model) | |
| self.dropout = layers.Dropout(dropout_rate) | |
| # Transformer blocks | |
| self.transformer_blocks = [ | |
| TransformerBlock(d_model, num_heads, ff_dim, dropout_rate) | |
| for _ in range(num_layers) | |
| ] | |
| # Output layer | |
| self.output_layer = layers.Dense(vocab_size) | |
| def _create_causal_mask(self, seq_length): | |
| """Create causal attention mask""" | |
| mask = tf.linalg.band_part( | |
| tf.ones((seq_length, seq_length)), -1, 0 | |
| ) | |
| return mask | |
| def call(self, inputs, training=False): | |
| seq_length = tf.shape(inputs)[1] | |
| # Create causal mask | |
| mask = self._create_causal_mask(seq_length) | |
| # Embeddings | |
| x = self.token_embedding(inputs) | |
| x = x * tf.math.sqrt(tf.cast(self.d_model, tf.float32)) | |
| x = self.positional_encoding(x) | |
| x = self.dropout(x, training=training) | |
| # Transformer blocks | |
| for transformer_block in self.transformer_blocks: | |
| x = transformer_block(x, training=training, mask=mask) | |
| # Output projection | |
| logits = self.output_layer(x) | |
| return logits | |
| def generate( | |
| self, | |
| prompt_tokens: list, | |
| max_new_tokens: int = 100, | |
| temperature: float = 0.7, | |
| top_k: int = 50, | |
| top_p: float = 0.9 | |
| ): | |
| """Generate code given a prompt""" | |
| generated = list(prompt_tokens) | |
| for _ in range(max_new_tokens): | |
| # Truncate if too long | |
| context = generated[-self.max_length:] | |
| # Get predictions | |
| input_tensor = tf.expand_dims(context, 0) | |
| logits = self(input_tensor, training=False) | |
| next_token_logits = logits[0, -1, :] / temperature | |
| # Apply top-k filtering | |
| if top_k > 0: | |
| top_k_logits, top_k_indices = tf.math.top_k( | |
| next_token_logits, k=min(top_k, self.vocab_size) | |
| ) | |
| # Create mask for non-top-k tokens | |
| indices_to_remove = tf.less( | |
| next_token_logits, | |
| top_k_logits[-1] | |
| ) | |
| next_token_logits = tf.where( | |
| indices_to_remove, | |
| tf.ones_like(next_token_logits) * float('-inf'), | |
| next_token_logits | |
| ) | |
| # Apply top-p (nucleus) filtering | |
| if top_p < 1.0: | |
| sorted_logits = tf.sort(next_token_logits, direction='DESCENDING') | |
| sorted_probs = tf.nn.softmax(sorted_logits) | |
| cumulative_probs = tf.cumsum(sorted_probs) | |
| # Find cutoff | |
| sorted_indices_to_remove = cumulative_probs > top_p | |
| sorted_indices_to_remove = tf.concat([ | |
| [False], | |
| sorted_indices_to_remove[:-1] | |
| ], axis=0) | |
| sorted_logits = tf.where( | |
| sorted_indices_to_remove, | |
| tf.ones_like(sorted_logits) * float('-inf'), | |
| sorted_logits | |
| ) | |
| # Sample from distribution | |
| probs = tf.nn.softmax(next_token_logits) | |
| next_token = tf.random.categorical( | |
| tf.expand_dims(next_token_logits, 0), | |
| num_samples=1 | |
| )[0, 0] | |
| generated.append(int(next_token.numpy())) | |
| # Stop if end token | |
| if next_token == 3: # END token | |
| break | |
| return generated | |
| def get_config(self): | |
| return { | |
| 'vocab_size': self.vocab_size, | |
| 'max_length': self.max_length, | |
| 'd_model': self.d_model, | |
| 'num_heads': self.num_heads, | |
| 'num_layers': self.num_layers, | |
| 'ff_dim': self.ff_dim, | |
| 'dropout_rate': self.dropout_rate | |
| } | |
| def from_config(cls, config): | |
| return cls(**config) | |
| def create_veda_model( | |
| vocab_size: int, | |
| max_length: int = 512, | |
| model_size: str = "small" | |
| ) -> VedaProgrammingLLM: | |
| """Factory function to create Veda Programming model""" | |
| configs = { | |
| "small": { | |
| "d_model": 256, | |
| "num_heads": 4, | |
| "num_layers": 4, | |
| "ff_dim": 512 | |
| }, | |
| "medium": { | |
| "d_model": 512, | |
| "num_heads": 8, | |
| "num_layers": 6, | |
| "ff_dim": 1024 | |
| }, | |
| "large": { | |
| "d_model": 768, | |
| "num_heads": 12, | |
| "num_layers": 12, | |
| "ff_dim": 2048 | |
| } | |
| } | |
| config = configs.get(model_size, configs["small"]) | |
| model = VedaProgrammingLLM( | |
| vocab_size=vocab_size, | |
| max_length=max_length, | |
| **config | |
| ) | |
| return model |