| """ |
| Helion-OSC Comprehensive Benchmark Suite |
| Performance benchmarking and comparison with other models |
| """ |
|
|
| import torch |
| import time |
| import psutil |
| import numpy as np |
| from typing import Dict, List, Any, Optional |
| from dataclasses import dataclass, asdict |
| import json |
| import logging |
| from transformers import AutoTokenizer, AutoModelForCausalLM |
| from tqdm import tqdm |
| import pandas as pd |
| import matplotlib.pyplot as plt |
| import seaborn as sns |
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class BenchmarkResult: |
| """Single benchmark result""" |
| model_name: str |
| task: str |
| prompt_length: int |
| generation_length: int |
| temperature: float |
| inference_time: float |
| tokens_per_second: float |
| memory_used_mb: float |
| gpu_memory_mb: Optional[float] |
| success: bool |
| error: Optional[str] = None |
|
|
|
|
| @dataclass |
| class AggregatedResults: |
| """Aggregated benchmark results""" |
| model_name: str |
| total_tests: int |
| successful_tests: int |
| failed_tests: int |
| avg_inference_time: float |
| avg_tokens_per_second: float |
| avg_memory_mb: float |
| min_inference_time: float |
| max_inference_time: float |
| std_inference_time: float |
|
|
|
|
| class PerformanceBenchmark: |
| """Performance benchmarking utilities""" |
| |
| def __init__(self, model_name: str = "DeepXR/Helion-OSC"): |
| self.model_name = model_name |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" |
| |
| logger.info(f"Loading model: {model_name}") |
| self.tokenizer = AutoTokenizer.from_pretrained(model_name) |
| self.model = AutoModelForCausalLM.from_pretrained( |
| model_name, |
| torch_dtype=torch.bfloat16 if self.device == "cuda" else torch.float32, |
| device_map="auto" if self.device == "cuda" else None |
| ) |
| |
| if self.device == "cpu": |
| self.model = self.model.to(self.device) |
| |
| self.model.eval() |
| self.results: List[BenchmarkResult] = [] |
| |
| def get_memory_usage(self) -> tuple: |
| """Get current memory usage""" |
| process = psutil.Process() |
| ram_mb = process.memory_info().rss / 1024 / 1024 |
| |
| gpu_mb = None |
| if torch.cuda.is_available(): |
| gpu_mb = torch.cuda.memory_allocated() / 1024 / 1024 |
| |
| return ram_mb, gpu_mb |
| |
| def benchmark_inference( |
| self, |
| prompt: str, |
| task: str, |
| max_length: int = 512, |
| temperature: float = 0.7, |
| num_runs: int = 1 |
| ) -> List[BenchmarkResult]: |
| """Benchmark inference performance""" |
| run_results = [] |
| |
| for run in range(num_runs): |
| try: |
| |
| inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device) |
| prompt_length = inputs.input_ids.shape[1] |
| |
| |
| if run == 0 and self.device == "cuda": |
| with torch.no_grad(): |
| _ = self.model.generate(**inputs, max_length=prompt_length + 10) |
| torch.cuda.synchronize() |
| |
| |
| if self.device == "cuda": |
| torch.cuda.empty_cache() |
| |
| |
| ram_before, gpu_before = self.get_memory_usage() |
| |
| |
| start_time = time.time() |
| |
| with torch.no_grad(): |
| outputs = self.model.generate( |
| **inputs, |
| max_length=max_length, |
| temperature=temperature, |
| do_sample=temperature > 0, |
| pad_token_id=self.tokenizer.eos_token_id |
| ) |
| |
| if self.device == "cuda": |
| torch.cuda.synchronize() |
| |
| end_time = time.time() |
| |
| |
| ram_after, gpu_after = self.get_memory_usage() |
| |
| |
| inference_time = end_time - start_time |
| generation_length = outputs.shape[1] - prompt_length |
| tokens_per_second = generation_length / inference_time if inference_time > 0 else 0 |
| memory_used = ram_after - ram_before |
| gpu_memory = (gpu_after - gpu_before) if gpu_after and gpu_before else None |
| |
| result = BenchmarkResult( |
| model_name=self.model_name, |
| task=task, |
| prompt_length=prompt_length, |
| generation_length=generation_length, |
| temperature=temperature, |
| inference_time=inference_time, |
| tokens_per_second=tokens_per_second, |
| memory_used_mb=memory_used, |
| gpu_memory_mb=gpu_memory, |
| success=True |
| ) |
| |
| run_results.append(result) |
| self.results.append(result) |
| |
| except Exception as e: |
| logger.error(f"Benchmark failed: {e}") |
| result = BenchmarkResult( |
| model_name=self.model_name, |
| task=task, |
| prompt_length=0, |
| generation_length=0, |
| temperature=temperature, |
| inference_time=0, |
| tokens_per_second=0, |
| memory_used_mb=0, |
| gpu_memory_mb=None, |
| success=False, |
| error=str(e) |
| ) |
| run_results.append(result) |
| self.results.append(result) |
| |
| return run_results |
| |
| def run_benchmark_suite(self) -> List[BenchmarkResult]: |
| """Run comprehensive benchmark suite""" |
| logger.info("Starting comprehensive benchmark suite...") |
| |
| test_cases = [ |
| { |
| "prompt": "def fibonacci(n):", |
| "task": "simple_function", |
| "max_length": 256, |
| "temperature": 0.7 |
| }, |
| { |
| "prompt": "Write a Python class for a binary search tree with insert, search, and delete methods:", |
| "task": "complex_class", |
| "max_length": 1024, |
| "temperature": 0.7 |
| }, |
| { |
| "prompt": "Implement quicksort algorithm in Python with detailed comments:", |
| "task": "algorithm", |
| "max_length": 512, |
| "temperature": 0.5 |
| }, |
| { |
| "prompt": "Solve: What is the derivative of f(x) = x^3 + 2x^2 - 5x + 3?", |
| "task": "math_simple", |
| "max_length": 256, |
| "temperature": 0.3 |
| }, |
| { |
| "prompt": "Prove using mathematical induction that the sum of first n natural numbers is n(n+1)/2:", |
| "task": "math_proof", |
| "max_length": 1024, |
| "temperature": 0.2 |
| }, |
| { |
| "prompt": "Design a RESTful API for a todo list application with proper documentation:", |
| "task": "system_design", |
| "max_length": 2048, |
| "temperature": 0.7 |
| }, |
| ] |
| |
| all_results = [] |
| |
| for test_case in tqdm(test_cases, desc="Running benchmarks"): |
| results = self.benchmark_inference( |
| prompt=test_case["prompt"], |
| task=test_case["task"], |
| max_length=test_case["max_length"], |
| temperature=test_case["temperature"], |
| num_runs=3 |
| ) |
| all_results.extend(results) |
| |
| logger.info("Benchmark suite completed!") |
| return all_results |
| |
| def aggregate_results(self) -> AggregatedResults: |
| """Aggregate benchmark results""" |
| if not self.results: |
| raise ValueError("No benchmark results available") |
| |
| successful = [r for r in self.results if r.success] |
| |
| if not successful: |
| raise ValueError("No successful benchmark runs") |
| |
| inference_times = [r.inference_time for r in successful] |
| tokens_per_sec = [r.tokens_per_second for r in successful] |
| memory_usage = [r.memory_used_mb for r in successful] |
| |
| return AggregatedResults( |
| model_name=self.model_name, |
| total_tests=len(self.results), |
| successful_tests=len(successful), |
| failed_tests=len(self.results) - len(successful), |
| avg_inference_time=np.mean(inference_times), |
| avg_tokens_per_second=np.mean(tokens_per_sec), |
| avg_memory_mb=np.mean(memory_usage), |
| min_inference_time=np.min(inference_times), |
| max_inference_time=np.max(inference_times), |
| std_inference_time=np.std(inference_times) |
| ) |
| |
| def save_results(self, output_file: str = "benchmark_results.json"): |
| """Save benchmark results to file""" |
| results_dict = [asdict(r) for r in self.results] |
| |
| with open(output_file, 'w') as f: |
| json.dump(results_dict, f, indent=2) |
| |
| logger.info(f"Results saved to {output_file}") |
| |
| def generate_report(self, output_file: str = "benchmark_report.txt"): |
| """Generate human-readable benchmark report""" |
| agg = self.aggregate_results() |
| |
| report = f""" |
| {'='*80} |
| HELION-OSC BENCHMARK REPORT |
| {'='*80} |
| |
| Model: {agg.model_name} |
| Device: {self.device} |
| |
| OVERALL STATISTICS |
| {'='*80} |
| Total Tests: {agg.total_tests} |
| Successful: {agg.successful_tests} |
| Failed: {agg.failed_tests} |
| Success Rate: {(agg.successful_tests/agg.total_tests)*100:.2f}% |
| |
| PERFORMANCE METRICS |
| {'='*80} |
| Average Inference Time: {agg.avg_inference_time:.4f} seconds |
| Min Inference Time: {agg.min_inference_time:.4f} seconds |
| Max Inference Time: {agg.max_inference_time:.4f} seconds |
| Std Inference Time: {agg.std_inference_time:.4f} seconds |
| |
| Average Tokens/Second: {agg.avg_tokens_per_second:.2f} |
| Average Memory Usage: {agg.avg_memory_mb:.2f} MB |
| |
| PER-TASK BREAKDOWN |
| {'='*80} |
| """ |
| |
| |
| df = pd.DataFrame([asdict(r) for r in self.results if r.success]) |
| if not df.empty: |
| task_stats = df.groupby('task').agg({ |
| 'inference_time': ['mean', 'min', 'max'], |
| 'tokens_per_second': 'mean', |
| 'memory_used_mb': 'mean' |
| }) |
| |
| report += task_stats.to_string() |
| |
| report += f"\n\n{'='*80}\n" |
| |
| with open(output_file, 'w') as f: |
| f.write(report) |
| |
| logger.info(f"Report saved to {output_file}") |
| print(report) |
| |
| def plot_results(self, output_dir: str = "./benchmark_plots"): |
| """Generate visualization plots""" |
| import os |
| os.makedirs(output_dir, exist_ok=True) |
| |
| df = pd.DataFrame([asdict(r) for r in self.results if r.success]) |
| |
| if df.empty: |
| logger.warning("No data to plot") |
| return |
| |
| |
| sns.set_style("whitegrid") |
| |
| |
| plt.figure(figsize=(12, 6)) |
| sns.barplot(data=df, x='task', y='inference_time') |
| plt.xticks(rotation=45, ha='right') |
| plt.title('Inference Time by Task') |
| plt.ylabel('Time (seconds)') |
| plt.tight_layout() |
| plt.savefig(f"{output_dir}/inference_time_by_task.png", dpi=300) |
| plt.close() |
| |
| |
| plt.figure(figsize=(12, 6)) |
| sns.barplot(data=df, x='task', y='tokens_per_second') |
| plt.xticks(rotation=45, ha='right') |
| plt.title('Tokens Per Second by Task') |
| plt.ylabel('Tokens/Second') |
| plt.tight_layout() |
| plt.savefig(f"{output_dir}/tokens_per_second_by_task.png", dpi=300) |
| plt.close() |
| |
| |
| plt.figure(figsize=(12, 6)) |
| sns.barplot(data=df, x='task', y='memory_used_mb') |
| plt.xticks(rotation=45, ha='right') |
| plt.title('Memory Usage by Task') |
| plt.ylabel('Memory (MB)') |
| plt.tight_layout() |
| plt.savefig(f"{output_dir}/memory_usage_by_task.png", dpi=300) |
| plt.close() |
| |
| |
| plt.figure(figsize=(10, 6)) |
| sns.scatterplot(data=df, x='generation_length', y='inference_time', hue='task', s=100) |
| plt.title('Generation Length vs Inference Time') |
| plt.xlabel('Generation Length (tokens)') |
| plt.ylabel('Inference Time (seconds)') |
| plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left') |
| plt.tight_layout() |
| plt.savefig(f"{output_dir}/length_vs_time.png", dpi=300) |
| plt.close() |
| |
| logger.info(f"Plots saved to {output_dir}") |
|
|
|
|
| class ComparisonBenchmark: |
| """Compare multiple models""" |
| |
| def __init__(self, model_names: List[str]): |
| self.model_names = model_names |
| self.benchmarks = {} |
| |
| def run_comparison(self): |
| """Run benchmarks for all models""" |
| for model_name in self.model_names: |
| logger.info(f"\nBenchmarking {model_name}...") |
| try: |
| benchmark = PerformanceBenchmark(model_name) |
| benchmark.run_benchmark_suite() |
| self.benchmarks[model_name] = benchmark |
| except Exception as e: |
| logger.error(f"Failed to benchmark {model_name}: {e}") |
| |
| def generate_comparison_report(self, output_file: str = "comparison_report.txt"): |
| """Generate comparison report""" |
| report = f""" |
| {'='*80} |
| MODEL COMPARISON REPORT |
| {'='*80} |
| |
| """ |
| |
| for model_name, benchmark in self.benchmarks.items(): |
| agg = benchmark.aggregate_results() |
| report += f""" |
| Model: {model_name} |
| {'='*80} |
| Avg Inference Time: {agg.avg_inference_time:.4f}s |
| Avg Tokens/Second: {agg.avg_tokens_per_second:.2f} |
| Avg Memory Usage: {agg.avg_memory_mb:.2f} MB |
| Success Rate: {(agg.successful_tests/agg.total_tests)*100:.2f}% |
| |
| """ |
| |
| with open(output_file, 'w') as f: |
| f.write(report) |
| |
| print(report) |
| logger.info(f"Comparison report saved to {output_file}") |
|
|
|
|
| def main(): |
| """Main benchmark script""" |
| import argparse |
| |
| parser = argparse.ArgumentParser(description="Benchmark Helion-OSC model") |
| parser.add_argument("--model", type=str, default="DeepXR/Helion-OSC") |
| parser.add_argument("--output-dir", type=str, default="./benchmark_results") |
| parser.add_argument("--compare", nargs='+', help="List of models to compare") |
| parser.add_argument("--plot", action="store_true", help="Generate plots") |
| |
| args = parser.parse_args() |
| |
| import os |
| os.makedirs(args.output_dir, exist_ok=True) |
| |
| if args.compare: |
| |
| comparison = ComparisonBenchmark(args.compare) |
| comparison.run_comparison() |
| comparison.generate_comparison_report( |
| os.path.join(args.output_dir, "comparison_report.txt") |
| ) |
| else: |
| |
| benchmark = PerformanceBenchmark(args.model) |
| benchmark.run_benchmark_suite() |
| benchmark.save_results(os.path.join(args.output_dir, "benchmark_results.json")) |
| benchmark.generate_report(os.path.join(args.output_dir, "benchmark_report.txt")) |
| |
| if args.plot: |
| benchmark.plot_results(os.path.join(args.output_dir, "plots")) |
|
|
|
|
| if __name__ == "__main__": |
| main() |