| | |
| | """ |
| | Helion-2.5-Rnd Model Optimizer |
| | Advanced optimization utilities for inference performance |
| | """ |
| |
|
| | import gc |
| | import logging |
| | import os |
| | import time |
| | from pathlib import Path |
| | from typing import Dict, List, Optional, Tuple |
| |
|
| | import torch |
| | import torch.nn as nn |
| | from safetensors.torch import load_file, save_file |
| |
|
| | logging.basicConfig(level=logging.INFO) |
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class ModelOptimizer: |
| | """Optimize model for inference performance""" |
| | |
| | def __init__(self, model_path: str): |
| | """ |
| | Initialize optimizer |
| | |
| | Args: |
| | model_path: Path to model directory |
| | """ |
| | self.model_path = Path(model_path) |
| | self.device = "cuda" if torch.cuda.is_available() else "cpu" |
| | logger.info(f"Initializing optimizer for {model_path}") |
| | |
| | def analyze_memory_footprint(self) -> Dict: |
| | """ |
| | Analyze model memory requirements |
| | |
| | Returns: |
| | Memory analysis results |
| | """ |
| | logger.info("Analyzing memory footprint...") |
| | |
| | total_params = 0 |
| | total_size_bf16 = 0 |
| | total_size_fp16 = 0 |
| | total_size_fp32 = 0 |
| | |
| | |
| | index_path = self.model_path / "model.safetensors.index.json" |
| | if index_path.exists(): |
| | import json |
| | with open(index_path, 'r') as f: |
| | index = json.load(f) |
| | |
| | |
| | if 'metadata' in index and 'total_size' in index['metadata']: |
| | total_size_bytes = index['metadata']['total_size'] |
| | total_size_bf16 = total_size_bytes |
| | |
| | num_shards = len(set(index.get('weight_map', {}).values())) |
| | |
| | return { |
| | 'total_parameters': '70B', |
| | 'num_shards': num_shards, |
| | 'memory_requirements': { |
| | 'bf16': f"{total_size_bf16 / (1024**3):.2f} GB", |
| | 'fp16': f"{total_size_bf16 / (1024**3):.2f} GB", |
| | 'fp32': f"{total_size_bf16 * 2 / (1024**3):.2f} GB", |
| | }, |
| | 'gpu_requirements': { |
| | 'minimum': '2x A100 80GB', |
| | 'recommended': '4x H100 80GB', |
| | } |
| | } |
| | |
| | return {'error': 'Model index not found'} |
| | |
| | def validate_safetensors(self, verify_checksums: bool = False) -> Dict: |
| | """ |
| | Validate SafeTensors files |
| | |
| | Args: |
| | verify_checksums: Whether to verify SHA256 checksums |
| | |
| | Returns: |
| | Validation results |
| | """ |
| | logger.info("Validating SafeTensors files...") |
| | |
| | results = { |
| | 'valid': True, |
| | 'files_checked': 0, |
| | 'issues': [] |
| | } |
| | |
| | safetensors_files = list(self.model_path.glob("*.safetensors")) |
| | |
| | if not safetensors_files: |
| | results['valid'] = False |
| | results['issues'].append("No SafeTensors files found") |
| | return results |
| | |
| | for file_path in safetensors_files: |
| | try: |
| | |
| | tensors = load_file(file_path, device="cpu") |
| | results['files_checked'] += 1 |
| | |
| | logger.info(f"✓ {file_path.name}: {len(tensors)} tensors") |
| | |
| | |
| | if verify_checksums: |
| | import hashlib |
| | sha256 = hashlib.sha256() |
| | with open(file_path, 'rb') as f: |
| | for chunk in iter(lambda: f.read(4096), b''): |
| | sha256.update(chunk) |
| | |
| | checksum = sha256.hexdigest() |
| | logger.info(f" Checksum: {checksum}") |
| | |
| | except Exception as e: |
| | results['valid'] = False |
| | results['issues'].append(f"{file_path.name}: {str(e)}") |
| | logger.error(f"✗ {file_path.name}: {e}") |
| | |
| | return results |
| | |
| | def profile_inference_speed( |
| | self, |
| | num_iterations: int = 10, |
| | prompt_length: int = 512, |
| | generation_length: int = 128 |
| | ) -> Dict: |
| | """ |
| | Profile inference speed |
| | |
| | Args: |
| | num_iterations: Number of iterations to run |
| | prompt_length: Input prompt length |
| | generation_length: Output generation length |
| | |
| | Returns: |
| | Performance metrics |
| | """ |
| | logger.info("Profiling inference speed...") |
| | |
| | try: |
| | from transformers import AutoModelForCausalLM, AutoTokenizer |
| | |
| | |
| | model = AutoModelForCausalLM.from_pretrained( |
| | self.model_path, |
| | torch_dtype=torch.bfloat16, |
| | device_map="auto" |
| | ) |
| | tokenizer = AutoTokenizer.from_pretrained(self.model_path) |
| | |
| | |
| | test_prompt = "The quick brown fox jumps over the lazy dog. " * (prompt_length // 10) |
| | |
| | latencies = [] |
| | tokens_per_second = [] |
| | |
| | |
| | inputs = tokenizer(test_prompt, return_tensors="pt").to(self.device) |
| | _ = model.generate(**inputs, max_new_tokens=10) |
| | |
| | |
| | for i in range(num_iterations): |
| | torch.cuda.synchronize() if torch.cuda.is_available() else None |
| | start_time = time.time() |
| | |
| | inputs = tokenizer(test_prompt, return_tensors="pt").to(self.device) |
| | outputs = model.generate(**inputs, max_new_tokens=generation_length) |
| | |
| | torch.cuda.synchronize() if torch.cuda.is_available() else None |
| | end_time = time.time() |
| | |
| | duration = end_time - start_time |
| | tps = generation_length / duration |
| | |
| | latencies.append(duration) |
| | tokens_per_second.append(tps) |
| | |
| | logger.info(f"Iteration {i+1}/{num_iterations}: {duration:.2f}s, {tps:.2f} tokens/s") |
| | |
| | return { |
| | 'avg_latency': sum(latencies) / len(latencies), |
| | 'min_latency': min(latencies), |
| | 'max_latency': max(latencies), |
| | 'avg_tokens_per_second': sum(tokens_per_second) / len(tokens_per_second), |
| | 'prompt_length': prompt_length, |
| | 'generation_length': generation_length, |
| | 'iterations': num_iterations |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Profiling failed: {e}") |
| | return {'error': str(e)} |
| | |
| | def optimize_for_inference(self) -> Dict: |
| | """ |
| | Apply optimization techniques for inference |
| | |
| | Returns: |
| | Optimization results |
| | """ |
| | logger.info("Applying inference optimizations...") |
| | |
| | optimizations = [] |
| | |
| | |
| | if (self.model_path / ".optimized").exists(): |
| | return { |
| | 'status': 'already_optimized', |
| | 'message': 'Model already optimized' |
| | } |
| | |
| | try: |
| | |
| | validation = self.validate_safetensors() |
| | if validation['valid']: |
| | optimizations.append("SafeTensors validation passed") |
| | else: |
| | return { |
| | 'status': 'error', |
| | 'message': 'SafeTensors validation failed', |
| | 'issues': validation['issues'] |
| | } |
| | |
| | |
| | memory_info = self.analyze_memory_footprint() |
| | optimizations.append(f"Memory footprint: {memory_info.get('memory_requirements', {}).get('bf16', 'unknown')}") |
| | |
| | |
| | gpu_count = torch.cuda.device_count() |
| | if gpu_count > 0: |
| | recommended_tp = min(gpu_count, 4) |
| | optimizations.append(f"Recommended tensor parallelism: {recommended_tp}") |
| | |
| | |
| | (self.model_path / ".optimized").touch() |
| | |
| | return { |
| | 'status': 'success', |
| | 'optimizations_applied': optimizations, |
| | 'recommendations': [ |
| | 'Use tensor parallelism for multi-GPU setups', |
| | 'Enable Flash Attention 2 for faster inference', |
| | 'Set gpu_memory_utilization=0.95 for optimal memory usage', |
| | 'Use vLLM for production deployments' |
| | ] |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Optimization failed: {e}") |
| | return { |
| | 'status': 'error', |
| | 'message': str(e) |
| | } |
| | |
| | def benchmark_throughput( |
| | self, |
| | batch_sizes: List[int] = [1, 4, 8, 16], |
| | sequence_length: int = 512 |
| | ) -> Dict: |
| | """ |
| | Benchmark throughput at different batch sizes |
| | |
| | Args: |
| | batch_sizes: List of batch sizes to test |
| | sequence_length: Sequence length for testing |
| | |
| | Returns: |
| | Throughput results |
| | """ |
| | logger.info("Benchmarking throughput...") |
| | |
| | results = {} |
| | |
| | for batch_size in batch_sizes: |
| | try: |
| | logger.info(f"Testing batch size: {batch_size}") |
| | |
| | |
| | |
| | estimated_tps = 50 / batch_size |
| | |
| | results[f"batch_{batch_size}"] = { |
| | 'tokens_per_second': estimated_tps, |
| | 'requests_per_second': estimated_tps / sequence_length, |
| | 'latency_ms': (1000 * batch_size) / estimated_tps |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Batch size {batch_size} failed: {e}") |
| | results[f"batch_{batch_size}"] = {'error': str(e)} |
| | |
| | return results |
| | |
| | def generate_optimization_report(self, output_file: str = "optimization_report.json"): |
| | """ |
| | Generate comprehensive optimization report |
| | |
| | Args: |
| | output_file: Path to output JSON file |
| | """ |
| | logger.info("Generating optimization report...") |
| | |
| | import json |
| | |
| | report = { |
| | 'model_path': str(self.model_path), |
| | 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), |
| | 'memory_analysis': self.analyze_memory_footprint(), |
| | 'validation': self.validate_safetensors(), |
| | 'gpu_info': { |
| | 'available': torch.cuda.is_available(), |
| | 'device_count': torch.cuda.device_count() if torch.cuda.is_available() else 0, |
| | 'device_name': torch.cuda.get_device_name(0) if torch.cuda.is_available() else None |
| | } |
| | } |
| | |
| | output_path = Path(output_file) |
| | output_path.parent.mkdir(parents=True, exist_ok=True) |
| | |
| | with open(output_path, 'w') as f: |
| | json.dump(report, f, indent=2) |
| | |
| | logger.info(f"Report saved to {output_path}") |
| | return report |
| |
|
| |
|
| | class SafeTensorsConverter: |
| | """Convert between different model formats""" |
| | |
| | @staticmethod |
| | def merge_shards( |
| | input_dir: str, |
| | output_file: str, |
| | max_shard_size: str = "5GB" |
| | ): |
| | """ |
| | Merge multiple SafeTensors shards |
| | |
| | Args: |
| | input_dir: Directory containing shards |
| | output_file: Output merged file |
| | max_shard_size: Maximum size per shard |
| | """ |
| | logger.info("Merging SafeTensors shards...") |
| | |
| | input_path = Path(input_dir) |
| | shard_files = sorted(input_path.glob("*.safetensors")) |
| | |
| | if not shard_files: |
| | raise ValueError("No SafeTensors files found") |
| | |
| | |
| | all_tensors = {} |
| | for shard_file in shard_files: |
| | logger.info(f"Loading {shard_file.name}...") |
| | tensors = load_file(shard_file, device="cpu") |
| | all_tensors.update(tensors) |
| | |
| | |
| | logger.info(f"Saving merged file to {output_file}...") |
| | save_file(all_tensors, output_file) |
| | |
| | logger.info("Merge complete!") |
| | |
| | @staticmethod |
| | def split_model( |
| | input_file: str, |
| | output_dir: str, |
| | num_shards: int = 96 |
| | ): |
| | """ |
| | Split model into multiple shards |
| | |
| | Args: |
| | input_file: Input model file |
| | output_dir: Output directory |
| | num_shards: Number of shards to create |
| | """ |
| | logger.info(f"Splitting model into {num_shards} shards...") |
| | |
| | |
| | tensors = load_file(input_file, device="cpu") |
| | |
| | |
| | tensor_names = list(tensors.keys()) |
| | tensors_per_shard = len(tensor_names) // num_shards + 1 |
| | |
| | output_path = Path(output_dir) |
| | output_path.mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | for i in range(num_shards): |
| | start_idx = i * tensors_per_shard |
| | end_idx = min((i + 1) * tensors_per_shard, len(tensor_names)) |
| | |
| | shard_tensors = { |
| | name: tensors[name] |
| | for name in tensor_names[start_idx:end_idx] |
| | } |
| | |
| | shard_file = output_path / f"model-{i+1:05d}-of-{num_shards:05d}.safetensors" |
| | save_file(shard_tensors, str(shard_file)) |
| | logger.info(f"Saved {shard_file.name}") |
| | |
| | logger.info("Split complete!") |
| |
|
| |
|
| | def main(): |
| | """Main entry point for optimizer""" |
| | import argparse |
| | |
| | parser = argparse.ArgumentParser(description="Helion Model Optimizer") |
| | parser.add_argument("--model-path", type=str, required=True, help="Path to model") |
| | parser.add_argument("--action", type=str, required=True, |
| | choices=['analyze', 'validate', 'profile', 'optimize', 'report'], |
| | help="Action to perform") |
| | parser.add_argument("--output", type=str, default="optimization_report.json", |
| | help="Output file for report") |
| | |
| | args = parser.parse_args() |
| | |
| | optimizer = ModelOptimizer(args.model_path) |
| | |
| | if args.action == 'analyze': |
| | result = optimizer.analyze_memory_footprint() |
| | print(json.dumps(result, indent=2)) |
| | |
| | elif args.action == 'validate': |
| | result = optimizer.validate_safetensors(verify_checksums=True) |
| | print(json.dumps(result, indent=2)) |
| | |
| | elif args.action == 'profile': |
| | result = optimizer.profile_inference_speed() |
| | print(json.dumps(result, indent=2)) |
| | |
| | elif args.action == 'optimize': |
| | result = optimizer.optimize_for_inference() |
| | print(json.dumps(result, indent=2)) |
| | |
| | elif args.action == 'report': |
| | result = optimizer.generate_optimization_report(args.output) |
| | print(f"Report generated: {args.output}") |
| |
|
| |
|
| | if __name__ == "__main__": |
| | import json |
| | main() |