| """ |
| Kirim OSS Safeguard R1 10B - Benchmarking Script |
| Performance testing and evaluation for the model |
| """ |
|
|
| import time |
| import torch |
| import numpy as np |
| from typing import List, Dict, Tuple |
| import json |
| from datetime import datetime |
| from dataclasses import dataclass, asdict |
| import argparse |
|
|
|
|
| @dataclass |
| class BenchmarkResult: |
| """Results from benchmark test""" |
| test_name: str |
| num_samples: int |
| total_time: float |
| avg_time: float |
| median_time: float |
| p95_time: float |
| p99_time: float |
| tokens_per_second: float |
| memory_used_gb: float |
| success_rate: float |
| errors: int |
|
|
|
|
| class ModelBenchmark: |
| """Benchmark suite for model performance""" |
| |
| def __init__(self, model, tokenizer, device='cuda'): |
| """ |
| Initialize benchmark suite |
| |
| Args: |
| model: Model instance |
| tokenizer: Tokenizer instance |
| device: Device to run on |
| """ |
| self.model = model |
| self.tokenizer = tokenizer |
| self.device = device |
| |
| def benchmark_latency( |
| self, |
| prompts: List[str], |
| max_tokens: int = 256, |
| num_runs: int = 10 |
| ) -> BenchmarkResult: |
| """ |
| Benchmark generation latency |
| |
| Args: |
| prompts: List of test prompts |
| max_tokens: Maximum tokens to generate |
| num_runs: Number of runs per prompt |
| |
| Returns: |
| BenchmarkResult with latency statistics |
| """ |
| print(f"Running latency benchmark ({num_runs} runs per prompt)...") |
| |
| latencies = [] |
| errors = 0 |
| |
| for prompt in prompts: |
| for _ in range(num_runs): |
| try: |
| inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device) |
| |
| start_time = time.time() |
| |
| with torch.no_grad(): |
| outputs = self.model.generate( |
| **inputs, |
| max_new_tokens=max_tokens, |
| do_sample=False |
| ) |
| |
| |
| if self.device == 'cuda': |
| torch.cuda.synchronize() |
| |
| end_time = time.time() |
| latency = end_time - start_time |
| latencies.append(latency) |
| |
| except Exception as e: |
| print(f"Error: {e}") |
| errors += 1 |
| |
| latencies = np.array(latencies) |
| |
| |
| if self.device == 'cuda': |
| memory_used = torch.cuda.max_memory_allocated() / (1024 ** 3) |
| else: |
| memory_used = 0.0 |
| |
| |
| avg_tokens = max_tokens * 0.8 |
| tokens_per_second = avg_tokens / np.mean(latencies) |
| |
| return BenchmarkResult( |
| test_name="latency", |
| num_samples=len(prompts) * num_runs, |
| total_time=np.sum(latencies), |
| avg_time=np.mean(latencies), |
| median_time=np.median(latencies), |
| p95_time=np.percentile(latencies, 95), |
| p99_time=np.percentile(latencies, 99), |
| tokens_per_second=tokens_per_second, |
| memory_used_gb=memory_used, |
| success_rate=(len(latencies) / (len(prompts) * num_runs)) * 100, |
| errors=errors |
| ) |
| |
| def benchmark_throughput( |
| self, |
| prompts: List[str], |
| max_tokens: int = 256, |
| batch_sizes: List[int] = [1, 2, 4, 8] |
| ) -> List[BenchmarkResult]: |
| """ |
| Benchmark throughput with different batch sizes |
| |
| Args: |
| prompts: List of test prompts |
| max_tokens: Maximum tokens to generate |
| batch_sizes: List of batch sizes to test |
| |
| Returns: |
| List of BenchmarkResults for each batch size |
| """ |
| print(f"Running throughput benchmark...") |
| results = [] |
| |
| for batch_size in batch_sizes: |
| print(f"Testing batch size: {batch_size}") |
| |
| latencies = [] |
| errors = 0 |
| total_tokens = 0 |
| |
| |
| for i in range(0, len(prompts), batch_size): |
| batch = prompts[i:i + batch_size] |
| |
| try: |
| inputs = self.tokenizer( |
| batch, |
| return_tensors="pt", |
| padding=True, |
| truncation=True |
| ).to(self.device) |
| |
| start_time = time.time() |
| |
| with torch.no_grad(): |
| outputs = self.model.generate( |
| **inputs, |
| max_new_tokens=max_tokens, |
| do_sample=False, |
| pad_token_id=self.tokenizer.eos_token_id |
| ) |
| |
| if self.device == 'cuda': |
| torch.cuda.synchronize() |
| |
| end_time = time.time() |
| latency = end_time - start_time |
| latencies.append(latency) |
| |
| |
| total_tokens += outputs.shape[0] * outputs.shape[1] |
| |
| except Exception as e: |
| print(f"Error: {e}") |
| errors += 1 |
| |
| latencies = np.array(latencies) |
| |
| |
| if self.device == 'cuda': |
| memory_used = torch.cuda.max_memory_allocated() / (1024 ** 3) |
| else: |
| memory_used = 0.0 |
| |
| |
| tokens_per_second = total_tokens / np.sum(latencies) |
| |
| result = BenchmarkResult( |
| test_name=f"throughput_batch_{batch_size}", |
| num_samples=len(prompts), |
| total_time=np.sum(latencies), |
| avg_time=np.mean(latencies), |
| median_time=np.median(latencies), |
| p95_time=np.percentile(latencies, 95), |
| p99_time=np.percentile(latencies, 99), |
| tokens_per_second=tokens_per_second, |
| memory_used_gb=memory_used, |
| success_rate=((len(latencies) * batch_size) / len(prompts)) * 100, |
| errors=errors |
| ) |
| results.append(result) |
| |
| return results |
| |
| def benchmark_memory( |
| self, |
| prompt_lengths: List[int] = [128, 256, 512, 1024, 2048], |
| max_tokens: int = 256 |
| ) -> List[BenchmarkResult]: |
| """ |
| Benchmark memory usage with different prompt lengths |
| |
| Args: |
| prompt_lengths: List of prompt lengths to test |
| max_tokens: Maximum tokens to generate |
| |
| Returns: |
| List of BenchmarkResults for each prompt length |
| """ |
| print(f"Running memory benchmark...") |
| results = [] |
| |
| for length in prompt_lengths: |
| print(f"Testing prompt length: {length}") |
| |
| |
| test_text = "test " * (length // 5) |
| prompt = test_text[:length] |
| |
| if self.device == 'cuda': |
| torch.cuda.reset_peak_memory_stats() |
| |
| try: |
| inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device) |
| |
| start_time = time.time() |
| |
| with torch.no_grad(): |
| outputs = self.model.generate( |
| **inputs, |
| max_new_tokens=max_tokens, |
| do_sample=False |
| ) |
| |
| if self.device == 'cuda': |
| torch.cuda.synchronize() |
| |
| end_time = time.time() |
| latency = end_time - start_time |
| |
| |
| if self.device == 'cuda': |
| memory_used = torch.cuda.max_memory_allocated() / (1024 ** 3) |
| else: |
| memory_used = 0.0 |
| |
| result = BenchmarkResult( |
| test_name=f"memory_length_{length}", |
| num_samples=1, |
| total_time=latency, |
| avg_time=latency, |
| median_time=latency, |
| p95_time=latency, |
| p99_time=latency, |
| tokens_per_second=max_tokens / latency, |
| memory_used_gb=memory_used, |
| success_rate=100.0, |
| errors=0 |
| ) |
| results.append(result) |
| |
| except Exception as e: |
| print(f"Error at length {length}: {e}") |
| |
| return results |
| |
| def run_full_benchmark( |
| self, |
| test_prompts: List[str], |
| output_file: str = "benchmark_results.json" |
| ) -> Dict: |
| """ |
| Run complete benchmark suite |
| |
| Args: |
| test_prompts: List of test prompts |
| output_file: File to save results |
| |
| Returns: |
| Dictionary with all results |
| """ |
| print("\n" + "="*80) |
| print("STARTING FULL BENCHMARK SUITE") |
| print("="*80 + "\n") |
| |
| all_results = { |
| "timestamp": datetime.now().isoformat(), |
| "device": str(self.device), |
| "model_name": self.model.__class__.__name__, |
| "results": {} |
| } |
| |
| |
| print("\n1. LATENCY BENCHMARK") |
| print("-" * 80) |
| latency_result = self.benchmark_latency(test_prompts[:5], num_runs=5) |
| all_results["results"]["latency"] = asdict(latency_result) |
| self._print_result(latency_result) |
| |
| |
| print("\n2. THROUGHPUT BENCHMARK") |
| print("-" * 80) |
| throughput_results = self.benchmark_throughput( |
| test_prompts, |
| batch_sizes=[1, 2, 4] |
| ) |
| all_results["results"]["throughput"] = [asdict(r) for r in throughput_results] |
| for result in throughput_results: |
| self._print_result(result) |
| |
| |
| print("\n3. MEMORY BENCHMARK") |
| print("-" * 80) |
| memory_results = self.benchmark_memory( |
| prompt_lengths=[128, 256, 512, 1024] |
| ) |
| all_results["results"]["memory"] = [asdict(r) for r in memory_results] |
| for result in memory_results: |
| self._print_result(result) |
| |
| |
| with open(output_file, 'w') as f: |
| json.dump(all_results, f, indent=2) |
| |
| print("\n" + "="*80) |
| print(f"BENCHMARK COMPLETE - Results saved to {output_file}") |
| print("="*80 + "\n") |
| |
| return all_results |
| |
| def _print_result(self, result: BenchmarkResult): |
| """Print formatted benchmark result""" |
| print(f"\nTest: {result.test_name}") |
| print(f" Samples: {result.num_samples}") |
| print(f" Avg Time: {result.avg_time:.3f}s") |
| print(f" Median Time: {result.median_time:.3f}s") |
| print(f" P95 Time: {result.p95_time:.3f}s") |
| print(f" P99 Time: {result.p99_time:.3f}s") |
| print(f" Tokens/sec: {result.tokens_per_second:.1f}") |
| print(f" Memory: {result.memory_used_gb:.2f} GB") |
| print(f" Success Rate: {result.success_rate:.1f}%") |
| if result.errors > 0: |
| print(f" Errors: {result.errors}") |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Benchmark Kirim OSS Safeguard") |
| parser.add_argument("--model", type=str, default="Kirim-ai/Kirim-OSS-Safeguard-R1-10B") |
| parser.add_argument("--8bit", action="store_true", help="Load in 8-bit") |
| parser.add_argument("--output", type=str, default="benchmark_results.json") |
| |
| args = parser.parse_args() |
| |
| print("Loading model for benchmarking...") |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| test_prompts = [ |
| "Explain quantum computing in simple terms.", |
| "Write a short story about a robot.", |
| "What are the benefits of exercise?", |
| "Describe the process of photosynthesis.", |
| "How does the internet work?", |
| "What is artificial intelligence?", |
| "Explain climate change.", |
| "What is machine learning?", |
| "Describe the water cycle.", |
| "What is blockchain technology?" |
| ] |
| |
| |
| |
| |
| print("Benchmark script ready. Uncomment model loading code to run.") |
|
|
|
|
| if __name__ == "__main__": |
| main() |