Spaces:
Running
Running
| """Continuous training system for Veda Programming LLM""" | |
| import os | |
| import json | |
| import shutil | |
| from datetime import datetime | |
| from typing import Optional | |
| import threading | |
| import time | |
| import tensorflow as tf | |
| from tensorflow import keras | |
| from model import VedaProgrammingLLM | |
| from tokenizer import VedaTokenizer | |
| from database import db | |
| from data_collector import collector | |
| from config import ( | |
| MODEL_DIR, VERSIONS_DIR, VOCAB_SIZE, MAX_LENGTH, | |
| D_MODEL, NUM_HEADS, NUM_LAYERS, FF_DIM, BATCH_SIZE, | |
| MIN_SAMPLES_FOR_TRAINING, EPOCHS_PER_RETRAIN, | |
| AUTO_TRAIN_INTERVAL_HOURS | |
| ) | |
| class ContinuousTrainer: | |
| """Handles continuous learning and model updates""" | |
| def __init__(self): | |
| self.model: Optional[VedaProgrammingLLM] = None | |
| self.tokenizer: Optional[VedaTokenizer] = None | |
| self.is_training = False | |
| self.training_progress = 0 | |
| self.last_training_time = None | |
| self.model_version = self._get_current_version() | |
| # Background training thread | |
| self._training_thread = None | |
| self._stop_background = False | |
| def _get_current_version(self) -> str: | |
| """Get current model version""" | |
| config_path = os.path.join(MODEL_DIR, "config.json") | |
| if os.path.exists(config_path): | |
| with open(config_path, 'r') as f: | |
| config = json.load(f) | |
| return config.get('version', 'v1.0') | |
| return 'v1.0' | |
| def _generate_version(self) -> str: | |
| """Generate new version string""" | |
| return f"v{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| def load_model(self) -> bool: | |
| """Load the current model""" | |
| config_path = os.path.join(MODEL_DIR, "config.json") | |
| if not os.path.exists(config_path): | |
| print("No existing model found.") | |
| return False | |
| try: | |
| # Load config | |
| with open(config_path, 'r') as f: | |
| config = json.load(f) | |
| # Load tokenizer | |
| self.tokenizer = VedaTokenizer() | |
| self.tokenizer.load(os.path.join(MODEL_DIR, "tokenizer.json")) | |
| # Create model | |
| self.model = VedaProgrammingLLM( | |
| vocab_size=config['vocab_size'], | |
| max_length=config['max_length'], | |
| d_model=config['d_model'], | |
| num_heads=config['num_heads'], | |
| num_layers=config['num_layers'], | |
| ff_dim=config['ff_dim'] | |
| ) | |
| # Build and load weights | |
| dummy = tf.zeros((1, config['max_length']), dtype=tf.int32) | |
| self.model(dummy) | |
| self.model.load_weights(os.path.join(MODEL_DIR, "weights.h5")) | |
| self.model_version = config.get('version', 'v1.0') | |
| print(f"Model loaded: {self.model_version}") | |
| return True | |
| except Exception as e: | |
| print(f"Error loading model: {e}") | |
| return False | |
| def save_model(self, version: str = None): | |
| """Save the current model""" | |
| if self.model is None or self.tokenizer is None: | |
| return | |
| version = version or self._generate_version() | |
| # Save to main directory | |
| os.makedirs(MODEL_DIR, exist_ok=True) | |
| self.model.save_weights(os.path.join(MODEL_DIR, "weights.h5")) | |
| self.tokenizer.save(os.path.join(MODEL_DIR, "tokenizer.json")) | |
| config = self.model.get_config() | |
| config['version'] = version | |
| config['last_trained'] = datetime.now().isoformat() | |
| with open(os.path.join(MODEL_DIR, "config.json"), 'w') as f: | |
| json.dump(config, f, indent=2) | |
| # Save version backup | |
| version_dir = os.path.join(VERSIONS_DIR, version) | |
| os.makedirs(version_dir, exist_ok=True) | |
| shutil.copy( | |
| os.path.join(MODEL_DIR, "weights.h5"), | |
| os.path.join(version_dir, "weights.h5") | |
| ) | |
| shutil.copy( | |
| os.path.join(MODEL_DIR, "tokenizer.json"), | |
| os.path.join(version_dir, "tokenizer.json") | |
| ) | |
| shutil.copy( | |
| os.path.join(MODEL_DIR, "config.json"), | |
| os.path.join(version_dir, "config.json") | |
| ) | |
| self.model_version = version | |
| print(f"Model saved: {version}") | |
| def should_retrain(self) -> bool: | |
| """Check if retraining is needed""" | |
| pending = collector.get_pending_count() | |
| return pending >= MIN_SAMPLES_FOR_TRAINING | |
| def prepare_training_data(self) -> tf.data.Dataset: | |
| """Prepare dataset for training""" | |
| # Get all training samples | |
| samples = collector.get_training_data(include_base=True) | |
| if not samples: | |
| return None | |
| # Combine all samples | |
| all_text = '\n\n'.join(samples) | |
| # Fit or update tokenizer | |
| if self.tokenizer is None: | |
| self.tokenizer = VedaTokenizer(vocab_size=VOCAB_SIZE) | |
| self.tokenizer.fit([all_text]) | |
| # Encode | |
| all_tokens = self.tokenizer.encode(all_text) | |
| # Create sequences | |
| sequences = [] | |
| stride = MAX_LENGTH // 2 | |
| for i in range(0, len(all_tokens) - MAX_LENGTH - 1, stride): | |
| seq = all_tokens[i:i + MAX_LENGTH + 1] | |
| if len(seq) == MAX_LENGTH + 1: | |
| sequences.append(seq) | |
| if len(sequences) < 5: | |
| stride = max(1, MAX_LENGTH // 8) | |
| sequences = [] | |
| for i in range(0, len(all_tokens) - MAX_LENGTH - 1, stride): | |
| seq = all_tokens[i:i + MAX_LENGTH + 1] | |
| if len(seq) == MAX_LENGTH + 1: | |
| sequences.append(seq) | |
| import numpy as np | |
| sequences = np.array(sequences) | |
| X = sequences[:, :-1] | |
| y = sequences[:, 1:] | |
| dataset = tf.data.Dataset.from_tensor_slices((X, y)) | |
| dataset = dataset.shuffle(buffer_size=min(1000, len(sequences))) | |
| dataset = dataset.batch(BATCH_SIZE) | |
| dataset = dataset.prefetch(tf.data.AUTOTUNE) | |
| print(f"Prepared {len(sequences)} training sequences") | |
| return dataset | |
| def train( | |
| self, | |
| epochs: int = EPOCHS_PER_RETRAIN, | |
| callback=None | |
| ) -> dict: | |
| """Train/retrain the model""" | |
| if self.is_training: | |
| return {'status': 'error', 'message': 'Training already in progress'} | |
| self.is_training = True | |
| self.training_progress = 0 | |
| try: | |
| # Prepare data | |
| dataset = self.prepare_training_data() | |
| if dataset is None: | |
| self.is_training = False | |
| return {'status': 'error', 'message': 'No training data available'} | |
| # Create/update model | |
| if self.model is None: | |
| self.model = VedaProgrammingLLM( | |
| vocab_size=self.tokenizer.vocabulary_size, | |
| max_length=MAX_LENGTH, | |
| d_model=D_MODEL, | |
| num_heads=NUM_HEADS, | |
| num_layers=NUM_LAYERS, | |
| ff_dim=FF_DIM | |
| ) | |
| # Compile | |
| self.model.compile( | |
| optimizer=keras.optimizers.Adam(learning_rate=1e-4), | |
| loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True), | |
| metrics=['accuracy'] | |
| ) | |
| # Build | |
| dummy = tf.zeros((1, MAX_LENGTH), dtype=tf.int32) | |
| self.model(dummy) | |
| # Custom callback for progress | |
| class ProgressCallback(keras.callbacks.Callback): | |
| def __init__(self, trainer, total_epochs): | |
| self.trainer = trainer | |
| self.total_epochs = total_epochs | |
| def on_epoch_end(self, epoch, logs=None): | |
| self.trainer.training_progress = (epoch + 1) / self.total_epochs * 100 | |
| callbacks = [ProgressCallback(self, epochs)] | |
| if callback: | |
| callbacks.append(callback) | |
| # Train | |
| history = self.model.fit( | |
| dataset, | |
| epochs=epochs, | |
| callbacks=callbacks, | |
| verbose=1 | |
| ) | |
| # Save model | |
| new_version = self._generate_version() | |
| self.save_model(new_version) | |
| # Mark samples as used | |
| new_samples = collector.get_new_training_data() | |
| if new_samples: | |
| sample_ids = [s['id'] for s in new_samples] | |
| db.mark_as_used_for_training(sample_ids) | |
| # Record training run | |
| final_loss = history.history['loss'][-1] | |
| final_acc = history.history.get('accuracy', [0])[-1] | |
| samples_count = len(new_samples) if new_samples else 0 | |
| db.save_training_run( | |
| samples_used=samples_count, | |
| epochs=epochs, | |
| final_loss=final_loss, | |
| final_accuracy=final_acc, | |
| model_version=new_version | |
| ) | |
| self.last_training_time = datetime.now() | |
| self.is_training = False | |
| self.training_progress = 100 | |
| return { | |
| 'status': 'success', | |
| 'version': new_version, | |
| 'loss': final_loss, | |
| 'accuracy': final_acc, | |
| 'samples_used': samples_count | |
| } | |
| except Exception as e: | |
| self.is_training = False | |
| import traceback | |
| traceback.print_exc() | |
| return {'status': 'error', 'message': str(e)} | |
| def train_async(self, epochs: int = EPOCHS_PER_RETRAIN): | |
| """Start training in background thread""" | |
| if self.is_training: | |
| return False | |
| def train_thread(): | |
| result = self.train(epochs=epochs) | |
| print(f"Background training completed: {result}") | |
| self._training_thread = threading.Thread(target=train_thread) | |
| self._training_thread.start() | |
| return True | |
| def start_auto_training(self): | |
| """Start automatic retraining scheduler""" | |
| def auto_train_loop(): | |
| while not self._stop_background: | |
| # Check every hour | |
| time.sleep(3600) | |
| if self._stop_background: | |
| break | |
| # Check if retraining needed | |
| if self.should_retrain(): | |
| print("Auto-training triggered...") | |
| self.train() | |
| self._stop_background = False | |
| thread = threading.Thread(target=auto_train_loop, daemon=True) | |
| thread.start() | |
| print("Auto-training scheduler started") | |
| def stop_auto_training(self): | |
| """Stop automatic retraining""" | |
| self._stop_background = True | |
| def get_status(self) -> dict: | |
| """Get trainer status""" | |
| return { | |
| 'model_loaded': self.model is not None, | |
| 'model_version': self.model_version, | |
| 'is_training': self.is_training, | |
| 'training_progress': self.training_progress, | |
| 'last_training': self.last_training_time.isoformat() if self.last_training_time else None, | |
| 'pending_samples': collector.get_pending_count(), | |
| 'min_samples_for_training': MIN_SAMPLES_FOR_TRAINING | |
| } | |
| def generate( | |
| self, | |
| prompt: str, | |
| max_tokens: int = 100, | |
| temperature: float = 0.7, | |
| repetition_penalty: float = 1.2, | |
| top_k: int = 50 | |
| ) -> str: | |
| """Generate code using the model""" | |
| if self.model is None or self.tokenizer is None: | |
| raise ValueError("Model not loaded") | |
| tokens = self.tokenizer.encode(prompt) | |
| if len(tokens) == 0: | |
| tokens = [ord(' ')] | |
| generated = self.model.generate( | |
| tokens, | |
| max_new_tokens=max_tokens, | |
| temperature=temperature, | |
| top_k=top_k, | |
| repetition_penalty=repetition_penalty | |
| ) | |
| return self.tokenizer.decode(generated) | |
| # Global trainer instance | |
| trainer = ContinuousTrainer() |