import tensorflow as tf from tensorflow import keras from tensorflow.keras import layers import numpy as np from typing import Optional class PositionalEncoding(layers.Layer): """Positional encoding layer for transformer""" def __init__(self, max_length: int, d_model: int, **kwargs): super().__init__(**kwargs) self.max_length = max_length self.d_model = d_model # Create positional encoding matrix position = np.arange(max_length)[:, np.newaxis] div_term = np.exp(np.arange(0, d_model, 2) * -(np.log(10000.0) / d_model)) pe = np.zeros((max_length, d_model)) pe[:, 0::2] = np.sin(position * div_term) pe[:, 1::2] = np.cos(position * div_term) self.positional_encoding = tf.constant(pe, dtype=tf.float32) def call(self, x): seq_length = tf.shape(x)[1] return x + self.positional_encoding[:seq_length, :] def get_config(self): config = super().get_config() config.update({ 'max_length': self.max_length, 'd_model': self.d_model }) return config class TransformerBlock(layers.Layer): """Transformer decoder block""" def __init__(self, d_model: int, num_heads: int, ff_dim: int, dropout_rate: float = 0.1, **kwargs): super().__init__(**kwargs) self.d_model = d_model self.num_heads = num_heads self.ff_dim = ff_dim self.dropout_rate = dropout_rate self.attention = layers.MultiHeadAttention( num_heads=num_heads, key_dim=d_model // num_heads, dropout=dropout_rate ) self.ffn = keras.Sequential([ layers.Dense(ff_dim, activation='gelu'), layers.Dropout(dropout_rate), layers.Dense(d_model), layers.Dropout(dropout_rate) ]) self.layernorm1 = layers.LayerNormalization(epsilon=1e-6) self.layernorm2 = layers.LayerNormalization(epsilon=1e-6) self.dropout = layers.Dropout(dropout_rate) def call(self, x, training=False, mask=None): # Causal self-attention attn_output = self.attention( query=x, value=x, key=x, attention_mask=mask, training=training ) attn_output = self.dropout(attn_output, training=training) out1 = self.layernorm1(x + attn_output) # Feed forward network ffn_output = self.ffn(out1, training=training) return self.layernorm2(out1 + ffn_output) def get_config(self): config = super().get_config() config.update({ 'd_model': self.d_model, 'num_heads': self.num_heads, 'ff_dim': self.ff_dim, 'dropout_rate': self.dropout_rate }) return config class VedaProgrammingLLM(keras.Model): """Veda Programming Language Model""" def __init__( self, vocab_size: int, max_length: int = 512, d_model: int = 256, num_heads: int = 8, num_layers: int = 6, ff_dim: int = 1024, dropout_rate: float = 0.1, **kwargs ): super().__init__(**kwargs) self.vocab_size = vocab_size self.max_length = max_length self.d_model = d_model self.num_heads = num_heads self.num_layers = num_layers self.ff_dim = ff_dim self.dropout_rate = dropout_rate # Embedding layers self.token_embedding = layers.Embedding( input_dim=vocab_size, output_dim=d_model ) self.positional_encoding = PositionalEncoding(max_length, d_model) self.dropout = layers.Dropout(dropout_rate) # Transformer blocks self.transformer_blocks = [ TransformerBlock(d_model, num_heads, ff_dim, dropout_rate) for _ in range(num_layers) ] # Output layer self.output_layer = layers.Dense(vocab_size) def _create_causal_mask(self, seq_length): """Create causal attention mask""" mask = tf.linalg.band_part( tf.ones((seq_length, seq_length)), -1, 0 ) return mask def call(self, inputs, training=False): seq_length = tf.shape(inputs)[1] # Create causal mask mask = self._create_causal_mask(seq_length) # Embeddings x = self.token_embedding(inputs) x = x * tf.math.sqrt(tf.cast(self.d_model, tf.float32)) x = self.positional_encoding(x) x = self.dropout(x, training=training) # Transformer blocks for transformer_block in self.transformer_blocks: x = transformer_block(x, training=training, mask=mask) # Output projection logits = self.output_layer(x) return logits def generate( self, prompt_tokens: list, max_new_tokens: int = 100, temperature: float = 0.7, top_k: int = 50, top_p: float = 0.9 ): """Generate code given a prompt""" generated = list(prompt_tokens) for _ in range(max_new_tokens): # Truncate if too long context = generated[-self.max_length:] # Get predictions input_tensor = tf.expand_dims(context, 0) logits = self(input_tensor, training=False) next_token_logits = logits[0, -1, :] / temperature # Apply top-k filtering if top_k > 0: top_k_logits, top_k_indices = tf.math.top_k( next_token_logits, k=min(top_k, self.vocab_size) ) # Create mask for non-top-k tokens indices_to_remove = tf.less( next_token_logits, top_k_logits[-1] ) next_token_logits = tf.where( indices_to_remove, tf.ones_like(next_token_logits) * float('-inf'), next_token_logits ) # Apply top-p (nucleus) filtering if top_p < 1.0: sorted_logits = tf.sort(next_token_logits, direction='DESCENDING') sorted_probs = tf.nn.softmax(sorted_logits) cumulative_probs = tf.cumsum(sorted_probs) # Find cutoff sorted_indices_to_remove = cumulative_probs > top_p sorted_indices_to_remove = tf.concat([ [False], sorted_indices_to_remove[:-1] ], axis=0) sorted_logits = tf.where( sorted_indices_to_remove, tf.ones_like(sorted_logits) * float('-inf'), sorted_logits ) # Sample from distribution probs = tf.nn.softmax(next_token_logits) next_token = tf.random.categorical( tf.expand_dims(next_token_logits, 0), num_samples=1 )[0, 0] generated.append(int(next_token.numpy())) # Stop if end token if next_token == 3: # END token break return generated def get_config(self): return { 'vocab_size': self.vocab_size, 'max_length': self.max_length, 'd_model': self.d_model, 'num_heads': self.num_heads, 'num_layers': self.num_layers, 'ff_dim': self.ff_dim, 'dropout_rate': self.dropout_rate } @classmethod def from_config(cls, config): return cls(**config) def create_veda_model( vocab_size: int, max_length: int = 512, model_size: str = "small" ) -> VedaProgrammingLLM: """Factory function to create Veda Programming model""" configs = { "small": { "d_model": 256, "num_heads": 4, "num_layers": 4, "ff_dim": 512 }, "medium": { "d_model": 512, "num_heads": 8, "num_layers": 6, "ff_dim": 1024 }, "large": { "d_model": 768, "num_heads": 12, "num_layers": 12, "ff_dim": 2048 } } config = configs.get(model_size, configs["small"]) model = VedaProgrammingLLM( vocab_size=vocab_size, max_length=max_length, **config ) return model