| from collections import Counter, defaultdict | |
| def detect_failure_patterns(session_scores: list[dict], spooled_entries: list[dict]) -> list[dict]: | |
| """ | |
| Scan session data for recurring failure patterns. | |
| Returns a list of pattern objects with count + affected sessions. | |
| """ | |
| patterns = [] | |
| tool_empty_count = defaultdict(list) | |
| tool_error_count = defaultdict(list) | |
| for entry in spooled_entries: | |
| if entry.get("role") != "toolResult": | |
| continue | |
| tool_name = entry.get("tool_name", "unknown") or "unknown" | |
| clean_text = entry.get("clean_text", "").strip() | |
| if clean_text == "": | |
| tool_empty_count[tool_name].append(entry["session_id"]) | |
| if entry.get("is_error"): | |
| tool_error_count[tool_name].append(entry["session_id"]) | |
| for tool, sessions in tool_empty_count.items(): | |
| uniq = sorted(set(sessions)) | |
| if len(uniq) >= 3: | |
| patterns.append({ | |
| "pattern": f"empty_tool_result:{tool}", | |
| "count": len(uniq), | |
| "sessions": uniq[:5], | |
| "description": f"Tool '{tool}' returned empty output across {len(uniq)} sessions", | |
| }) | |
| for tool, sessions in tool_error_count.items(): | |
| uniq = sorted(set(sessions)) | |
| if len(uniq) >= 3: | |
| patterns.append({ | |
| "pattern": f"tool_errors:{tool}", | |
| "count": len(uniq), | |
| "sessions": uniq[:5], | |
| "description": f"Tool '{tool}' showed error-like output across {len(uniq)} sessions", | |
| }) | |
| flag_counts = Counter() | |
| flag_sessions = defaultdict(list) | |
| for sess in session_scores: | |
| for flag in sess.get("flags", []): | |
| flag_counts[flag] += 1 | |
| flag_sessions[flag].append(sess["session_id"]) | |
| for flag, count in flag_counts.items(): | |
| if count >= 5: | |
| patterns.append({ | |
| "pattern": f"session_flag:{flag}", | |
| "count": count, | |
| "sessions": sorted(set(flag_sessions[flag]))[:5], | |
| "description": f"Flag '{flag}' appeared in {count} sessions", | |
| }) | |
| multi_flag_sessions = [ | |
| sess for sess in session_scores | |
| if len(sess.get("flags", [])) >= 2 | |
| ] | |
| if multi_flag_sessions: | |
| patterns.append({ | |
| "pattern": "session_multiple_flags", | |
| "count": len(multi_flag_sessions), | |
| "sessions": [sess["session_id"] for sess in multi_flag_sessions[:5]], | |
| "description": f"{len(multi_flag_sessions)} sessions triggered multiple review flags", | |
| }) | |
| return patterns | |