Spaces:
Paused
Paused
| """ | |
| Performance-based Auto-Scaling Implementation | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| import subprocess | |
| import time | |
| from enum import Enum | |
| from typing import Any | |
| import psutil | |
| logger = logging.getLogger(__name__) | |
| class ScalingDirection(Enum): | |
| SCALE_UP = "scale_up" | |
| SCALE_DOWN = "scale_down" | |
| NO_CHANGE = "no_change" | |
| class ScalingDecision: | |
| """Auto-scaling decision engine""" | |
| def __init__(self, redis_client=None): | |
| self.redis = redis_client | |
| self.scaling_enabled = os.getenv("AUTO_SCALING_ENABLED", "true").lower() == "true" | |
| self.min_replicas = int(os.getenv("MIN_REPLICAS", "1")) | |
| self.max_replicas = int(os.getenv("MAX_REPLICAS", "10")) | |
| self.scale_up_threshold = float(os.getenv("SCALE_UP_THRESHOLD", "80.0")) # CPU % | |
| self.scale_down_threshold = float(os.getenv("SCALE_DOWN_THRESHOLD", "20.0")) # CPU % | |
| self.scale_up_memory_threshold = float(os.getenv("SCALE_UP_MEMORY_THRESHOLD", "85.0")) # Memory % | |
| self.scale_down_memory_threshold = float(os.getenv("SCALE_DOWN_MEMORY_THRESHOLD", "30.0")) # Memory % | |
| self.scale_up_cooldown = int(os.getenv("SCALE_UP_COOLDOWN", "300")) # 5 minutes | |
| self.scale_down_cooldown = int(os.getenv("SCALE_DOWN_COOLDOWN", "600")) # 10 minutes | |
| self.last_scaling_action = {} | |
| self.metrics_history = [] | |
| def _get_current_metrics(self) -> dict[str, Any]: | |
| """Get current system and application metrics""" | |
| try: | |
| # System metrics | |
| cpu_percent = psutil.cpu_percent(interval=1) | |
| memory = psutil.virtual_memory() | |
| disk = psutil.disk_usage("/") | |
| # Process metrics for our application | |
| try: | |
| process = psutil.Process() | |
| process_memory = process.memory_info() | |
| process_cpu = process.cpu_percent() | |
| except Exception: | |
| process_memory = process_memory = None | |
| process_cpu = 0 | |
| return { | |
| "cpu_percent": cpu_percent, | |
| "memory_percent": memory.percent, | |
| "memory_available_gb": memory.available / (1024**3), | |
| "memory_used_gb": memory.used / (1024**3), | |
| "disk_usage_percent": disk.percent, | |
| "process_memory_mb": process_memory.rss / (1024**2) if process_memory else 0, | |
| "process_cpu_percent": process_cpu, | |
| "timestamp": time.time(), | |
| "load_average": psutil.getloadavg()[0], # 1-minute load average | |
| } | |
| except Exception as e: | |
| logger.error(f"Error collecting metrics: {e}") | |
| return {} | |
| async def _get_application_metrics(self) -> dict[str, Any]: | |
| """Get application-specific metrics from Redis""" | |
| try: | |
| if not self.redis: | |
| return {} | |
| # Get Redis metrics for application | |
| metrics = {} | |
| # Response time metrics | |
| response_times = await self.redis.lrange("app_response_times", -100) # Last 100 responses | |
| if response_times: | |
| times = [float(rt) for rt in response_times] | |
| metrics["avg_response_time_ms"] = sum(times) / len(times) | |
| metrics["p95_response_time_ms"] = sorted(times)[int(len(times) * 0.95)] | |
| metrics["p99_response_time_ms"] = sorted(times)[int(len(times) * 0.99)] | |
| # Request rate metrics | |
| request_rate = await self.redis.get("app_request_rate") | |
| if request_rate: | |
| metrics["requests_per_second"] = float(request_rate) | |
| # Error rate metrics | |
| error_rate = await self.redis.get("app_error_rate") | |
| if error_rate: | |
| metrics["error_rate_percent"] = float(error_rate) * 100 | |
| # Active connections | |
| active_connections = await self.redis.get("app_active_connections") | |
| if active_connections: | |
| metrics["active_connections"] = int(active_connections) | |
| return metrics | |
| except Exception as e: | |
| logger.error(f"Error getting application metrics: {e}") | |
| return {} | |
| async def _should_scale_up(self, system_metrics: dict[str, Any], app_metrics: dict[str, Any]) -> bool: | |
| """Determine if we should scale up""" | |
| if not self.scaling_enabled: | |
| return False | |
| # Check cooldown | |
| last_scale_up = self.last_scaling_action.get("scale_up", 0) | |
| if time.time() - last_scale_up < self.scale_up_cooldown: | |
| return False | |
| # Check if we're at max replicas | |
| current_replicas = await self._get_current_replica_count() | |
| if current_replicas >= self.max_replicas: | |
| return False | |
| # Check system metrics thresholds | |
| cpu_reason = system_metrics.get("cpu_percent", 0) >= self.scale_up_threshold | |
| memory_reason = system_metrics.get("memory_percent", 0) >= self.scale_up_memory_threshold | |
| load_reason = system_metrics.get("load_average", 0) >= 2.0 | |
| # Check application metrics thresholds | |
| response_time_reason = app_metrics.get("avg_response_time_ms", 0) > 500 # > 500ms average | |
| error_rate_reason = app_metrics.get("error_rate_percent", 0) > 5.0 # > 5% error rate | |
| return cpu_reason or memory_reason or load_reason or response_time_reason or error_rate_reason | |
| async def _should_scale_down(self, system_metrics: dict[str, Any], app_metrics: dict[str, Any]) -> bool: | |
| """Determine if we should scale down""" | |
| if not self.scaling_enabled: | |
| return False | |
| # Check cooldown | |
| last_scale_down = self.last_scaling_action.get("scale_down", 0) | |
| if time.time() - last_scale_down < self.scale_down_cooldown: | |
| return False | |
| # Check if we're at min replicas | |
| current_replicas = await self._get_current_replica_count() | |
| if current_replicas <= self.min_replicas: | |
| return False | |
| # Check system metrics thresholds | |
| cpu_reason = system_metrics.get("cpu_percent", 0) <= self.scale_down_threshold | |
| memory_reason = system_metrics.get("memory_percent", 0) <= self.scale_down_memory_threshold | |
| load_reason = system_metrics.get("load_average", 0) <= 0.5 | |
| # Check application metrics thresholds | |
| response_time_reason = app_metrics.get("avg_response_time_ms", 0) < 100 # < 100ms average | |
| error_rate_reason = app_metrics.get("error_rate_percent", 0) < 1.0 # < 1% error rate | |
| active_connections_reason = app_metrics.get("active_connections", 999) < 50 # Low activity | |
| # Scale down only if multiple conditions are met | |
| conditions_met = sum( | |
| [cpu_reason, memory_reason, load_reason, response_time_reason, error_rate_reason, active_connections_reason] | |
| ) | |
| return conditions_met >= 4 # At least 4 conditions | |
| async def _get_current_replica_count(self) -> int: | |
| """Get current number of running replicas""" | |
| try: | |
| # In a real implementation, this would query the orchestration system | |
| # For now, we'll check Docker containers | |
| result = subprocess.run( | |
| ["docker", "ps", "--filter", "name=backend-service", "--format", "{{.Names}}"], | |
| capture_output=True, | |
| text=True, | |
| ) | |
| if result.returncode == 0: | |
| containers = result.stdout.strip().split("\n") | |
| return len([c for c in containers if c.strip()]) | |
| except Exception as e: | |
| logger.error(f"Error getting replica count: {e}") | |
| return 1 # Default fallback | |
| async def _execute_scaling_action(self, direction: ScalingDirection, target_replicas: int) -> bool: | |
| """Execute scaling action""" | |
| try: | |
| action = { | |
| "direction": direction.value, | |
| "current_replicas": await self._get_current_replica_count(), | |
| "target_replicas": target_replicas, | |
| "timestamp": time.time(), | |
| "metrics": await self._get_current_metrics(), | |
| } | |
| if direction == ScalingDirection.SCALE_UP: | |
| result = await self._scale_up(target_replicas) | |
| self.last_scaling_action["scale_up"] = time.time() | |
| elif direction == ScalingDirection.SCALE_DOWN: | |
| result = await self._scale_down(target_replicas) | |
| self.last_scaling_action["scale_down"] = time.time() | |
| else: | |
| logger.info("No scaling action needed") | |
| return True | |
| if result: | |
| # Store scaling action in Redis | |
| if self.redis: | |
| await self.redis.lpush("scaling_history", json.dumps(action)) | |
| await self.redis.ltrim("scaling_history", 0, 100) # Keep last 100 actions | |
| logger.info(f"Scaling action executed: {direction} to {target_replicas} replicas") | |
| return True | |
| else: | |
| logger.error(f"Scaling action failed: {direction}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error executing scaling action: {e}") | |
| return False | |
| async def _scale_up(self, target_replicas: int) -> bool: | |
| """Scale up to target number of replicas""" | |
| try: | |
| # In a real implementation, this would orchestrate with Kubernetes/ECS/etc. | |
| # For Docker Compose, we'll simulate the action | |
| logger.info(f"Scaling up to {target_replicas} replicas") | |
| # Simulate scaling action (in production, this would be real) | |
| current_replicas = await self._get_current_replica_count() | |
| if target_replicas > current_replicas: | |
| # This would typically call the orchestration API | |
| # docker-compose up --scale backend-service=target_replicas | |
| result = subprocess.run( | |
| ["docker-compose", "up", "--scale", f"backend-service={target_replicas}", "-d"], | |
| capture_output=True, | |
| text=True, | |
| ) | |
| return result.returncode == 0 | |
| except Exception as e: | |
| logger.error(f"Error scaling up: {e}") | |
| return False | |
| async def _scale_down(self, target_replicas: int) -> bool: | |
| """Scale down to target number of replicas""" | |
| try: | |
| # In a real implementation, this would orchestrate with Kubernetes/ECS/etc. | |
| logger.info(f"Scaling down to {target_replicas} replicas") | |
| current_replicas = await self._get_current_replica_count() | |
| if target_replicas < current_replicas: | |
| # This would typically call the orchestration API | |
| # docker-compose up --scale backend-service=target_replicas | |
| result = subprocess.run( | |
| ["docker-compose", "up", "--scale", f"backend-service={target_replicas}", "-d"], | |
| capture_output=True, | |
| text=True, | |
| ) | |
| return result.returncode == 0 | |
| except Exception as e: | |
| logger.error(f"Error scaling down: {e}") | |
| return False | |
| async def make_scaling_decision(self) -> dict[str, Any]: | |
| """Make auto-scaling decision based on metrics""" | |
| if not self.scaling_enabled: | |
| return { | |
| "decision": ScalingDirection.NO_CHANGE, | |
| "reason": "Auto-scaling disabled", | |
| "current_replicas": await self._get_current_replica_count(), | |
| } | |
| try: | |
| # Get current metrics | |
| system_metrics = self._get_current_metrics() | |
| app_metrics = await self._get_application_metrics() | |
| current_replicas = await self._get_current_replica_count() | |
| # Add to history | |
| self.metrics_history.append({**system_metrics, **app_metrics, "timestamp": time.time()}) | |
| # Keep only last 100 entries | |
| if len(self.metrics_history) > 100: | |
| self.metrics_history = self.metrics_history[-100:] | |
| # Make decision | |
| if await self._should_scale_up(system_metrics, app_metrics): | |
| decision = ScalingDirection.SCALE_UP | |
| target_replicas = min(current_replicas + 1, self.max_replicas) | |
| reason = "High CPU/Memory/Response time detected" | |
| elif await self._should_scale_down(system_metrics, app_metrics): | |
| decision = ScalingDirection.SCALE_DOWN | |
| target_replicas = max(current_replicas - 1, self.min_replicas) | |
| reason = "Low resource utilization detected" | |
| else: | |
| decision = ScalingDirection.NO_CHANGE | |
| target_replicas = current_replicas | |
| reason = "Resource utilization within normal range" | |
| scaling_decision = { | |
| "decision": decision.value, | |
| "current_replicas": current_replicas, | |
| "target_replicas": target_replicas, | |
| "reason": reason, | |
| "metrics": {"system": system_metrics, "application": app_metrics}, | |
| "timestamp": time.time(), | |
| } | |
| # Store decision in Redis | |
| if self.redis: | |
| await self.redis.setex( | |
| "last_scaling_decision", | |
| 300, # 5 minutes TTL | |
| json.dumps(scaling_decision), | |
| ) | |
| return scaling_decision | |
| except Exception as e: | |
| logger.error(f"Error making scaling decision: {e}") | |
| return { | |
| "decision": ScalingDirection.NO_CHANGE, | |
| "reason": f"Error: {e}", | |
| "current_replicas": await self._get_current_replica_count(), | |
| } | |
| async def get_scaling_metrics(self) -> dict[str, Any]: | |
| """Get comprehensive auto-scaling metrics""" | |
| try: | |
| # Get current decision | |
| if self.redis: | |
| last_decision = await self.redis.get("last_scaling_decision") | |
| if last_decision: | |
| last_decision = json.loads(last_decision) | |
| else: | |
| last_decision = {} | |
| # Get scaling history | |
| if self.redis: | |
| scaling_history = await self.redis.lrange("scaling_history", -20) # Last 20 actions | |
| history = [json.loads(action) for action in scaling_history if action] | |
| else: | |
| history = [] | |
| current_metrics = self._get_current_metrics() | |
| app_metrics = await self._get_application_metrics() | |
| return { | |
| "current_metrics": current_metrics, | |
| "application_metrics": app_metrics, | |
| "last_decision": last_decision, | |
| "scaling_history": history, | |
| "configuration": { | |
| "auto_scaling_enabled": self.scaling_enabled, | |
| "min_replicas": self.min_replicas, | |
| "max_replicas": self.max_replicas, | |
| "scale_up_threshold": self.scale_up_threshold, | |
| "scale_down_threshold": self.scale_down_threshold, | |
| "scale_up_cooldown": self.scale_up_cooldown, | |
| "scale_down_cooldown": self.scale_down_cooldown, | |
| }, | |
| "current_replicas": await self._get_current_replica_count(), | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting scaling metrics: {e}") | |
| return {"error": str(e)} | |
| async def start_auto_scaling(self): | |
| """Start continuous auto-scaling loop""" | |
| async def scaling_loop(): | |
| while True: | |
| try: | |
| decision = await self.make_scaling_decision() | |
| if decision["decision"] != ScalingDirection.NO_CHANGE.value: | |
| # Execute scaling action | |
| success = await self._execute_scaling_action( | |
| ScalingDirection(decision["decision"]), decision["target_replicas"] | |
| ) | |
| if success: | |
| logger.info(f"Auto-scaling action completed: {decision['decision']}") | |
| else: | |
| logger.error(f"Auto-scaling action failed: {decision['decision']}") | |
| # Wait before next decision | |
| await asyncio.sleep(60) # Check every minute | |
| except Exception as e: | |
| logger.error(f"Auto-scaling loop error: {e}") | |
| await asyncio.sleep(60) | |
| # Start auto-scaling loop | |
| asyncio.create_task(scaling_loop()) | |
| logger.info("Auto-scaling monitoring started") | |
| async def manual_scale(self, target_replicas: int) -> bool: | |
| """Manual scaling to specific replica count""" | |
| try: | |
| target_replicas = max(self.min_replicas, min(target_replicas, self.max_replicas)) | |
| current_replicas = await self._get_current_replica_count() | |
| if target_replicas == current_replicas: | |
| logger.info(f"Already at target replica count: {target_replicas}") | |
| return True | |
| if target_replicas > current_replicas: | |
| direction = ScalingDirection.SCALE_UP | |
| else: | |
| direction = ScalingDirection.SCALE_DOWN | |
| return await self._execute_scaling_action(direction, target_replicas) | |
| except Exception as e: | |
| logger.error(f"Manual scaling failed: {e}") | |
| return False | |
| class AutoScalingManager: | |
| """Comprehensive auto-scaling manager""" | |
| def __init__(self, redis_client=None): | |
| self.redis = redis_client | |
| self.decision_engine = ScalingDecision(redis_client) | |
| self.is_running = False | |
| self.monitoring_task = None | |
| async def start_monitoring(self): | |
| """Start auto-scaling monitoring""" | |
| if not self.is_running: | |
| self.is_running = True | |
| await self.decision_engine.start_auto_scaling() | |
| logger.info("Auto-scaling manager started") | |
| async def stop_monitoring(self): | |
| """Stop auto-scaling monitoring""" | |
| self.is_running = False | |
| if self.monitoring_task: | |
| self.monitoring_task.cancel() | |
| logger.info("Auto-scaling manager stopped") | |
| async def get_status(self) -> dict[str, Any]: | |
| """Get auto-scaling status""" | |
| return await self.decision_engine.get_scaling_metrics() | |
| async def scale_to(self, target_replicas: int) -> bool: | |
| """Manual scaling to target replicas""" | |
| return await self.decision_engine.manual_scale(target_replicas) | |
| # Global auto-scaling manager instance | |
| auto_scaling_manager = AutoScalingManager() | |
| # Auto-scaling initialization function | |
| async def initialize_auto_scaling(redis_client=None): | |
| """Initialize auto-scaling system""" | |
| manager = AutoScalingManager(redis_client) | |
| await manager.start_monitoring() | |
| logger.info("Auto-scaling system initialized") | |
| return manager | |