File size: 2,648 Bytes
63c75d5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 | from collections import Counter, defaultdict
def detect_failure_patterns(session_scores: list[dict], spooled_entries: list[dict]) -> list[dict]:
"""
Scan session data for recurring failure patterns.
Returns a list of pattern objects with count + affected sessions.
"""
patterns = []
tool_empty_count = defaultdict(list)
tool_error_count = defaultdict(list)
for entry in spooled_entries:
if entry.get("role") != "toolResult":
continue
tool_name = entry.get("tool_name", "unknown") or "unknown"
clean_text = entry.get("clean_text", "").strip()
if clean_text == "":
tool_empty_count[tool_name].append(entry["session_id"])
if entry.get("is_error"):
tool_error_count[tool_name].append(entry["session_id"])
for tool, sessions in tool_empty_count.items():
uniq = sorted(set(sessions))
if len(uniq) >= 3:
patterns.append({
"pattern": f"empty_tool_result:{tool}",
"count": len(uniq),
"sessions": uniq[:5],
"description": f"Tool '{tool}' returned empty output across {len(uniq)} sessions",
})
for tool, sessions in tool_error_count.items():
uniq = sorted(set(sessions))
if len(uniq) >= 3:
patterns.append({
"pattern": f"tool_errors:{tool}",
"count": len(uniq),
"sessions": uniq[:5],
"description": f"Tool '{tool}' showed error-like output across {len(uniq)} sessions",
})
flag_counts = Counter()
flag_sessions = defaultdict(list)
for sess in session_scores:
for flag in sess.get("flags", []):
flag_counts[flag] += 1
flag_sessions[flag].append(sess["session_id"])
for flag, count in flag_counts.items():
if count >= 5:
patterns.append({
"pattern": f"session_flag:{flag}",
"count": count,
"sessions": sorted(set(flag_sessions[flag]))[:5],
"description": f"Flag '{flag}' appeared in {count} sessions",
})
multi_flag_sessions = [
sess for sess in session_scores
if len(sess.get("flags", [])) >= 2
]
if multi_flag_sessions:
patterns.append({
"pattern": "session_multiple_flags",
"count": len(multi_flag_sessions),
"sessions": [sess["session_id"] for sess in multi_flag_sessions[:5]],
"description": f"{len(multi_flag_sessions)} sessions triggered multiple review flags",
})
return patterns
|