""" Visible LLM - A Language Model built with TensorFlow Trained on veda.txt """ import os import json import numpy as np import tensorflow as tf from tensorflow import keras from tensorflow.keras import layers from flask import Flask, request, jsonify, render_template_string import re import pickle from datetime import datetime # ============================================================ # CONFIGURATION # ============================================================ class VisibleConfig: """Configuration for Visible LLM""" MODEL_NAME = "Visible" VERSION = "1.0.0" # Model Architecture VOCAB_SIZE = 10000 EMBEDDING_DIM = 256 NUM_HEADS = 8 NUM_LAYERS = 6 FF_DIM = 512 MAX_SEQ_LENGTH = 128 DROPOUT_RATE = 0.1 # Training BATCH_SIZE = 32 EPOCHS = 50 LEARNING_RATE = 0.0001 WARMUP_STEPS = 4000 # Paths DATA_FILE = "veda.txt" MODEL_DIR = "models" MODEL_PATH = "models/visible_model" TOKENIZER_PATH = "models/visible_tokenizer.pkl" CONFIG_PATH = "models/visible_config.json" # ============================================================ # CUSTOM TOKENIZER # ============================================================ class VisibleTokenizer: """Custom tokenizer for Visible LLM""" def __init__(self, vocab_size=10000): self.vocab_size = vocab_size self.word_to_idx = {} self.idx_to_word = {} self.vocab = [] # Special tokens self.pad_token = "" self.unk_token = "" self.start_token = "" self.end_token = "" self.pad_token_id = 0 self.unk_token_id = 1 self.start_token_id = 2 self.end_token_id = 3 def _preprocess_text(self, text): """Clean and preprocess text""" text = text.lower() text = re.sub(r'[^\w\s\.\,\!\?\;\:\'\"\-]', '', text) text = re.sub(r'\s+', ' ', text) return text.strip() def _tokenize(self, text): """Split text into tokens""" text = self._preprocess_text(text) # Simple word-level tokenization with punctuation handling tokens = re.findall(r'\w+|[^\w\s]', text) return tokens def fit(self, texts): """Build vocabulary from texts""" print("Building vocabulary...") word_counts = {} for text in texts: tokens = self._tokenize(text) for token in tokens: word_counts[token] = word_counts.get(token, 0) + 1 # Sort by frequency sorted_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True) # Build vocabulary with special tokens self.vocab = [self.pad_token, self.unk_token, self.start_token, self.end_token] self.vocab.extend([word for word, _ in sorted_words[:self.vocab_size - 4]]) self.word_to_idx = {word: idx for idx, word in enumerate(self.vocab)} self.idx_to_word = {idx: word for idx, word in enumerate(self.vocab)} print(f"Vocabulary size: {len(self.vocab)}") return self def encode(self, text, max_length=None, add_special_tokens=True): """Encode text to token ids""" tokens = self._tokenize(text) if add_special_tokens: tokens = [self.start_token] + tokens + [self.end_token] token_ids = [self.word_to_idx.get(token, self.unk_token_id) for token in tokens] if max_length: if len(token_ids) > max_length: token_ids = token_ids[:max_length] else: token_ids.extend([self.pad_token_id] * (max_length - len(token_ids))) return token_ids def decode(self, token_ids, skip_special_tokens=True): """Decode token ids to text""" special_ids = {self.pad_token_id, self.start_token_id, self.end_token_id} tokens = [] for idx in token_ids: if skip_special_tokens and idx in special_ids: continue if idx == self.unk_token_id and skip_special_tokens: tokens.append("") else: tokens.append(self.idx_to_word.get(idx, self.unk_token)) # Join tokens properly text = ' '.join(tokens) # Fix punctuation spacing text = re.sub(r'\s+([.,!?;:])', r'\1', text) return text def save(self, path): """Save tokenizer to file""" with open(path, 'wb') as f: pickle.dump({ 'vocab': self.vocab, 'vocab_size': self.vocab_size }, f) print(f"Tokenizer saved to {path}") def load(self, path): """Load tokenizer from file""" with open(path, 'rb') as f: data = pickle.load(f) self.vocab = data['vocab'] self.vocab_size = data['vocab_size'] self.word_to_idx = {word: idx for idx, word in enumerate(self.vocab)} self.idx_to_word = {idx: word for idx, word in enumerate(self.vocab)} print(f"Tokenizer loaded from {path}") return self def __len__(self): return len(self.vocab) # ============================================================ # TRANSFORMER COMPONENTS # ============================================================ class PositionalEncoding(layers.Layer): """Positional encoding layer""" def __init__(self, max_seq_length, embed_dim, **kwargs): super().__init__(**kwargs) self.max_seq_length = max_seq_length self.embed_dim = embed_dim # Create positional encoding matrix position = np.arange(max_seq_length)[:, np.newaxis] div_term = np.exp(np.arange(0, embed_dim, 2) * -(np.log(10000.0) / embed_dim)) pe = np.zeros((max_seq_length, embed_dim)) pe[:, 0::2] = np.sin(position * div_term) pe[:, 1::2] = np.cos(position * div_term) self.positional_encoding = tf.constant(pe, dtype=tf.float32) def call(self, x): seq_length = tf.shape(x)[1] return x + self.positional_encoding[:seq_length, :] def get_config(self): config = super().get_config() config.update({ 'max_seq_length': self.max_seq_length, 'embed_dim': self.embed_dim }) return config class TransformerBlock(layers.Layer): """Transformer decoder block""" def __init__(self, embed_dim, num_heads, ff_dim, dropout_rate=0.1, **kwargs): super().__init__(**kwargs) self.embed_dim = embed_dim self.num_heads = num_heads self.ff_dim = ff_dim self.dropout_rate = dropout_rate self.attention = layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_dim // num_heads, dropout=dropout_rate ) self.ffn = keras.Sequential([ layers.Dense(ff_dim, activation='gelu'), layers.Dropout(dropout_rate), layers.Dense(embed_dim) ]) self.layernorm1 = layers.LayerNormalization(epsilon=1e-6) self.layernorm2 = layers.LayerNormalization(epsilon=1e-6) self.dropout1 = layers.Dropout(dropout_rate) self.dropout2 = layers.Dropout(dropout_rate) def causal_attention_mask(self, seq_length): """Create causal mask for autoregressive attention""" mask = tf.linalg.band_part(tf.ones((seq_length, seq_length)), -1, 0) return mask def call(self, x, training=False): seq_length = tf.shape(x)[1] causal_mask = self.causal_attention_mask(seq_length) # Self-attention with causal mask attention_output = self.attention( query=x, value=x, key=x, attention_mask=causal_mask, training=training ) attention_output = self.dropout1(attention_output, training=training) x = self.layernorm1(x + attention_output) # Feed-forward network ffn_output = self.ffn(x) ffn_output = self.dropout2(ffn_output, training=training) x = self.layernorm2(x + ffn_output) return x def get_config(self): config = super().get_config() config.update({ 'embed_dim': self.embed_dim, 'num_heads': self.num_heads, 'ff_dim': self.ff_dim, 'dropout_rate': self.dropout_rate }) return config # ============================================================ # VISIBLE LLM MODEL # ============================================================ class VisibleLLM: """Visible Language Model""" def __init__(self, config=None): self.config = config or VisibleConfig() self.tokenizer = None self.model = None self.history = None def build_model(self, vocab_size=None): """Build the Transformer model""" vocab_size = vocab_size or self.config.VOCAB_SIZE print(f"\n{'='*50}") print(f"Building {self.config.MODEL_NAME} LLM") print(f"{'='*50}") # Input layer inputs = layers.Input(shape=(None,), dtype=tf.int32, name="input_ids") # Token embedding token_embedding = layers.Embedding( input_dim=vocab_size, output_dim=self.config.EMBEDDING_DIM, name="token_embedding" )(inputs) # Positional encoding x = PositionalEncoding( self.config.MAX_SEQ_LENGTH, self.config.EMBEDDING_DIM, name="positional_encoding" )(token_embedding) # Dropout x = layers.Dropout(self.config.DROPOUT_RATE)(x) # Transformer blocks for i in range(self.config.NUM_LAYERS): x = TransformerBlock( embed_dim=self.config.EMBEDDING_DIM, num_heads=self.config.NUM_HEADS, ff_dim=self.config.FF_DIM, dropout_rate=self.config.DROPOUT_RATE, name=f"transformer_block_{i}" )(x) # Final layer normalization x = layers.LayerNormalization(epsilon=1e-6, name="final_layernorm")(x) # Output projection outputs = layers.Dense(vocab_size, name="output_projection")(x) self.model = keras.Model(inputs=inputs, outputs=outputs, name=self.config.MODEL_NAME) # Compile model self.model.compile( optimizer=keras.optimizers.Adam(learning_rate=self.config.LEARNING_RATE), loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'] ) self.model.summary() return self.model def load_data(self, file_path=None): """Load and preprocess training data""" file_path = file_path or self.config.DATA_FILE print(f"\nLoading data from {file_path}...") if not os.path.exists(file_path): raise FileNotFoundError(f"Data file not found: {file_path}") with open(file_path, 'r', encoding='utf-8') as f: text = f.read() # Split into sentences/chunks sentences = re.split(r'[.!?]+', text) sentences = [s.strip() for s in sentences if len(s.strip()) > 10] print(f"Loaded {len(sentences)} text segments") return sentences def prepare_training_data(self, texts): """Prepare data for training""" print("\nPreparing training data...") # Initialize and fit tokenizer self.tokenizer = VisibleTokenizer(vocab_size=self.config.VOCAB_SIZE) self.tokenizer.fit(texts) # Create training sequences input_sequences = [] target_sequences = [] for text in texts: token_ids = self.tokenizer.encode( text, max_length=self.config.MAX_SEQ_LENGTH + 1, add_special_tokens=True ) if len([t for t in token_ids if t != 0]) > 3: # Skip very short sequences input_sequences.append(token_ids[:-1]) target_sequences.append(token_ids[1:]) X = np.array(input_sequences) y = np.array(target_sequences) print(f"Training samples: {len(X)}") print(f"Input shape: {X.shape}") print(f"Target shape: {y.shape}") return X, y def train(self, data_file=None, epochs=None, batch_size=None): """Train the model""" epochs = epochs or self.config.EPOCHS batch_size = batch_size or self.config.BATCH_SIZE # Load and prepare data texts = self.load_data(data_file) X, y = self.prepare_training_data(texts) # Build model self.build_model(vocab_size=len(self.tokenizer)) # Create model directory os.makedirs(self.config.MODEL_DIR, exist_ok=True) # Callbacks callbacks = [ keras.callbacks.ModelCheckpoint( filepath=self.config.MODEL_PATH, save_best_only=True, monitor='loss', mode='min' ), keras.callbacks.EarlyStopping( monitor='loss', patience=5, restore_best_weights=True ), keras.callbacks.ReduceLROnPlateau( monitor='loss', factor=0.5, patience=3, min_lr=1e-7 ), keras.callbacks.TensorBoard( log_dir=f'logs/{datetime.now().strftime("%Y%m%d-%H%M%S")}' ) ] print(f"\n{'='*50}") print(f"Training {self.config.MODEL_NAME}") print(f"{'='*50}") print(f"Epochs: {epochs}") print(f"Batch Size: {batch_size}") print(f"{'='*50}\n") # Train self.history = self.model.fit( X, y, epochs=epochs, batch_size=batch_size, callbacks=callbacks, validation_split=0.1 ) # Save tokenizer self.tokenizer.save(self.config.TOKENIZER_PATH) # Save config self.save_config() print(f"\n{'='*50}") print(f"Training Complete!") print(f"Model saved to: {self.config.MODEL_PATH}") print(f"Tokenizer saved to: {self.config.TOKENIZER_PATH}") print(f"{'='*50}\n") return self.history def save_config(self): """Save model configuration""" config_dict = { 'model_name': self.config.MODEL_NAME, 'version': self.config.VERSION, 'vocab_size': len(self.tokenizer), 'embedding_dim': self.config.EMBEDDING_DIM, 'num_heads': self.config.NUM_HEADS, 'num_layers': self.config.NUM_LAYERS, 'ff_dim': self.config.FF_DIM, 'max_seq_length': self.config.MAX_SEQ_LENGTH, 'trained_on': datetime.now().isoformat() } with open(self.config.CONFIG_PATH, 'w') as f: json.dump(config_dict, f, indent=2) def load_model(self, model_path=None, tokenizer_path=None): """Load a trained model""" model_path = model_path or self.config.MODEL_PATH tokenizer_path = tokenizer_path or self.config.TOKENIZER_PATH print(f"Loading model from {model_path}...") # Load tokenizer self.tokenizer = VisibleTokenizer() self.tokenizer.load(tokenizer_path) # Load model with custom objects custom_objects = { 'PositionalEncoding': PositionalEncoding, 'TransformerBlock': TransformerBlock } self.model = keras.models.load_model(model_path, custom_objects=custom_objects) print("Model loaded successfully!") return self def generate(self, prompt, max_length=100, temperature=0.7, top_k=50, top_p=0.9): """Generate text from a prompt""" if self.model is None or self.tokenizer is None: raise ValueError("Model not loaded. Call load_model() first.") # Encode prompt input_ids = self.tokenizer.encode(prompt, add_special_tokens=True) input_ids = input_ids[:-1] # Remove end token for generation generated_ids = list(input_ids) for _ in range(max_length): # Prepare input current_input = np.array([generated_ids[-self.config.MAX_SEQ_LENGTH:]]) # Get predictions predictions = self.model.predict(current_input, verbose=0) next_token_logits = predictions[0, -1, :] # Apply temperature next_token_logits = next_token_logits / temperature # Apply top-k filtering if top_k > 0: indices_to_remove = np.argsort(next_token_logits)[:-top_k] next_token_logits[indices_to_remove] = float('-inf') # Apply top-p (nucleus) filtering if top_p < 1.0: sorted_indices = np.argsort(next_token_logits)[::-1] sorted_logits = next_token_logits[sorted_indices] cumulative_probs = np.cumsum(tf.nn.softmax(sorted_logits).numpy()) sorted_indices_to_remove = cumulative_probs > top_p sorted_indices_to_remove[1:] = sorted_indices_to_remove[:-1].copy() sorted_indices_to_remove[0] = False indices_to_remove = sorted_indices[sorted_indices_to_remove] next_token_logits[indices_to_remove] = float('-inf') # Sample from distribution probs = tf.nn.softmax(next_token_logits).numpy() next_token_id = np.random.choice(len(probs), p=probs) # Stop if end token if next_token_id == self.tokenizer.end_token_id: break generated_ids.append(next_token_id) # Decode generated text generated_text = self.tokenizer.decode(generated_ids, skip_special_tokens=True) return generated_text def chat(self, user_input, max_length=100, temperature=0.7): """Interactive chat with the model""" response = self.generate( prompt=user_input, max_length=max_length, temperature=temperature ) return response # ============================================================ # FLASK WEB APPLICATION # ============================================================ app = Flask(__name__) visible_llm = None # HTML Template HTML_TEMPLATE = """ Visible LLM

