openenv / server /executor.py
Sandeep Suresh
feat: Implement wait action and enhance action handling in simulation environment
dfc56a2
from pydantic import BaseModel
from typing import Any, Dict, Optional
from server.actions import (
KubeAction,
ScaleAction,
DeletePodAction,
PatchAction,
RolloutRestartAction,
SetHPAAction,
DrainNodeAction,
DescribeAction,
WaitAction,
)
from server.models import ClusterObservation
class ExecutionResult(BaseModel):
observation: ClusterObservation
action_applied: str
tick_advanced: bool
describe_detail: Optional[Dict[str, Any]] = None
def execute(action: KubeAction, world) -> ExecutionResult:
if isinstance(action, ScaleAction):
return _execute_scale(action, world)
elif isinstance(action, DeletePodAction):
return _execute_delete_pod(action, world)
elif isinstance(action, PatchAction):
return _execute_patch(action, world)
elif isinstance(action, RolloutRestartAction):
return _execute_rollout_restart(action, world)
elif isinstance(action, SetHPAAction):
return _execute_set_hpa(action, world)
elif isinstance(action, DrainNodeAction):
return _execute_drain_node(action, world)
elif isinstance(action, DescribeAction):
return _execute_describe(action, world)
elif isinstance(action, WaitAction):
return _execute_wait(world)
else:
raise ValueError(f"Unknown action type: {type(action)}")
def _execute_scale(action: ScaleAction, world) -> ExecutionResult:
world.scale(action.deployment, action.replicas)
world.tick()
return ExecutionResult(
observation=world.get_observation(),
action_applied=f"Scaled '{action.deployment}' to {action.replicas} replicas",
tick_advanced=True
)
def _execute_delete_pod(action: DeletePodAction, world) -> ExecutionResult:
world.delete_pod(action.pod_name)
world.tick()
return ExecutionResult(
observation=world.get_observation(),
action_applied=f"Deleted pod '{action.pod_name}'",
tick_advanced=True
)
def _execute_patch(action: PatchAction, world) -> ExecutionResult:
world.apply_patch(action.resource_type, action.name, action.patch)
world.tick()
return ExecutionResult(
observation=world.get_observation(),
action_applied=f"Patched {action.resource_type} '{action.name}'",
tick_advanced=True
)
def _execute_rollout_restart(action: RolloutRestartAction, world) -> ExecutionResult:
world.rollout_restart(action.deployment)
world.tick()
return ExecutionResult(
observation=world.get_observation(),
action_applied=f"Rollout restarted '{action.deployment}'",
tick_advanced=True
)
def _execute_set_hpa(action: SetHPAAction, world) -> ExecutionResult:
world.set_hpa(
action.deployment,
action.min_replicas,
action.max_replicas,
action.cpu_target_percent
)
world.tick()
return ExecutionResult(
observation=world.get_observation(),
action_applied=f"Set HPA for '{action.deployment}': {action.min_replicas}-{action.max_replicas} replicas, {action.cpu_target_percent}% CPU",
tick_advanced=True
)
def _execute_drain_node(action: DrainNodeAction, world) -> ExecutionResult:
world.drain_node(action.node_name)
world.tick()
return ExecutionResult(
observation=world.get_observation(),
action_applied=f"Drained node '{action.node_name}'",
tick_advanced=True
)
def _execute_describe(action: DescribeAction, world) -> ExecutionResult:
detail = world.describe(action.resource_type, action.name)
obs = world.get_observation()
return ExecutionResult(
observation=obs,
action_applied=f"Described {action.resource_type} '{action.name}'",
tick_advanced=False,
describe_detail=detail
)
def _execute_wait(world) -> ExecutionResult:
world.tick()
return ExecutionResult(
observation=world.get_observation(),
action_applied="Waited one simulation tick",
tick_advanced=True,
)