ToolACE-Hallucination-Detector / evaluation_baselines_span_utils.py
jameVee's picture
Add trained hallucination-detector classifiers
131239b verified
import json
import re
from typing import Dict, Iterable, List, Optional, Sequence, Tuple
import numpy as np
ACTION_PATTERNS = {
"email": [r"\bemail\b", r"\bmail\b", r"\bsend (an )?email\b"],
"calendar": [r"\bcalendar\b", r"\bschedule\b", r"\bbook\b", r"\bset up (a )?meeting\b"],
"phone": [r"\bcall\b", r"\bphone\b", r"\bdial\b"],
"message": [r"\bslack\b", r"\bmessage\b", r"\bping\b", r"\btext\b"],
}
def split_context_and_tools(context: str) -> Tuple[str, str]:
marker = "Available tools: "
if marker in context:
tool_output, tools_json = context.split(marker, 1)
return tool_output.strip(), tools_json.strip()
return context.strip(), ""
def parse_available_tools(context: str) -> List[Dict]:
_, tools_json = split_context_and_tools(context)
if not tools_json:
return []
try:
parsed = json.loads(tools_json)
if isinstance(parsed, list):
return parsed
except Exception:
pass
return []
def parse_tool_blocks(context: str) -> List[Tuple[str, object]]:
left, _ = split_context_and_tools(context)
s = left.strip()
blocks: List[Tuple[str, object]] = []
decoder = json.JSONDecoder()
i = 0
n = len(s)
while i < n:
while i < n and s[i] in " \n\t.":
i += 1
if i >= n:
break
colon = s.find(":", i)
if colon == -1:
break
name = s[i:colon].strip()
j = colon + 1
while j < n and s[j].isspace():
j += 1
if not name or j >= n:
break
try:
payload, end = decoder.raw_decode(s, j)
blocks.append((name, payload))
i = end
continue
except Exception:
match = re.search(r'(?:\n|\.\s+)(?=[^\n:]{1,80}:\s*[\[{\"])', s[j:])
end = j + (match.start() if match else len(s[j:]))
raw = s[j:end].strip()
blocks.append((name, raw))
i = end
return blocks
def humanize_key(key: str) -> str:
key = str(key).replace("_", " ").replace("-", " ")
key = re.sub(r"(?<=[a-z])(?=[A-Z])", " ", key)
key = re.sub(r"\s+", " ", key).strip()
return key.lower()
def normalize_text(text: str) -> str:
return re.sub(r"\s+", " ", str(text).lower()).strip()
def content_tokens(text: str) -> List[str]:
toks = re.findall(r"[A-Za-z0-9_./%:-]+", str(text).lower())
return [t for t in toks if len(t) >= 3]
def is_large_blob(text: str) -> bool:
text = str(text)
if text.startswith("data:image/"):
return True
return len(text) > 160 and re.fullmatch(r"[A-Za-z0-9+/=._:-]+", text) is not None
def format_scalar(value: object) -> str:
if value is None:
return "null"
if isinstance(value, bool):
return "true" if value else "false"
if isinstance(value, (int, float)):
return str(value)
text = str(value).strip().replace("\n", " ")
if is_large_blob(text):
return "[large binary/string omitted]"
if len(text) > 220:
return text[:217] + "..."
return text
def render_json(value: object, indent: int = 0, max_items: int = 4) -> List[str]:
sp = " " * indent
lines: List[str] = []
if isinstance(value, dict):
for k, v in value.items():
key = humanize_key(k)
if isinstance(v, (dict, list)):
lines.append(f"{sp}{key}:")
lines.extend(render_json(v, indent + 1, max_items=max_items))
else:
lines.append(f"{sp}{key}: {format_scalar(v)}")
return lines
if isinstance(value, list):
for idx, item in enumerate(value[:max_items], 1):
if isinstance(item, (dict, list)):
lines.append(f"{sp}- item {idx}:")
lines.extend(render_json(item, indent + 1, max_items=max_items))
else:
lines.append(f"{sp}- item {idx}: {format_scalar(item)}")
if len(value) > max_items:
lines.append(f"{sp}- ... {len(value) - max_items} more items")
return lines
return [f"{sp}{format_scalar(value)}"]
def render_available_tools(context: str, max_tools: int = 8) -> List[str]:
tools = parse_available_tools(context)
if not tools:
return []
lines = ["Available tools:"]
for tool in tools[:max_tools]:
name = tool.get("name", "")
desc = tool.get("description", "")
if len(desc) > 180:
desc = desc[:177] + "..."
lines.append(f"- {name}: {desc}")
if len(tools) > max_tools:
lines.append(f"- ... {len(tools) - max_tools} more tools")
return lines
def normalize_tool_context(context: str, max_items: int = 4, max_tools: int = 8) -> str:
blocks = parse_tool_blocks(context)
lines: List[str] = []
for name, payload in blocks:
lines.append(f"Tool: {name}")
lines.extend(render_json(payload, indent=1, max_items=max_items))
lines.extend(render_available_tools(context, max_tools=max_tools))
return "\n".join(lines)
def add_normalized_context_columns(df):
df = df.copy()
df["normalized_context"] = df["context"].apply(normalize_tool_context)
df["normalized_tool_output"] = df["context"].apply(lambda x: split_context_and_tools(normalize_tool_context(x))[0])
return df
def extract_percentages(text: str) -> List[Tuple[int, int, str]]:
return [(m.start(), m.end(), m.group(0)) for m in re.finditer(r"[+-]?\d+(?:\.\d+)?%", text)]
def extract_dates(text: str) -> List[Tuple[int, int, str]]:
patterns = [
r"\b\d{4}-\d{2}-\d{2}\b",
r"\b\d{4}/\d{2}/\d{2}\b",
r"\b(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\s+\d{1,2},\s+\d{4}\b",
]
matches = []
for pat in patterns:
matches.extend((m.start(), m.end(), m.group(0)) for m in re.finditer(pat, text))
return matches
def extract_numbers(text: str) -> List[Tuple[int, int, str]]:
return [(m.start(), m.end(), m.group(0)) for m in re.finditer(r"\b\d+(?:\.\d+)?\b", text)]
def extract_quoted_strings(text: str) -> List[Tuple[int, int, str]]:
return [(m.start(), m.end(), m.group(0)) for m in re.finditer(r'"[^"\n]{3,}"', text)]
def extract_capitalized_phrases(text: str) -> List[Tuple[int, int, str]]:
return [
(m.start(), m.end(), m.group(0))
for m in re.finditer(r"\b(?:[A-Z][a-z]+(?:\s+[A-Z][A-Za-z0-9.&'-]+){0,4})\b", text)
]
def merge_spans(spans: Sequence[Dict]) -> List[Dict]:
if not spans:
return []
spans = sorted(spans, key=lambda x: (x["start"], x["end"]))
merged = [dict(spans[0])]
for span in spans[1:]:
last = merged[-1]
if span["start"] <= last["end"]:
last["end"] = max(last["end"], span["end"])
if len(span.get("text", "")) > len(last.get("text", "")):
last["text"] = span.get("text", last.get("text", ""))
last["score"] = max(last.get("score", 0.0), span.get("score", 0.0))
last["type"] = last.get("type", span.get("type", "hallucination"))
else:
merged.append(dict(span))
return merged
def normalize_gold_labels(labels: Sequence, text: Optional[str] = None) -> List[Dict]:
normalized = []
for label in labels or []:
if isinstance(label, dict):
start = int(label.get("start", -1))
end = int(label.get("end", -1))
label_type = label.get("type", "hallucination")
snippet = label.get("text")
elif isinstance(label, (list, tuple)) and len(label) >= 3:
start = int(label[0])
end = int(label[1])
label_type = label[2]
snippet = text[start:end] if text and start >= 0 and end > start else None
else:
continue
if start < 0 or end <= start:
continue
normalized.append({
"start": start,
"end": end,
"type": label_type,
"text": snippet if snippet is not None else (text[start:end] if text else ""),
"score": 1.0,
})
return merge_spans(normalized)
def spans_to_char_set(spans: Sequence[Dict]) -> set:
chars = set()
for span in spans:
chars.update(range(int(span["start"]), int(span["end"])))
return chars
def evaluate_span_predictions(gold_spans_list: Sequence[Sequence[Dict]], pred_spans_list: Sequence[Sequence[Dict]]) -> Dict[str, float]:
tp = fp = fn = 0
exact_matches = 0
total = len(gold_spans_list)
for gold_spans, pred_spans in zip(gold_spans_list, pred_spans_list):
gold_chars = spans_to_char_set(gold_spans)
pred_chars = spans_to_char_set(pred_spans)
tp += len(gold_chars & pred_chars)
fp += len(pred_chars - gold_chars)
fn += len(gold_chars - pred_chars)
gold_exact = {(s["start"], s["end"], s.get("type", "hallucination")) for s in gold_spans}
pred_exact = {(s["start"], s["end"], s.get("type", "hallucination")) for s in pred_spans}
exact_matches += int(gold_exact == pred_exact)
precision = tp / (tp + fp) if tp + fp > 0 else 0.0
recall = tp / (tp + fn) if tp + fn > 0 else 0.0
f1 = 2 * precision * recall / (precision + recall) if precision + recall > 0 else 0.0
return {
"span_precision": precision,
"span_recall": recall,
"span_f1": f1,
"exact_match_rate": exact_matches / total if total else 0.0,
}
def build_span_eval_frame(df, pred_col: str) -> Dict[str, float]:
gold = [normalize_gold_labels(labels, text=output) for labels, output in zip(df["hallucination_labels"], df["output"])]
pred = df[pred_col].tolist()
return evaluate_span_predictions(gold, pred)
def lexical_hallucination_spans(row) -> List[Dict]:
output = row["output"]
tool_output = row.get("normalized_tool_output", row["context"])
available_tools = parse_available_tools(row["context"])
tool_norm = normalize_text(tool_output)
spans = []
candidate_extractors = [
("percentage", extract_percentages),
("date", extract_dates),
("number", extract_numbers),
("quote", extract_quoted_strings),
("entity", extract_capitalized_phrases),
]
for kind, extractor in candidate_extractors:
for start, end, value in extractor(output):
value_norm = normalize_text(value.strip('"'))
if len(value_norm) < 2:
continue
if value_norm not in tool_norm:
spans.append({
"start": start,
"end": end,
"text": output[start:end],
"type": "hallucination",
"score": 0.6 if kind in {"number", "entity"} else 0.8,
})
tool_blob = normalize_text(" ".join(
f"{tool.get('name', '')} {tool.get('description', '')}" for tool in available_tools
))
for affordance, patterns in ACTION_PATTERNS.items():
supported = affordance in tool_blob
for pattern in patterns:
for match in re.finditer(pattern, output, flags=re.IGNORECASE):
if not supported:
spans.append({
"start": match.start(),
"end": match.end(),
"text": match.group(0),
"type": "missing_tool",
"score": 0.95,
})
return merge_spans(spans)
def longest_streak(flags: Sequence[bool]) -> int:
best = 0
cur = 0
for flag in flags:
cur = cur + 1 if flag else 0
best = max(best, cur)
return best
def aggregate_span_features(spans: Sequence[Dict], text_length: int) -> Dict[str, float]:
lengths = [max(0, span["end"] - span["start"]) for span in spans]
scores = [float(span.get("score", 0.0)) for span in spans]
coverage = sum(lengths) / max(1, text_length)
return {
"num_spans": len(spans),
"max_span_score": max(scores) if scores else 0.0,
"mean_span_score": float(np.mean(scores)) if scores else 0.0,
"span_char_fraction": coverage,
"max_span_len": max(lengths) if lengths else 0.0,
}
def aggregate_lookback_features(ratios: Sequence[Dict]) -> Dict[str, float]:
if not ratios:
return {
"mean_ratio": 0.5,
"min_ratio": 0.5,
"frac_low_03": 0.0,
"frac_low_02": 0.0,
"std_ratio": 0.0,
"bottom3_mean": 0.5,
"longest_low_streak": 0.0,
"mean_ratio_numeric": 0.5,
"frac_low_numeric": 0.0,
}
vals = np.array([r["lookback_ratio"] for r in ratios], dtype=float)
bottom3 = np.sort(vals)[: min(3, len(vals))]
numeric_mask = np.array([bool(re.search(r"\d", r["token"])) for r in ratios])
if numeric_mask.any():
numeric_vals = vals[numeric_mask]
mean_ratio_numeric = float(numeric_vals.mean())
frac_low_numeric = float((numeric_vals < 0.3).mean())
else:
mean_ratio_numeric = 0.5
frac_low_numeric = 0.0
low_flags = vals < 0.3
return {
"mean_ratio": float(vals.mean()),
"min_ratio": float(vals.min()),
"frac_low_03": float((vals < 0.3).mean()),
"frac_low_02": float((vals < 0.2).mean()),
"std_ratio": float(vals.std()),
"bottom3_mean": float(bottom3.mean()),
"longest_low_streak": float(longest_streak(low_flags.tolist())),
"mean_ratio_numeric": mean_ratio_numeric,
"frac_low_numeric": frac_low_numeric,
}
def spans_from_lookback_ratios(answer: str, ratios: Sequence[Dict], low_threshold: float = 0.22, min_chars: int = 3) -> List[Dict]:
spans = []
cur = None
for ratio in ratios:
start = ratio.get("start")
end = ratio.get("end")
if start is None or end is None or end <= start:
continue
text = answer[start:end]
is_content = bool(re.search(r"[A-Za-z0-9]", text))
low = ratio["lookback_ratio"] < low_threshold
if low and is_content:
if cur is None:
cur = {
"start": start,
"end": end,
"score_values": [1.0 - float(ratio["lookback_ratio"])],
}
else:
if start <= cur["end"] + 1:
cur["end"] = end
cur["score_values"].append(1.0 - float(ratio["lookback_ratio"]))
else:
spans.append(cur)
cur = {
"start": start,
"end": end,
"score_values": [1.0 - float(ratio["lookback_ratio"])],
}
elif cur is not None:
spans.append(cur)
cur = None
if cur is not None:
spans.append(cur)
normalized = []
for span in spans:
if span["end"] - span["start"] < min_chars:
continue
normalized.append({
"start": span["start"],
"end": span["end"],
"text": answer[span["start"]:span["end"]],
"type": "hallucination",
"score": float(np.mean(span["score_values"])) if span["score_values"] else 0.0,
})
return merge_spans(normalized)