#!/usr/bin/env python3 """ Automated Performance Tuning Engine Optimizes application and infrastructure performance using ML """ import asyncio import logging import os from dataclasses import dataclass from datetime import datetime from typing import Any, Optional import aiohttp import joblib import numpy as np from prometheus_client import Counter, Gauge, Histogram # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Prometheus Metrics OPTIMIZATION_SCORE = Gauge("performance_optimization_score", "Current optimization score") PERFORMANCE_IMPROVEMENT = Gauge("performance_improvement_percent", "Performance improvement percentage") TUNING_ATTEMPTS = Counter("performance_tuning_attempts_total", "Total tuning attempts", ["success"]) OPTIMIZATION_DURATION = Histogram("performance_optimization_duration_seconds", "Optimization duration") RESOURCE_UTILIZATION = Gauge("performance_resource_utilization_percent", "Resource utilization", ["resource"]) @dataclass class PerformanceMetrics: """Current performance metrics""" timestamp: datetime response_time_p50: float response_time_p95: float response_time_p99: float throughput_rps: float error_rate: float cpu_utilization: float memory_utilization: float cache_hit_rate: float connection_pool_utilization: float @dataclass class TuningConfiguration: """Performance tuning configuration""" parameter_name: str current_value: Any min_value: Any max_value: Any value_type: str # 'integer', 'float', 'boolean' step_size: Any @dataclass class OptimizationResult: """Optimization result""" timestamp: datetime configuration: dict[str, Any] baseline_metrics: PerformanceMetrics optimized_metrics: PerformanceMetrics improvement_score: float success: bool recommendation: str class PerformanceTuningEngine: """Automated performance tuning engine""" def __init__(self, config_path: str = "/config/tuning-config.yaml"): self.config = self._load_config(config_path) self.models = {} self.scalers = {} self.prometheus_url = os.getenv("PROMETHEUS_URL", "http://prometheus.monitoring.svc.cluster.local:9090") self.jaeger_url = os.getenv( "JAEGER_ENDPOINT", "http://jaeger-query.istio-system.svc.cluster.local:16686", ) self.tuning_interval = int(os.getenv("TUNING_INTERVAL", "300")) # 5 minutes self.optimization_threshold = float(os.getenv("OPTIMIZATION_THRESHOLD", "0.15")) self.max_tuning_attempts = int(os.getenv("MAX_TUNING_ATTEMPTS", "5")) self.rollback_threshold = float(os.getenv("ROLLBACK_THRESHOLD", "0.05")) # Load models self._load_models() # Initialize HTTP session self.session = aiohttp.ClientSession() # Optimization state self.optimization_history = [] self.current_tuning_attempt = 0 self.baseline_metrics = None def _load_config(self, config_path: str) -> Dict: """Load tuning configuration""" try: import yaml with open(config_path, "r") as f: return yaml.safe_load(f) except Exception as e: logger.error(f"Failed to load config: {e}") return self._default_config() def _default_config(self) -> Dict: """Default configuration""" return { "optimization_targets": { "response_time": {"target_p95": 100, "weight": 0.4}, "throughput": {"target_rps": 2000, "weight": 0.3}, }, "tuning_parameters": { "application": [ { "name": "worker_threads", "min": 2, "max": 64, "default": 8, "type": "integer", } ], "infrastructure": [ { "name": "cpu_limit_millicores", "min": 500, "max": 8000, "default": 2000, "type": "integer", } ], }, "optimization_strategies": { "hill_climbing": {"enabled": True}, "bayesian_optimization": {"enabled": True}, }, } def _load_models(self): """Load pre-trained performance models""" model_files = { "performance_prediction": "performance_prediction_latest.pkl", "parameter_optimization": "parameter_optimization_latest.pkl", "performance_anomaly": "performance_anomaly_latest.pkl", } for model_name, filename in model_files.items(): try: model_path = os.path.join("/models", filename) if os.path.exists(model_path): self.models[model_name] = joblib.load(model_path) logger.info(f"Loaded performance model: {model_name}") else: logger.warning(f"Performance model not found: {model_path}") except Exception as e: logger.error(f"Failed to load performance model {model_name}: {e}") async def collect_performance_metrics(self, service: str = "zenith-api") -> Optional[PerformanceMetrics]: """Collect current performance metrics""" try: # Query Prometheus for performance metrics queries = { "response_time_p50": f'histogram_quantile(0.50, sum(rate(http_request_duration_seconds_bucket{{service="{service}"}}[5m])) by (le))', "response_time_p95": f'histogram_quantile(0.95, sum(rate(http_request_duration_seconds_bucket{{service="{service}"}}[5m])) by (le))', "response_time_p99": f'histogram_quantile(0.99, sum(rate(http_request_duration_seconds_bucket{{service="{service}"}}[5m])) by (le))', "throughput_rps": f'sum(rate(http_requests_total{{service="{service}"}}[5m]))', "error_rate": f'sum(rate(http_requests_total{{service="{service}",status=~"5.."}}[5m])) / sum(rate(http_requests_total{{service="{service}"}}[5m]))', "cpu_utilization": f'avg(rate(container_cpu_usage_seconds_total{{service="{service}"}}[5m])) * 100', "memory_utilization": f'avg(container_memory_usage_bytes{{service="{service}"}} / container_spec_memory_limit_bytes) * 100', "cache_hit_rate": f'sum(rate(cache_hits_total{{service="{service}"}}[5m])) / sum(rate(cache_requests_total{{service="{service}"}}[5m]))', "connection_pool_utilization": f'avg(connection_pool_active{{service="{service}"}}) / connection_pool_max{{service="{service}"}} * 100', } metrics = {} for metric_name, query in queries.items(): result = await self._query_prometheus(query) metrics[metric_name] = result if result is not None else 0.0 return PerformanceMetrics( timestamp=datetime.utcnow(), response_time_p50=metrics.get("response_time_p50", 0.0) * 1000, # Convert to ms response_time_p95=metrics.get("response_time_p95", 0.0) * 1000, response_time_p99=metrics.get("response_time_p99", 0.0) * 1000, throughput_rps=metrics.get("throughput_rps", 0.0), error_rate=metrics.get("error_rate", 0.0), cpu_utilization=metrics.get("cpu_utilization", 0.0), memory_utilization=metrics.get("memory_utilization", 0.0), cache_hit_rate=metrics.get("cache_hit_rate", 0.0) * 100, connection_pool_utilization=metrics.get("connection_pool_utilization", 0.0), ) except Exception as e: logger.error(f"Failed to collect performance metrics: {e}") return None async def _query_prometheus(self, query: str) -> Optional[float]: """Query Prometheus for metric value""" try: async with self.session.get( f"{self.prometheus_url}/api/v1/query", params={"query": query}, timeout=10, ) as response: if response.status == 200: data = await response.json() if data["data"]["result"]: return float(data["data"]["result"][0]["value"][1]) return None except Exception as e: logger.error(f"Prometheus query failed: {e}") return None async def predict_performance(self, configuration: dict[str, Any]) -> Optional[PerformanceMetrics]: """Predict performance for given configuration""" try: if "performance_prediction" not in self.models: logger.warning("Performance prediction model not available") return None model = self.models["performance_prediction"] # Prepare features from configuration features = self._prepare_prediction_features(configuration) if not features: return None # Predict metrics predicted_values = model.predict([features])[0] # Map predicted values to performance metrics return PerformanceMetrics( timestamp=datetime.utcnow(), response_time_p50=predicted_values[0] * 1000, response_time_p95=predicted_values[1] * 1000, response_time_p99=predicted_values[2] * 1000, throughput_rps=predicted_values[3], error_rate=predicted_values[4], cpu_utilization=predicted_values[5], memory_utilization=predicted_values[6], cache_hit_rate=predicted_values[7] * 100, connection_pool_utilization=predicted_values[8], ) except Exception as e: logger.error(f"Failed to predict performance: {e}") return None def _prepare_prediction_features(self, configuration: dict[str, Any]) -> Optional[list[float]]: """Prepare features for performance prediction""" try: features = [] # Application parameters app_params = self.config["tuning_parameters"]["application"] for param in app_params: value = configuration.get(param["name"], param["default"]) features.append(float(value)) # Infrastructure parameters infra_params = self.config["tuning_parameters"]["infrastructure"] for param in infra_params: value = configuration.get(param["name"], param["default"]) features.append(float(value)) return features except Exception as e: logger.error(f"Failed to prepare prediction features: {e}") return None async def optimize_configuration( self, current_config: dict[str, Any], current_metrics: PerformanceMetrics ) -> Optional[dict[str, Any]]: """Find optimal configuration using various strategies""" try: strategies = self.config["optimization_strategies"] best_config = None best_score = float("inf") for strategy_name, strategy_config in strategies.items(): if not strategy_config.get("enabled", False): continue logger.info(f"Running optimization strategy: {strategy_name}") if strategy_name == "hill_climbing": config, metrics, score = await self._hill_climbing_optimization( current_config, current_metrics, strategy_config ) elif strategy_name == "bayesian_optimization": config, score = await self._bayesian_optimization(current_config, current_metrics, strategy_config) elif strategy_name == "genetic_algorithm": config, score = await self._genetic_algorithm_optimization( current_config, current_metrics, strategy_config ) else: continue if config and score < best_score: best_config = config best_score = score if best_config and best_score < self._calculate_score(current_metrics): logger.info( f"Found better configuration with improvement: {self._calculate_score(current_metrics) - best_score:.2f}" ) return best_config return None except Exception as e: logger.error(f"Failed to optimize configuration: {e}") return None async def _hill_climbing_optimization( self, current_config: dict[str, Any], current_metrics: PerformanceMetrics, config: Dict, ) -> tuple[Optional[dict[str, Any]], float]: """Hill climbing optimization strategy""" try: best_config = current_config.copy() best_metrics = current_metrics best_score = self._calculate_score(current_metrics) step_size = config.get("step_size", 0.1) max_iterations = config.get("max_iterations", 50) for iteration in range(max_iterations): improved = False # Try to improve each parameter for param_category in ["application", "infrastructure"]: for param in self.config["tuning_parameters"][param_category]: param_name = param["name"] current_value = best_config.get(param_name, param["default"]) # Try increasing the parameter increased_config = best_config.copy() new_value = self._adjust_parameter(current_value, param, "increase", step_size) increased_config[param_name] = new_value predicted_metrics = await self.predict_performance(increased_config) if predicted_metrics: new_score = self._calculate_score(predicted_metrics) if new_score < best_score: best_config = increased_config best_metrics = predicted_metrics best_score = new_score improved = True logger.info(f"Improved {param_name} from {current_value} to {new_value}") # Try decreasing the parameter decreased_config = best_config.copy() new_value = self._adjust_parameter(current_value, param, "decrease", step_size) decreased_config[param_name] = new_value predicted_metrics = await self.predict_performance(decreased_config) if predicted_metrics: new_score = self._calculate_score(predicted_metrics) if new_score < best_score: best_config = decreased_config best_metrics = predicted_metrics best_score = new_score improved = True logger.info(f"Improved {param_name} from {current_value} to {new_value}") if not improved: logger.info(f"Hill climbing converged at iteration {iteration}") break return best_config, best_metrics, best_score except Exception as e: logger.error(f"Hill climbing optimization failed: {e}") return None, None, float("inf") async def _bayesian_optimization( self, current_config: dict[str, Any], current_metrics: PerformanceMetrics, config: Dict, ) -> tuple[Optional[dict[str, Any]], float]: """Bayesian optimization strategy""" try: # Simplified Bayesian optimization using grid search with scoring best_config = current_config.copy() best_score = self._calculate_score(current_metrics) initial_points = config.get("initial_points", 10) max_iterations = config.get("max_iterations", 30) # Generate initial random configurations for i in range(initial_points): random_config = self._generate_random_configuration() predicted_metrics = await self.predict_performance(random_config) if predicted_metrics: score = self._calculate_score(predicted_metrics) if score < best_score: best_config = random_config best_score = score # Simple iterative improvement for iteration in range(max_iterations): # Generate configuration near best found so far new_config = self._perturb_configuration(best_config) predicted_metrics = await self.predict_performance(new_config) if predicted_metrics: score = self._calculate_score(predicted_metrics) if score < best_score: best_config = new_config best_score = score logger.info(f"Iteration {iteration}: New best score {best_score:.2f}") return best_config, best_score except Exception as e: logger.error(f"Bayesian optimization failed: {e}") return None, float("inf") async def _genetic_algorithm_optimization( self, current_config: dict[str, Any], current_metrics: PerformanceMetrics, config: Dict, ) -> tuple[Optional[dict[str, Any]], float]: """Genetic algorithm optimization strategy""" try: population_size = config.get("population_size", 20) generations = config.get("generations", 100) mutation_rate = config.get("mutation_rate", 0.1) crossover_rate = config.get("crossover_rate", 0.8) # Initialize population population = [self._generate_random_configuration() for _ in range(population_size)] population.append(current_config) # Include current config for generation in range(generations): # Evaluate fitness for each configuration fitness_scores = [] for config in population: predicted_metrics = await self.predict_performance(config) if predicted_metrics: score = self._calculate_score(predicted_metrics) fitness_scores.append(1.0 / (1.0 + score)) # Convert to fitness (higher is better) else: fitness_scores.append(0.0) # Select best individuals sorted_population = [x for _, x in sorted(zip(fitness_scores, population), reverse=True)] # Keep best half new_population = sorted_population[: population_size // 2] # Crossover and mutation while len(new_population) < population_size: if np.random.random() < crossover_rate: parent1 = np.random.choice(sorted_population[: population_size // 2]) parent2 = np.random.choice(sorted_population[: population_size // 2]) child = self._crossover(parent1, parent2) else: child = np.random.choice(sorted_population[: population_size // 2]).copy() if np.random.random() < mutation_rate: child = self._mutate(child, mutation_rate) new_population.append(child) population = new_population # Log best fitness best_fitness = max(fitness_scores) logger.debug(f"Generation {generation}: Best fitness {best_fitness:.4f}") # Return best configuration final_scores = [] for config in population: predicted_metrics = await self.predict_performance(config) if predicted_metrics: score = self._calculate_score(predicted_metrics) final_scores.append((score, config)) if final_scores: best_score, best_config = min(final_scores, key=lambda x: x[0]) return best_config, best_score return None, float("inf") except Exception as e: logger.error(f"Genetic algorithm optimization failed: {e}") return None, float("inf") def _generate_random_configuration(self) -> dict[str, Any]: """Generate random configuration within bounds""" config = {} for param_category in ["application", "infrastructure"]: for param in self.config["tuning_parameters"][param_category]: param_name = param["name"] min_val = param["min"] max_val = param["max"] if param["type"] == "integer": config[param_name] = np.random.randint(min_val, max_val + 1) elif param["type"] == "float": config[param_name] = np.random.uniform(min_val, max_val) elif param["type"] == "boolean": config[param_name] = np.random.choice([True, False]) return config def _perturb_configuration(self, config: dict[str, Any], perturbation_factor: float = 0.1) -> dict[str, Any]: """Perturb configuration for local search""" new_config = config.copy() for param_category in ["application", "infrastructure"]: for param in self.config["tuning_parameters"][param_category]: param_name = param["name"] current_value = new_config.get(param_name, param["default"]) min_val = param["min"] max_val = param["max"] if param["type"] == "integer": range_val = max_val - min_val change = int(np.random.normal(0, range_val * perturbation_factor)) new_value = np.clip(current_value + change, min_val, max_val) new_config[param_name] = new_value elif param["type"] == "float": range_val = max_val - min_val change = np.random.normal(0, range_val * perturbation_factor) new_value = np.clip(current_value + change, min_val, max_val) new_config[param_name] = new_value return new_config def _crossover(self, parent1: dict[str, Any], parent2: dict[str, Any]) -> dict[str, Any]: """Crossover two parent configurations""" child = {} for param_category in ["application", "infrastructure"]: for param in self.config["tuning_parameters"][param_category]: param_name = param["name"] # Randomly choose from either parent if np.random.random() < 0.5: child[param_name] = parent1.get(param_name, param["default"]) else: child[param_name] = parent2.get(param_name, param["default"]) return child def _mutate(self, config: dict[str, Any], mutation_rate: float) -> dict[str, Any]: """Mutate configuration""" mutated = config.copy() for param_category in ["application", "infrastructure"]: for param in self.config["tuning_parameters"][param_category]: if np.random.random() < mutation_rate: param_name = param["name"] min_val = param["min"] max_val = param["max"] if param["type"] == "integer": mutated[param_name] = np.random.randint(min_val, max_val + 1) elif param["type"] == "float": mutated[param_name] = np.random.uniform(min_val, max_val) elif param["type"] == "boolean": mutated[param_name] = np.random.choice([True, False]) return mutated def _adjust_parameter( self, current_value: Any, param: dict[str, Any], direction: str, step_size: float, ) -> Any: """Adjust parameter value in given direction""" min_val = param["min"] max_val = param["max"] if param["type"] == "integer": step = int((max_val - min_val) * step_size) step = max(1, step) if direction == "increase": return min(current_value + step, max_val) else: return max(current_value - step, min_val) elif param["type"] == "float": step = (max_val - min_val) * step_size if direction == "increase": return min(current_value + step, max_val) else: return max(current_value - step, min_val) else: return current_value def _calculate_score(self, metrics: PerformanceMetrics) -> float: """Calculate optimization score (lower is better)""" targets = self.config["optimization_targets"] score = 0.0 # Response time component response_time_target = targets["response_time"]["target_p95"] response_time_weight = targets["response_time"]["weight"] response_time_score = max(0, (metrics.response_time_p95 - response_time_target) / response_time_target) score += response_time_score * response_time_weight # Throughput component throughput_target = targets["throughput"]["target_rps"] throughput_weight = targets["throughput"]["weight"] throughput_score = max(0, (throughput_target - metrics.throughput_rps) / throughput_target) score += throughput_score * throughput_weight # Error rate component (penalty) error_weight = 0.1 error_score = metrics.error_rate * 100 # Convert to percentage score += error_score * error_weight # Resource efficiency component resource_weight = targets.get("resource_efficiency", {}).get("weight", 0.1) cpu_target = targets.get("resource_efficiency", {}).get("target_cpu_utilization", 70) memory_target = targets.get("resource_efficiency", {}).get("target_memory_utilization", 80) cpu_score = abs(metrics.cpu_utilization - cpu_target) / cpu_target memory_score = abs(metrics.memory_utilization - memory_target) / memory_target resource_score = (cpu_score + memory_score) / 2 score += resource_score * resource_weight return score async def apply_configuration(self, configuration: dict[str, Any]) -> bool: """Apply new configuration to the system""" try: logger.info(f"Applying configuration: {configuration}") # This would integrate with: # - Kubernetes API for infrastructure changes # - Application config management for app changes # - Service discovery for routing changes success = True for param_name, param_value in configuration.items(): try: # Determine if this is an application or infrastructure parameter param_type = self._get_parameter_type(param_name) if param_type == "infrastructure": success &= await self._apply_infrastructure_change(param_name, param_value) elif param_type == "application": success &= await self._apply_application_change(param_name, param_value) except Exception as e: logger.error(f"Failed to apply parameter {param_name}: {e}") success = False return success except Exception as e: logger.error(f"Failed to apply configuration: {e}") return False def _get_parameter_type(self, param_name: str) -> str: """Get parameter type (application or infrastructure)""" for param in self.config["tuning_parameters"]["application"]: if param["name"] == param_name: return "application" for param in self.config["tuning_parameters"]["infrastructure"]: if param["name"] == param_name: return "infrastructure" return "unknown" async def _apply_infrastructure_change(self, param_name: str, param_value: Any) -> bool: """Apply infrastructure parameter change""" try: # This would use Kubernetes API to update deployments, services, etc. logger.info(f"Applying infrastructure change: {param_name} = {param_value}") # Mock implementation - would actually patch Kubernetes resources if param_name == "cpu_limit_millicores": # Update pod CPU limits pass elif param_name == "memory_limit_mb": # Update pod memory limits pass elif param_name == "replica_count": # Update deployment replica count pass return True except Exception as e: logger.error(f"Failed to apply infrastructure change {param_name}: {e}") return False async def _apply_application_change(self, param_name: str, param_value: Any) -> bool: """Apply application parameter change""" try: # This would update application configuration logger.info(f"Applying application change: {param_name} = {param_value}") # Mock implementation - would actually update app config if param_name == "worker_threads": # Update worker thread count pass elif param_name == "connection_pool_size": # Update connection pool size pass elif param_name == "cache_size_mb": # Update cache size pass return True except Exception as e: logger.error(f"Failed to apply application change {param_name}: {e}") return False async def run_load_test(self, test_config: dict[str, Any]) -> Optional[PerformanceMetrics]: """Run load test and collect metrics""" try: # This would integrate with load testing tools like k6, Locust, etc. logger.info(f"Running load test: {test_config}") # Mock load test execution await asyncio.sleep(60) # Simulate test duration # Return mock test results return PerformanceMetrics( timestamp=datetime.utcnow(), response_time_p50=85.0, response_time_p95=120.0, response_time_p99=250.0, throughput_rps=1800.0, error_rate=0.005, cpu_utilization=65.0, memory_utilization=75.0, cache_hit_rate=92.0, connection_pool_utilization=70.0, ) except Exception as e: logger.error(f"Failed to run load test: {e}") return None async def should_rollback(self, new_metrics: PerformanceMetrics, baseline_metrics: PerformanceMetrics) -> bool: """Check if rollback should be triggered""" try: # Check response time degradation if new_metrics.response_time_p95 > baseline_metrics.response_time_p95 * (1 + self.rollback_threshold): logger.warning( f"Response time degradation detected: {new_metrics.response_time_p95} vs {baseline_metrics.response_time_p95}" ) return True # Check throughput degradation if new_metrics.throughput_rps < baseline_metrics.throughput_rps * (1 - self.rollback_threshold): logger.warning( f"Throughput degradation detected: {new_metrics.throughput_rps} vs {baseline_metrics.throughput_rps}" ) return True # Check error rate increase if new_metrics.error_rate > baseline_metrics.error_rate + 0.01: # 1% absolute increase logger.warning( f"Error rate increase detected: {new_metrics.error_rate} vs {baseline_metrics.error_rate}" ) return True return False except Exception as e: logger.error(f"Failed to check rollback conditions: {e}") return True # Conservative rollback on error async def rollback_configuration(self) -> bool: """Rollback to previous configuration""" try: logger.info("Rolling back to previous configuration") # This would restore previous configuration from backup # or revert to known good defaults return True except Exception as e: logger.error(f"Failed to rollback configuration: {e}") return False async def start_tuning_loop(self): """Main performance tuning loop""" logger.info("Starting performance tuning loop") while True: try: # Collect current metrics current_metrics = await self.collect_performance_metrics() if not current_metrics: await asyncio.sleep(30) continue # Set baseline if not set if self.baseline_metrics is None: self.baseline_metrics = current_metrics logger.info("Baseline metrics established") # Update Prometheus metrics score = self._calculate_score(current_metrics) OPTIMIZATION_SCORE.set(score) RESOURCE_UTILIZATION.labels(resource="cpu").set(current_metrics.cpu_utilization) RESOURCE_UTILIZATION.labels(resource="memory").set(current_metrics.memory_utilization) # Check if optimization is needed optimization_targets = self.config["optimization_targets"] needs_optimization = ( current_metrics.response_time_p95 > optimization_targets["response_time"]["target_p95"] or current_metrics.throughput_rps < optimization_targets["throughput"]["target_rps"] ) if needs_optimization and self.current_tuning_attempt < self.max_tuning_attempts: logger.info(f"Starting optimization attempt {self.current_tuning_attempt + 1}") with OPTIMIZATION_DURATION.time(): # Get current configuration (mock) current_config = self._generate_random_configuration() # Find optimal configuration optimal_config = await self.optimize_configuration(current_config, current_metrics) if optimal_config: # Apply configuration if await self.apply_configuration(optimal_config): # Wait for changes to take effect await asyncio.sleep(120) # Collect new metrics new_metrics = await self.collect_performance_metrics() if new_metrics: # Check if rollback is needed if await self.should_rollback(new_metrics, current_metrics): await self.rollback_configuration() logger.warning("Configuration rolled back due to performance degradation") TUNING_ATTEMPTS.labels(success="false").inc() else: # Calculate improvement baseline_score = self._calculate_score(current_metrics) new_score = self._calculate_score(new_metrics) improvement = ((baseline_score - new_score) / baseline_score) * 100 if improvement > self.optimization_threshold: logger.info(f"Optimization successful with {improvement:.1f}% improvement") PERFORMANCE_IMPROVEMENT.set(improvement) TUNING_ATTEMPTS.labels(success="true").inc() self.optimization_history.append( OptimizationResult( timestamp=datetime.utcnow(), configuration=optimal_config, baseline_metrics=current_metrics, optimized_metrics=new_metrics, improvement_score=improvement, success=True, recommendation="Keep current configuration", ) ) else: logger.info(f"Optimization insufficient improvement: {improvement:.1f}%") await self.rollback_configuration() TUNING_ATTEMPTS.labels(success="false").inc() else: logger.error("Failed to apply configuration") TUNING_ATTEMPTS.labels(success="false").inc() self.current_tuning_attempt += 1 else: # Reset attempt counter if no optimization needed self.current_tuning_attempt = 0 # Wait for next cycle await asyncio.sleep(self.tuning_interval) except Exception as e: logger.error(f"Error in tuning loop: {e}") await asyncio.sleep(60) async def main(): """Main entry point""" tuner = PerformanceTuningEngine() await tuner.start_tuning_loop() if __name__ == "__main__": asyncio.run(main())