Arpit-Bansal's picture
all supported algos now in bencmarks/
e356b9f
#!/usr/bin/env python3
"""
Benchmark script for comparing optimizer performance
Measures schedule generation time and computational efficiency
"""
import time
import json
import statistics
from datetime import datetime, date
from typing import Dict, List, Any, Optional
import sys
import os
# Add project root to path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
from DataService.enhanced_generator import EnhancedMetroDataGenerator
from DataService.schedule_optimizer import MetroScheduleOptimizer
from greedyOptim.scheduler import TrainsetSchedulingOptimizer
from DataService.metro_models import Route, TrainHealthStatus
# --- Adapters for Uniform Interface ---
class OptimizerAdapter:
"""Base adapter for different optimizers"""
def optimize(self, data: Dict) -> Any:
raise NotImplementedError
class GeneticAdapter(OptimizerAdapter):
"""Adapter for Genetic Algorithm"""
def optimize(self, data: Dict) -> Any:
optimizer = TrainsetSchedulingOptimizer(data)
return optimizer.optimize(method='ga')
class PSOAdapter(OptimizerAdapter):
"""Adapter for Particle Swarm Optimization"""
def optimize(self, data: Dict) -> Any:
optimizer = TrainsetSchedulingOptimizer(data)
return optimizer.optimize(method='pso')
class SAAdapter(OptimizerAdapter):
"""Adapter for Simulated Annealing"""
def optimize(self, data: Dict) -> Any:
optimizer = TrainsetSchedulingOptimizer(data)
return optimizer.optimize(method='sa')
class CMAESAdapter(OptimizerAdapter):
"""Adapter for CMA-ES"""
def optimize(self, data: Dict) -> Any:
optimizer = TrainsetSchedulingOptimizer(data)
return optimizer.optimize(method='cmaes')
class NSGA2Adapter(OptimizerAdapter):
"""Adapter for NSGA-II"""
def optimize(self, data: Dict) -> Any:
optimizer = TrainsetSchedulingOptimizer(data)
return optimizer.optimize(method='nsga2')
class AdaptiveAdapter(OptimizerAdapter):
"""Adapter for Adaptive Algorithm"""
def optimize(self, data: Dict) -> Any:
optimizer = TrainsetSchedulingOptimizer(data)
return optimizer.optimize(method='adaptive')
class EnsembleAdapter(OptimizerAdapter):
"""Adapter for Ensemble Method"""
def optimize(self, data: Dict) -> Any:
optimizer = TrainsetSchedulingOptimizer(data)
return optimizer.optimize(method='ensemble')
class ORToolsAdapter(OptimizerAdapter):
"""Adapter for OR-Tools CP-SAT"""
def optimize(self, data: Dict) -> Any:
optimizer = TrainsetSchedulingOptimizer(data)
return optimizer.optimize(method='cp-sat')
class OptimizerBenchmark:
"""Benchmark different optimization algorithms"""
def __init__(self):
self.results = {
"benchmark_info": {
"date": datetime.now().isoformat(),
"description": "Metro Schedule Optimization Performance Comparison"
},
"test_configurations": [],
"results": []
}
def generate_test_data(self, num_trains: int) -> Dict:
"""Generate consistent test data for all optimizers"""
generator = EnhancedMetroDataGenerator(num_trainsets=num_trains)
# We need the full dataset as expected by TrainsetSchedulingEvaluator
full_data = generator.generate_complete_enhanced_dataset()
return full_data
def benchmark_optimizer(
self,
optimizer_name: str,
adapter_class,
num_trains: int,
num_runs: int = 3
) -> Dict[str, Any]:
"""Benchmark a single optimizer"""
print(f"\n{'='*70}")
print(f"Benchmarking: {optimizer_name}")
print(f"Fleet Size: {num_trains} trains")
print(f"{'='*70}")
run_times = []
success_count = 0
for run in range(num_runs):
print(f"Run {run + 1}/{num_runs}...", end=" ", flush=True)
try:
# Generate fresh data for each run
data = self.generate_test_data(num_trains)
# Time the optimization
start_time = time.perf_counter()
adapter = adapter_class()
result = adapter.optimize(data)
end_time = time.perf_counter()
elapsed = end_time - start_time
run_times.append(elapsed)
success_count += 1
print(f"✓ Completed in {elapsed:.4f}s | Fitness: {result.fitness_score:.2f}")
except Exception as e:
print(f"✗ Failed: {str(e)[:100]}")
# import traceback
# traceback.print_exc()
# Calculate statistics
if run_times:
result = {
"optimizer": optimizer_name,
"fleet_size": num_trains,
"num_runs": num_runs,
"successful_runs": success_count,
"success_rate": f"{(success_count/num_runs)*100:.1f}%",
"execution_times": {
"min_seconds": min(run_times),
"max_seconds": max(run_times),
"mean_seconds": statistics.mean(run_times),
"stdev_seconds": statistics.stdev(run_times) if len(run_times) > 1 else 0
}
}
else:
result = {
"optimizer": optimizer_name,
"fleet_size": num_trains,
"num_runs": num_runs,
"successful_runs": 0,
"success_rate": "0%",
"error": "All runs failed"
}
print(f"\nSummary:")
print(f" Success Rate: {result['success_rate']}")
if run_times:
print(f" Average Time: {result['execution_times']['mean_seconds']:.4f}s")
return result
def run_comprehensive_benchmark(
self,
fleet_sizes: List[int] = [10, 20, 30],
num_runs: int = 3
):
"""Run comprehensive benchmark across all optimizers and fleet sizes"""
print("\n" + "="*70)
print("COMPREHENSIVE OPTIMIZER BENCHMARK")
print("="*70)
print(f"Fleet Sizes to Test: {fleet_sizes}")
print(f"Runs per Configuration: {num_runs}")
print("="*70)
# Define optimizers to test
optimizers = [
("Genetic Algorithm", GeneticAdapter),
("Particle Swarm", PSOAdapter),
("Simulated Annealing", SAAdapter),
("CMA-ES", CMAESAdapter),
("NSGA-II", NSGA2Adapter),
("Adaptive Algorithm", AdaptiveAdapter),
("Ensemble Method", EnsembleAdapter),
# ("OR-Tools CP-SAT", ORToolsAdapter), # Uncomment if OR-Tools is installed
]
# Run benchmarks
for fleet_size in fleet_sizes:
print(f"\n{'#'*70}")
print(f"# FLEET SIZE: {fleet_size} TRAINS")
print(f"{'#'*70}")
for optimizer_name, adapter_class in optimizers:
result = self.benchmark_optimizer(
optimizer_name,
adapter_class,
fleet_size,
num_runs=num_runs
)
self.results["results"].append(result)
# Small delay between tests
time.sleep(0.5)
# Generate comparison summary
self._generate_summary()
# Save results
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"optimizer_benchmark_{timestamp}.json"
with open(filename, 'w') as f:
json.dump(self.results, f, indent=2)
print(f"\nResults saved to: {filename}")
def _generate_summary(self):
"""Generate comparative summary of results"""
print("\n" + "="*70)
print("BENCHMARK SUMMARY")
print("="*70)
# Group by fleet size
fleet_sizes = sorted(set(r["fleet_size"] for r in self.results["results"]))
summary = {
"by_fleet_size": {},
"overall_rankings": {}
}
for fleet_size in fleet_sizes:
fleet_results = [r for r in self.results["results"] if r["fleet_size"] == fleet_size]
print(f"\nFleet Size: {fleet_size} trains")
print("-" * 70)
print(f"{'Optimizer':<25} {'Avg Time (s)':<15} {'Success Rate':<15}")
print("-" * 70)
fleet_summary = []
for result in fleet_results:
optimizer = result["optimizer"]
avg_time = result["execution_times"]["mean_seconds"] if "execution_times" in result else "N/A"
success = result["success_rate"]
if isinstance(avg_time, float):
time_str = f"{avg_time:.4f}"
else:
time_str = str(avg_time)
print(f"{optimizer:<25} {time_str:<15} {success:<15}")
if isinstance(avg_time, float):
fleet_summary.append({
"optimizer": optimizer,
"time": avg_time
})
# Rank for this fleet size
fleet_summary.sort(key=lambda x: x["time"])
summary["by_fleet_size"][fleet_size] = fleet_summary
# Update overall stats
for item in fleet_summary:
opt = item["optimizer"]
if opt not in summary["overall_rankings"]:
summary["overall_rankings"][opt] = []
summary["overall_rankings"][opt].append(item["time"])
# Print overall rankings
print("\n" + "="*70)
print("OVERALL PERFORMANCE RANKINGS (by average time)")
print("="*70)
print(f"{'Rank':<8} {'Optimizer/Method':<30} {'Avg Time (s)':<15}")
print("-" * 70)
overall_stats = []
for opt, times in summary["overall_rankings"].items():
if times:
overall_stats.append({
"optimizer": opt,
"avg_time": statistics.mean(times)
})
overall_stats.sort(key=lambda x: x["avg_time"])
for i, stat in enumerate(overall_stats):
print(f"{i+1:<8} {stat['optimizer']:<30} {stat['avg_time']:.4f}")
# Save report to text file
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_file = f"optimizer_performance_report_{timestamp}.txt"
with open(report_file, "w") as f:
f.write("OPTIMIZER PERFORMANCE BENCHMARK REPORT\n")
f.write(f"Date: {datetime.now().isoformat()}\n")
f.write("="*70 + "\n\n")
for fleet_size in fleet_sizes:
f.write(f"Fleet Size: {fleet_size} trains\n")
f.write("-" * 70 + "\n")
f.write(f"{'Optimizer':<25} {'Avg Time (s)':<15} {'Success Rate':<15}\n")
f.write("-" * 70 + "\n")
fleet_results = [r for r in self.results["results"] if r["fleet_size"] == fleet_size]
for result in fleet_results:
optimizer = result["optimizer"]
avg_time = result["execution_times"]["mean_seconds"] if "execution_times" in result else "N/A"
success = result["success_rate"]
if isinstance(avg_time, float):
time_str = f"{avg_time:.4f}"
else:
time_str = str(avg_time)
f.write(f"{optimizer:<25} {time_str:<15} {success:<15}\n")
f.write("\n")
f.write("="*70 + "\n")
f.write("OVERALL RANKINGS\n")
f.write("="*70 + "\n")
for i, stat in enumerate(overall_stats):
f.write(f"{i+1}. {stat['optimizer']}: {stat['avg_time']:.4f}s\n")
print(f"\nPerformance report saved to: {report_file}")
def main():
import argparse
parser = argparse.ArgumentParser(description="Benchmark metro schedule optimizers")
parser.add_argument("--fleet-sizes", type=int, nargs="+", default=[10, 20, 30],
help="Fleet sizes to test (default: 10 20 30)")
parser.add_argument("--runs", type=int, default=3,
help="Number of runs per configuration (default: 3)")
parser.add_argument("--quick", action="store_true",
help="Quick test with fewer configurations")
args = parser.parse_args()
if args.quick:
print("\n*** QUICK BENCHMARK MODE ***")
fleet_sizes = [10, 20]
runs = 1
else:
fleet_sizes = args.fleet_sizes
runs = args.runs
benchmark = OptimizerBenchmark()
benchmark.run_comprehensive_benchmark(
fleet_sizes=fleet_sizes,
num_runs=runs
)
if __name__ == "__main__":
main()