| |
| """ |
| Test MarkovSpline + BitTransformerLM Integration |
| |
| Validates the integration between MarkovSpline and BitTransformerLM |
| using actual datasets and training procedures. |
| """ |
|
|
| import os |
| import sys |
| import time |
| import torch |
| import json |
| import numpy as np |
| from pathlib import Path |
|
|
| |
| sys.path.insert(0, '/data/MarkovSpline') |
| from bitpipe_integration import create_markov_spline_bitpipe_module |
|
|
| |
| from bit_transformer.model import BitTransformerLM |
| from markov_spline_training import MarkovSplineEnhancedTrainer, MarkovSplineEnhancedDataset |
| from markov_spline_cli import MarkovSplineBitTransformerCLI |
|
|
| |
| def create_simple_dataset(num_samples=100, seq_length=128): |
| """Create simple dataset for testing.""" |
| dataset = [] |
| for i in range(num_samples): |
| input_bits = torch.randint(0, 2, (seq_length,), dtype=torch.long) |
| target_bits = torch.randint(0, 2, (seq_length,), dtype=torch.long) |
| dataset.append({'input_bits': input_bits, 'target_bits': target_bits}) |
| return dataset |
|
|
| |
| class TextToBitsConverter: |
| """Simple text to bits converter for testing.""" |
| |
| def text_to_bits(self, text, max_length=128): |
| """Convert text to bit sequence.""" |
| |
| bit_sequence = [] |
| for char in text[:max_length//8]: |
| char_bits = format(ord(char), '08b') |
| bit_sequence.extend([int(b) for b in char_bits]) |
| |
| |
| if len(bit_sequence) < max_length: |
| bit_sequence.extend([0] * (max_length - len(bit_sequence))) |
| else: |
| bit_sequence = bit_sequence[:max_length] |
| |
| return bit_sequence |
|
|
|
|
| def test_markov_spline_preprocessing(): |
| """Test MarkovSpline data preprocessing functionality.""" |
| |
| print("\n🧪 Testing MarkovSpline Data Preprocessing") |
| print("=" * 50) |
| |
| |
| markov_module = create_markov_spline_bitpipe_module() |
| |
| |
| converter = TextToBitsConverter() |
| test_texts = [ |
| "The quick brown fox jumps over the lazy dog.", |
| "Machine learning transforms raw data into insights.", |
| "BitTransformerLM processes information at the bit level.", |
| "MarkovSpline provides smooth data transitions." |
| ] |
| |
| bit_sequences = [] |
| for text in test_texts: |
| bits = converter.text_to_bits(text, max_length=64) |
| bit_sequences.append(bits) |
| |
| print(f"📝 Created {len(bit_sequences)} test bit sequences") |
| |
| |
| print("\n🔮 Testing Binary Sequence Prediction") |
| result = markov_module.process_data(bit_sequences[0], 'predict_binary', num_predictions=8) |
| |
| if result['success']: |
| print(f" ✅ Prediction successful") |
| print(f" 🎯 Predictions: {result['predictions']}") |
| print(f" 📊 Avg confidence: {result['prediction_metrics']['avg_confidence']:.3f}") |
| print(f" 📈 Entropy: {result['prediction_metrics']['prediction_entropy']:.3f}") |
| else: |
| print(f" ❌ Prediction failed: {result.get('error', 'Unknown error')}") |
| return False |
| |
| |
| print("\n🔄 Testing Data Preprocessing") |
| result = markov_module.process_data(bit_sequences, 'preprocess_training', binary_data=True) |
| |
| if result['success']: |
| print(f" ✅ Preprocessing successful") |
| print(f" 📦 Processed {len(result['processed_sequences'])} sequences") |
| print(f" 📋 Summary: {result['preprocessing_summary']}") |
| else: |
| print(f" ❌ Preprocessing failed: {result.get('error', 'Unknown error')}") |
| return False |
| |
| return True |
|
|
|
|
| def test_enhanced_dataset(): |
| """Test MarkovSpline enhanced dataset wrapper.""" |
| |
| print("\n📦 Testing Enhanced Dataset Wrapper") |
| print("=" * 50) |
| |
| |
| base_dataset = create_simple_dataset(num_samples=20, seq_length=32) |
| print(f"📝 Created base dataset with {len(base_dataset)} samples") |
| |
| |
| markov_module = create_markov_spline_bitpipe_module() |
| |
| |
| enhanced_dataset = MarkovSplineEnhancedDataset( |
| base_dataset, markov_module, smoothing_strength=0.15, enable_smoothing=True |
| ) |
| |
| print(f"🌊 Created enhanced dataset with MarkovSpline preprocessing") |
| |
| |
| test_samples = [] |
| smoothing_success_count = 0 |
| |
| for i in range(min(5, len(enhanced_dataset))): |
| sample = enhanced_dataset[i] |
| test_samples.append(sample) |
| |
| if sample.get('smoothing_applied', False): |
| smoothing_success_count += 1 |
| |
| print(f" Sample {i}: smoothing_applied = {sample.get('smoothing_applied', False)}") |
| |
| success_rate = smoothing_success_count / len(test_samples) if test_samples else 0 |
| print(f"✅ Smoothing success rate: {success_rate:.2%}") |
| |
| return success_rate > 0.5 |
|
|
|
|
| def test_gradient_smoothing(): |
| """Test gradient smoothing functionality.""" |
| |
| print("\n⚡ Testing Gradient Smoothing") |
| print("=" * 50) |
| |
| |
| model = BitTransformerLM( |
| d_model=32, nhead=2, num_layers=2, |
| dim_feedforward=64, max_seq_len=128 |
| ) |
| |
| |
| batch_size = 4 |
| seq_length = 64 |
| test_batch = { |
| 'input_bits': torch.randint(0, 2, (batch_size, seq_length), dtype=torch.long), |
| 'target_bits': torch.randint(0, 2, (batch_size, seq_length), dtype=torch.long) |
| } |
| |
| |
| trainer = MarkovSplineEnhancedTrainer( |
| model=model, |
| gradient_smoothing=True, |
| data_smoothing=False, |
| smoothing_strength=0.2, |
| learning_rate=1e-3 |
| ) |
| |
| print(f"🧠 Initialized enhanced trainer") |
| |
| |
| start_time = time.time() |
| metrics = trainer.train_step(test_batch) |
| end_time = time.time() |
| |
| print(f" ✅ Training step completed in {end_time - start_time:.3f}s") |
| print(f" 📊 Loss: {metrics['loss']:.4f}") |
| print(f" 🌊 Smoothing applied: {metrics.get('smoothing_applied', 0):.3f}") |
| |
| |
| markov_metrics = trainer.get_markov_spline_metrics() |
| print(f" 📈 MarkovSpline operations: {markov_metrics['processing_operations']}") |
| |
| return True |
|
|
|
|
| def test_cli_interface(): |
| """Test command-line interface functionality.""" |
| |
| print("\n💻 Testing CLI Interface") |
| print("=" * 50) |
| |
| |
| cli = MarkovSplineBitTransformerCLI() |
| |
| if not cli.initialize_markov_spline(): |
| print(" ❌ CLI initialization failed") |
| return False |
| |
| print(" ✅ CLI initialized successfully") |
| |
| |
| test_bits = [0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 1, 0, 0, 1] |
| result = cli.smooth_bit_sequence(test_bits, 'predict_binary', num_predictions=5) |
| |
| if result['success']: |
| print(f" ✅ Bit sequence smoothing successful") |
| print(f" 🎯 Predictions: {result['predictions']}") |
| else: |
| print(f" ❌ Bit sequence smoothing failed: {result.get('error', 'Unknown')}") |
| return False |
| |
| return True |
|
|
|
|
| def run_integration_benchmark(): |
| """Run comprehensive integration benchmark.""" |
| |
| print("\n🏃 Running Integration Benchmark") |
| print("=" * 50) |
| |
| |
| dataset = create_simple_dataset(num_samples=50, seq_length=64) |
| |
| |
| markov_module = create_markov_spline_bitpipe_module() |
| |
| |
| enhanced_dataset = MarkovSplineEnhancedDataset( |
| dataset, markov_module, smoothing_strength=0.1, enable_smoothing=True |
| ) |
| |
| |
| print("\n⏱️ Benchmarking Data Loading Performance") |
| |
| |
| start_time = time.time() |
| for i in range(10): |
| sample = dataset[i % len(dataset)] |
| base_time = time.time() - start_time |
| |
| |
| start_time = time.time() |
| for i in range(10): |
| sample = enhanced_dataset[i % len(enhanced_dataset)] |
| enhanced_time = time.time() - start_time |
| |
| overhead = ((enhanced_time - base_time) / base_time) * 100 if base_time > 0 else 0 |
| |
| print(f" 📊 Base loading time: {base_time:.3f}s") |
| print(f" 🌊 Enhanced loading time: {enhanced_time:.3f}s") |
| print(f" 📈 Smoothing overhead: {overhead:.1f}%") |
| |
| |
| print("\n🏋️ Benchmarking Training Performance") |
| |
| model = BitTransformerLM(d_model=64, nhead=4, num_layers=2, dim_feedforward=128, max_seq_len=128) |
| |
| |
| from markov_spline_training import BitwiseTrainer |
| standard_trainer = BitwiseTrainer(model, learning_rate=1e-3) |
| |
| |
| enhanced_trainer = MarkovSplineEnhancedTrainer( |
| model, gradient_smoothing=True, data_smoothing=True, |
| smoothing_strength=0.15, learning_rate=1e-3 |
| ) |
| |
| |
| test_batch = { |
| 'input_bits': torch.randint(0, 2, (4, 64), dtype=torch.long), |
| 'target_bits': torch.randint(0, 2, (4, 64), dtype=torch.long) |
| } |
| |
| |
| start_time = time.time() |
| standard_metrics = standard_trainer.train_step(test_batch) |
| standard_time = time.time() - start_time |
| |
| |
| start_time = time.time() |
| enhanced_metrics = enhanced_trainer.train_step(test_batch) |
| enhanced_time = time.time() - start_time |
| |
| training_overhead = ((enhanced_time - standard_time) / standard_time) * 100 if standard_time > 0 else 0 |
| |
| print(f" 📊 Standard training step: {standard_time:.3f}s") |
| print(f" 🌊 Enhanced training step: {enhanced_time:.3f}s") |
| print(f" 📈 Enhancement overhead: {training_overhead:.1f}%") |
| print(f" 🎯 Standard loss: {standard_metrics['loss']:.4f}") |
| print(f" 🎯 Enhanced loss: {enhanced_metrics['loss']:.4f}") |
| |
| return { |
| 'data_loading_overhead': overhead, |
| 'training_overhead': training_overhead, |
| 'standard_loss': standard_metrics['loss'], |
| 'enhanced_loss': enhanced_metrics['loss'] |
| } |
|
|
|
|
| def main(): |
| """Run comprehensive MarkovSpline integration tests.""" |
| |
| print("🌊 MarkovSpline + BitTransformerLM Integration Tests") |
| print("=" * 60) |
| |
| results = { |
| 'preprocessing_test': False, |
| 'enhanced_dataset_test': False, |
| 'gradient_smoothing_test': False, |
| 'cli_interface_test': False, |
| 'benchmark_results': None |
| } |
| |
| try: |
| |
| results['preprocessing_test'] = test_markov_spline_preprocessing() |
| results['enhanced_dataset_test'] = test_enhanced_dataset() |
| results['gradient_smoothing_test'] = test_gradient_smoothing() |
| results['cli_interface_test'] = test_cli_interface() |
| |
| |
| results['benchmark_results'] = run_integration_benchmark() |
| |
| |
| print("\n📋 Test Results Summary") |
| print("=" * 60) |
| |
| passed_tests = 0 |
| total_tests = 4 |
| |
| for test_name, result in results.items(): |
| if test_name != 'benchmark_results': |
| status = "✅ PASSED" if result else "❌ FAILED" |
| print(f" {test_name}: {status}") |
| if result: |
| passed_tests += 1 |
| |
| success_rate = (passed_tests / total_tests) * 100 |
| print(f"\n🎯 Overall Success Rate: {success_rate:.1f}% ({passed_tests}/{total_tests})") |
| |
| if results['benchmark_results']: |
| benchmark = results['benchmark_results'] |
| print(f"\n📊 Performance Impact:") |
| print(f" - Data loading overhead: {benchmark['data_loading_overhead']:.1f}%") |
| print(f" - Training overhead: {benchmark['training_overhead']:.1f}%") |
| print(f" - Loss comparison: {benchmark['standard_loss']:.4f} → {benchmark['enhanced_loss']:.4f}") |
| |
| |
| results_file = '/data/BitTransformerLM/BitTransformerLM/markov_integration_test_results.json' |
| with open(results_file, 'w') as f: |
| json.dump(results, f, indent=2, default=str) |
| |
| print(f"\n📁 Results saved to: {results_file}") |
| |
| if success_rate >= 75: |
| print("\n🚀 Integration tests PASSED! Ready for production use.") |
| return 0 |
| else: |
| print("\n⚠️ Integration tests show issues. Review before production use.") |
| return 1 |
| |
| except Exception as e: |
| print(f"\n💥 Test suite failed with error: {e}") |
| import traceback |
| traceback.print_exc() |
| return 1 |
|
|
|
|
| if __name__ == '__main__': |
| sys.exit(main()) |