Spaces:
Running
Running
| import tensorflow as tf | |
| from tensorflow import keras | |
| import numpy as np | |
| import os | |
| from typing import List, Tuple, Optional | |
| from model import VedaProgrammingLLM, create_veda_model | |
| from tokenizer import VedaTokenizer | |
| class VedaTrainer: | |
| """Trainer class for Veda Programming LLM""" | |
| def __init__( | |
| self, | |
| data_path: str = "programming.txt", | |
| vocab_size: int = 10000, | |
| max_length: int = 256, | |
| batch_size: int = 32, | |
| model_size: str = "small" | |
| ): | |
| self.data_path = data_path | |
| self.vocab_size = vocab_size | |
| self.max_length = max_length | |
| self.batch_size = batch_size | |
| self.model_size = model_size | |
| self.tokenizer = VedaTokenizer(vocab_size=vocab_size) | |
| self.model: Optional[VedaProgrammingLLM] = None | |
| def load_data(self) -> List[str]: | |
| """Load programming data from file""" | |
| if not os.path.exists(self.data_path): | |
| print(f"Creating sample {self.data_path}...") | |
| self._create_sample_data() | |
| with open(self.data_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| # Split into code samples (by double newlines or function definitions) | |
| samples = [] | |
| current_sample = [] | |
| for line in content.split('\n'): | |
| if line.strip() == '' and current_sample: | |
| samples.append('\n'.join(current_sample)) | |
| current_sample = [] | |
| else: | |
| current_sample.append(line) | |
| if current_sample: | |
| samples.append('\n'.join(current_sample)) | |
| # Filter empty samples | |
| samples = [s.strip() for s in samples if s.strip()] | |
| print(f"Loaded {len(samples)} code samples") | |
| return samples | |
| def _create_sample_data(self): | |
| """Create sample programming data""" | |
| sample_code = ''' | |
| def hello_world(): | |
| print("Hello, World!") | |
| return True | |
| def fibonacci(n): | |
| if n <= 1: | |
| return n | |
| return fibonacci(n-1) + fibonacci(n-2) | |
| def factorial(n): | |
| if n == 0: | |
| return 1 | |
| return n * factorial(n-1) | |
| class Calculator: | |
| def __init__(self): | |
| self.result = 0 | |
| def add(self, a, b): | |
| self.result = a + b | |
| return self.result | |
| def subtract(self, a, b): | |
| self.result = a - b | |
| return self.result | |
| def multiply(self, a, b): | |
| self.result = a * b | |
| return self.result | |
| def divide(self, a, b): | |
| if b != 0: | |
| self.result = a / b | |
| return self.result | |
| def bubble_sort(arr): | |
| n = len(arr) | |
| for i in range(n): | |
| for j in range(0, n-i-1): | |
| if arr[j] > arr[j+1]: | |
| arr[j], arr[j+1] = arr[j+1], arr[j] | |
| return arr | |
| def binary_search(arr, target): | |
| left, right = 0, len(arr) - 1 | |
| while left <= right: | |
| mid = (left + right) // 2 | |
| if arr[mid] == target: | |
| return mid | |
| elif arr[mid] < target: | |
| left = mid + 1 | |
| else: | |
| right = mid - 1 | |
| return -1 | |
| def quicksort(arr): | |
| if len(arr) <= 1: | |
| return arr | |
| pivot = arr[len(arr) // 2] | |
| left = [x for x in arr if x < pivot] | |
| middle = [x for x in arr if x == pivot] | |
| right = [x for x in arr if x > pivot] | |
| return quicksort(left) + middle + quicksort(right) | |
| class LinkedList: | |
| def __init__(self): | |
| self.head = None | |
| def append(self, data): | |
| new_node = Node(data) | |
| if not self.head: | |
| self.head = new_node | |
| return | |
| current = self.head | |
| while current.next: | |
| current = current.next | |
| current.next = new_node | |
| def merge_sort(arr): | |
| if len(arr) <= 1: | |
| return arr | |
| mid = len(arr) // 2 | |
| left = merge_sort(arr[:mid]) | |
| right = merge_sort(arr[mid:]) | |
| return merge(left, right) | |
| def is_palindrome(s): | |
| s = s.lower().replace(" ", "") | |
| return s == s[::-1] | |
| def count_words(text): | |
| words = text.split() | |
| return len(words) | |
| async def fetch_data(url): | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url) as response: | |
| return await response.json() | |
| def read_file(filename): | |
| with open(filename, 'r') as f: | |
| return f.read() | |
| def write_file(filename, content): | |
| with open(filename, 'w') as f: | |
| f.write(content) | |
| ''' | |
| with open(self.data_path, 'w', encoding='utf-8') as f: | |
| f.write(sample_code) | |
| print(f"Created sample {self.data_path}") | |
| def prepare_dataset(self, samples: List[str]) -> tf.data.Dataset: | |
| """Prepare TensorFlow dataset for training""" | |
| # Fit tokenizer | |
| self.tokenizer.fit(samples) | |
| # Encode all samples | |
| all_tokens = [] | |
| for sample in samples: | |
| tokens = self.tokenizer.encode(sample) | |
| all_tokens.extend(tokens) | |
| # Create sequences | |
| sequences = [] | |
| for i in range(0, len(all_tokens) - self.max_length, self.max_length // 2): | |
| seq = all_tokens[i:i + self.max_length + 1] | |
| if len(seq) == self.max_length + 1: | |
| sequences.append(seq) | |
| if not sequences: | |
| # Create padded sequences if not enough data | |
| for sample in samples: | |
| tokens = self.tokenizer.encode(sample, max_length=self.max_length + 1) | |
| sequences.append(tokens) | |
| print(f"Created {len(sequences)} training sequences") | |
| # Convert to numpy arrays | |
| sequences = np.array(sequences) | |
| # Split into input and target | |
| X = sequences[:, :-1] | |
| y = sequences[:, 1:] | |
| # Create dataset | |
| dataset = tf.data.Dataset.from_tensor_slices((X, y)) | |
| dataset = dataset.shuffle(buffer_size=len(sequences)) | |
| dataset = dataset.batch(self.batch_size) | |
| dataset = dataset.prefetch(tf.data.AUTOTUNE) | |
| return dataset | |
| def build_model(self): | |
| """Build the Veda Programming model""" | |
| self.model = create_veda_model( | |
| vocab_size=self.tokenizer.vocabulary_size, | |
| max_length=self.max_length, | |
| model_size=self.model_size | |
| ) | |
| # Compile model | |
| optimizer = keras.optimizers.Adam(learning_rate=1e-4) | |
| loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True) | |
| self.model.compile( | |
| optimizer=optimizer, | |
| loss=loss_fn, | |
| metrics=['accuracy'] | |
| ) | |
| # Build model with dummy input | |
| dummy_input = tf.zeros((1, self.max_length), dtype=tf.int32) | |
| self.model(dummy_input) | |
| self.model.summary() | |
| return self.model | |
| def train( | |
| self, | |
| epochs: int = 10, | |
| save_path: str = "veda_model" | |
| ): | |
| """Train the model""" | |
| # Load and prepare data | |
| samples = self.load_data() | |
| dataset = self.prepare_dataset(samples) | |
| # Build model | |
| self.build_model() | |
| # Callbacks | |
| callbacks = [ | |
| keras.callbacks.ModelCheckpoint( | |
| filepath=os.path.join(save_path, "model_checkpoint.keras"), | |
| save_best_only=True, | |
| monitor='loss' | |
| ), | |
| keras.callbacks.EarlyStopping( | |
| monitor='loss', | |
| patience=5, | |
| restore_best_weights=True | |
| ), | |
| keras.callbacks.ReduceLROnPlateau( | |
| monitor='loss', | |
| factor=0.5, | |
| patience=2 | |
| ) | |
| ] | |
| # Create save directory | |
| os.makedirs(save_path, exist_ok=True) | |
| # Train | |
| history = self.model.fit( | |
| dataset, | |
| epochs=epochs, | |
| callbacks=callbacks | |
| ) | |
| # Save final model and tokenizer | |
| self.model.save_weights(os.path.join(save_path, "model_weights.h5")) | |
| self.tokenizer.save(os.path.join(save_path, "tokenizer.json")) | |
| # Save model config | |
| config = self.model.get_config() | |
| config['tokenizer_vocab_size'] = self.tokenizer.vocabulary_size | |
| import json | |
| with open(os.path.join(save_path, "config.json"), 'w') as f: | |
| json.dump(config, f) | |
| print(f"Model saved to {save_path}") | |
| return history | |
| def generate( | |
| self, | |
| prompt: str, | |
| max_new_tokens: int = 100, | |
| temperature: float = 0.7 | |
| ) -> str: | |
| """Generate code from prompt""" | |
| if self.model is None: | |
| raise ValueError("Model not loaded. Train or load a model first.") | |
| # Encode prompt | |
| prompt_tokens = self.tokenizer.encode(prompt) | |
| # Generate | |
| generated_tokens = self.model.generate( | |
| prompt_tokens, | |
| max_new_tokens=max_new_tokens, | |
| temperature=temperature | |
| ) | |
| # Decode | |
| generated_text = self.tokenizer.decode(generated_tokens) | |
| return generated_text | |
| def main(): | |
| """Main training function""" | |
| trainer = VedaTrainer( | |
| data_path="programming.txt", | |
| vocab_size=10000, | |
| max_length=256, | |
| batch_size=16, | |
| model_size="small" | |
| ) | |
| # Train model | |
| history = trainer.train(epochs=20, save_path="veda_model") | |
| # Test generation | |
| test_prompt = "def calculate" | |
| generated = trainer.generate(test_prompt, max_new_tokens=50) | |
| print(f"\nGenerated code:\n{generated}") | |
| if __name__ == "__main__": | |
| main() |