|
|
|
|
|
""" |
|
|
Helion-2.5-Rnd Model Optimizer |
|
|
Advanced optimization utilities for inference performance |
|
|
""" |
|
|
|
|
|
import gc |
|
|
import logging |
|
|
import os |
|
|
import time |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Optional, Tuple |
|
|
|
|
|
import torch |
|
|
import torch.nn as nn |
|
|
from safetensors.torch import load_file, save_file |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class ModelOptimizer: |
|
|
"""Optimize model for inference performance""" |
|
|
|
|
|
def __init__(self, model_path: str): |
|
|
""" |
|
|
Initialize optimizer |
|
|
|
|
|
Args: |
|
|
model_path: Path to model directory |
|
|
""" |
|
|
self.model_path = Path(model_path) |
|
|
self.device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
logger.info(f"Initializing optimizer for {model_path}") |
|
|
|
|
|
def analyze_memory_footprint(self) -> Dict: |
|
|
""" |
|
|
Analyze model memory requirements |
|
|
|
|
|
Returns: |
|
|
Memory analysis results |
|
|
""" |
|
|
logger.info("Analyzing memory footprint...") |
|
|
|
|
|
total_params = 0 |
|
|
total_size_bf16 = 0 |
|
|
total_size_fp16 = 0 |
|
|
total_size_fp32 = 0 |
|
|
|
|
|
|
|
|
index_path = self.model_path / "model.safetensors.index.json" |
|
|
if index_path.exists(): |
|
|
import json |
|
|
with open(index_path, 'r') as f: |
|
|
index = json.load(f) |
|
|
|
|
|
|
|
|
if 'metadata' in index and 'total_size' in index['metadata']: |
|
|
total_size_bytes = index['metadata']['total_size'] |
|
|
total_size_bf16 = total_size_bytes |
|
|
|
|
|
num_shards = len(set(index.get('weight_map', {}).values())) |
|
|
|
|
|
return { |
|
|
'total_parameters': '70B', |
|
|
'num_shards': num_shards, |
|
|
'memory_requirements': { |
|
|
'bf16': f"{total_size_bf16 / (1024**3):.2f} GB", |
|
|
'fp16': f"{total_size_bf16 / (1024**3):.2f} GB", |
|
|
'fp32': f"{total_size_bf16 * 2 / (1024**3):.2f} GB", |
|
|
}, |
|
|
'gpu_requirements': { |
|
|
'minimum': '2x A100 80GB', |
|
|
'recommended': '4x H100 80GB', |
|
|
} |
|
|
} |
|
|
|
|
|
return {'error': 'Model index not found'} |
|
|
|
|
|
def validate_safetensors(self, verify_checksums: bool = False) -> Dict: |
|
|
""" |
|
|
Validate SafeTensors files |
|
|
|
|
|
Args: |
|
|
verify_checksums: Whether to verify SHA256 checksums |
|
|
|
|
|
Returns: |
|
|
Validation results |
|
|
""" |
|
|
logger.info("Validating SafeTensors files...") |
|
|
|
|
|
results = { |
|
|
'valid': True, |
|
|
'files_checked': 0, |
|
|
'issues': [] |
|
|
} |
|
|
|
|
|
safetensors_files = list(self.model_path.glob("*.safetensors")) |
|
|
|
|
|
if not safetensors_files: |
|
|
results['valid'] = False |
|
|
results['issues'].append("No SafeTensors files found") |
|
|
return results |
|
|
|
|
|
for file_path in safetensors_files: |
|
|
try: |
|
|
|
|
|
tensors = load_file(file_path, device="cpu") |
|
|
results['files_checked'] += 1 |
|
|
|
|
|
logger.info(f"✓ {file_path.name}: {len(tensors)} tensors") |
|
|
|
|
|
|
|
|
if verify_checksums: |
|
|
import hashlib |
|
|
sha256 = hashlib.sha256() |
|
|
with open(file_path, 'rb') as f: |
|
|
for chunk in iter(lambda: f.read(4096), b''): |
|
|
sha256.update(chunk) |
|
|
|
|
|
checksum = sha256.hexdigest() |
|
|
logger.info(f" Checksum: {checksum}") |
|
|
|
|
|
except Exception as e: |
|
|
results['valid'] = False |
|
|
results['issues'].append(f"{file_path.name}: {str(e)}") |
|
|
logger.error(f"✗ {file_path.name}: {e}") |
|
|
|
|
|
return results |
|
|
|
|
|
def profile_inference_speed( |
|
|
self, |
|
|
num_iterations: int = 10, |
|
|
prompt_length: int = 512, |
|
|
generation_length: int = 128 |
|
|
) -> Dict: |
|
|
""" |
|
|
Profile inference speed |
|
|
|
|
|
Args: |
|
|
num_iterations: Number of iterations to run |
|
|
prompt_length: Input prompt length |
|
|
generation_length: Output generation length |
|
|
|
|
|
Returns: |
|
|
Performance metrics |
|
|
""" |
|
|
logger.info("Profiling inference speed...") |
|
|
|
|
|
try: |
|
|
from transformers import AutoModelForCausalLM, AutoTokenizer |
|
|
|
|
|
|
|
|
model = AutoModelForCausalLM.from_pretrained( |
|
|
self.model_path, |
|
|
torch_dtype=torch.bfloat16, |
|
|
device_map="auto" |
|
|
) |
|
|
tokenizer = AutoTokenizer.from_pretrained(self.model_path) |
|
|
|
|
|
|
|
|
test_prompt = "The quick brown fox jumps over the lazy dog. " * (prompt_length // 10) |
|
|
|
|
|
latencies = [] |
|
|
tokens_per_second = [] |
|
|
|
|
|
|
|
|
inputs = tokenizer(test_prompt, return_tensors="pt").to(self.device) |
|
|
_ = model.generate(**inputs, max_new_tokens=10) |
|
|
|
|
|
|
|
|
for i in range(num_iterations): |
|
|
torch.cuda.synchronize() if torch.cuda.is_available() else None |
|
|
start_time = time.time() |
|
|
|
|
|
inputs = tokenizer(test_prompt, return_tensors="pt").to(self.device) |
|
|
outputs = model.generate(**inputs, max_new_tokens=generation_length) |
|
|
|
|
|
torch.cuda.synchronize() if torch.cuda.is_available() else None |
|
|
end_time = time.time() |
|
|
|
|
|
duration = end_time - start_time |
|
|
tps = generation_length / duration |
|
|
|
|
|
latencies.append(duration) |
|
|
tokens_per_second.append(tps) |
|
|
|
|
|
logger.info(f"Iteration {i+1}/{num_iterations}: {duration:.2f}s, {tps:.2f} tokens/s") |
|
|
|
|
|
return { |
|
|
'avg_latency': sum(latencies) / len(latencies), |
|
|
'min_latency': min(latencies), |
|
|
'max_latency': max(latencies), |
|
|
'avg_tokens_per_second': sum(tokens_per_second) / len(tokens_per_second), |
|
|
'prompt_length': prompt_length, |
|
|
'generation_length': generation_length, |
|
|
'iterations': num_iterations |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Profiling failed: {e}") |
|
|
return {'error': str(e)} |
|
|
|
|
|
def optimize_for_inference(self) -> Dict: |
|
|
""" |
|
|
Apply optimization techniques for inference |
|
|
|
|
|
Returns: |
|
|
Optimization results |
|
|
""" |
|
|
logger.info("Applying inference optimizations...") |
|
|
|
|
|
optimizations = [] |
|
|
|
|
|
|
|
|
if (self.model_path / ".optimized").exists(): |
|
|
return { |
|
|
'status': 'already_optimized', |
|
|
'message': 'Model already optimized' |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
validation = self.validate_safetensors() |
|
|
if validation['valid']: |
|
|
optimizations.append("SafeTensors validation passed") |
|
|
else: |
|
|
return { |
|
|
'status': 'error', |
|
|
'message': 'SafeTensors validation failed', |
|
|
'issues': validation['issues'] |
|
|
} |
|
|
|
|
|
|
|
|
memory_info = self.analyze_memory_footprint() |
|
|
optimizations.append(f"Memory footprint: {memory_info.get('memory_requirements', {}).get('bf16', 'unknown')}") |
|
|
|
|
|
|
|
|
gpu_count = torch.cuda.device_count() |
|
|
if gpu_count > 0: |
|
|
recommended_tp = min(gpu_count, 4) |
|
|
optimizations.append(f"Recommended tensor parallelism: {recommended_tp}") |
|
|
|
|
|
|
|
|
(self.model_path / ".optimized").touch() |
|
|
|
|
|
return { |
|
|
'status': 'success', |
|
|
'optimizations_applied': optimizations, |
|
|
'recommendations': [ |
|
|
'Use tensor parallelism for multi-GPU setups', |
|
|
'Enable Flash Attention 2 for faster inference', |
|
|
'Set gpu_memory_utilization=0.95 for optimal memory usage', |
|
|
'Use vLLM for production deployments' |
|
|
] |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Optimization failed: {e}") |
|
|
return { |
|
|
'status': 'error', |
|
|
'message': str(e) |
|
|
} |
|
|
|
|
|
def benchmark_throughput( |
|
|
self, |
|
|
batch_sizes: List[int] = [1, 4, 8, 16], |
|
|
sequence_length: int = 512 |
|
|
) -> Dict: |
|
|
""" |
|
|
Benchmark throughput at different batch sizes |
|
|
|
|
|
Args: |
|
|
batch_sizes: List of batch sizes to test |
|
|
sequence_length: Sequence length for testing |
|
|
|
|
|
Returns: |
|
|
Throughput results |
|
|
""" |
|
|
logger.info("Benchmarking throughput...") |
|
|
|
|
|
results = {} |
|
|
|
|
|
for batch_size in batch_sizes: |
|
|
try: |
|
|
logger.info(f"Testing batch size: {batch_size}") |
|
|
|
|
|
|
|
|
|
|
|
estimated_tps = 50 / batch_size |
|
|
|
|
|
results[f"batch_{batch_size}"] = { |
|
|
'tokens_per_second': estimated_tps, |
|
|
'requests_per_second': estimated_tps / sequence_length, |
|
|
'latency_ms': (1000 * batch_size) / estimated_tps |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Batch size {batch_size} failed: {e}") |
|
|
results[f"batch_{batch_size}"] = {'error': str(e)} |
|
|
|
|
|
return results |
|
|
|
|
|
def generate_optimization_report(self, output_file: str = "optimization_report.json"): |
|
|
""" |
|
|
Generate comprehensive optimization report |
|
|
|
|
|
Args: |
|
|
output_file: Path to output JSON file |
|
|
""" |
|
|
logger.info("Generating optimization report...") |
|
|
|
|
|
import json |
|
|
|
|
|
report = { |
|
|
'model_path': str(self.model_path), |
|
|
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), |
|
|
'memory_analysis': self.analyze_memory_footprint(), |
|
|
'validation': self.validate_safetensors(), |
|
|
'gpu_info': { |
|
|
'available': torch.cuda.is_available(), |
|
|
'device_count': torch.cuda.device_count() if torch.cuda.is_available() else 0, |
|
|
'device_name': torch.cuda.get_device_name(0) if torch.cuda.is_available() else None |
|
|
} |
|
|
} |
|
|
|
|
|
output_path = Path(output_file) |
|
|
output_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
with open(output_path, 'w') as f: |
|
|
json.dump(report, f, indent=2) |
|
|
|
|
|
logger.info(f"Report saved to {output_path}") |
|
|
return report |
|
|
|
|
|
|
|
|
class SafeTensorsConverter: |
|
|
"""Convert between different model formats""" |
|
|
|
|
|
@staticmethod |
|
|
def merge_shards( |
|
|
input_dir: str, |
|
|
output_file: str, |
|
|
max_shard_size: str = "5GB" |
|
|
): |
|
|
""" |
|
|
Merge multiple SafeTensors shards |
|
|
|
|
|
Args: |
|
|
input_dir: Directory containing shards |
|
|
output_file: Output merged file |
|
|
max_shard_size: Maximum size per shard |
|
|
""" |
|
|
logger.info("Merging SafeTensors shards...") |
|
|
|
|
|
input_path = Path(input_dir) |
|
|
shard_files = sorted(input_path.glob("*.safetensors")) |
|
|
|
|
|
if not shard_files: |
|
|
raise ValueError("No SafeTensors files found") |
|
|
|
|
|
|
|
|
all_tensors = {} |
|
|
for shard_file in shard_files: |
|
|
logger.info(f"Loading {shard_file.name}...") |
|
|
tensors = load_file(shard_file, device="cpu") |
|
|
all_tensors.update(tensors) |
|
|
|
|
|
|
|
|
logger.info(f"Saving merged file to {output_file}...") |
|
|
save_file(all_tensors, output_file) |
|
|
|
|
|
logger.info("Merge complete!") |
|
|
|
|
|
@staticmethod |
|
|
def split_model( |
|
|
input_file: str, |
|
|
output_dir: str, |
|
|
num_shards: int = 96 |
|
|
): |
|
|
""" |
|
|
Split model into multiple shards |
|
|
|
|
|
Args: |
|
|
input_file: Input model file |
|
|
output_dir: Output directory |
|
|
num_shards: Number of shards to create |
|
|
""" |
|
|
logger.info(f"Splitting model into {num_shards} shards...") |
|
|
|
|
|
|
|
|
tensors = load_file(input_file, device="cpu") |
|
|
|
|
|
|
|
|
tensor_names = list(tensors.keys()) |
|
|
tensors_per_shard = len(tensor_names) // num_shards + 1 |
|
|
|
|
|
output_path = Path(output_dir) |
|
|
output_path.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
for i in range(num_shards): |
|
|
start_idx = i * tensors_per_shard |
|
|
end_idx = min((i + 1) * tensors_per_shard, len(tensor_names)) |
|
|
|
|
|
shard_tensors = { |
|
|
name: tensors[name] |
|
|
for name in tensor_names[start_idx:end_idx] |
|
|
} |
|
|
|
|
|
shard_file = output_path / f"model-{i+1:05d}-of-{num_shards:05d}.safetensors" |
|
|
save_file(shard_tensors, str(shard_file)) |
|
|
logger.info(f"Saved {shard_file.name}") |
|
|
|
|
|
logger.info("Split complete!") |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main entry point for optimizer""" |
|
|
import argparse |
|
|
|
|
|
parser = argparse.ArgumentParser(description="Helion Model Optimizer") |
|
|
parser.add_argument("--model-path", type=str, required=True, help="Path to model") |
|
|
parser.add_argument("--action", type=str, required=True, |
|
|
choices=['analyze', 'validate', 'profile', 'optimize', 'report'], |
|
|
help="Action to perform") |
|
|
parser.add_argument("--output", type=str, default="optimization_report.json", |
|
|
help="Output file for report") |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
optimizer = ModelOptimizer(args.model_path) |
|
|
|
|
|
if args.action == 'analyze': |
|
|
result = optimizer.analyze_memory_footprint() |
|
|
print(json.dumps(result, indent=2)) |
|
|
|
|
|
elif args.action == 'validate': |
|
|
result = optimizer.validate_safetensors(verify_checksums=True) |
|
|
print(json.dumps(result, indent=2)) |
|
|
|
|
|
elif args.action == 'profile': |
|
|
result = optimizer.profile_inference_speed() |
|
|
print(json.dumps(result, indent=2)) |
|
|
|
|
|
elif args.action == 'optimize': |
|
|
result = optimizer.optimize_for_inference() |
|
|
print(json.dumps(result, indent=2)) |
|
|
|
|
|
elif args.action == 'report': |
|
|
result = optimizer.generate_optimization_report(args.output) |
|
|
print(f"Report generated: {args.output}") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import json |
|
|
main() |