"""Continuous training system for Veda Programming LLM""" import os import json import shutil from datetime import datetime from typing import Optional import threading import time import tensorflow as tf from tensorflow import keras from model import VedaProgrammingLLM from tokenizer import VedaTokenizer from database import db from data_collector import collector from config import ( MODEL_DIR, VERSIONS_DIR, VOCAB_SIZE, MAX_LENGTH, D_MODEL, NUM_HEADS, NUM_LAYERS, FF_DIM, BATCH_SIZE, MIN_SAMPLES_FOR_TRAINING, EPOCHS_PER_RETRAIN, AUTO_TRAIN_INTERVAL_HOURS ) class ContinuousTrainer: """Handles continuous learning and model updates""" def __init__(self): self.model: Optional[VedaProgrammingLLM] = None self.tokenizer: Optional[VedaTokenizer] = None self.is_training = False self.training_progress = 0 self.last_training_time = None self.model_version = self._get_current_version() # Background training thread self._training_thread = None self._stop_background = False def _get_current_version(self) -> str: """Get current model version""" config_path = os.path.join(MODEL_DIR, "config.json") if os.path.exists(config_path): with open(config_path, 'r') as f: config = json.load(f) return config.get('version', 'v1.0') return 'v1.0' def _generate_version(self) -> str: """Generate new version string""" return f"v{datetime.now().strftime('%Y%m%d_%H%M%S')}" def load_model(self) -> bool: """Load the current model""" config_path = os.path.join(MODEL_DIR, "config.json") if not os.path.exists(config_path): print("No existing model found.") return False try: # Load config with open(config_path, 'r') as f: config = json.load(f) # Load tokenizer self.tokenizer = VedaTokenizer() self.tokenizer.load(os.path.join(MODEL_DIR, "tokenizer.json")) # Create model self.model = VedaProgrammingLLM( vocab_size=config['vocab_size'], max_length=config['max_length'], d_model=config['d_model'], num_heads=config['num_heads'], num_layers=config['num_layers'], ff_dim=config['ff_dim'] ) # Build and load weights dummy = tf.zeros((1, config['max_length']), dtype=tf.int32) self.model(dummy) self.model.load_weights(os.path.join(MODEL_DIR, "weights.h5")) self.model_version = config.get('version', 'v1.0') print(f"Model loaded: {self.model_version}") return True except Exception as e: print(f"Error loading model: {e}") return False def save_model(self, version: str = None): """Save the current model""" if self.model is None or self.tokenizer is None: return version = version or self._generate_version() # Save to main directory os.makedirs(MODEL_DIR, exist_ok=True) self.model.save_weights(os.path.join(MODEL_DIR, "weights.h5")) self.tokenizer.save(os.path.join(MODEL_DIR, "tokenizer.json")) config = self.model.get_config() config['version'] = version config['last_trained'] = datetime.now().isoformat() with open(os.path.join(MODEL_DIR, "config.json"), 'w') as f: json.dump(config, f, indent=2) # Save version backup version_dir = os.path.join(VERSIONS_DIR, version) os.makedirs(version_dir, exist_ok=True) shutil.copy( os.path.join(MODEL_DIR, "weights.h5"), os.path.join(version_dir, "weights.h5") ) shutil.copy( os.path.join(MODEL_DIR, "tokenizer.json"), os.path.join(version_dir, "tokenizer.json") ) shutil.copy( os.path.join(MODEL_DIR, "config.json"), os.path.join(version_dir, "config.json") ) self.model_version = version print(f"Model saved: {version}") def should_retrain(self) -> bool: """Check if retraining is needed""" pending = collector.get_pending_count() return pending >= MIN_SAMPLES_FOR_TRAINING def prepare_training_data(self) -> tf.data.Dataset: """Prepare dataset for training""" # Get all training samples samples = collector.get_training_data(include_base=True) if not samples: return None # Combine all samples all_text = '\n\n'.join(samples) # Fit or update tokenizer if self.tokenizer is None: self.tokenizer = VedaTokenizer(vocab_size=VOCAB_SIZE) self.tokenizer.fit([all_text]) # Encode all_tokens = self.tokenizer.encode(all_text) # Create sequences sequences = [] stride = MAX_LENGTH // 2 for i in range(0, len(all_tokens) - MAX_LENGTH - 1, stride): seq = all_tokens[i:i + MAX_LENGTH + 1] if len(seq) == MAX_LENGTH + 1: sequences.append(seq) if len(sequences) < 5: stride = max(1, MAX_LENGTH // 8) sequences = [] for i in range(0, len(all_tokens) - MAX_LENGTH - 1, stride): seq = all_tokens[i:i + MAX_LENGTH + 1] if len(seq) == MAX_LENGTH + 1: sequences.append(seq) import numpy as np sequences = np.array(sequences) X = sequences[:, :-1] y = sequences[:, 1:] dataset = tf.data.Dataset.from_tensor_slices((X, y)) dataset = dataset.shuffle(buffer_size=min(1000, len(sequences))) dataset = dataset.batch(BATCH_SIZE) dataset = dataset.prefetch(tf.data.AUTOTUNE) print(f"Prepared {len(sequences)} training sequences") return dataset def train( self, epochs: int = EPOCHS_PER_RETRAIN, callback=None ) -> dict: """Train/retrain the model""" if self.is_training: return {'status': 'error', 'message': 'Training already in progress'} self.is_training = True self.training_progress = 0 try: # Prepare data dataset = self.prepare_training_data() if dataset is None: self.is_training = False return {'status': 'error', 'message': 'No training data available'} # Create/update model if self.model is None: self.model = VedaProgrammingLLM( vocab_size=self.tokenizer.vocabulary_size, max_length=MAX_LENGTH, d_model=D_MODEL, num_heads=NUM_HEADS, num_layers=NUM_LAYERS, ff_dim=FF_DIM ) # Compile self.model.compile( optimizer=keras.optimizers.Adam(learning_rate=1e-4), loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'] ) # Build dummy = tf.zeros((1, MAX_LENGTH), dtype=tf.int32) self.model(dummy) # Custom callback for progress class ProgressCallback(keras.callbacks.Callback): def __init__(self, trainer, total_epochs): self.trainer = trainer self.total_epochs = total_epochs def on_epoch_end(self, epoch, logs=None): self.trainer.training_progress = (epoch + 1) / self.total_epochs * 100 callbacks = [ProgressCallback(self, epochs)] if callback: callbacks.append(callback) # Train history = self.model.fit( dataset, epochs=epochs, callbacks=callbacks, verbose=1 ) # Save model new_version = self._generate_version() self.save_model(new_version) # Mark samples as used new_samples = collector.get_new_training_data() if new_samples: sample_ids = [s['id'] for s in new_samples] db.mark_as_used_for_training(sample_ids) # Record training run final_loss = history.history['loss'][-1] final_acc = history.history.get('accuracy', [0])[-1] samples_count = len(new_samples) if new_samples else 0 db.save_training_run( samples_used=samples_count, epochs=epochs, final_loss=final_loss, final_accuracy=final_acc, model_version=new_version ) self.last_training_time = datetime.now() self.is_training = False self.training_progress = 100 return { 'status': 'success', 'version': new_version, 'loss': final_loss, 'accuracy': final_acc, 'samples_used': samples_count } except Exception as e: self.is_training = False import traceback traceback.print_exc() return {'status': 'error', 'message': str(e)} def train_async(self, epochs: int = EPOCHS_PER_RETRAIN): """Start training in background thread""" if self.is_training: return False def train_thread(): result = self.train(epochs=epochs) print(f"Background training completed: {result}") self._training_thread = threading.Thread(target=train_thread) self._training_thread.start() return True def start_auto_training(self): """Start automatic retraining scheduler""" def auto_train_loop(): while not self._stop_background: # Check every hour time.sleep(3600) if self._stop_background: break # Check if retraining needed if self.should_retrain(): print("Auto-training triggered...") self.train() self._stop_background = False thread = threading.Thread(target=auto_train_loop, daemon=True) thread.start() print("Auto-training scheduler started") def stop_auto_training(self): """Stop automatic retraining""" self._stop_background = True def get_status(self) -> dict: """Get trainer status""" return { 'model_loaded': self.model is not None, 'model_version': self.model_version, 'is_training': self.is_training, 'training_progress': self.training_progress, 'last_training': self.last_training_time.isoformat() if self.last_training_time else None, 'pending_samples': collector.get_pending_count(), 'min_samples_for_training': MIN_SAMPLES_FOR_TRAINING } def generate( self, prompt: str, max_tokens: int = 100, temperature: float = 0.7, repetition_penalty: float = 1.2, top_k: int = 50 ) -> str: """Generate code using the model""" if self.model is None or self.tokenizer is None: raise ValueError("Model not loaded") tokens = self.tokenizer.encode(prompt) if len(tokens) == 0: tokens = [ord(' ')] generated = self.model.generate( tokens, max_new_tokens=max_tokens, temperature=temperature, top_k=top_k, repetition_penalty=repetition_penalty ) return self.tokenizer.decode(generated) # Global trainer instance trainer = ContinuousTrainer()