[KM-567][AI] Planner agent: fix PII leak + placeholder/retry robustness
Browse filesReview follow-ups on the Planner agent:
- inputs.py: suppress a PII column's top_values when building CatalogSummary.
Previously only sample_values were nulled, so a low-cardinality PII column's
top_values (same class of data) could reach the planner prompt. Now both are
suppressed. (Regression test added.)
- validator.py: a ${tN} placeholder is now valid when its target is a
transitive ancestor, not only a direct depends_on. Cycle detection moved
ahead of the placeholder check so ancestors are well-defined on a DAG. This
stops valid plans (target reached via an intermediate task) being rejected.
- service.py: floor max_retries at 1 (mirrors QueryService) so a misconfigured
value can't skip the LLM call entirely.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
@@ -77,8 +77,15 @@ class CatalogSummary(BaseModel):
|
|
| 77 |
name=col.name,
|
| 78 |
data_type=col.data_type,
|
| 79 |
pii_flag=col.pii_flag,
|
|
|
|
|
|
|
|
|
|
| 80 |
sample_values=None if col.pii_flag else col.sample_values,
|
| 81 |
-
top_values=
|
|
|
|
|
|
|
|
|
|
|
|
|
| 82 |
)
|
| 83 |
for col in table.columns
|
| 84 |
],
|
|
|
|
| 77 |
name=col.name,
|
| 78 |
data_type=col.data_type,
|
| 79 |
pii_flag=col.pii_flag,
|
| 80 |
+
# PII columns leak nothing into the prompt: both
|
| 81 |
+
# sample_values and (low-cardinality) top_values are
|
| 82 |
+
# suppressed — top_values are the same class of data.
|
| 83 |
sample_values=None if col.pii_flag else col.sample_values,
|
| 84 |
+
top_values=(
|
| 85 |
+
None
|
| 86 |
+
if col.pii_flag or col.stats is None
|
| 87 |
+
else col.stats.top_values
|
| 88 |
+
),
|
| 89 |
)
|
| 90 |
for col in table.columns
|
| 91 |
],
|
|
@@ -87,7 +87,7 @@ class PlannerService:
|
|
| 87 |
) -> None:
|
| 88 |
self._chain = structured_chain
|
| 89 |
self._validator = validator or PlannerValidator()
|
| 90 |
-
self._max_retries = max_retries
|
| 91 |
|
| 92 |
def _ensure_chain(self) -> Runnable:
|
| 93 |
if self._chain is None:
|
|
|
|
| 87 |
) -> None:
|
| 88 |
self._chain = structured_chain
|
| 89 |
self._validator = validator or PlannerValidator()
|
| 90 |
+
self._max_retries = max(1, max_retries)
|
| 91 |
|
| 92 |
def _ensure_chain(self) -> Runnable:
|
| 93 |
if self._chain is None:
|
|
@@ -148,22 +148,28 @@ class PlannerValidator:
|
|
| 148 |
raise PlannerValidationError(
|
| 149 |
f"task {task.id}: depends_on includes itself"
|
| 150 |
)
|
| 151 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 152 |
for ref in _placeholder_refs(task):
|
| 153 |
if ref not in id_set:
|
| 154 |
raise PlannerValidationError(
|
| 155 |
f"task {task.id}: placeholder '${{{ref}}}' references unknown task"
|
| 156 |
)
|
| 157 |
-
if ref not in task.
|
| 158 |
raise PlannerValidationError(
|
| 159 |
f"task {task.id}: placeholder '${{{ref}}}' used but {ref!r} is "
|
| 160 |
-
f"not
|
| 161 |
)
|
| 162 |
|
| 163 |
-
cycle = _find_cycle(tasks_by_id)
|
| 164 |
-
if cycle:
|
| 165 |
-
raise PlannerValidationError(f"cycle detected in depends_on: {' -> '.join(cycle)}")
|
| 166 |
-
|
| 167 |
@staticmethod
|
| 168 |
def _validate_parallelism(tasks_by_id: dict, id_set: set[str]) -> None:
|
| 169 |
ancestors = _all_ancestors(tasks_by_id)
|
|
|
|
| 148 |
raise PlannerValidationError(
|
| 149 |
f"task {task.id}: depends_on includes itself"
|
| 150 |
)
|
| 151 |
+
|
| 152 |
+
cycle = _find_cycle(tasks_by_id)
|
| 153 |
+
if cycle:
|
| 154 |
+
raise PlannerValidationError(f"cycle detected in depends_on: {' -> '.join(cycle)}")
|
| 155 |
+
|
| 156 |
+
# On an acyclic graph, a placeholder is safe iff its target is a
|
| 157 |
+
# transitive ancestor — i.e. guaranteed to have completed before this
|
| 158 |
+
# task runs. Requiring a *direct* depends_on would wrongly reject valid
|
| 159 |
+
# plans that depend on the target through an intermediate task.
|
| 160 |
+
ancestors = _all_ancestors(tasks_by_id)
|
| 161 |
+
for task in tasks_by_id.values():
|
| 162 |
for ref in _placeholder_refs(task):
|
| 163 |
if ref not in id_set:
|
| 164 |
raise PlannerValidationError(
|
| 165 |
f"task {task.id}: placeholder '${{{ref}}}' references unknown task"
|
| 166 |
)
|
| 167 |
+
if ref not in ancestors[task.id]:
|
| 168 |
raise PlannerValidationError(
|
| 169 |
f"task {task.id}: placeholder '${{{ref}}}' used but {ref!r} is "
|
| 170 |
+
f"not a (transitive) dependency — add it to depends_on"
|
| 171 |
)
|
| 172 |
|
|
|
|
|
|
|
|
|
|
|
|
|
| 173 |
@staticmethod
|
| 174 |
def _validate_parallelism(tasks_by_id: dict, id_set: set[str]) -> None:
|
| 175 |
ancestors = _all_ancestors(tasks_by_id)
|