#!/usr/bin/env python3 """ ML-Powered Deployment Optimization Engine Optimizes deployment strategies using machine learning predictions """ import asyncio import logging import os from dataclasses import dataclass from datetime import datetime from typing import Any, Optional import aiohttp import joblib import numpy as np from prometheus_client import Counter, Gauge, Histogram # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Prometheus Metrics PREDICTION_ACCURACY = Gauge("ml_deployment_prediction_accuracy", "Model prediction accuracy") OPTIMIZATION_COUNT = Counter("ml_deployment_optimizations_total", "Total optimizations performed") OPTIMIZATION_DURATION = Histogram("ml_deployment_optimization_duration_seconds", "Optimization duration") RESOURCE_SAVINGS = Gauge("ml_deployment_resource_savings_percent", "Resource savings percentage") SUCCESS_RATE_IMPROVEMENT = Gauge("ml_deployment_success_rate_improvement", "Success rate improvement percentage") @dataclass class DeploymentMetrics: """Deployment performance metrics""" deployment_id: str timestamp: datetime cpu_utilization: float memory_usage: float request_rate: float error_rate: float response_time_p95: float deployment_size: int complexity_score: float test_coverage: float success: bool promotion_time: Optional[float] = None @dataclass class OptimizationRecommendation: """Optimization recommendation""" deployment_id: str strategy: str rollout_percentage: int promotion_delay: int resource_adjustments: dict[str, Any] confidence_score: float expected_improvement: dict[str, float] risk_assessment: str class MLOptimizer: """ML-powered deployment optimization engine""" def __init__(self, config_path: str = "/config/optimization-config.yaml"): self.config = self._load_config(config_path) self.models = {} self.scalers = {} self.prometheus_url = os.getenv("PROMETHEUS_URL", "http://prometheus.monitoring.svc.cluster.local:9090") self.redis_url = os.getenv( "REDIS_URL", "redis://redis-cluster.zenith-production.svc.cluster.local:6379", ) self.model_path = os.getenv("MODEL_PATH", "/models") # Load pre-trained models self._load_models() # Initialize HTTP session self.session = aiohttp.ClientSession() def _load_config(self, config_path: str) -> Dict: """Load optimization configuration""" try: import yaml with open(config_path, "r") as f: return yaml.safe_load(f) except Exception as e: logger.error(f"Failed to load config: {e}") return self._default_config() def _default_config(self) -> Dict: """Default configuration""" return { "models": { "deployment_success": {"type": "gradient_boosting"}, "performance_prediction": {"type": "lstm"}, "resource_optimization": {"type": "neural_network"}, }, "optimization_strategies": { "rollout_strategy": { "enabled": True, "max_rollout_percentage": 20, "min_promotion_delay": 300, } }, } def _load_models(self): """Load pre-trained ML models""" model_files = { "deployment_success": "deployment_success_latest.pkl", "performance_prediction": "performance_prediction_latest.pkl", "resource_optimization": "resource_optimization_latest.pkl", } for model_name, filename in model_files.items(): try: model_path = os.path.join(self.model_path, filename) if os.path.exists(model_path): self.models[model_name] = joblib.load(model_path) logger.info(f"Loaded model: {model_name}") else: logger.warning(f"Model file not found: {model_path}") except Exception as e: logger.error(f"Failed to load model {model_name}: {e}") async def collect_deployment_metrics(self, deployment_id: str) -> Optional[DeploymentMetrics]: """Collect current deployment metrics from Prometheus""" try: # Query Prometheus for deployment metrics queries = { "cpu_utilization": f'avg(rate(container_cpu_usage_seconds_total{{deployment="{deployment_id}"}}[5m])) * 100', "memory_usage": f'avg(container_memory_usage_bytes{{deployment="{deployment_id}"}}) / (1024*1024*1024)', "request_rate": f'sum(rate(http_requests_total{{deployment="{deployment_id}"}}[5m]))', "error_rate": f'sum(rate(http_requests_total{{deployment="{deployment_id}",status=~"5.."}}[5m])) / sum(rate(http_requests_total{{deployment="{deployment_id}"}}[5m]))', "response_time_p95": f'histogram_quantile(0.95, sum(rate(http_request_duration_seconds_bucket{{deployment="{deployment_id}"}}[5m])) by (le))', } metrics = {} for metric_name, query in queries.items(): result = await self._query_prometheus(query) metrics[metric_name] = result if result is not None else 0.0 # Get deployment metadata deployment_info = await self._get_deployment_info(deployment_id) return DeploymentMetrics( deployment_id=deployment_id, timestamp=datetime.utcnow(), cpu_utilization=metrics.get("cpu_utilization", 0.0), memory_usage=metrics.get("memory_usage", 0.0), request_rate=metrics.get("request_rate", 0.0), error_rate=metrics.get("error_rate", 0.0), response_time_p95=metrics.get("response_time_p95", 0.0), deployment_size=deployment_info.get("size", 1), complexity_score=deployment_info.get("complexity", 0.5), test_coverage=deployment_info.get("test_coverage", 0.8), success=True, # Assume success for now ) except Exception as e: logger.error(f"Failed to collect metrics for {deployment_id}: {e}") return None async def _query_prometheus(self, query: str) -> Optional[float]: """Query Prometheus for metric value""" try: async with self.session.get( f"{self.prometheus_url}/api/v1/query", params={"query": query}, timeout=10, ) as response: if response.status == 200: data = await response.json() if data["data"]["result"]: return float(data["data"]["result"][0]["value"][1]) return None except Exception as e: logger.error(f"Prometheus query failed: {e}") return None async def _get_deployment_info(self, deployment_id: str) -> Dict: """Get deployment metadata from Kubernetes API""" try: # This would integrate with Kubernetes API # For now, return mock data return {"size": 3, "complexity": 0.6, "test_coverage": 0.85} except Exception as e: logger.error(f"Failed to get deployment info: {e}") return {"size": 1, "complexity": 0.5, "test_coverage": 0.8} async def predict_deployment_success(self, metrics: DeploymentMetrics) -> tuple[float, Dict]: """Predict deployment success probability""" try: if "deployment_success" not in self.models: logger.warning("Deployment success model not available") return 0.5, {"confidence": 0.1} model = self.models["deployment_success"] # Prepare features features = np.array( [ [ metrics.cpu_utilization, metrics.memory_usage, metrics.request_rate, metrics.error_rate, metrics.response_time_p95, metrics.deployment_size, metrics.complexity_score, metrics.test_coverage, ] ] ) # Predict success probability success_probability = model.predict_proba(features)[0][1] # Get feature importance if hasattr(model, "feature_importances_"): feature_names = [ "cpu", "memory", "request_rate", "error_rate", "response_time", "size", "complexity", "test_coverage", ] importance = dict(zip(feature_names, model.feature_importances_)) else: importance = {} return success_probability, importance except Exception as e: logger.error(f"Failed to predict deployment success: {e}") return 0.5, {} async def predict_performance(self, metrics: DeploymentMetrics) -> tuple[float, float]: """Predict performance metrics (response time and throughput)""" try: # Simple heuristic-based prediction for now base_response_time = 50 # ms cpu_impact = metrics.cpu_utilization * 0.5 memory_impact = metrics.memory_usage * 0.1 complexity_impact = metrics.complexity_score * 20 predicted_response_time = base_response_time + cpu_impact + memory_impact + complexity_impact # Predict throughput base_throughput = 1000 # requests/sec throughput_factor = 1.0 - (metrics.error_rate * 10) - (metrics.complexity_score * 0.2) predicted_throughput = base_throughput * max(0.1, throughput_factor) return predicted_response_time, predicted_throughput except Exception as e: logger.error(f"Failed to predict performance: {e}") return 100.0, 500.0 async def optimize_resources(self, metrics: DeploymentMetrics) -> dict[str, Any]: """Optimize resource allocation""" try: # Default CPU limit for cost comparison current_cpu_limit = 2000 # millicores # Optimize based on utilization optimal_cpu = max(100, int(metrics.cpu_utilization * 1.5)) # 50% buffer optimal_memory = max(0.5, metrics.memory_usage * 1.2) # 20% buffer # Optimize replica count based on request rate optimal_replicas = max(2, int(metrics.request_rate / 1000)) # 1000 req/sec per replica return { "cpu_limit": optimal_cpu, "memory_limit": optimal_memory, "replica_count": optimal_replicas, "cost_savings_percent": max(0, ((current_cpu_limit - optimal_cpu) / current_cpu_limit) * 100), } except Exception as e: logger.error(f"Failed to optimize resources: {e}") return {} async def generate_optimization_recommendation( self, deployment_id: str, current_metrics: DeploymentMetrics ) -> OptimizationRecommendation: """Generate comprehensive optimization recommendation""" with OPTIMIZATION_DURATION.time(): try: # Predict deployment success ( success_prob, feature_importance, ) = await self.predict_deployment_success(current_metrics) # Predict performance ( predicted_response_time, predicted_throughput, ) = await self.predict_performance(current_metrics) # Optimize resources resource_optimization = await self.optimize_resources(current_metrics) # Determine optimal deployment strategy strategy, rollout_percentage, promotion_delay = self._determine_strategy(success_prob, current_metrics) # Calculate expected improvements expected_improvements = self._calculate_improvements( current_metrics, predicted_response_time, predicted_throughput, resource_optimization, ) # Assess risk risk_assessment = self._assess_risk(success_prob, current_metrics.complexity_score) recommendation = OptimizationRecommendation( deployment_id=deployment_id, strategy=strategy, rollout_percentage=rollout_percentage, promotion_delay=promotion_delay, resource_adjustments=resource_optimization, confidence_score=min(0.95, success_prob + 0.1), expected_improvement=expected_improvements, risk_assessment=risk_assessment, ) # Update metrics OPTIMIZATION_COUNT.inc() if "cost_savings_percent" in resource_optimization: RESOURCE_SAVINGS.set(resource_optimization["cost_savings_percent"]) return recommendation except Exception as e: logger.error(f"Failed to generate recommendation for {deployment_id}: {e}") return self._default_recommendation(deployment_id) def _determine_strategy(self, success_prob: float, metrics: DeploymentMetrics) -> tuple[str, int, int]: """Determine optimal deployment strategy""" max_rollout = self.config["optimization_strategies"]["rollout_strategy"]["max_rollout_percentage"] min_delay = self.config["optimization_strategies"]["rollout_strategy"]["min_promotion_delay"] if success_prob > 0.9 and metrics.complexity_score < 0.5: return "canary", 10, min_delay elif success_prob > 0.8 and metrics.error_rate < 0.01: return "rolling", 25, min_delay elif success_prob > 0.7: return "blue_green", max_rollout, min_delay * 2 else: return "blue_green", 5, min_delay * 3 def _calculate_improvements( self, current: DeploymentMetrics, predicted_response_time: float, predicted_throughput: float, resource_optimization: Dict, ) -> dict[str, float]: """Calculate expected improvements""" improvements = {} # Performance improvement if current.response_time_p95 > 0: performance_improvement = ( (current.response_time_p95 - predicted_response_time) / current.response_time_p95 ) * 100 improvements["performance_percent"] = max(-100, performance_improvement) # Throughput improvement current_throughput = current.request_rate * (1 - current.error_rate) if current_throughput > 0: throughput_improvement = ((predicted_throughput - current_throughput) / current_throughput) * 100 improvements["throughput_percent"] = max(-100, throughput_improvement) # Cost improvement if "cost_savings_percent" in resource_optimization: improvements["cost_savings_percent"] = resource_optimization["cost_savings_percent"] return improvements def _assess_risk(self, success_prob: float, complexity: float) -> str: """Assess deployment risk level""" risk_score = 1.0 - success_prob + complexity * 0.5 if risk_score > 0.7: return "HIGH" elif risk_score > 0.4: return "MEDIUM" else: return "LOW" def _default_recommendation(self, deployment_id: str) -> OptimizationRecommendation: """Default recommendation when optimization fails""" return OptimizationRecommendation( deployment_id=deployment_id, strategy="rolling", rollout_percentage=25, promotion_delay=300, resource_adjustments={}, confidence_score=0.5, expected_improvement={}, risk_assessment="MEDIUM", ) async def apply_optimization(self, recommendation: OptimizationRecommendation) -> bool: """Apply optimization recommendations to deployment""" try: logger.info(f"Applying optimization to {recommendation.deployment_id}") # This would integrate with Kubernetes API to apply changes # For now, just log the recommendation logger.info(f"Strategy: {recommendation.strategy}") logger.info(f"Rollout percentage: {recommendation.rollout_percentage}%") logger.info(f"Promotion delay: {recommendation.promotion_delay}s") logger.info(f"Resource adjustments: {recommendation.resource_adjustments}") logger.info(f"Confidence: {recommendation.confidence_score:.2f}") logger.info(f"Risk: {recommendation.risk_assessment}") return True except Exception as e: logger.error(f"Failed to apply optimization: {e}") return False async def start_optimization_loop(self): """Main optimization loop""" logger.info("Starting ML deployment optimization loop") while True: try: # Get active deployments deployments = await self._get_active_deployments() for deployment_id in deployments: # Collect current metrics metrics = await self.collect_deployment_metrics(deployment_id) if metrics: # Generate recommendation recommendation = await self.generate_optimization_recommendation(deployment_id, metrics) # Apply if confidence is high enough if recommendation.confidence_score > 0.8: await self.apply_optimization(recommendation) # Wait for next iteration await asyncio.sleep(int(os.getenv("OPTIMIZATION_INTERVAL", "300"))) except Exception as e: logger.error(f"Error in optimization loop: {e}") await asyncio.sleep(60) async def _get_active_deployments(self) -> list[str]: """Get list of active deployments""" # This would query Kubernetes API for active deployments # For now, return mock data return ["zenith-api-prod", "zenith-api-staging"] async def main(): """Main entry point""" optimizer = MLOptimizer() await optimizer.start_optimization_loop() if __name__ == "__main__": asyncio.run(main())