Kirim-Safeguard-R1-10B / benchmark.py
Song Yi
Create benchmark.py
2224b06 verified
"""
Kirim OSS Safeguard R1 10B - Benchmarking Script
Performance testing and evaluation for the model
"""
import time
import torch
import numpy as np
from typing import List, Dict, Tuple
import json
from datetime import datetime
from dataclasses import dataclass, asdict
import argparse
@dataclass
class BenchmarkResult:
"""Results from benchmark test"""
test_name: str
num_samples: int
total_time: float
avg_time: float
median_time: float
p95_time: float
p99_time: float
tokens_per_second: float
memory_used_gb: float
success_rate: float
errors: int
class ModelBenchmark:
"""Benchmark suite for model performance"""
def __init__(self, model, tokenizer, device='cuda'):
"""
Initialize benchmark suite
Args:
model: Model instance
tokenizer: Tokenizer instance
device: Device to run on
"""
self.model = model
self.tokenizer = tokenizer
self.device = device
def benchmark_latency(
self,
prompts: List[str],
max_tokens: int = 256,
num_runs: int = 10
) -> BenchmarkResult:
"""
Benchmark generation latency
Args:
prompts: List of test prompts
max_tokens: Maximum tokens to generate
num_runs: Number of runs per prompt
Returns:
BenchmarkResult with latency statistics
"""
print(f"Running latency benchmark ({num_runs} runs per prompt)...")
latencies = []
errors = 0
for prompt in prompts:
for _ in range(num_runs):
try:
inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)
start_time = time.time()
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=max_tokens,
do_sample=False
)
# Ensure generation is complete
if self.device == 'cuda':
torch.cuda.synchronize()
end_time = time.time()
latency = end_time - start_time
latencies.append(latency)
except Exception as e:
print(f"Error: {e}")
errors += 1
latencies = np.array(latencies)
# Calculate memory usage
if self.device == 'cuda':
memory_used = torch.cuda.max_memory_allocated() / (1024 ** 3)
else:
memory_used = 0.0
# Calculate tokens per second (approximate)
avg_tokens = max_tokens * 0.8 # Assume 80% of max
tokens_per_second = avg_tokens / np.mean(latencies)
return BenchmarkResult(
test_name="latency",
num_samples=len(prompts) * num_runs,
total_time=np.sum(latencies),
avg_time=np.mean(latencies),
median_time=np.median(latencies),
p95_time=np.percentile(latencies, 95),
p99_time=np.percentile(latencies, 99),
tokens_per_second=tokens_per_second,
memory_used_gb=memory_used,
success_rate=(len(latencies) / (len(prompts) * num_runs)) * 100,
errors=errors
)
def benchmark_throughput(
self,
prompts: List[str],
max_tokens: int = 256,
batch_sizes: List[int] = [1, 2, 4, 8]
) -> List[BenchmarkResult]:
"""
Benchmark throughput with different batch sizes
Args:
prompts: List of test prompts
max_tokens: Maximum tokens to generate
batch_sizes: List of batch sizes to test
Returns:
List of BenchmarkResults for each batch size
"""
print(f"Running throughput benchmark...")
results = []
for batch_size in batch_sizes:
print(f"Testing batch size: {batch_size}")
latencies = []
errors = 0
total_tokens = 0
# Create batches
for i in range(0, len(prompts), batch_size):
batch = prompts[i:i + batch_size]
try:
inputs = self.tokenizer(
batch,
return_tensors="pt",
padding=True,
truncation=True
).to(self.device)
start_time = time.time()
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=max_tokens,
do_sample=False,
pad_token_id=self.tokenizer.eos_token_id
)
if self.device == 'cuda':
torch.cuda.synchronize()
end_time = time.time()
latency = end_time - start_time
latencies.append(latency)
# Count tokens
total_tokens += outputs.shape[0] * outputs.shape[1]
except Exception as e:
print(f"Error: {e}")
errors += 1
latencies = np.array(latencies)
# Memory usage
if self.device == 'cuda':
memory_used = torch.cuda.max_memory_allocated() / (1024 ** 3)
else:
memory_used = 0.0
# Tokens per second
tokens_per_second = total_tokens / np.sum(latencies)
result = BenchmarkResult(
test_name=f"throughput_batch_{batch_size}",
num_samples=len(prompts),
total_time=np.sum(latencies),
avg_time=np.mean(latencies),
median_time=np.median(latencies),
p95_time=np.percentile(latencies, 95),
p99_time=np.percentile(latencies, 99),
tokens_per_second=tokens_per_second,
memory_used_gb=memory_used,
success_rate=((len(latencies) * batch_size) / len(prompts)) * 100,
errors=errors
)
results.append(result)
return results
def benchmark_memory(
self,
prompt_lengths: List[int] = [128, 256, 512, 1024, 2048],
max_tokens: int = 256
) -> List[BenchmarkResult]:
"""
Benchmark memory usage with different prompt lengths
Args:
prompt_lengths: List of prompt lengths to test
max_tokens: Maximum tokens to generate
Returns:
List of BenchmarkResults for each prompt length
"""
print(f"Running memory benchmark...")
results = []
for length in prompt_lengths:
print(f"Testing prompt length: {length}")
# Create prompt of specified length
test_text = "test " * (length // 5)
prompt = test_text[:length]
if self.device == 'cuda':
torch.cuda.reset_peak_memory_stats()
try:
inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)
start_time = time.time()
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=max_tokens,
do_sample=False
)
if self.device == 'cuda':
torch.cuda.synchronize()
end_time = time.time()
latency = end_time - start_time
# Memory usage
if self.device == 'cuda':
memory_used = torch.cuda.max_memory_allocated() / (1024 ** 3)
else:
memory_used = 0.0
result = BenchmarkResult(
test_name=f"memory_length_{length}",
num_samples=1,
total_time=latency,
avg_time=latency,
median_time=latency,
p95_time=latency,
p99_time=latency,
tokens_per_second=max_tokens / latency,
memory_used_gb=memory_used,
success_rate=100.0,
errors=0
)
results.append(result)
except Exception as e:
print(f"Error at length {length}: {e}")
return results
def run_full_benchmark(
self,
test_prompts: List[str],
output_file: str = "benchmark_results.json"
) -> Dict:
"""
Run complete benchmark suite
Args:
test_prompts: List of test prompts
output_file: File to save results
Returns:
Dictionary with all results
"""
print("\n" + "="*80)
print("STARTING FULL BENCHMARK SUITE")
print("="*80 + "\n")
all_results = {
"timestamp": datetime.now().isoformat(),
"device": str(self.device),
"model_name": self.model.__class__.__name__,
"results": {}
}
# Latency benchmark
print("\n1. LATENCY BENCHMARK")
print("-" * 80)
latency_result = self.benchmark_latency(test_prompts[:5], num_runs=5)
all_results["results"]["latency"] = asdict(latency_result)
self._print_result(latency_result)
# Throughput benchmark
print("\n2. THROUGHPUT BENCHMARK")
print("-" * 80)
throughput_results = self.benchmark_throughput(
test_prompts,
batch_sizes=[1, 2, 4]
)
all_results["results"]["throughput"] = [asdict(r) for r in throughput_results]
for result in throughput_results:
self._print_result(result)
# Memory benchmark
print("\n3. MEMORY BENCHMARK")
print("-" * 80)
memory_results = self.benchmark_memory(
prompt_lengths=[128, 256, 512, 1024]
)
all_results["results"]["memory"] = [asdict(r) for r in memory_results]
for result in memory_results:
self._print_result(result)
# Save results
with open(output_file, 'w') as f:
json.dump(all_results, f, indent=2)
print("\n" + "="*80)
print(f"BENCHMARK COMPLETE - Results saved to {output_file}")
print("="*80 + "\n")
return all_results
def _print_result(self, result: BenchmarkResult):
"""Print formatted benchmark result"""
print(f"\nTest: {result.test_name}")
print(f" Samples: {result.num_samples}")
print(f" Avg Time: {result.avg_time:.3f}s")
print(f" Median Time: {result.median_time:.3f}s")
print(f" P95 Time: {result.p95_time:.3f}s")
print(f" P99 Time: {result.p99_time:.3f}s")
print(f" Tokens/sec: {result.tokens_per_second:.1f}")
print(f" Memory: {result.memory_used_gb:.2f} GB")
print(f" Success Rate: {result.success_rate:.1f}%")
if result.errors > 0:
print(f" Errors: {result.errors}")
def main():
parser = argparse.ArgumentParser(description="Benchmark Kirim OSS Safeguard")
parser.add_argument("--model", type=str, default="Kirim-ai/Kirim-OSS-Safeguard-R1-10B")
parser.add_argument("--8bit", action="store_true", help="Load in 8-bit")
parser.add_argument("--output", type=str, default="benchmark_results.json")
args = parser.parse_args()
print("Loading model for benchmarking...")
# In production, uncomment this:
# from transformers import AutoTokenizer, AutoModelForCausalLM
#
# tokenizer = AutoTokenizer.from_pretrained(args.model)
# model = AutoModelForCausalLM.from_pretrained(
# args.model,
# load_in_8bit=args.__dict__['8bit'],
# device_map="auto"
# )
#
# device = "cuda" if torch.cuda.is_available() else "cpu"
# benchmark = ModelBenchmark(model, tokenizer, device)
# Test prompts
test_prompts = [
"Explain quantum computing in simple terms.",
"Write a short story about a robot.",
"What are the benefits of exercise?",
"Describe the process of photosynthesis.",
"How does the internet work?",
"What is artificial intelligence?",
"Explain climate change.",
"What is machine learning?",
"Describe the water cycle.",
"What is blockchain technology?"
]
# Run benchmarks
# results = benchmark.run_full_benchmark(test_prompts, args.output)
print("Benchmark script ready. Uncomment model loading code to run.")
if __name__ == "__main__":
main()