zenith-backend / app /infrastructure /auto_scaling.py
teoat's picture
fix(backend): fix port and health check robustness
d29a5a0 verified
"""
Performance-based Auto-Scaling Implementation
"""
import asyncio
import json
import logging
import os
import subprocess
import time
from enum import Enum
from typing import Any
import psutil
logger = logging.getLogger(__name__)
class ScalingDirection(Enum):
SCALE_UP = "scale_up"
SCALE_DOWN = "scale_down"
NO_CHANGE = "no_change"
class ScalingDecision:
"""Auto-scaling decision engine"""
def __init__(self, redis_client=None):
self.redis = redis_client
self.scaling_enabled = os.getenv("AUTO_SCALING_ENABLED", "true").lower() == "true"
self.min_replicas = int(os.getenv("MIN_REPLICAS", "1"))
self.max_replicas = int(os.getenv("MAX_REPLICAS", "10"))
self.scale_up_threshold = float(os.getenv("SCALE_UP_THRESHOLD", "80.0")) # CPU %
self.scale_down_threshold = float(os.getenv("SCALE_DOWN_THRESHOLD", "20.0")) # CPU %
self.scale_up_memory_threshold = float(os.getenv("SCALE_UP_MEMORY_THRESHOLD", "85.0")) # Memory %
self.scale_down_memory_threshold = float(os.getenv("SCALE_DOWN_MEMORY_THRESHOLD", "30.0")) # Memory %
self.scale_up_cooldown = int(os.getenv("SCALE_UP_COOLDOWN", "300")) # 5 minutes
self.scale_down_cooldown = int(os.getenv("SCALE_DOWN_COOLDOWN", "600")) # 10 minutes
self.last_scaling_action = {}
self.metrics_history = []
def _get_current_metrics(self) -> dict[str, Any]:
"""Get current system and application metrics"""
try:
# System metrics
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage("/")
# Process metrics for our application
try:
process = psutil.Process()
process_memory = process.memory_info()
process_cpu = process.cpu_percent()
except Exception:
process_memory = process_memory = None
process_cpu = 0
return {
"cpu_percent": cpu_percent,
"memory_percent": memory.percent,
"memory_available_gb": memory.available / (1024**3),
"memory_used_gb": memory.used / (1024**3),
"disk_usage_percent": disk.percent,
"process_memory_mb": process_memory.rss / (1024**2) if process_memory else 0,
"process_cpu_percent": process_cpu,
"timestamp": time.time(),
"load_average": psutil.getloadavg()[0], # 1-minute load average
}
except Exception as e:
logger.error(f"Error collecting metrics: {e}")
return {}
async def _get_application_metrics(self) -> dict[str, Any]:
"""Get application-specific metrics from Redis"""
try:
if not self.redis:
return {}
# Get Redis metrics for application
metrics = {}
# Response time metrics
response_times = await self.redis.lrange("app_response_times", -100) # Last 100 responses
if response_times:
times = [float(rt) for rt in response_times]
metrics["avg_response_time_ms"] = sum(times) / len(times)
metrics["p95_response_time_ms"] = sorted(times)[int(len(times) * 0.95)]
metrics["p99_response_time_ms"] = sorted(times)[int(len(times) * 0.99)]
# Request rate metrics
request_rate = await self.redis.get("app_request_rate")
if request_rate:
metrics["requests_per_second"] = float(request_rate)
# Error rate metrics
error_rate = await self.redis.get("app_error_rate")
if error_rate:
metrics["error_rate_percent"] = float(error_rate) * 100
# Active connections
active_connections = await self.redis.get("app_active_connections")
if active_connections:
metrics["active_connections"] = int(active_connections)
return metrics
except Exception as e:
logger.error(f"Error getting application metrics: {e}")
return {}
async def _should_scale_up(self, system_metrics: dict[str, Any], app_metrics: dict[str, Any]) -> bool:
"""Determine if we should scale up"""
if not self.scaling_enabled:
return False
# Check cooldown
last_scale_up = self.last_scaling_action.get("scale_up", 0)
if time.time() - last_scale_up < self.scale_up_cooldown:
return False
# Check if we're at max replicas
current_replicas = await self._get_current_replica_count()
if current_replicas >= self.max_replicas:
return False
# Check system metrics thresholds
cpu_reason = system_metrics.get("cpu_percent", 0) >= self.scale_up_threshold
memory_reason = system_metrics.get("memory_percent", 0) >= self.scale_up_memory_threshold
load_reason = system_metrics.get("load_average", 0) >= 2.0
# Check application metrics thresholds
response_time_reason = app_metrics.get("avg_response_time_ms", 0) > 500 # > 500ms average
error_rate_reason = app_metrics.get("error_rate_percent", 0) > 5.0 # > 5% error rate
return cpu_reason or memory_reason or load_reason or response_time_reason or error_rate_reason
async def _should_scale_down(self, system_metrics: dict[str, Any], app_metrics: dict[str, Any]) -> bool:
"""Determine if we should scale down"""
if not self.scaling_enabled:
return False
# Check cooldown
last_scale_down = self.last_scaling_action.get("scale_down", 0)
if time.time() - last_scale_down < self.scale_down_cooldown:
return False
# Check if we're at min replicas
current_replicas = await self._get_current_replica_count()
if current_replicas <= self.min_replicas:
return False
# Check system metrics thresholds
cpu_reason = system_metrics.get("cpu_percent", 0) <= self.scale_down_threshold
memory_reason = system_metrics.get("memory_percent", 0) <= self.scale_down_memory_threshold
load_reason = system_metrics.get("load_average", 0) <= 0.5
# Check application metrics thresholds
response_time_reason = app_metrics.get("avg_response_time_ms", 0) < 100 # < 100ms average
error_rate_reason = app_metrics.get("error_rate_percent", 0) < 1.0 # < 1% error rate
active_connections_reason = app_metrics.get("active_connections", 999) < 50 # Low activity
# Scale down only if multiple conditions are met
conditions_met = sum(
[cpu_reason, memory_reason, load_reason, response_time_reason, error_rate_reason, active_connections_reason]
)
return conditions_met >= 4 # At least 4 conditions
async def _get_current_replica_count(self) -> int:
"""Get current number of running replicas"""
try:
# In a real implementation, this would query the orchestration system
# For now, we'll check Docker containers
result = subprocess.run(
["docker", "ps", "--filter", "name=backend-service", "--format", "{{.Names}}"],
capture_output=True,
text=True,
)
if result.returncode == 0:
containers = result.stdout.strip().split("\n")
return len([c for c in containers if c.strip()])
except Exception as e:
logger.error(f"Error getting replica count: {e}")
return 1 # Default fallback
async def _execute_scaling_action(self, direction: ScalingDirection, target_replicas: int) -> bool:
"""Execute scaling action"""
try:
action = {
"direction": direction.value,
"current_replicas": await self._get_current_replica_count(),
"target_replicas": target_replicas,
"timestamp": time.time(),
"metrics": await self._get_current_metrics(),
}
if direction == ScalingDirection.SCALE_UP:
result = await self._scale_up(target_replicas)
self.last_scaling_action["scale_up"] = time.time()
elif direction == ScalingDirection.SCALE_DOWN:
result = await self._scale_down(target_replicas)
self.last_scaling_action["scale_down"] = time.time()
else:
logger.info("No scaling action needed")
return True
if result:
# Store scaling action in Redis
if self.redis:
await self.redis.lpush("scaling_history", json.dumps(action))
await self.redis.ltrim("scaling_history", 0, 100) # Keep last 100 actions
logger.info(f"Scaling action executed: {direction} to {target_replicas} replicas")
return True
else:
logger.error(f"Scaling action failed: {direction}")
return False
except Exception as e:
logger.error(f"Error executing scaling action: {e}")
return False
async def _scale_up(self, target_replicas: int) -> bool:
"""Scale up to target number of replicas"""
try:
# In a real implementation, this would orchestrate with Kubernetes/ECS/etc.
# For Docker Compose, we'll simulate the action
logger.info(f"Scaling up to {target_replicas} replicas")
# Simulate scaling action (in production, this would be real)
current_replicas = await self._get_current_replica_count()
if target_replicas > current_replicas:
# This would typically call the orchestration API
# docker-compose up --scale backend-service=target_replicas
result = subprocess.run(
["docker-compose", "up", "--scale", f"backend-service={target_replicas}", "-d"],
capture_output=True,
text=True,
)
return result.returncode == 0
except Exception as e:
logger.error(f"Error scaling up: {e}")
return False
async def _scale_down(self, target_replicas: int) -> bool:
"""Scale down to target number of replicas"""
try:
# In a real implementation, this would orchestrate with Kubernetes/ECS/etc.
logger.info(f"Scaling down to {target_replicas} replicas")
current_replicas = await self._get_current_replica_count()
if target_replicas < current_replicas:
# This would typically call the orchestration API
# docker-compose up --scale backend-service=target_replicas
result = subprocess.run(
["docker-compose", "up", "--scale", f"backend-service={target_replicas}", "-d"],
capture_output=True,
text=True,
)
return result.returncode == 0
except Exception as e:
logger.error(f"Error scaling down: {e}")
return False
async def make_scaling_decision(self) -> dict[str, Any]:
"""Make auto-scaling decision based on metrics"""
if not self.scaling_enabled:
return {
"decision": ScalingDirection.NO_CHANGE,
"reason": "Auto-scaling disabled",
"current_replicas": await self._get_current_replica_count(),
}
try:
# Get current metrics
system_metrics = self._get_current_metrics()
app_metrics = await self._get_application_metrics()
current_replicas = await self._get_current_replica_count()
# Add to history
self.metrics_history.append({**system_metrics, **app_metrics, "timestamp": time.time()})
# Keep only last 100 entries
if len(self.metrics_history) > 100:
self.metrics_history = self.metrics_history[-100:]
# Make decision
if await self._should_scale_up(system_metrics, app_metrics):
decision = ScalingDirection.SCALE_UP
target_replicas = min(current_replicas + 1, self.max_replicas)
reason = "High CPU/Memory/Response time detected"
elif await self._should_scale_down(system_metrics, app_metrics):
decision = ScalingDirection.SCALE_DOWN
target_replicas = max(current_replicas - 1, self.min_replicas)
reason = "Low resource utilization detected"
else:
decision = ScalingDirection.NO_CHANGE
target_replicas = current_replicas
reason = "Resource utilization within normal range"
scaling_decision = {
"decision": decision.value,
"current_replicas": current_replicas,
"target_replicas": target_replicas,
"reason": reason,
"metrics": {"system": system_metrics, "application": app_metrics},
"timestamp": time.time(),
}
# Store decision in Redis
if self.redis:
await self.redis.setex(
"last_scaling_decision",
300, # 5 minutes TTL
json.dumps(scaling_decision),
)
return scaling_decision
except Exception as e:
logger.error(f"Error making scaling decision: {e}")
return {
"decision": ScalingDirection.NO_CHANGE,
"reason": f"Error: {e}",
"current_replicas": await self._get_current_replica_count(),
}
async def get_scaling_metrics(self) -> dict[str, Any]:
"""Get comprehensive auto-scaling metrics"""
try:
# Get current decision
if self.redis:
last_decision = await self.redis.get("last_scaling_decision")
if last_decision:
last_decision = json.loads(last_decision)
else:
last_decision = {}
# Get scaling history
if self.redis:
scaling_history = await self.redis.lrange("scaling_history", -20) # Last 20 actions
history = [json.loads(action) for action in scaling_history if action]
else:
history = []
current_metrics = self._get_current_metrics()
app_metrics = await self._get_application_metrics()
return {
"current_metrics": current_metrics,
"application_metrics": app_metrics,
"last_decision": last_decision,
"scaling_history": history,
"configuration": {
"auto_scaling_enabled": self.scaling_enabled,
"min_replicas": self.min_replicas,
"max_replicas": self.max_replicas,
"scale_up_threshold": self.scale_up_threshold,
"scale_down_threshold": self.scale_down_threshold,
"scale_up_cooldown": self.scale_up_cooldown,
"scale_down_cooldown": self.scale_down_cooldown,
},
"current_replicas": await self._get_current_replica_count(),
}
except Exception as e:
logger.error(f"Error getting scaling metrics: {e}")
return {"error": str(e)}
async def start_auto_scaling(self):
"""Start continuous auto-scaling loop"""
async def scaling_loop():
while True:
try:
decision = await self.make_scaling_decision()
if decision["decision"] != ScalingDirection.NO_CHANGE.value:
# Execute scaling action
success = await self._execute_scaling_action(
ScalingDirection(decision["decision"]), decision["target_replicas"]
)
if success:
logger.info(f"Auto-scaling action completed: {decision['decision']}")
else:
logger.error(f"Auto-scaling action failed: {decision['decision']}")
# Wait before next decision
await asyncio.sleep(60) # Check every minute
except Exception as e:
logger.error(f"Auto-scaling loop error: {e}")
await asyncio.sleep(60)
# Start auto-scaling loop
asyncio.create_task(scaling_loop())
logger.info("Auto-scaling monitoring started")
async def manual_scale(self, target_replicas: int) -> bool:
"""Manual scaling to specific replica count"""
try:
target_replicas = max(self.min_replicas, min(target_replicas, self.max_replicas))
current_replicas = await self._get_current_replica_count()
if target_replicas == current_replicas:
logger.info(f"Already at target replica count: {target_replicas}")
return True
if target_replicas > current_replicas:
direction = ScalingDirection.SCALE_UP
else:
direction = ScalingDirection.SCALE_DOWN
return await self._execute_scaling_action(direction, target_replicas)
except Exception as e:
logger.error(f"Manual scaling failed: {e}")
return False
class AutoScalingManager:
"""Comprehensive auto-scaling manager"""
def __init__(self, redis_client=None):
self.redis = redis_client
self.decision_engine = ScalingDecision(redis_client)
self.is_running = False
self.monitoring_task = None
async def start_monitoring(self):
"""Start auto-scaling monitoring"""
if not self.is_running:
self.is_running = True
await self.decision_engine.start_auto_scaling()
logger.info("Auto-scaling manager started")
async def stop_monitoring(self):
"""Stop auto-scaling monitoring"""
self.is_running = False
if self.monitoring_task:
self.monitoring_task.cancel()
logger.info("Auto-scaling manager stopped")
async def get_status(self) -> dict[str, Any]:
"""Get auto-scaling status"""
return await self.decision_engine.get_scaling_metrics()
async def scale_to(self, target_replicas: int) -> bool:
"""Manual scaling to target replicas"""
return await self.decision_engine.manual_scale(target_replicas)
# Global auto-scaling manager instance
auto_scaling_manager = AutoScalingManager()
# Auto-scaling initialization function
async def initialize_auto_scaling(redis_client=None):
"""Initialize auto-scaling system"""
manager = AutoScalingManager(redis_client)
await manager.start_monitoring()
logger.info("Auto-scaling system initialized")
return manager