veda-programming / model.py
vedaco's picture
Create model.py
5b1197f verified
raw
history blame
8.79 kB
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
from typing import Optional
class PositionalEncoding(layers.Layer):
"""Positional encoding layer for transformer"""
def __init__(self, max_length: int, d_model: int, **kwargs):
super().__init__(**kwargs)
self.max_length = max_length
self.d_model = d_model
# Create positional encoding matrix
position = np.arange(max_length)[:, np.newaxis]
div_term = np.exp(np.arange(0, d_model, 2) * -(np.log(10000.0) / d_model))
pe = np.zeros((max_length, d_model))
pe[:, 0::2] = np.sin(position * div_term)
pe[:, 1::2] = np.cos(position * div_term)
self.positional_encoding = tf.constant(pe, dtype=tf.float32)
def call(self, x):
seq_length = tf.shape(x)[1]
return x + self.positional_encoding[:seq_length, :]
def get_config(self):
config = super().get_config()
config.update({
'max_length': self.max_length,
'd_model': self.d_model
})
return config
class TransformerBlock(layers.Layer):
"""Transformer decoder block"""
def __init__(self, d_model: int, num_heads: int, ff_dim: int,
dropout_rate: float = 0.1, **kwargs):
super().__init__(**kwargs)
self.d_model = d_model
self.num_heads = num_heads
self.ff_dim = ff_dim
self.dropout_rate = dropout_rate
self.attention = layers.MultiHeadAttention(
num_heads=num_heads,
key_dim=d_model // num_heads,
dropout=dropout_rate
)
self.ffn = keras.Sequential([
layers.Dense(ff_dim, activation='gelu'),
layers.Dropout(dropout_rate),
layers.Dense(d_model),
layers.Dropout(dropout_rate)
])
self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
self.dropout = layers.Dropout(dropout_rate)
def call(self, x, training=False, mask=None):
# Causal self-attention
attn_output = self.attention(
query=x,
value=x,
key=x,
attention_mask=mask,
training=training
)
attn_output = self.dropout(attn_output, training=training)
out1 = self.layernorm1(x + attn_output)
# Feed forward network
ffn_output = self.ffn(out1, training=training)
return self.layernorm2(out1 + ffn_output)
def get_config(self):
config = super().get_config()
config.update({
'd_model': self.d_model,
'num_heads': self.num_heads,
'ff_dim': self.ff_dim,
'dropout_rate': self.dropout_rate
})
return config
class VedaProgrammingLLM(keras.Model):
"""Veda Programming Language Model"""
def __init__(
self,
vocab_size: int,
max_length: int = 512,
d_model: int = 256,
num_heads: int = 8,
num_layers: int = 6,
ff_dim: int = 1024,
dropout_rate: float = 0.1,
**kwargs
):
super().__init__(**kwargs)
self.vocab_size = vocab_size
self.max_length = max_length
self.d_model = d_model
self.num_heads = num_heads
self.num_layers = num_layers
self.ff_dim = ff_dim
self.dropout_rate = dropout_rate
# Embedding layers
self.token_embedding = layers.Embedding(
input_dim=vocab_size,
output_dim=d_model
)
self.positional_encoding = PositionalEncoding(max_length, d_model)
self.dropout = layers.Dropout(dropout_rate)
# Transformer blocks
self.transformer_blocks = [
TransformerBlock(d_model, num_heads, ff_dim, dropout_rate)
for _ in range(num_layers)
]
# Output layer
self.output_layer = layers.Dense(vocab_size)
def _create_causal_mask(self, seq_length):
"""Create causal attention mask"""
mask = tf.linalg.band_part(
tf.ones((seq_length, seq_length)), -1, 0
)
return mask
def call(self, inputs, training=False):
seq_length = tf.shape(inputs)[1]
# Create causal mask
mask = self._create_causal_mask(seq_length)
# Embeddings
x = self.token_embedding(inputs)
x = x * tf.math.sqrt(tf.cast(self.d_model, tf.float32))
x = self.positional_encoding(x)
x = self.dropout(x, training=training)
# Transformer blocks
for transformer_block in self.transformer_blocks:
x = transformer_block(x, training=training, mask=mask)
# Output projection
logits = self.output_layer(x)
return logits
def generate(
self,
prompt_tokens: list,
max_new_tokens: int = 100,
temperature: float = 0.7,
top_k: int = 50,
top_p: float = 0.9
):
"""Generate code given a prompt"""
generated = list(prompt_tokens)
for _ in range(max_new_tokens):
# Truncate if too long
context = generated[-self.max_length:]
# Get predictions
input_tensor = tf.expand_dims(context, 0)
logits = self(input_tensor, training=False)
next_token_logits = logits[0, -1, :] / temperature
# Apply top-k filtering
if top_k > 0:
top_k_logits, top_k_indices = tf.math.top_k(
next_token_logits, k=min(top_k, self.vocab_size)
)
# Create mask for non-top-k tokens
indices_to_remove = tf.less(
next_token_logits,
top_k_logits[-1]
)
next_token_logits = tf.where(
indices_to_remove,
tf.ones_like(next_token_logits) * float('-inf'),
next_token_logits
)
# Apply top-p (nucleus) filtering
if top_p < 1.0:
sorted_logits = tf.sort(next_token_logits, direction='DESCENDING')
sorted_probs = tf.nn.softmax(sorted_logits)
cumulative_probs = tf.cumsum(sorted_probs)
# Find cutoff
sorted_indices_to_remove = cumulative_probs > top_p
sorted_indices_to_remove = tf.concat([
[False],
sorted_indices_to_remove[:-1]
], axis=0)
sorted_logits = tf.where(
sorted_indices_to_remove,
tf.ones_like(sorted_logits) * float('-inf'),
sorted_logits
)
# Sample from distribution
probs = tf.nn.softmax(next_token_logits)
next_token = tf.random.categorical(
tf.expand_dims(next_token_logits, 0),
num_samples=1
)[0, 0]
generated.append(int(next_token.numpy()))
# Stop if end token
if next_token == 3: # END token
break
return generated
def get_config(self):
return {
'vocab_size': self.vocab_size,
'max_length': self.max_length,
'd_model': self.d_model,
'num_heads': self.num_heads,
'num_layers': self.num_layers,
'ff_dim': self.ff_dim,
'dropout_rate': self.dropout_rate
}
@classmethod
def from_config(cls, config):
return cls(**config)
def create_veda_model(
vocab_size: int,
max_length: int = 512,
model_size: str = "small"
) -> VedaProgrammingLLM:
"""Factory function to create Veda Programming model"""
configs = {
"small": {
"d_model": 256,
"num_heads": 4,
"num_layers": 4,
"ff_dim": 512
},
"medium": {
"d_model": 512,
"num_heads": 8,
"num_layers": 6,
"ff_dim": 1024
},
"large": {
"d_model": 768,
"num_heads": 12,
"num_layers": 12,
"ff_dim": 2048
}
}
config = configs.get(model_size, configs["small"])
model = VedaProgrammingLLM(
vocab_size=vocab_size,
max_length=max_length,
**config
)
return model