🔮 Visible

Intelligent Language Model powered by TensorFlow

Checking model status...
0.7
100
50
0.9
Hello! I am Visible, your AI assistant. Ask me anything!
""" @app.route('/') def home(): """Render the main chat interface""" return render_template_string(HTML_TEMPLATE) @app.route('/api/status') def status(): """Get model status""" global visible_llm return jsonify({ 'model_loaded': visible_llm is not None and visible_llm.model is not None, 'model_name': VisibleConfig.MODEL_NAME, 'version': VisibleConfig.VERSION }) @app.route('/api/generate', methods=['POST']) def generate(): """Generate text from prompt""" global visible_llm if visible_llm is None or visible_llm.model is None: return jsonify({ 'success': False, 'error': 'Model not loaded. Please train the model first.' }) try: data = request.json prompt = data.get('prompt', '') max_length = data.get('max_length', 100) temperature = data.get('temperature', 0.7) top_k = data.get('top_k', 50) top_p = data.get('top_p', 0.9) response = visible_llm.generate( prompt=prompt, max_length=max_length, temperature=temperature, top_k=top_k, top_p=top_p ) return jsonify({ 'success': True, 'response': response, 'prompt': prompt }) except Exception as e: return jsonify({ 'success': False, 'error': str(e) }) @app.route('/api/train', methods=['POST']) def train_model(): """Train the model (API endpoint)""" global visible_llm try: data = request.json or {} epochs = data.get('epochs', 50) batch_size = data.get('batch_size', 32) visible_llm = VisibleLLM() visible_llm.train(epochs=epochs, batch_size=batch_size) return jsonify({ 'success': True, 'message': 'Training complete!' }) except Exception as e: return jsonify({ 'success': False, 'error': str(e) }) # ============================================================ # COMMAND LINE INTERFACE # ============================================================ def main(): """Main entry point""" import argparse parser = argparse.ArgumentParser(description='Visible LLM - Language Model') parser.add_argument('--train', action='store_true', help='Train the model') parser.add_argument('--serve', action='store_true', help='Start web server') parser.add_argument('--chat', action='store_true', help='Interactive chat mode') parser.add_argument('--epochs', type=int, default=50, help='Number of training epochs') parser.add_argument('--batch-size', type=int, default=32, help='Batch size') parser.add_argument('--data', type=str, default='veda.txt', help='Training data file') parser.add_argument('--port', type=int, default=5000, help='Server port') args = parser.parse_args() global visible_llm if args.train: print("\n" + "="*60) print("VISIBLE LLM - TRAINING MODE") print("="*60 + "\n") visible_llm = VisibleLLM() VisibleConfig.DATA_FILE = args.data visible_llm.train(epochs=args.epochs, batch_size=args.batch_size) elif args.chat: print("\n" + "="*60) print("VISIBLE LLM - CHAT MODE") print("="*60 + "\n") visible_llm = VisibleLLM() visible_llm.load_model() print("Chat with Visible (type 'quit' to exit)\n") while True: user_input = input("You: ").strip() if user_input.lower() in ['quit', 'exit', 'q']: print("Goodbye!") break if user_input: response = visible_llm.chat(user_input) print(f"Visible: {response}\n") elif args.serve: print("\n" + "="*60) print("VISIBLE LLM - WEB SERVER MODE") print("="*60 + "\n") # Try to load existing model visible_llm = VisibleLLM() try: visible_llm.load_model() print("Model loaded successfully!") except Exception as e: print(f"Could not load model: {e}") print("Please train the model first with: python app.py --train") visible_llm = None print(f"\nStarting server on http://localhost:{args.port}") app.run(host='0.0.0.0', port=args.port, debug=False) else: # Default: show help parser.print_help() print("\n" + "="*60) print("QUICK START:") print("="*60) print("1. Train the model: python app.py --train --data veda.txt") print("2. Start web server: python app.py --serve") print("3. Interactive chat: python app.py --chat") print("="*60 + "\n") if __name__ == '__main__': main()