|
|
""" |
|
|
Comprehensive test and demo script for the enhanced optimization system. |
|
|
""" |
|
|
import json |
|
|
import time |
|
|
import os |
|
|
import sys |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
sys.path.append(str(Path(__file__).parent.parent)) |
|
|
|
|
|
from greedyOptim import ( |
|
|
optimize_trainset_schedule, |
|
|
compare_optimization_methods, |
|
|
OptimizationConfig, |
|
|
TrainsetSchedulingOptimizer |
|
|
) |
|
|
from greedyOptim.error_handling import safe_optimize, DataValidator |
|
|
from greedyOptim.hybrid_optimizers import optimize_with_hybrid_methods |
|
|
|
|
|
|
|
|
def generate_test_data(): |
|
|
"""Generate test data using the enhanced generator.""" |
|
|
print("π Generating enhanced synthetic data...") |
|
|
|
|
|
try: |
|
|
|
|
|
sys.path.append(str(Path(__file__).parent.parent / "DataService")) |
|
|
from mlservice.DataService import enhanced_generator |
|
|
|
|
|
generator = enhanced_generator.EnhancedMetroDataGenerator(num_trainsets=25, seed=42) |
|
|
data = generator.save_to_json("test_data_enhanced.json") |
|
|
return data |
|
|
|
|
|
except ImportError: |
|
|
print("Enhanced generator not available, using basic data...") |
|
|
|
|
|
return create_basic_test_data() |
|
|
|
|
|
|
|
|
def create_basic_test_data(): |
|
|
"""Create basic test data structure.""" |
|
|
from datetime import datetime, timedelta |
|
|
import random |
|
|
|
|
|
num_trainsets = 25 |
|
|
trainset_ids = [f"TS-{str(i+1).zfill(3)}" for i in range(num_trainsets)] |
|
|
|
|
|
data = { |
|
|
"metadata": { |
|
|
"generated_at": datetime.now().isoformat(), |
|
|
"num_trainsets": num_trainsets, |
|
|
"system": "Test System" |
|
|
}, |
|
|
"trainset_status": [], |
|
|
"fitness_certificates": [], |
|
|
"job_cards": [], |
|
|
"component_health": [], |
|
|
"branding_contracts": [] |
|
|
} |
|
|
|
|
|
|
|
|
for ts_id in trainset_ids: |
|
|
data["trainset_status"].append({ |
|
|
"trainset_id": ts_id, |
|
|
"operational_status": random.choice(["Available", "Available", "Available", "Maintenance", "Standby"]), |
|
|
"total_mileage_km": random.randint(50000, 200000), |
|
|
"last_service_date": (datetime.now() - timedelta(days=random.randint(1, 30))).isoformat() |
|
|
}) |
|
|
|
|
|
|
|
|
departments = ["Rolling Stock", "Signalling", "Telecom"] |
|
|
for ts_id in trainset_ids: |
|
|
for dept in departments: |
|
|
data["fitness_certificates"].append({ |
|
|
"trainset_id": ts_id, |
|
|
"department": dept, |
|
|
"status": random.choice(["Valid", "Valid", "Valid", "Expired"]), |
|
|
"expiry_date": (datetime.now() + timedelta(days=random.randint(-5, 90))).isoformat() |
|
|
}) |
|
|
|
|
|
|
|
|
for ts_id in trainset_ids: |
|
|
if random.random() < 0.3: |
|
|
data["job_cards"].append({ |
|
|
"trainset_id": ts_id, |
|
|
"priority": random.choice(["Critical", "High", "Medium", "Low"]), |
|
|
"status": random.choice(["Open", "Closed", "In-Progress"]) |
|
|
}) |
|
|
|
|
|
|
|
|
components = ["Bogie", "Brake_Pad", "HVAC", "Door_System"] |
|
|
for ts_id in trainset_ids: |
|
|
for comp in components: |
|
|
data["component_health"].append({ |
|
|
"trainset_id": ts_id, |
|
|
"component": comp, |
|
|
"status": random.choice(["Good", "Good", "Fair", "Warning"]), |
|
|
"wear_level": random.randint(20, 90) |
|
|
}) |
|
|
|
|
|
return data |
|
|
|
|
|
|
|
|
def test_data_validation(data): |
|
|
"""Test data validation functionality.""" |
|
|
print("\nπ Testing Data Validation...") |
|
|
print("="*50) |
|
|
|
|
|
|
|
|
errors = DataValidator.validate_data(data) |
|
|
if errors: |
|
|
print("β Validation errors found:") |
|
|
for error in errors[:5]: |
|
|
print(f" β’ {error}") |
|
|
if len(errors) > 5: |
|
|
print(f" ... and {len(errors) - 5} more errors") |
|
|
return False |
|
|
else: |
|
|
print("β
Data validation passed!") |
|
|
return True |
|
|
|
|
|
|
|
|
def test_basic_optimization(data): |
|
|
"""Test basic optimization methods.""" |
|
|
print("\nπ Testing Basic Optimization Methods...") |
|
|
print("="*50) |
|
|
|
|
|
basic_methods = ['ga', 'cmaes', 'pso', 'sa'] |
|
|
results = {} |
|
|
|
|
|
|
|
|
config = OptimizationConfig( |
|
|
required_service_trains=20, |
|
|
min_standby=2, |
|
|
population_size=30, |
|
|
generations=50 |
|
|
) |
|
|
|
|
|
for method in basic_methods: |
|
|
print(f"\nπ Testing {method.upper()}...") |
|
|
try: |
|
|
start_time = time.time() |
|
|
|
|
|
if method == 'sa': |
|
|
result = optimize_trainset_schedule(data, method, config, max_iterations=1000) |
|
|
else: |
|
|
result = optimize_trainset_schedule(data, method, config) |
|
|
|
|
|
elapsed = time.time() - start_time |
|
|
results[method] = { |
|
|
'result': result, |
|
|
'time': elapsed, |
|
|
'success': True |
|
|
} |
|
|
|
|
|
print(f" β
{method.upper()} completed in {elapsed:.1f}s") |
|
|
print(f" Fitness: {result.fitness_score:.2f}") |
|
|
print(f" Service: {len(result.selected_trainsets)}") |
|
|
print(f" Standby: {len(result.standby_trainsets)}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f" β {method.upper()} failed: {str(e)}") |
|
|
results[method] = { |
|
|
'result': None, |
|
|
'time': 0, |
|
|
'success': False, |
|
|
'error': str(e) |
|
|
} |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
def test_hybrid_optimization(data): |
|
|
"""Test hybrid optimization methods.""" |
|
|
print("\n㪠Testing Hybrid Optimization Methods...") |
|
|
print("="*50) |
|
|
|
|
|
hybrid_methods = ['adaptive', 'ensemble'] |
|
|
results = {} |
|
|
|
|
|
for method in hybrid_methods: |
|
|
print(f"\nπ Testing {method.upper()}...") |
|
|
try: |
|
|
start_time = time.time() |
|
|
|
|
|
if method == 'adaptive': |
|
|
result = optimize_with_hybrid_methods(data, method) |
|
|
elif method == 'ensemble': |
|
|
result = optimize_with_hybrid_methods(data, method) |
|
|
else: |
|
|
continue |
|
|
|
|
|
elapsed = time.time() - start_time |
|
|
results[method] = { |
|
|
'result': result, |
|
|
'time': elapsed, |
|
|
'success': True |
|
|
} |
|
|
|
|
|
print(f" β
{method.upper()} completed in {elapsed:.1f}s") |
|
|
print(f" Fitness: {result.fitness_score:.2f}") |
|
|
print(f" Service: {len(result.selected_trainsets)}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f" β {method.upper()} failed: {str(e)}") |
|
|
results[method] = { |
|
|
'result': None, |
|
|
'time': 0, |
|
|
'success': False, |
|
|
'error': str(e) |
|
|
} |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
def test_error_handling(data): |
|
|
"""Test error handling capabilities.""" |
|
|
print("\nπ‘οΈ Testing Error Handling...") |
|
|
print("="*50) |
|
|
|
|
|
|
|
|
print("Testing with valid data...") |
|
|
try: |
|
|
result = safe_optimize(data, method='ga', log_file='test_optimization.log') |
|
|
print(" β
Safe optimization with valid data succeeded") |
|
|
except Exception as e: |
|
|
print(f" β Safe optimization failed: {e}") |
|
|
|
|
|
|
|
|
print("Testing with invalid data...") |
|
|
invalid_data = { |
|
|
"trainset_status": [{"invalid": "data"}], |
|
|
"fitness_certificates": [], |
|
|
"job_cards": [], |
|
|
"component_health": [] |
|
|
} |
|
|
|
|
|
try: |
|
|
result = safe_optimize(invalid_data, method='ga') |
|
|
print(" β Should have failed with invalid data") |
|
|
except Exception as e: |
|
|
print(f" β
Correctly caught error: {type(e).__name__}") |
|
|
|
|
|
|
|
|
def test_configuration_options(data): |
|
|
"""Test different configuration options.""" |
|
|
print("\nβοΈ Testing Configuration Options...") |
|
|
print("="*50) |
|
|
|
|
|
configs = [ |
|
|
("Small Population", OptimizationConfig(population_size=20, generations=30)), |
|
|
("Large Population", OptimizationConfig(population_size=100, generations=30)), |
|
|
("High Mutation", OptimizationConfig(mutation_rate=0.3, generations=30)), |
|
|
("Low Mutation", OptimizationConfig(mutation_rate=0.05, generations=30)), |
|
|
] |
|
|
|
|
|
for config_name, config in configs: |
|
|
print(f"\nπ Testing {config_name}...") |
|
|
try: |
|
|
start_time = time.time() |
|
|
result = optimize_trainset_schedule(data, 'ga', config) |
|
|
elapsed = time.time() - start_time |
|
|
|
|
|
print(f" β
{config_name}: Fitness = {result.fitness_score:.2f} ({elapsed:.1f}s)") |
|
|
|
|
|
except Exception as e: |
|
|
print(f" β {config_name} failed: {e}") |
|
|
|
|
|
|
|
|
def run_comprehensive_comparison(data): |
|
|
"""Run comprehensive comparison of all methods.""" |
|
|
print("\nπ Comprehensive Method Comparison...") |
|
|
print("="*60) |
|
|
|
|
|
try: |
|
|
|
|
|
config = OptimizationConfig( |
|
|
population_size=40, |
|
|
generations=75 |
|
|
) |
|
|
|
|
|
methods = ['ga', 'pso', 'cmaes'] |
|
|
|
|
|
optimizer = TrainsetSchedulingOptimizer(data, config) |
|
|
results = optimizer.compare_methods(methods) |
|
|
|
|
|
print("\nπ Final Comparison Results:") |
|
|
print("-" * 60) |
|
|
|
|
|
valid_results = [(method, result) for method, result in results.items() |
|
|
if result is not None] |
|
|
|
|
|
if valid_results: |
|
|
|
|
|
valid_results.sort(key=lambda x: x[1].fitness_score) |
|
|
|
|
|
for i, (method, result) in enumerate(valid_results): |
|
|
status = "π₯" if i == 0 else "π₯" if i == 1 else "π₯" if i == 2 else "π" |
|
|
print(f"{status} {method.upper()}: {result.fitness_score:.2f}") |
|
|
|
|
|
return results |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Comparison failed: {e}") |
|
|
return {} |
|
|
|
|
|
|
|
|
def generate_summary_report(basic_results, hybrid_results, comparison_results): |
|
|
"""Generate a summary report of all tests.""" |
|
|
print("\nπ OPTIMIZATION SYSTEM TEST SUMMARY") |
|
|
print("="*60) |
|
|
|
|
|
|
|
|
basic_success = sum(1 for r in basic_results.values() if r.get('success', False)) |
|
|
hybrid_success = sum(1 for r in hybrid_results.values() if r.get('success', False)) |
|
|
|
|
|
print(f"Basic Methods: {basic_success}/{len(basic_results)} successful") |
|
|
print(f"Hybrid Methods: {hybrid_success}/{len(hybrid_results)} successful") |
|
|
|
|
|
|
|
|
all_results = [] |
|
|
for method, data in basic_results.items(): |
|
|
if data.get('success') and data.get('result'): |
|
|
all_results.append((method, data['result'].fitness_score, data['time'])) |
|
|
|
|
|
for method, data in hybrid_results.items(): |
|
|
if data.get('success') and data.get('result'): |
|
|
all_results.append((method, data['result'].fitness_score, data['time'])) |
|
|
|
|
|
if all_results: |
|
|
|
|
|
all_results.sort(key=lambda x: x[1]) |
|
|
|
|
|
print(f"\nπ Best Overall Results:") |
|
|
for i, (method, fitness, time_taken) in enumerate(all_results[:3]): |
|
|
rank = ["π₯", "π₯", "π₯"][i] |
|
|
print(f" {rank} {method.upper()}: {fitness:.2f} (in {time_taken:.1f}s)") |
|
|
|
|
|
|
|
|
print(f"\nβ
System Capabilities Confirmed:") |
|
|
print(f" β’ Data validation and error handling") |
|
|
print(f" β’ Multiple optimization algorithms") |
|
|
print(f" β’ Hybrid and ensemble methods") |
|
|
print(f" β’ Configurable parameters") |
|
|
print(f" β’ Comprehensive result analysis") |
|
|
|
|
|
print(f"\nπ― System ready for production use!") |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main test function.""" |
|
|
print("π¬ METRO TRAINSET SCHEDULING OPTIMIZATION SYSTEM") |
|
|
print("=" * 60) |
|
|
print("Enhanced system with modular architecture and advanced algorithms") |
|
|
print("=" * 60) |
|
|
|
|
|
try: |
|
|
|
|
|
data = generate_test_data() |
|
|
|
|
|
|
|
|
if not test_data_validation(data): |
|
|
print("β Cannot proceed with invalid data") |
|
|
return |
|
|
|
|
|
|
|
|
basic_results = test_basic_optimization(data) |
|
|
|
|
|
|
|
|
hybrid_results = {} |
|
|
if any(r.get('success', False) for r in basic_results.values()): |
|
|
hybrid_results = test_hybrid_optimization(data) |
|
|
|
|
|
|
|
|
test_error_handling(data) |
|
|
|
|
|
|
|
|
test_configuration_options(data) |
|
|
|
|
|
|
|
|
comparison_results = run_comprehensive_comparison(data) |
|
|
|
|
|
|
|
|
generate_summary_report(basic_results, hybrid_results, comparison_results) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Test suite failed: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |