|
|
|
|
|
""" |
|
|
Test implementation script for Compact AI Model with Interleaved Thinking. |
|
|
|
|
|
This script tests the core functionality of the model including: |
|
|
- Model creation and initialization |
|
|
- Forward pass with interleaved thinking |
|
|
- Basic text generation |
|
|
- Memory usage and performance metrics |
|
|
- API endpoints (if available) |
|
|
""" |
|
|
|
|
|
import torch |
|
|
import time |
|
|
import psutil |
|
|
import os |
|
|
import sys |
|
|
import json |
|
|
from pathlib import Path |
|
|
from typing import Dict, Any |
|
|
|
|
|
|
|
|
def get_memory_usage() -> Dict[str, float]: |
|
|
"""Get current memory usage.""" |
|
|
process = psutil.Process(os.getpid()) |
|
|
memory_info = process.memory_info() |
|
|
|
|
|
return { |
|
|
"rss_mb": memory_info.rss / 1024 / 1024, |
|
|
"vms_mb": memory_info.vms / 1024 / 1024, |
|
|
"percent": process.memory_percent(), |
|
|
} |
|
|
|
|
|
|
|
|
def test_model_creation(): |
|
|
"""Test model creation and basic properties.""" |
|
|
print("π§ͺ Testing model creation...") |
|
|
|
|
|
try: |
|
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent)) |
|
|
|
|
|
from compact_ai_model.architecture.model import create_compact_model, CompactAIModel |
|
|
from compact_ai_model.configs.config import Config |
|
|
|
|
|
|
|
|
for size in ["tiny", "small", "medium"]: |
|
|
print(f" Creating {size} model...") |
|
|
model = create_compact_model(size) |
|
|
|
|
|
|
|
|
num_params = model.get_num_params() |
|
|
print(f" {size} model: {num_params:,} parameters") |
|
|
|
|
|
|
|
|
if size == "tiny": |
|
|
assert num_params < 100_000_000, f"Tiny model too large: {num_params}" |
|
|
elif size == "small": |
|
|
assert num_params < 250_000_000, f"Small model too large: {num_params}" |
|
|
elif size == "medium": |
|
|
assert num_params < 400_000_000, f"Medium model too large: {num_params}" |
|
|
|
|
|
print("β
Model creation tests passed!") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Model creation test failed: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def test_forward_pass(): |
|
|
"""Test forward pass with interleaved thinking.""" |
|
|
print("π§ͺ Testing forward pass...") |
|
|
|
|
|
try: |
|
|
from compact_ai_model.architecture.model import create_compact_model |
|
|
|
|
|
model = create_compact_model("tiny") |
|
|
model.eval() |
|
|
|
|
|
|
|
|
vocab_size = model.model_config.vocab_size |
|
|
seq_len = 32 |
|
|
batch_size = 1 |
|
|
|
|
|
input_ids = torch.randint(0, min(1000, vocab_size), (batch_size, seq_len)) |
|
|
|
|
|
|
|
|
print(" Testing forward pass without thinking...") |
|
|
with torch.no_grad(): |
|
|
start_time = time.time() |
|
|
outputs = model(input_ids, use_thinking=False) |
|
|
inference_time = time.time() - start_time |
|
|
|
|
|
assert "logits" in outputs, "Missing logits in output" |
|
|
assert outputs["logits"].shape == (batch_size, seq_len, vocab_size), f"Wrong logits shape: {outputs['logits'].shape}" |
|
|
print(f" Inference time: {inference_time:.4f}s") |
|
|
|
|
|
|
|
|
print(" Testing forward pass with thinking...") |
|
|
with torch.no_grad(): |
|
|
start_time = time.time() |
|
|
outputs = model(input_ids, use_thinking=True, max_reasoning_depth=2) |
|
|
inference_time = time.time() - start_time |
|
|
|
|
|
assert "logits" in outputs, "Missing logits in output" |
|
|
assert "thinking_results" in outputs, "Missing thinking results" |
|
|
assert "final_tokens" in outputs, "Missing token count" |
|
|
print(f" Inference time with thinking: {inference_time:.4f}s") |
|
|
print(f" Reasoning tokens used: {outputs['final_tokens']}") |
|
|
|
|
|
print("β
Forward pass tests passed!") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Forward pass test failed: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def test_interleaved_thinking(): |
|
|
"""Test interleaved thinking mechanism.""" |
|
|
print("π§ͺ Testing interleaved thinking...") |
|
|
|
|
|
try: |
|
|
from compact_ai_model.architecture.model import CompactAIModel |
|
|
from compact_ai_model.configs.config import ModelConfig, InterleavedThinkingConfig |
|
|
|
|
|
model_config = ModelConfig(dim=128, layers=4, heads=4, vocab_size=1000) |
|
|
thinking_config = InterleavedThinkingConfig( |
|
|
max_reasoning_paths=2, |
|
|
reasoning_depth=3, |
|
|
early_stop_threshold=0.8 |
|
|
) |
|
|
|
|
|
model = CompactAIModel(model_config, thinking_config) |
|
|
model.eval() |
|
|
|
|
|
input_ids = torch.randint(0, 1000, (1, 16)) |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = model(input_ids, use_thinking=True, max_reasoning_depth=2) |
|
|
|
|
|
|
|
|
thinking_results = outputs["thinking_results"] |
|
|
assert isinstance(thinking_results, list), "Thinking results should be a list" |
|
|
|
|
|
if thinking_results: |
|
|
first_result = thinking_results[0] |
|
|
assert "path_logits" in first_result, "Missing path logits" |
|
|
assert "confidence_scores" in first_result, "Missing confidence scores" |
|
|
assert "complexity" in first_result, "Missing complexity scores" |
|
|
print(f" Generated {len(thinking_results)} thinking layers") |
|
|
print(f" Path logits shape: {first_result['path_logits'].shape}") |
|
|
|
|
|
print("β
Interleaved thinking tests passed!") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Interleaved thinking test failed: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def test_memory_usage(): |
|
|
"""Test memory usage during model operations.""" |
|
|
print("π§ͺ Testing memory usage...") |
|
|
|
|
|
try: |
|
|
from compact_ai_model.architecture.model import create_compact_model |
|
|
|
|
|
initial_memory = get_memory_usage() |
|
|
print(f" Initial memory: {initial_memory['rss_mb']:.1f}MB") |
|
|
|
|
|
model = create_compact_model("tiny") |
|
|
model.eval() |
|
|
|
|
|
model_loaded_memory = get_memory_usage() |
|
|
memory_increase = model_loaded_memory["rss_mb"] - initial_memory["rss_mb"] |
|
|
print(f" Memory increase: {memory_increase:.1f}MB") |
|
|
|
|
|
|
|
|
input_ids = torch.randint(0, 1000, (1, 32)) |
|
|
with torch.no_grad(): |
|
|
_ = model(input_ids, use_thinking=True) |
|
|
|
|
|
inference_memory = get_memory_usage() |
|
|
inference_increase = inference_memory["rss_mb"] - model_loaded_memory["rss_mb"] |
|
|
print(f" Inference memory increase: {inference_increase:.1f}MB") |
|
|
|
|
|
|
|
|
assert memory_increase < 500, f"Model memory usage too high: {memory_increase:.1f}MB" |
|
|
assert inference_increase < 100, f"Inference memory usage too high: {inference_increase:.1f}MB" |
|
|
|
|
|
print("β
Memory usage tests passed!") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Memory usage test failed: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def test_configuration(): |
|
|
"""Test configuration loading and validation.""" |
|
|
print("π§ͺ Testing configuration...") |
|
|
|
|
|
try: |
|
|
from compact_ai_model.configs.config import get_balanced_config, load_config_from_dict, save_config_to_dict |
|
|
|
|
|
|
|
|
config_obj = Config.get_balanced_config() |
|
|
configs = { |
|
|
"balanced": config_obj, |
|
|
"tiny": config_obj.get_tiny_config(), |
|
|
"large": config_obj.get_large_config(), |
|
|
} |
|
|
|
|
|
for name, config in configs.items(): |
|
|
print(f" Testing {name} config...") |
|
|
assert config.model.dim > 0, f"Invalid model dim for {name}" |
|
|
assert config.thinking.max_reasoning_paths > 0, f"Invalid reasoning paths for {name}" |
|
|
assert 0 <= config.thinking.early_stop_threshold <= 1, f"Invalid early stop threshold for {name}" |
|
|
|
|
|
|
|
|
config = get_balanced_config() |
|
|
config_dict = save_config_to_dict(config) |
|
|
loaded_config = load_config_from_dict(config_dict) |
|
|
|
|
|
assert loaded_config.model.dim == config.model.dim, "Config serialization failed" |
|
|
assert loaded_config.thinking.max_reasoning_paths == config.thinking.max_reasoning_paths, "Config serialization failed" |
|
|
|
|
|
print("β
Configuration tests passed!") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Configuration test failed: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def test_training_components(): |
|
|
"""Test training-related components.""" |
|
|
print("π§ͺ Testing training components...") |
|
|
|
|
|
try: |
|
|
from compact_ai_model.training.train import create_sample_data, TextDataset |
|
|
from compact_ai_model.architecture.model import create_compact_model |
|
|
|
|
|
|
|
|
print(" Testing sample data creation...") |
|
|
data = create_sample_data(100) |
|
|
assert len(data) == 100, "Wrong number of samples created" |
|
|
assert "text" in data[0], "Missing text field in sample data" |
|
|
|
|
|
|
|
|
print(" Testing dataset creation...") |
|
|
dataset = TextDataset(data) |
|
|
assert len(dataset) == 100, "Wrong dataset length" |
|
|
|
|
|
|
|
|
sample = dataset[0] |
|
|
assert "text" in sample, "Missing text in dataset sample" |
|
|
|
|
|
print("β
Training component tests passed!") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Training component test failed: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def test_api_endpoints(): |
|
|
"""Test API endpoints if available.""" |
|
|
print("π§ͺ Testing API endpoints...") |
|
|
|
|
|
try: |
|
|
import subprocess |
|
|
import time |
|
|
import requests |
|
|
from compact_ai_model.api.main import app |
|
|
import uvicorn |
|
|
from threading import Thread |
|
|
|
|
|
|
|
|
print(" β οΈ Skipping API endpoint tests (requires running server)") |
|
|
return True |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
print(f"β API endpoint test failed: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def run_performance_benchmarks(): |
|
|
"""Run performance benchmarks.""" |
|
|
print("π Running performance benchmarks...") |
|
|
|
|
|
try: |
|
|
from compact_ai_model.architecture.model import create_compact_model |
|
|
|
|
|
model = create_compact_model("tiny") |
|
|
model.eval() |
|
|
|
|
|
|
|
|
sequence_lengths = [32, 64, 128, 256] |
|
|
batch_sizes = [1, 4, 8] |
|
|
|
|
|
print(" Sequence Length | Batch Size | Inference Time (ms) | Memory (MB)") |
|
|
print(" ----------------|------------|---------------------|------------") |
|
|
|
|
|
for seq_len in sequence_lengths: |
|
|
for batch_size in batch_sizes: |
|
|
try: |
|
|
input_ids = torch.randint(0, 1000, (batch_size, seq_len)) |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
_ = model(input_ids, use_thinking=False) |
|
|
|
|
|
|
|
|
torch.cuda.synchronize() if torch.cuda.is_available() else None |
|
|
start_time = time.time() |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = model(input_ids, use_thinking=False) |
|
|
|
|
|
torch.cuda.synchronize() if torch.cuda.is_available() else None |
|
|
inference_time = (time.time() - start_time) * 1000 |
|
|
|
|
|
memory = get_memory_usage()["rss_mb"] |
|
|
|
|
|
print(f" {seq_len:8d} | {batch_size:10d} | {inference_time:19.2f} | {memory:10.1f}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f" {seq_len:8d} | {batch_size:10d} | Failed | N/A ") |
|
|
|
|
|
print("β
Performance benchmarks completed!") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Performance benchmark failed: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Run all tests.""" |
|
|
print("π Starting Compact AI Model Implementation Tests") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
test_results = [] |
|
|
total_memory_before = get_memory_usage() |
|
|
|
|
|
|
|
|
tests = [ |
|
|
("Model Creation", test_model_creation), |
|
|
("Forward Pass", test_forward_pass), |
|
|
("Interleaved Thinking", test_interleaved_thinking), |
|
|
("Memory Usage", test_memory_usage), |
|
|
("Configuration", test_configuration), |
|
|
("Training Components", test_training_components), |
|
|
("API Endpoints", test_api_endpoints), |
|
|
("Performance Benchmarks", run_performance_benchmarks), |
|
|
] |
|
|
|
|
|
|
|
|
for test_name, test_func in tests: |
|
|
print(f"\n㪠Running {test_name}...") |
|
|
try: |
|
|
result = test_func() |
|
|
test_results.append((test_name, result)) |
|
|
except Exception as e: |
|
|
print(f"β {test_name} crashed: {e}") |
|
|
test_results.append((test_name, False)) |
|
|
|
|
|
|
|
|
print("\n" + "=" * 60) |
|
|
print("π Test Results Summary") |
|
|
print("=" * 60) |
|
|
|
|
|
passed = 0 |
|
|
total = len(test_results) |
|
|
|
|
|
for test_name, result in test_results: |
|
|
status = "β
PASS" if result else "β FAIL" |
|
|
print(f" {test_name:30} | {status}") |
|
|
if result: |
|
|
passed += 1 |
|
|
|
|
|
print(f"\nπ Overall: {passed}/{total} tests passed") |
|
|
|
|
|
total_memory_after = get_memory_usage() |
|
|
memory_used = total_memory_after["rss_mb"] - total_memory_before["rss_mb"] |
|
|
print(f"Total memory used: {memory_used:.1f}MB") |
|
|
|
|
|
if passed == total: |
|
|
print("π All tests passed! Implementation is ready.") |
|
|
return 0 |
|
|
else: |
|
|
print("β οΈ Some tests failed. Please check the implementation.") |
|
|
return 1 |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
exit_code = main() |
|
|
sys.exit(exit_code) |