import tensorflow as tf from tensorflow import keras import numpy as np import os from typing import List, Tuple, Optional from model import VedaProgrammingLLM, create_veda_model from tokenizer import VedaTokenizer class VedaTrainer: """Trainer class for Veda Programming LLM""" def __init__( self, data_path: str = "programming.txt", vocab_size: int = 10000, max_length: int = 256, batch_size: int = 32, model_size: str = "small" ): self.data_path = data_path self.vocab_size = vocab_size self.max_length = max_length self.batch_size = batch_size self.model_size = model_size self.tokenizer = VedaTokenizer(vocab_size=vocab_size) self.model: Optional[VedaProgrammingLLM] = None def load_data(self) -> List[str]: """Load programming data from file""" if not os.path.exists(self.data_path): print(f"Creating sample {self.data_path}...") self._create_sample_data() with open(self.data_path, 'r', encoding='utf-8') as f: content = f.read() # Split into code samples (by double newlines or function definitions) samples = [] current_sample = [] for line in content.split('\n'): if line.strip() == '' and current_sample: samples.append('\n'.join(current_sample)) current_sample = [] else: current_sample.append(line) if current_sample: samples.append('\n'.join(current_sample)) # Filter empty samples samples = [s.strip() for s in samples if s.strip()] print(f"Loaded {len(samples)} code samples") return samples def _create_sample_data(self): """Create sample programming data""" sample_code = ''' def hello_world(): print("Hello, World!") return True def fibonacci(n): if n <= 1: return n return fibonacci(n-1) + fibonacci(n-2) def factorial(n): if n == 0: return 1 return n * factorial(n-1) class Calculator: def __init__(self): self.result = 0 def add(self, a, b): self.result = a + b return self.result def subtract(self, a, b): self.result = a - b return self.result def multiply(self, a, b): self.result = a * b return self.result def divide(self, a, b): if b != 0: self.result = a / b return self.result def bubble_sort(arr): n = len(arr) for i in range(n): for j in range(0, n-i-1): if arr[j] > arr[j+1]: arr[j], arr[j+1] = arr[j+1], arr[j] return arr def binary_search(arr, target): left, right = 0, len(arr) - 1 while left <= right: mid = (left + right) // 2 if arr[mid] == target: return mid elif arr[mid] < target: left = mid + 1 else: right = mid - 1 return -1 def quicksort(arr): if len(arr) <= 1: return arr pivot = arr[len(arr) // 2] left = [x for x in arr if x < pivot] middle = [x for x in arr if x == pivot] right = [x for x in arr if x > pivot] return quicksort(left) + middle + quicksort(right) class LinkedList: def __init__(self): self.head = None def append(self, data): new_node = Node(data) if not self.head: self.head = new_node return current = self.head while current.next: current = current.next current.next = new_node def merge_sort(arr): if len(arr) <= 1: return arr mid = len(arr) // 2 left = merge_sort(arr[:mid]) right = merge_sort(arr[mid:]) return merge(left, right) def is_palindrome(s): s = s.lower().replace(" ", "") return s == s[::-1] def count_words(text): words = text.split() return len(words) async def fetch_data(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.json() def read_file(filename): with open(filename, 'r') as f: return f.read() def write_file(filename, content): with open(filename, 'w') as f: f.write(content) ''' with open(self.data_path, 'w', encoding='utf-8') as f: f.write(sample_code) print(f"Created sample {self.data_path}") def prepare_dataset(self, samples: List[str]) -> tf.data.Dataset: """Prepare TensorFlow dataset for training""" # Fit tokenizer self.tokenizer.fit(samples) # Encode all samples all_tokens = [] for sample in samples: tokens = self.tokenizer.encode(sample) all_tokens.extend(tokens) # Create sequences sequences = [] for i in range(0, len(all_tokens) - self.max_length, self.max_length // 2): seq = all_tokens[i:i + self.max_length + 1] if len(seq) == self.max_length + 1: sequences.append(seq) if not sequences: # Create padded sequences if not enough data for sample in samples: tokens = self.tokenizer.encode(sample, max_length=self.max_length + 1) sequences.append(tokens) print(f"Created {len(sequences)} training sequences") # Convert to numpy arrays sequences = np.array(sequences) # Split into input and target X = sequences[:, :-1] y = sequences[:, 1:] # Create dataset dataset = tf.data.Dataset.from_tensor_slices((X, y)) dataset = dataset.shuffle(buffer_size=len(sequences)) dataset = dataset.batch(self.batch_size) dataset = dataset.prefetch(tf.data.AUTOTUNE) return dataset def build_model(self): """Build the Veda Programming model""" self.model = create_veda_model( vocab_size=self.tokenizer.vocabulary_size, max_length=self.max_length, model_size=self.model_size ) # Compile model optimizer = keras.optimizers.Adam(learning_rate=1e-4) loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True) self.model.compile( optimizer=optimizer, loss=loss_fn, metrics=['accuracy'] ) # Build model with dummy input dummy_input = tf.zeros((1, self.max_length), dtype=tf.int32) self.model(dummy_input) self.model.summary() return self.model def train( self, epochs: int = 10, save_path: str = "veda_model" ): """Train the model""" # Load and prepare data samples = self.load_data() dataset = self.prepare_dataset(samples) # Build model self.build_model() # Callbacks callbacks = [ keras.callbacks.ModelCheckpoint( filepath=os.path.join(save_path, "model_checkpoint.keras"), save_best_only=True, monitor='loss' ), keras.callbacks.EarlyStopping( monitor='loss', patience=5, restore_best_weights=True ), keras.callbacks.ReduceLROnPlateau( monitor='loss', factor=0.5, patience=2 ) ] # Create save directory os.makedirs(save_path, exist_ok=True) # Train history = self.model.fit( dataset, epochs=epochs, callbacks=callbacks ) # Save final model and tokenizer self.model.save_weights(os.path.join(save_path, "model_weights.h5")) self.tokenizer.save(os.path.join(save_path, "tokenizer.json")) # Save model config config = self.model.get_config() config['tokenizer_vocab_size'] = self.tokenizer.vocabulary_size import json with open(os.path.join(save_path, "config.json"), 'w') as f: json.dump(config, f) print(f"Model saved to {save_path}") return history def generate( self, prompt: str, max_new_tokens: int = 100, temperature: float = 0.7 ) -> str: """Generate code from prompt""" if self.model is None: raise ValueError("Model not loaded. Train or load a model first.") # Encode prompt prompt_tokens = self.tokenizer.encode(prompt) # Generate generated_tokens = self.model.generate( prompt_tokens, max_new_tokens=max_new_tokens, temperature=temperature ) # Decode generated_text = self.tokenizer.decode(generated_tokens) return generated_text def main(): """Main training function""" trainer = VedaTrainer( data_path="programming.txt", vocab_size=10000, max_length=256, batch_size=16, model_size="small" ) # Train model history = trainer.train(epochs=20, save_path="veda_model") # Test generation test_prompt = "def calculate" generated = trainer.generate(test_prompt, max_new_tokens=50) print(f"\nGenerated code:\n{generated}") if __name__ == "__main__": main()