Spaces:
No application file
No application file
File size: 1,670 Bytes
57c06cb dfc56a2 57c06cb dfc56a2 57c06cb dfc56a2 57c06cb dfc56a2 57c06cb dfc56a2 57c06cb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 | from .scale_action import ScaleAction
from .patch_action import PatchAction
from .delete_pod_action import DeletePodAction
from .rollout_action import RolloutRestartAction
from .hpa_action import SetHPAAction
from .drain_action import DrainNodeAction
from .describe_action import DescribeAction
from .wait_action import WaitAction
from typing import Union, Any, Dict, Literal
KubeAction = Union[
ScaleAction,
PatchAction,
DeletePodAction,
RolloutRestartAction,
SetHPAAction,
DrainNodeAction,
DescribeAction,
WaitAction,
]
ActionType = Literal["scale", "patch", "delete_pod", "rollout_restart", "set_hpa", "drain_node", "describe", "wait"]
def parse_action(data: Dict[str, Any]) -> KubeAction:
if not isinstance(data, dict):
raise ValueError(f"Expected dict, got {type(data)}")
action_type = data.get("action_type")
if not action_type:
raise ValueError("Missing 'action_type' field")
action_map = {
"scale": ScaleAction,
"patch": PatchAction,
"delete_pod": DeletePodAction,
"rollout_restart": RolloutRestartAction,
"set_hpa": SetHPAAction,
"drain_node": DrainNodeAction,
"describe": DescribeAction,
"wait": WaitAction,
}
action_class = action_map.get(action_type)
if not action_class:
raise ValueError(f"Unknown action_type: {action_type}")
return action_class(**data)
__all__ = [
"ScaleAction",
"PatchAction",
"DeletePodAction",
"RolloutRestartAction",
"SetHPAAction",
"DrainNodeAction",
"DescribeAction",
"WaitAction",
"KubeAction",
"parse_action",
]
|