File size: 2,648 Bytes
63c75d5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
from collections import Counter, defaultdict


def detect_failure_patterns(session_scores: list[dict], spooled_entries: list[dict]) -> list[dict]:
    """
    Scan session data for recurring failure patterns.
    Returns a list of pattern objects with count + affected sessions.
    """
    patterns = []

    tool_empty_count = defaultdict(list)
    tool_error_count = defaultdict(list)
    for entry in spooled_entries:
        if entry.get("role") != "toolResult":
            continue
        tool_name = entry.get("tool_name", "unknown") or "unknown"
        clean_text = entry.get("clean_text", "").strip()
        if clean_text == "":
            tool_empty_count[tool_name].append(entry["session_id"])
        if entry.get("is_error"):
            tool_error_count[tool_name].append(entry["session_id"])

    for tool, sessions in tool_empty_count.items():
        uniq = sorted(set(sessions))
        if len(uniq) >= 3:
            patterns.append({
                "pattern": f"empty_tool_result:{tool}",
                "count": len(uniq),
                "sessions": uniq[:5],
                "description": f"Tool '{tool}' returned empty output across {len(uniq)} sessions",
            })

    for tool, sessions in tool_error_count.items():
        uniq = sorted(set(sessions))
        if len(uniq) >= 3:
            patterns.append({
                "pattern": f"tool_errors:{tool}",
                "count": len(uniq),
                "sessions": uniq[:5],
                "description": f"Tool '{tool}' showed error-like output across {len(uniq)} sessions",
            })

    flag_counts = Counter()
    flag_sessions = defaultdict(list)
    for sess in session_scores:
        for flag in sess.get("flags", []):
            flag_counts[flag] += 1
            flag_sessions[flag].append(sess["session_id"])

    for flag, count in flag_counts.items():
        if count >= 5:
            patterns.append({
                "pattern": f"session_flag:{flag}",
                "count": count,
                "sessions": sorted(set(flag_sessions[flag]))[:5],
                "description": f"Flag '{flag}' appeared in {count} sessions",
            })

    multi_flag_sessions = [
        sess for sess in session_scores
        if len(sess.get("flags", [])) >= 2
    ]
    if multi_flag_sessions:
        patterns.append({
            "pattern": "session_multiple_flags",
            "count": len(multi_flag_sessions),
            "sessions": [sess["session_id"] for sess in multi_flag_sessions[:5]],
            "description": f"{len(multi_flag_sessions)} sessions triggered multiple review flags",
        })

    return patterns