Spaces:
Paused
Paused
Upload core/autonomous_scaling.py with huggingface_hub
Browse files- core/autonomous_scaling.py +129 -43
core/autonomous_scaling.py
CHANGED
|
@@ -89,7 +89,9 @@ class ResourceMetrics:
|
|
| 89 |
|
| 90 |
def headroom_percentage(self) -> float:
|
| 91 |
"""Get available headroom as percentage"""
|
| 92 |
-
return (
|
|
|
|
|
|
|
| 93 |
|
| 94 |
|
| 95 |
class AutonomousScalingEngine:
|
|
@@ -239,8 +241,12 @@ class AutonomousScalingEngine:
|
|
| 239 |
"""Get current compute instance count"""
|
| 240 |
# In a real implementation, this would query cloud provider APIs
|
| 241 |
# For simulation, return current value with some variance
|
| 242 |
-
current = self.resource_metrics[
|
| 243 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 244 |
return current * variance
|
| 245 |
|
| 246 |
async def _get_database_connections(self) -> float:
|
|
@@ -248,14 +254,18 @@ class AutonomousScalingEngine:
|
|
| 248 |
# Simulate database connection monitoring
|
| 249 |
base_connections = 45
|
| 250 |
time_factor = datetime.now().hour / 24 # Daily pattern
|
| 251 |
-
load_factor =
|
|
|
|
|
|
|
| 252 |
return base_connections * load_factor
|
| 253 |
|
| 254 |
async def _get_cache_memory_usage(self) -> float:
|
| 255 |
"""Get current cache memory usage"""
|
| 256 |
# Simulate cache memory monitoring
|
| 257 |
base_memory = 2.1
|
| 258 |
-
variance = 0.9 + 0.2 * (
|
|
|
|
|
|
|
| 259 |
return base_memory * variance
|
| 260 |
|
| 261 |
async def _get_worker_process_count(self) -> float:
|
|
@@ -263,7 +273,9 @@ class AutonomousScalingEngine:
|
|
| 263 |
# Simulate worker process monitoring
|
| 264 |
base_workers = 8
|
| 265 |
queue_length = await self._get_queue_length()
|
| 266 |
-
utilization_factor = min(
|
|
|
|
|
|
|
| 267 |
return base_workers * utilization_factor
|
| 268 |
|
| 269 |
async def _get_queue_length(self) -> float:
|
|
@@ -299,7 +311,9 @@ class AutonomousScalingEngine:
|
|
| 299 |
|
| 300 |
return scaling_events
|
| 301 |
|
| 302 |
-
async def _evaluate_resource_scaling(
|
|
|
|
|
|
|
| 303 |
"""Evaluate scaling decision for a specific resource"""
|
| 304 |
|
| 305 |
utilization_pct = metrics.utilization_percentage()
|
|
@@ -307,7 +321,11 @@ class AutonomousScalingEngine:
|
|
| 307 |
# Check cooldown periods
|
| 308 |
last_action = self.last_scaling_actions.get(resource_type)
|
| 309 |
if last_action:
|
| 310 |
-
cooldown_period =
|
|
|
|
|
|
|
|
|
|
|
|
|
| 311 |
if datetime.now(UTC) - last_action < timedelta(seconds=cooldown_period):
|
| 312 |
return {
|
| 313 |
"decision": ScalingDecision.NO_CHANGE,
|
|
@@ -321,10 +339,14 @@ class AutonomousScalingEngine:
|
|
| 321 |
if utilization_pct > self.emergency_threshold:
|
| 322 |
return {
|
| 323 |
"decision": ScalingDecision.EMERGENCY_SCALE,
|
| 324 |
-
"target_capacity": min(
|
|
|
|
|
|
|
| 325 |
"reason": f"Emergency: Utilization at {utilization_pct:.1f}% exceeds threshold",
|
| 326 |
"confidence": 0.95,
|
| 327 |
-
"cost_impact": self._calculate_cost_impact(
|
|
|
|
|
|
|
| 328 |
}
|
| 329 |
|
| 330 |
# Normal scaling logic
|
|
@@ -333,7 +355,9 @@ class AutonomousScalingEngine:
|
|
| 333 |
if utilization_pct > self.scale_up_threshold:
|
| 334 |
# Scale up
|
| 335 |
increment = policy.get("scale_up_increment", 1)
|
| 336 |
-
new_capacity = min(
|
|
|
|
|
|
|
| 337 |
|
| 338 |
return {
|
| 339 |
"decision": ScalingDecision.SCALE_UP,
|
|
@@ -343,10 +367,15 @@ class AutonomousScalingEngine:
|
|
| 343 |
"cost_impact": self._calculate_cost_impact(resource_type, new_capacity),
|
| 344 |
}
|
| 345 |
|
| 346 |
-
elif
|
|
|
|
|
|
|
|
|
|
| 347 |
# Scale down
|
| 348 |
decrement = policy.get("scale_down_increment", 1)
|
| 349 |
-
new_capacity = max(
|
|
|
|
|
|
|
| 350 |
|
| 351 |
# Only scale down if we're significantly below target
|
| 352 |
if utilization_pct < self.scale_down_threshold * 0.7:
|
|
@@ -355,7 +384,9 @@ class AutonomousScalingEngine:
|
|
| 355 |
"target_capacity": new_capacity,
|
| 356 |
"reason": f"Low utilization: {utilization_pct:.1f}% < {self.scale_down_threshold}%",
|
| 357 |
"confidence": 0.7,
|
| 358 |
-
"cost_impact": self._calculate_cost_impact(
|
|
|
|
|
|
|
| 359 |
}
|
| 360 |
|
| 361 |
return {
|
|
@@ -366,7 +397,9 @@ class AutonomousScalingEngine:
|
|
| 366 |
"cost_impact": 0.0,
|
| 367 |
}
|
| 368 |
|
| 369 |
-
def _calculate_cost_impact(
|
|
|
|
|
|
|
| 370 |
"""Calculate cost impact of scaling decision"""
|
| 371 |
metrics = self.resource_metrics[resource_type]
|
| 372 |
capacity_change = new_capacity - metrics.current_utilization
|
|
@@ -376,11 +409,15 @@ class AutonomousScalingEngine:
|
|
| 376 |
return capacity_change * metrics.cost_per_unit * 24 # Daily cost
|
| 377 |
elif capacity_change < 0:
|
| 378 |
# Scaling down savings
|
| 379 |
-
return
|
|
|
|
|
|
|
| 380 |
else:
|
| 381 |
return 0.0
|
| 382 |
|
| 383 |
-
async def execute_scaling_decisions(
|
|
|
|
|
|
|
| 384 |
"""Execute approved scaling decisions"""
|
| 385 |
logger.info(f"Executing {len(scaling_events)} scaling decisions...")
|
| 386 |
|
|
@@ -400,7 +437,9 @@ class AutonomousScalingEngine:
|
|
| 400 |
self.last_scaling_actions[event.resource_type] = datetime.now(UTC)
|
| 401 |
|
| 402 |
# Update resource metrics
|
| 403 |
-
self.resource_metrics[event.resource_type].current_utilization =
|
|
|
|
|
|
|
| 404 |
|
| 405 |
logger.info(f"Successfully executed scaling: {event.event_id}")
|
| 406 |
else:
|
|
@@ -427,7 +466,9 @@ class AutonomousScalingEngine:
|
|
| 427 |
elif event.resource_type == ResourceType.WORKER_PROCESSES:
|
| 428 |
return await self._scale_worker_processes(event)
|
| 429 |
else:
|
| 430 |
-
logger.warning(
|
|
|
|
|
|
|
| 431 |
return False
|
| 432 |
|
| 433 |
except Exception as e:
|
|
@@ -437,53 +478,69 @@ class AutonomousScalingEngine:
|
|
| 437 |
async def _scale_compute_instances(self, event: ScalingEvent) -> bool:
|
| 438 |
"""Scale compute instances (AWS EC2, etc.)"""
|
| 439 |
# In a real implementation, this would call cloud provider APIs
|
| 440 |
-
logger.info(
|
|
|
|
|
|
|
| 441 |
|
| 442 |
# Simulate scaling operation only in development
|
| 443 |
if IS_DEVELOPMENT:
|
| 444 |
await asyncio.sleep(2) # Simulate API call delay
|
| 445 |
|
| 446 |
# Update internal tracking
|
| 447 |
-
self.resource_metrics[ResourceType.COMPUTE_INSTANCES].capacity =
|
|
|
|
|
|
|
| 448 |
|
| 449 |
return True
|
| 450 |
|
| 451 |
async def _scale_database_connections(self, event: ScalingEvent) -> bool:
|
| 452 |
"""Scale database connections"""
|
| 453 |
-
logger.info(
|
|
|
|
|
|
|
| 454 |
|
| 455 |
# Simulate connection pool adjustment only in development
|
| 456 |
if IS_DEVELOPMENT:
|
| 457 |
await asyncio.sleep(1)
|
| 458 |
|
| 459 |
# Update metrics
|
| 460 |
-
self.resource_metrics[ResourceType.DATABASE_CONNECTIONS].capacity =
|
|
|
|
|
|
|
| 461 |
|
| 462 |
return True
|
| 463 |
|
| 464 |
async def _scale_cache_memory(self, event: ScalingEvent) -> bool:
|
| 465 |
"""Scale cache memory"""
|
| 466 |
-
logger.info(
|
|
|
|
|
|
|
| 467 |
|
| 468 |
# Simulate Redis/memory scaling only in development
|
| 469 |
if IS_DEVELOPMENT:
|
| 470 |
await asyncio.sleep(1.5)
|
| 471 |
|
| 472 |
# Update metrics
|
| 473 |
-
self.resource_metrics[ResourceType.CACHE_MEMORY].capacity =
|
|
|
|
|
|
|
| 474 |
|
| 475 |
return True
|
| 476 |
|
| 477 |
async def _scale_worker_processes(self, event: ScalingEvent) -> bool:
|
| 478 |
"""Scale worker processes"""
|
| 479 |
-
logger.info(
|
|
|
|
|
|
|
| 480 |
|
| 481 |
# Simulate process scaling only in development
|
| 482 |
if IS_DEVELOPMENT:
|
| 483 |
await asyncio.sleep(1)
|
| 484 |
|
| 485 |
# Update metrics
|
| 486 |
-
self.resource_metrics[ResourceType.WORKER_PROCESSES].capacity =
|
|
|
|
|
|
|
| 487 |
|
| 488 |
return True
|
| 489 |
|
|
@@ -506,31 +563,41 @@ class AutonomousScalingEngine:
|
|
| 506 |
|
| 507 |
return optimizations
|
| 508 |
|
| 509 |
-
async def _optimize_resource(
|
|
|
|
|
|
|
| 510 |
"""Optimize a specific resource"""
|
| 511 |
utilization = metrics.utilization_percentage()
|
| 512 |
|
| 513 |
if utilization < 40 and metrics.current_utilization > metrics.min_capacity:
|
| 514 |
# Under-utilized - consider rightsizing
|
| 515 |
-
recommended_capacity = max(
|
|
|
|
|
|
|
| 516 |
|
| 517 |
return {
|
| 518 |
"action": "rightsize",
|
| 519 |
"current_capacity": metrics.current_utilization,
|
| 520 |
"recommended_capacity": recommended_capacity,
|
| 521 |
-
"estimated_savings": self._calculate_cost_impact(
|
|
|
|
|
|
|
| 522 |
"reason": f"Resource under-utilized at {utilization:.1f}%",
|
| 523 |
}
|
| 524 |
|
| 525 |
elif utilization > 85:
|
| 526 |
# Over-utilized - consider scaling
|
| 527 |
-
recommended_capacity = min(
|
|
|
|
|
|
|
| 528 |
|
| 529 |
return {
|
| 530 |
"action": "scale_up",
|
| 531 |
"current_capacity": metrics.current_utilization,
|
| 532 |
"recommended_capacity": recommended_capacity,
|
| 533 |
-
"estimated_cost": self._calculate_cost_impact(
|
|
|
|
|
|
|
| 534 |
"reason": f"Resource over-utilized at {utilization:.1f}%",
|
| 535 |
}
|
| 536 |
|
|
@@ -539,8 +606,12 @@ class AutonomousScalingEngine:
|
|
| 539 |
async def _optimize_cross_resources(self) -> dict[str, Any] | None:
|
| 540 |
"""Optimize across multiple resources"""
|
| 541 |
# Analyze compute vs memory ratio
|
| 542 |
-
compute_util = self.resource_metrics[
|
| 543 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 544 |
|
| 545 |
if compute_util > 80 and memory_util < 50:
|
| 546 |
return {
|
|
@@ -550,7 +621,9 @@ class AutonomousScalingEngine:
|
|
| 550 |
}
|
| 551 |
|
| 552 |
# Analyze worker vs queue ratio
|
| 553 |
-
worker_util = self.resource_metrics[
|
|
|
|
|
|
|
| 554 |
queue_length = await self._get_queue_length()
|
| 555 |
|
| 556 |
if worker_util > 90 and queue_length > 50:
|
|
@@ -582,14 +655,21 @@ class AutonomousScalingEngine:
|
|
| 582 |
}
|
| 583 |
|
| 584 |
# Overall system health
|
| 585 |
-
avg_utilization = sum(
|
| 586 |
-
self.resource_metrics
|
| 587 |
-
)
|
| 588 |
report["system_health"] = {
|
| 589 |
"average_utilization": avg_utilization,
|
| 590 |
-
"overall_status":
|
|
|
|
|
|
|
|
|
|
|
|
|
| 591 |
"total_capacity": sum(m.capacity for m in self.resource_metrics.values()),
|
| 592 |
-
"total_cost_per_hour": sum(
|
|
|
|
|
|
|
|
|
|
| 593 |
}
|
| 594 |
|
| 595 |
return report
|
|
@@ -616,7 +696,9 @@ class AutonomousScalingEngine:
|
|
| 616 |
# Generate report
|
| 617 |
report = {
|
| 618 |
"cycle_timestamp": datetime.now(UTC).isoformat(),
|
| 619 |
-
"resource_metrics": {
|
|
|
|
|
|
|
| 620 |
"scaling_actions": len(executed_events),
|
| 621 |
"successful_scalings": sum(1 for e in executed_events if e.success),
|
| 622 |
"optimizations": optimizations,
|
|
@@ -669,7 +751,9 @@ async def demonstrate_autonomous_scaling():
|
|
| 669 |
executed_events = await scaling_engine.execute_scaling_decisions(scaling_events)
|
| 670 |
|
| 671 |
successful = sum(1 for e in executed_events if e.success)
|
| 672 |
-
logger.info(
|
|
|
|
|
|
|
| 673 |
else:
|
| 674 |
logger.info("No scaling actions required at this time")
|
| 675 |
|
|
@@ -683,7 +767,9 @@ async def demonstrate_autonomous_scaling():
|
|
| 683 |
logger.info(f" - {resource}: {optimization.get('action', 'unknown')}")
|
| 684 |
logger.info(f" Reason: {optimization.get('reason', 'N/A')}")
|
| 685 |
if "estimated_savings" in optimization:
|
| 686 |
-
logger.info(
|
|
|
|
|
|
|
| 687 |
logger.info("")
|
| 688 |
else:
|
| 689 |
logger.info("No optimization recommendations at this time")
|
|
|
|
| 89 |
|
| 90 |
def headroom_percentage(self) -> float:
|
| 91 |
"""Get available headroom as percentage"""
|
| 92 |
+
return (
|
| 93 |
+
(self.max_capacity - self.current_utilization) / self.max_capacity
|
| 94 |
+
) * 100
|
| 95 |
|
| 96 |
|
| 97 |
class AutonomousScalingEngine:
|
|
|
|
| 241 |
"""Get current compute instance count"""
|
| 242 |
# In a real implementation, this would query cloud provider APIs
|
| 243 |
# For simulation, return current value with some variance
|
| 244 |
+
current = self.resource_metrics[
|
| 245 |
+
ResourceType.COMPUTE_INSTANCES
|
| 246 |
+
].current_utilization
|
| 247 |
+
variance = (
|
| 248 |
+
0.95 + 0.1 * (datetime.now().timestamp() % 10) / 10
|
| 249 |
+
) # 95-105% variance
|
| 250 |
return current * variance
|
| 251 |
|
| 252 |
async def _get_database_connections(self) -> float:
|
|
|
|
| 254 |
# Simulate database connection monitoring
|
| 255 |
base_connections = 45
|
| 256 |
time_factor = datetime.now().hour / 24 # Daily pattern
|
| 257 |
+
load_factor = (
|
| 258 |
+
0.8 + 0.4 * abs(time_factor - 0.5) * 2
|
| 259 |
+
) # Peak during business hours
|
| 260 |
return base_connections * load_factor
|
| 261 |
|
| 262 |
async def _get_cache_memory_usage(self) -> float:
|
| 263 |
"""Get current cache memory usage"""
|
| 264 |
# Simulate cache memory monitoring
|
| 265 |
base_memory = 2.1
|
| 266 |
+
variance = 0.9 + 0.2 * (
|
| 267 |
+
(datetime.now().timestamp() % 3600) / 3600
|
| 268 |
+
) # Hourly variance
|
| 269 |
return base_memory * variance
|
| 270 |
|
| 271 |
async def _get_worker_process_count(self) -> float:
|
|
|
|
| 273 |
# Simulate worker process monitoring
|
| 274 |
base_workers = 8
|
| 275 |
queue_length = await self._get_queue_length()
|
| 276 |
+
utilization_factor = min(
|
| 277 |
+
1.5, max(0.5, queue_length / 20)
|
| 278 |
+
) # Adjust based on queue
|
| 279 |
return base_workers * utilization_factor
|
| 280 |
|
| 281 |
async def _get_queue_length(self) -> float:
|
|
|
|
| 311 |
|
| 312 |
return scaling_events
|
| 313 |
|
| 314 |
+
async def _evaluate_resource_scaling(
|
| 315 |
+
self, resource_type: ResourceType, metrics: ResourceMetrics
|
| 316 |
+
) -> dict[str, Any]:
|
| 317 |
"""Evaluate scaling decision for a specific resource"""
|
| 318 |
|
| 319 |
utilization_pct = metrics.utilization_percentage()
|
|
|
|
| 321 |
# Check cooldown periods
|
| 322 |
last_action = self.last_scaling_actions.get(resource_type)
|
| 323 |
if last_action:
|
| 324 |
+
cooldown_period = (
|
| 325 |
+
self.scale_up_cooldown
|
| 326 |
+
if "up" in str(last_action)
|
| 327 |
+
else self.scale_down_cooldown
|
| 328 |
+
)
|
| 329 |
if datetime.now(UTC) - last_action < timedelta(seconds=cooldown_period):
|
| 330 |
return {
|
| 331 |
"decision": ScalingDecision.NO_CHANGE,
|
|
|
|
| 339 |
if utilization_pct > self.emergency_threshold:
|
| 340 |
return {
|
| 341 |
"decision": ScalingDecision.EMERGENCY_SCALE,
|
| 342 |
+
"target_capacity": min(
|
| 343 |
+
metrics.current_utilization * 1.5, metrics.max_capacity
|
| 344 |
+
),
|
| 345 |
"reason": f"Emergency: Utilization at {utilization_pct:.1f}% exceeds threshold",
|
| 346 |
"confidence": 0.95,
|
| 347 |
+
"cost_impact": self._calculate_cost_impact(
|
| 348 |
+
resource_type, metrics.current_utilization * 1.5
|
| 349 |
+
),
|
| 350 |
}
|
| 351 |
|
| 352 |
# Normal scaling logic
|
|
|
|
| 355 |
if utilization_pct > self.scale_up_threshold:
|
| 356 |
# Scale up
|
| 357 |
increment = policy.get("scale_up_increment", 1)
|
| 358 |
+
new_capacity = min(
|
| 359 |
+
metrics.current_utilization + increment, metrics.max_capacity
|
| 360 |
+
)
|
| 361 |
|
| 362 |
return {
|
| 363 |
"decision": ScalingDecision.SCALE_UP,
|
|
|
|
| 367 |
"cost_impact": self._calculate_cost_impact(resource_type, new_capacity),
|
| 368 |
}
|
| 369 |
|
| 370 |
+
elif (
|
| 371 |
+
utilization_pct < self.scale_down_threshold
|
| 372 |
+
and metrics.current_utilization > metrics.min_capacity
|
| 373 |
+
):
|
| 374 |
# Scale down
|
| 375 |
decrement = policy.get("scale_down_increment", 1)
|
| 376 |
+
new_capacity = max(
|
| 377 |
+
metrics.current_utilization - decrement, metrics.min_capacity
|
| 378 |
+
)
|
| 379 |
|
| 380 |
# Only scale down if we're significantly below target
|
| 381 |
if utilization_pct < self.scale_down_threshold * 0.7:
|
|
|
|
| 384 |
"target_capacity": new_capacity,
|
| 385 |
"reason": f"Low utilization: {utilization_pct:.1f}% < {self.scale_down_threshold}%",
|
| 386 |
"confidence": 0.7,
|
| 387 |
+
"cost_impact": self._calculate_cost_impact(
|
| 388 |
+
resource_type, new_capacity
|
| 389 |
+
),
|
| 390 |
}
|
| 391 |
|
| 392 |
return {
|
|
|
|
| 397 |
"cost_impact": 0.0,
|
| 398 |
}
|
| 399 |
|
| 400 |
+
def _calculate_cost_impact(
|
| 401 |
+
self, resource_type: ResourceType, new_capacity: float
|
| 402 |
+
) -> float:
|
| 403 |
"""Calculate cost impact of scaling decision"""
|
| 404 |
metrics = self.resource_metrics[resource_type]
|
| 405 |
capacity_change = new_capacity - metrics.current_utilization
|
|
|
|
| 409 |
return capacity_change * metrics.cost_per_unit * 24 # Daily cost
|
| 410 |
elif capacity_change < 0:
|
| 411 |
# Scaling down savings
|
| 412 |
+
return (
|
| 413 |
+
capacity_change * metrics.cost_per_unit * 24
|
| 414 |
+
) # Daily savings (negative)
|
| 415 |
else:
|
| 416 |
return 0.0
|
| 417 |
|
| 418 |
+
async def execute_scaling_decisions(
|
| 419 |
+
self, scaling_events: list[ScalingEvent]
|
| 420 |
+
) -> list[ScalingEvent]:
|
| 421 |
"""Execute approved scaling decisions"""
|
| 422 |
logger.info(f"Executing {len(scaling_events)} scaling decisions...")
|
| 423 |
|
|
|
|
| 437 |
self.last_scaling_actions[event.resource_type] = datetime.now(UTC)
|
| 438 |
|
| 439 |
# Update resource metrics
|
| 440 |
+
self.resource_metrics[event.resource_type].current_utilization = (
|
| 441 |
+
event.target_capacity
|
| 442 |
+
)
|
| 443 |
|
| 444 |
logger.info(f"Successfully executed scaling: {event.event_id}")
|
| 445 |
else:
|
|
|
|
| 466 |
elif event.resource_type == ResourceType.WORKER_PROCESSES:
|
| 467 |
return await self._scale_worker_processes(event)
|
| 468 |
else:
|
| 469 |
+
logger.warning(
|
| 470 |
+
f"Unsupported resource type for scaling: {event.resource_type}"
|
| 471 |
+
)
|
| 472 |
return False
|
| 473 |
|
| 474 |
except Exception as e:
|
|
|
|
| 478 |
async def _scale_compute_instances(self, event: ScalingEvent) -> bool:
|
| 479 |
"""Scale compute instances (AWS EC2, etc.)"""
|
| 480 |
# In a real implementation, this would call cloud provider APIs
|
| 481 |
+
logger.info(
|
| 482 |
+
f"Scaling compute instances from {event.current_capacity} to {event.target_capacity}"
|
| 483 |
+
)
|
| 484 |
|
| 485 |
# Simulate scaling operation only in development
|
| 486 |
if IS_DEVELOPMENT:
|
| 487 |
await asyncio.sleep(2) # Simulate API call delay
|
| 488 |
|
| 489 |
# Update internal tracking
|
| 490 |
+
self.resource_metrics[ResourceType.COMPUTE_INSTANCES].capacity = (
|
| 491 |
+
event.target_capacity
|
| 492 |
+
)
|
| 493 |
|
| 494 |
return True
|
| 495 |
|
| 496 |
async def _scale_database_connections(self, event: ScalingEvent) -> bool:
|
| 497 |
"""Scale database connections"""
|
| 498 |
+
logger.info(
|
| 499 |
+
f"Scaling database connections from {event.current_capacity} to {event.target_capacity}"
|
| 500 |
+
)
|
| 501 |
|
| 502 |
# Simulate connection pool adjustment only in development
|
| 503 |
if IS_DEVELOPMENT:
|
| 504 |
await asyncio.sleep(1)
|
| 505 |
|
| 506 |
# Update metrics
|
| 507 |
+
self.resource_metrics[ResourceType.DATABASE_CONNECTIONS].capacity = (
|
| 508 |
+
event.target_capacity
|
| 509 |
+
)
|
| 510 |
|
| 511 |
return True
|
| 512 |
|
| 513 |
async def _scale_cache_memory(self, event: ScalingEvent) -> bool:
|
| 514 |
"""Scale cache memory"""
|
| 515 |
+
logger.info(
|
| 516 |
+
f"Scaling cache memory from {event.current_capacity}GB to {event.target_capacity}GB"
|
| 517 |
+
)
|
| 518 |
|
| 519 |
# Simulate Redis/memory scaling only in development
|
| 520 |
if IS_DEVELOPMENT:
|
| 521 |
await asyncio.sleep(1.5)
|
| 522 |
|
| 523 |
# Update metrics
|
| 524 |
+
self.resource_metrics[ResourceType.CACHE_MEMORY].capacity = (
|
| 525 |
+
event.target_capacity
|
| 526 |
+
)
|
| 527 |
|
| 528 |
return True
|
| 529 |
|
| 530 |
async def _scale_worker_processes(self, event: ScalingEvent) -> bool:
|
| 531 |
"""Scale worker processes"""
|
| 532 |
+
logger.info(
|
| 533 |
+
f"Scaling worker processes from {event.current_capacity} to {event.target_capacity}"
|
| 534 |
+
)
|
| 535 |
|
| 536 |
# Simulate process scaling only in development
|
| 537 |
if IS_DEVELOPMENT:
|
| 538 |
await asyncio.sleep(1)
|
| 539 |
|
| 540 |
# Update metrics
|
| 541 |
+
self.resource_metrics[ResourceType.WORKER_PROCESSES].capacity = (
|
| 542 |
+
event.target_capacity
|
| 543 |
+
)
|
| 544 |
|
| 545 |
return True
|
| 546 |
|
|
|
|
| 563 |
|
| 564 |
return optimizations
|
| 565 |
|
| 566 |
+
async def _optimize_resource(
|
| 567 |
+
self, resource_type: ResourceType, metrics: ResourceMetrics
|
| 568 |
+
) -> dict[str, Any] | None:
|
| 569 |
"""Optimize a specific resource"""
|
| 570 |
utilization = metrics.utilization_percentage()
|
| 571 |
|
| 572 |
if utilization < 40 and metrics.current_utilization > metrics.min_capacity:
|
| 573 |
# Under-utilized - consider rightsizing
|
| 574 |
+
recommended_capacity = max(
|
| 575 |
+
metrics.min_capacity, metrics.current_utilization * 0.8
|
| 576 |
+
)
|
| 577 |
|
| 578 |
return {
|
| 579 |
"action": "rightsize",
|
| 580 |
"current_capacity": metrics.current_utilization,
|
| 581 |
"recommended_capacity": recommended_capacity,
|
| 582 |
+
"estimated_savings": self._calculate_cost_impact(
|
| 583 |
+
resource_type, recommended_capacity
|
| 584 |
+
),
|
| 585 |
"reason": f"Resource under-utilized at {utilization:.1f}%",
|
| 586 |
}
|
| 587 |
|
| 588 |
elif utilization > 85:
|
| 589 |
# Over-utilized - consider scaling
|
| 590 |
+
recommended_capacity = min(
|
| 591 |
+
metrics.max_capacity, metrics.current_utilization * 1.2
|
| 592 |
+
)
|
| 593 |
|
| 594 |
return {
|
| 595 |
"action": "scale_up",
|
| 596 |
"current_capacity": metrics.current_utilization,
|
| 597 |
"recommended_capacity": recommended_capacity,
|
| 598 |
+
"estimated_cost": self._calculate_cost_impact(
|
| 599 |
+
resource_type, recommended_capacity
|
| 600 |
+
),
|
| 601 |
"reason": f"Resource over-utilized at {utilization:.1f}%",
|
| 602 |
}
|
| 603 |
|
|
|
|
| 606 |
async def _optimize_cross_resources(self) -> dict[str, Any] | None:
|
| 607 |
"""Optimize across multiple resources"""
|
| 608 |
# Analyze compute vs memory ratio
|
| 609 |
+
compute_util = self.resource_metrics[
|
| 610 |
+
ResourceType.COMPUTE_INSTANCES
|
| 611 |
+
].utilization_percentage()
|
| 612 |
+
memory_util = self.resource_metrics[
|
| 613 |
+
ResourceType.CACHE_MEMORY
|
| 614 |
+
].utilization_percentage()
|
| 615 |
|
| 616 |
if compute_util > 80 and memory_util < 50:
|
| 617 |
return {
|
|
|
|
| 621 |
}
|
| 622 |
|
| 623 |
# Analyze worker vs queue ratio
|
| 624 |
+
worker_util = self.resource_metrics[
|
| 625 |
+
ResourceType.WORKER_PROCESSES
|
| 626 |
+
].utilization_percentage()
|
| 627 |
queue_length = await self._get_queue_length()
|
| 628 |
|
| 629 |
if worker_util > 90 and queue_length > 50:
|
|
|
|
| 655 |
}
|
| 656 |
|
| 657 |
# Overall system health
|
| 658 |
+
avg_utilization = sum(
|
| 659 |
+
m.utilization_percentage() for m in self.resource_metrics.values()
|
| 660 |
+
) / len(self.resource_metrics)
|
| 661 |
report["system_health"] = {
|
| 662 |
"average_utilization": avg_utilization,
|
| 663 |
+
"overall_status": (
|
| 664 |
+
"healthy"
|
| 665 |
+
if avg_utilization < 80
|
| 666 |
+
else "warning" if avg_utilization < 90 else "critical"
|
| 667 |
+
),
|
| 668 |
"total_capacity": sum(m.capacity for m in self.resource_metrics.values()),
|
| 669 |
+
"total_cost_per_hour": sum(
|
| 670 |
+
m.current_utilization * m.cost_per_unit
|
| 671 |
+
for m in self.resource_metrics.values()
|
| 672 |
+
),
|
| 673 |
}
|
| 674 |
|
| 675 |
return report
|
|
|
|
| 696 |
# Generate report
|
| 697 |
report = {
|
| 698 |
"cycle_timestamp": datetime.now(UTC).isoformat(),
|
| 699 |
+
"resource_metrics": {
|
| 700 |
+
k.value: v.current_utilization for k, v in resource_metrics.items()
|
| 701 |
+
},
|
| 702 |
"scaling_actions": len(executed_events),
|
| 703 |
"successful_scalings": sum(1 for e in executed_events if e.success),
|
| 704 |
"optimizations": optimizations,
|
|
|
|
| 751 |
executed_events = await scaling_engine.execute_scaling_decisions(scaling_events)
|
| 752 |
|
| 753 |
successful = sum(1 for e in executed_events if e.success)
|
| 754 |
+
logger.info(
|
| 755 |
+
f"Executed {successful}/{len(executed_events)} scaling actions successfully"
|
| 756 |
+
)
|
| 757 |
else:
|
| 758 |
logger.info("No scaling actions required at this time")
|
| 759 |
|
|
|
|
| 767 |
logger.info(f" - {resource}: {optimization.get('action', 'unknown')}")
|
| 768 |
logger.info(f" Reason: {optimization.get('reason', 'N/A')}")
|
| 769 |
if "estimated_savings" in optimization:
|
| 770 |
+
logger.info(
|
| 771 |
+
f" Savings: ${optimization['estimated_savings']:.2f}/day"
|
| 772 |
+
)
|
| 773 |
logger.info("")
|
| 774 |
else:
|
| 775 |
logger.info("No optimization recommendations at this time")
|