agentic-intent-classifier / evaluation /sweep_intent_threshold.py
manikumargouni's picture
Upload folder using huggingface_hub
0584798 verified
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
BASE_DIR = Path(__file__).resolve().parent.parent
if str(BASE_DIR) not in sys.path:
sys.path.insert(0, str(BASE_DIR))
from combined_inference import classify_query
from config import BASE_DIR, INTENT_HEAD_CONFIG, ensure_artifact_dirs
from model_runtime import get_head
from schemas import validate_classify_response
DEFAULT_THRESHOLDS = [0.0, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45]
SAFE_INTENT_TYPES = {"ambiguous", "personal_reflection", "support"}
OBVIOUS_INTENT_TYPES = {"informational", "commercial", "transactional"}
SWEEP_SUITE_PATH = BASE_DIR / "examples" / "intent_threshold_sweep_suite.json"
OUTPUT_PATH = BASE_DIR / "artifacts" / "evaluation" / "intent_threshold_sweep.json"
def load_jsonl(path: Path) -> list[dict]:
with path.open("r", encoding="utf-8") as handle:
return [json.loads(line) for line in handle]
def load_json(path: Path) -> list[dict]:
return json.loads(path.read_text(encoding="utf-8"))
def round_score(value: float) -> float:
return round(float(value), 4)
def write_json(path: Path, payload: dict) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
def evaluate_intent_head_threshold(threshold: float) -> dict:
head = get_head("intent_type")
dataset_specs = [
("val", INTENT_HEAD_CONFIG.split_paths["val"]),
("test", INTENT_HEAD_CONFIG.split_paths["test"]),
("hard_cases", BASE_DIR / "data" / "hard_cases.jsonl"),
("third_wave_cases", BASE_DIR / "data" / "third_wave_cases.jsonl"),
]
rows = []
for suite_name, path in dataset_specs:
for item in load_jsonl(path):
rows.append({"suite": suite_name, **item})
predictions = head.predict_batch([row["text"] for row in rows], confidence_threshold=threshold)
obvious_total = 0
obvious_fallback = 0
ambiguous_total = 0
ambiguous_bad_allow = 0
intent_only_safe_pred = 0
for row, prediction in zip(rows, predictions):
predicted_label = prediction["label"]
head_would_fallback = (not prediction["meets_confidence_threshold"]) or (predicted_label in SAFE_INTENT_TYPES)
if row[INTENT_HEAD_CONFIG.label_field] in OBVIOUS_INTENT_TYPES:
obvious_total += 1
if head_would_fallback:
obvious_fallback += 1
if row[INTENT_HEAD_CONFIG.label_field] == "ambiguous":
ambiguous_total += 1
if not head_would_fallback:
ambiguous_bad_allow += 1
if head_would_fallback and predicted_label in SAFE_INTENT_TYPES:
intent_only_safe_pred += 1
return {
"obvious_prompt_count": obvious_total,
"obvious_false_fallback_rate": round_score(obvious_fallback / obvious_total) if obvious_total else 0.0,
"ambiguous_prompt_count": ambiguous_total,
"ambiguous_bad_allow_rate": round_score(ambiguous_bad_allow / ambiguous_total) if ambiguous_total else 0.0,
"safe_predicate_rate": round_score(intent_only_safe_pred / len(rows)) if rows else 0.0,
}
def evaluate_combined_threshold(threshold: float) -> dict:
suite = load_json(SWEEP_SUITE_PATH)
benchmark = load_json(BASE_DIR / "examples" / "demo_prompt_suite.json")
obvious_total = 0
obvious_false_fallback = 0
safe_total = 0
safe_bad_allow = 0
intent_only = 0
phase_only = 0
both = 0
policy_safe = 0
suite_outputs = []
for item in suite:
payload = validate_classify_response(classify_query(item["input"], threshold_overrides={"intent_type": threshold}))
fallback = payload["model_output"].get("fallback")
fallback_applied = fallback is not None
failed_components = set((fallback or {}).get("failed_components", []))
if item["expected_outcome"] == "pass":
obvious_total += 1
if fallback_applied:
obvious_false_fallback += 1
else:
safe_total += 1
if not fallback_applied:
safe_bad_allow += 1
if fallback_applied:
if failed_components == {"intent_type"}:
intent_only += 1
elif failed_components == {"decision_phase"}:
phase_only += 1
elif failed_components == {"intent_type", "decision_phase"}:
both += 1
else:
policy_safe += 1
suite_outputs.append(
{
"input": item["input"],
"expected_outcome": item["expected_outcome"],
"fallback_applied": fallback_applied,
"failed_components": sorted(failed_components),
"intent_type": payload["model_output"]["classification"]["intent"]["type"],
"decision_phase": payload["model_output"]["classification"]["intent"]["decision_phase"],
"intent_confidence": payload["model_output"]["classification"]["intent"]["component_confidence"]["intent_type"]["confidence"],
"phase_confidence": payload["model_output"]["classification"]["intent"]["component_confidence"]["decision_phase"]["confidence"],
}
)
benchmark_fallbacks = []
for item in benchmark:
payload = validate_classify_response(classify_query(item["input"], threshold_overrides={"intent_type": threshold}))
fallback = payload["model_output"].get("fallback")
fallback_applied = fallback is not None
failed_components = set((fallback or {}).get("failed_components", []))
benchmark_fallbacks.append(
{
"fallback_applied": fallback_applied,
"intent_only": failed_components == {"intent_type"},
"phase_only": failed_components == {"decision_phase"},
"both": failed_components == {"intent_type", "decision_phase"},
}
)
total_suite_fallbacks = intent_only + phase_only + both + policy_safe
benchmark_total_fallbacks = sum(1 for item in benchmark_fallbacks if item["fallback_applied"])
return {
"suite_path": str(SWEEP_SUITE_PATH),
"obvious_prompt_count": obvious_total,
"false_fallback_rate_on_obvious_prompts": round_score(obvious_false_fallback / obvious_total) if obvious_total else 0.0,
"safe_prompt_count": safe_total,
"bad_allow_rate_on_safe_prompts": round_score(safe_bad_allow / safe_total) if safe_total else 0.0,
"fallback_responsibility": {
"intent_only": intent_only,
"phase_only": phase_only,
"both": both,
"policy_safe": policy_safe,
"intent_share_of_threshold_fallbacks": round_score(
(intent_only + both) / (intent_only + phase_only + both)
)
if (intent_only + phase_only + both)
else 0.0,
"phase_share_of_threshold_fallbacks": round_score(
(phase_only + both) / (intent_only + phase_only + both)
)
if (intent_only + phase_only + both)
else 0.0,
"fallback_rate": round_score(total_suite_fallbacks / len(suite)) if suite else 0.0,
},
"benchmark_fallback_rate": round_score(benchmark_total_fallbacks / len(benchmark)) if benchmark else 0.0,
"benchmark_intent_only_fallback_rate": round_score(
sum(1 for item in benchmark_fallbacks if item["intent_only"]) / len(benchmark)
)
if benchmark
else 0.0,
"benchmark_phase_only_fallback_rate": round_score(
sum(1 for item in benchmark_fallbacks if item["phase_only"]) / len(benchmark)
)
if benchmark
else 0.0,
"suite_outputs": suite_outputs,
}
def pick_recommended_threshold(results: list[dict]) -> dict:
return min(
results,
key=lambda item: (
item["combined"]["bad_allow_rate_on_safe_prompts"],
item["head"]["ambiguous_bad_allow_rate"],
item["combined"]["false_fallback_rate_on_obvious_prompts"],
abs(item["combined"]["fallback_responsibility"]["intent_share_of_threshold_fallbacks"] - 0.5),
item["combined"]["benchmark_fallback_rate"],
item["threshold"],
),
)
def apply_threshold(threshold: float) -> None:
calibration_path = INTENT_HEAD_CONFIG.calibration_path
payload = json.loads(calibration_path.read_text(encoding="utf-8"))
payload["confidence_threshold"] = round_score(threshold)
payload["threshold_selection_mode"] = "manual_sweep"
write_json(calibration_path, payload)
def main() -> None:
parser = argparse.ArgumentParser(description="Sweep candidate intent_type thresholds and compare end-to-end behavior.")
parser.add_argument(
"--thresholds",
nargs="*",
type=float,
default=DEFAULT_THRESHOLDS,
help="Candidate thresholds to evaluate.",
)
parser.add_argument(
"--apply-threshold",
type=float,
default=None,
help="Optional threshold to write into the intent_type calibration artifact after evaluation.",
)
args = parser.parse_args()
ensure_artifact_dirs()
thresholds = [round_score(value) for value in args.thresholds]
results = []
for threshold in thresholds:
results.append(
{
"threshold": threshold,
"head": evaluate_intent_head_threshold(threshold),
"combined": evaluate_combined_threshold(threshold),
}
)
recommended = pick_recommended_threshold(results)
output = {
"thresholds": thresholds,
"results": results,
"recommended_threshold": recommended["threshold"],
}
write_json(OUTPUT_PATH, output)
if args.apply_threshold is not None:
apply_threshold(args.apply_threshold)
output["applied_threshold"] = round_score(args.apply_threshold)
write_json(OUTPUT_PATH, output)
print(json.dumps(output, indent=2))
if __name__ == "__main__":
main()