"""
Real-world CLI command parser for DIME.
Converts kubectl / AWS CLI command strings into ``InfraAction`` objects
using regex pattern matching. This proves the environment can train
models on production-grade syntax rather than abstract JSON schemas.
"""
from __future__ import annotations
import json
import re
from server.models import InfraAction
class CommandParseError(Exception):
"""Raised when a raw CLI command cannot be parsed."""
# ---------------------------------------------------------------------------
# Pattern table: (compiled regex, handler function)
# ---------------------------------------------------------------------------
_PATTERNS: list[tuple[re.Pattern, callable]] = []
_REASONING_JSON_RE = re.compile(
r"^\s*.+?\s*(\{.*\})\s*$",
re.IGNORECASE | re.DOTALL,
)
def _register(pattern: str):
"""Decorator to register a regex → handler mapping."""
compiled = re.compile(pattern, re.IGNORECASE)
def decorator(fn):
_PATTERNS.append((compiled, fn))
return fn
return decorator
# ---- kubectl scale → scale_up ----
@_register(r"kubectl\s+scale\s+deployment\s+\S+\s+--replicas[=\s]+(\d+)")
def _handle_kubectl_scale(match: re.Match) -> InfraAction:
return InfraAction(action_type="scale_up")
# ---- aws autoscaling → scale_up ----
@_register(
r"aws\s+autoscaling\s+set-desired-capacity\s+.*--desired-capacity[=\s]+(\d+)"
)
def _handle_aws_scale(match: re.Match) -> InfraAction:
return InfraAction(action_type="scale_up")
# ---- kubectl delete pod + apply restart → restart_node ----
@_register(r"kubectl\s+(?:delete\s+pod|rollout\s+restart)\s+.*node[_-]?(\d+)")
def _handle_kubectl_restart(match: re.Match) -> InfraAction:
target = int(match.group(1))
return InfraAction(action_type="restart_node", target=target)
# ---- kubectl restart via apply -f restart.yaml ----
@_register(r"kubectl\s+apply\s+-f\s+restart.*node[_-]?(\d+)")
def _handle_kubectl_apply_restart(match: re.Match) -> InfraAction:
target = int(match.group(1))
return InfraAction(action_type="restart_node", target=target)
# ---- istio/envoy traffic shift → reroute_traffic ----
@_register(
r"kubectl\s+exec.*(?:istio|envoy).*traffic\s+shift\s+--from[=\s]+(\d+)\s+--to[=\s]+(\d+)"
)
def _handle_traffic_shift(match: re.Match) -> InfraAction:
return InfraAction(
action_type="reroute_traffic",
from_node=int(match.group(1)),
to_node=int(match.group(2)),
)
# ---- kubectl logs → query_logs ----
@_register(r"kubectl\s+logs\s+.*node[_-]?(\d+)")
def _handle_kubectl_logs(match: re.Match) -> InfraAction:
target = int(match.group(1))
return InfraAction(action_type="query_logs", target=target)
# ---- kubectl throttle / rate-limit ingress ----
@_register(r"kubectl\s+(?:throttle|annotate)\s+ingress\s+.*--rate[=\s]+([\d.]+)")
def _handle_throttle(match: re.Match) -> InfraAction:
rate = float(match.group(1))
rate = max(0.0, min(1.0, rate))
return InfraAction(action_type="throttle", rate=rate)
# ---- explicit no_op / observe ----
@_register(r"(?:no[_-]?op|observe|noop|kubectl\s+get\s+pods)")
def _handle_noop(match: re.Match) -> InfraAction:
return InfraAction(action_type="no_op")
def _extract_reasoning_json(raw: str) -> dict | None:
"""Return the JSON body after a block, if present and valid."""
match = _REASONING_JSON_RE.match(raw)
if not match:
return None
try:
payload = json.loads(match.group(1))
except json.JSONDecodeError as exc:
raise CommandParseError(f"Invalid JSON after : {exc}") from exc
if not isinstance(payload, dict):
raise CommandParseError("JSON command body must be an object.")
return payload
def has_reasoning_json_format(raw: str) -> bool:
"""Whether raw output uses the required XML + valid JSON shape."""
try:
return _extract_reasoning_json(raw.strip()) is not None
except CommandParseError:
return False
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def parse_command(raw: str) -> InfraAction:
"""
Parse a raw CLI command string into an ``InfraAction``.
Accepts the legacy plain kubectl/AWS command format, and also the CoT
format:
...
{"command": "kubectl ..."}
The JSON body may use ``command``/``raw_command`` for CLI syntax or a full
structured ``InfraAction`` object.
Raises
------
CommandParseError
If no pattern matches the input string.
"""
raw = raw.strip()
payload = _extract_reasoning_json(raw)
if payload is not None:
command = payload.get("command", payload.get("raw_command"))
if isinstance(command, str):
raw = command.strip()
else:
try:
return InfraAction.model_validate(payload)
except Exception as exc:
raise CommandParseError(
"JSON body must contain a string 'command'/'raw_command' "
"or a valid InfraAction object."
) from exc
# --- Graceful normalization: strip hallucinated 'node-' prefixes ---
# LLMs frequently write --from=node-5 instead of --from=5.
# Also handles Node-, NODE-, node_, etc.
clean_raw = re.sub(r"(?i)\bnode[-_](\d+)", r"\1", raw)
for pattern, handler in _PATTERNS:
m = pattern.search(clean_raw)
if m:
return handler(m)
# Fallback: try original raw in case normalization broke something
if clean_raw != raw:
for pattern, handler in _PATTERNS:
m = pattern.search(raw)
if m:
return handler(m)
raise CommandParseError(
f"Unrecognised command: '{raw[:120]}'. "
"Expected kubectl or AWS CLI syntax. Examples:\n"
" kubectl scale deployment frontend --replicas=10\n"
" kubectl delete pod node-3\n"
" kubectl logs node-2\n"
" kubectl throttle ingress --rate=0.8\n"
" kubectl exec -it istio-proxy -- traffic shift --from=1 --to=3"
)