Lars Talian commited on
Commit
b439619
·
1 Parent(s): f0faee1

fix(validator): sanitize reward grounding command construction

Browse files
src/open_range/validator/reward_grounding.py CHANGED
@@ -2,8 +2,13 @@
2
 
3
  from __future__ import annotations
4
 
 
 
 
5
  from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec
6
 
 
 
7
 
8
  def _parse_db_path(path: str) -> tuple[str, str, str] | None:
9
  """Parse a DB flag path like ``db:database.table.column``.
@@ -20,6 +25,8 @@ def _parse_db_path(path: str) -> tuple[str, str, str] | None:
20
  parts = rest.split(".")
21
  if len(parts) != 3:
22
  return None
 
 
23
  return parts[0], parts[1], parts[2]
24
 
25
 
@@ -48,13 +55,19 @@ class RewardGroundingCheck:
48
  # Deployment artifacts like "db:sql" are not flag locations.
49
  db_ref = _parse_db_path(path)
50
  if db_ref is None:
51
- # Unparseable DB path (e.g. "db:sql") — skip silently.
 
 
 
 
 
52
  continue
53
 
54
  database, table, column = db_ref
 
55
  mysql_cmd = (
56
- f'mysql -u root -p$MYSQL_ROOT_PASSWORD -N '
57
- f'-e "SELECT {column} FROM {database}.{table} LIMIT 1"'
58
  )
59
  try:
60
  output = await containers.exec(host, mysql_cmd)
@@ -81,7 +94,7 @@ class RewardGroundingCheck:
81
  continue
82
 
83
  try:
84
- output = await containers.exec(host, f"cat {path}")
85
  output = output.strip()
86
  except Exception as exc: # noqa: BLE001
87
  bad.append({"flag": flag.id, "error": str(exc)})
 
2
 
3
  from __future__ import annotations
4
 
5
+ import re
6
+ import shlex
7
+
8
  from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec
9
 
10
+ _IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
11
+
12
 
13
  def _parse_db_path(path: str) -> tuple[str, str, str] | None:
14
  """Parse a DB flag path like ``db:database.table.column``.
 
25
  parts = rest.split(".")
26
  if len(parts) != 3:
27
  return None
28
+ if not all(_IDENTIFIER_RE.fullmatch(part) for part in parts):
29
+ return None
30
  return parts[0], parts[1], parts[2]
31
 
32
 
 
55
  # Deployment artifacts like "db:sql" are not flag locations.
56
  db_ref = _parse_db_path(path)
57
  if db_ref is None:
58
+ if path in {"db:sql", "mysql:sql"}:
59
+ continue
60
+ bad.append({
61
+ "flag": flag.id,
62
+ "error": f"invalid db flag path format: {path}",
63
+ })
64
  continue
65
 
66
  database, table, column = db_ref
67
+ query = f"SELECT `{column}` FROM `{database}`.`{table}` LIMIT 1"
68
  mysql_cmd = (
69
+ "mysql -u root -p$MYSQL_ROOT_PASSWORD -N "
70
+ f"-e {shlex.quote(query)}"
71
  )
72
  try:
73
  output = await containers.exec(host, mysql_cmd)
 
94
  continue
95
 
96
  try:
97
+ output = await containers.exec(host, f"cat -- {shlex.quote(path)}")
98
  output = output.strip()
99
  except Exception as exc: # noqa: BLE001
100
  bad.append({"flag": flag.id, "error": str(exc)})
tests/test_validator.py CHANGED
@@ -668,6 +668,59 @@ async def test_reward_grounding_skips_db_sql_path(mock_containers):
668
  assert result.passed is True
669
 
670
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
671
  # ---------------------------------------------------------------------------
672
  # Check 6: Isolation
673
  # ---------------------------------------------------------------------------
 
668
  assert result.passed is True
669
 
670
 
671
+ @pytest.mark.asyncio
672
+ async def test_reward_grounding_quotes_filesystem_path():
673
+ """Filesystem flag paths with shell metacharacters must be quoted."""
674
+ from open_range.validator.reward_grounding import RewardGroundingCheck
675
+
676
+ class RecordingContainers:
677
+ def __init__(self):
678
+ self.calls: list[tuple[str, str]] = []
679
+
680
+ async def exec(self, container: str, cmd: str, **kwargs) -> str:
681
+ self.calls.append((container, cmd))
682
+ return "FLAG{abc}"
683
+
684
+ containers = RecordingContainers()
685
+ spec = SnapshotSpec(
686
+ flags=[FlagSpec(id="f1", value="FLAG{abc}", path="/tmp/f; echo PWNED", host="web")]
687
+ )
688
+ result = await RewardGroundingCheck().check(spec, containers) # type: ignore[arg-type]
689
+ assert result.passed is True
690
+ assert containers.calls
691
+ assert containers.calls[0][1] == "cat -- '/tmp/f; echo PWNED'"
692
+
693
+
694
+ @pytest.mark.asyncio
695
+ async def test_reward_grounding_rejects_invalid_db_identifier_path():
696
+ """Malformed DB paths must fail rather than altering SQL semantics."""
697
+ from open_range.validator.reward_grounding import RewardGroundingCheck
698
+
699
+ class RecordingContainers:
700
+ def __init__(self):
701
+ self.calls: list[tuple[str, str]] = []
702
+
703
+ async def exec(self, container: str, cmd: str, **kwargs) -> str:
704
+ self.calls.append((container, cmd))
705
+ return "FLAG{abc}"
706
+
707
+ containers = RecordingContainers()
708
+ spec = SnapshotSpec(
709
+ flags=[
710
+ FlagSpec(
711
+ id="f1",
712
+ value="FLAG{abc}",
713
+ path="db:flags.secrets.flag FROM secrets; SELECT 'x' --",
714
+ host="db",
715
+ )
716
+ ]
717
+ )
718
+ result = await RewardGroundingCheck().check(spec, containers) # type: ignore[arg-type]
719
+ assert result.passed is False
720
+ assert "invalid db flag path format" in result.details["results"][0]["error"]
721
+ assert containers.calls == []
722
+
723
+
724
  # ---------------------------------------------------------------------------
725
  # Check 6: Isolation
726
  # ---------------------------------------------------------------------------