File size: 3,821 Bytes
63c75d5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from reviewer.scorer import score_session
from reviewer.pattern_detector import detect_failure_patterns
from reviewer import skill_analyzer

def test_score_session(sample_spooled_entries):
    # Base case with provided sample entries
    result = score_session(sample_spooled_entries)
    
    assert "score" in result
    assert result["tool_result_count"] == 2
    assert result["assistant_msg_count"] == 1
    assert result["user_msg_count"] == 1
    
    # Check flags triggered by sample entries
    flags = result["flags"]
    assert "excessively_long_tool_output" in flags  # One output is 5000 chars length
    assert "empty_tool_results" in flags # One output is empty
    assert "meta_process_narration" in flags # Assistant uses "I'll use the..."
    assert result["score"] < 1.0

def test_score_session_fts_noise():
    entries = [
        {"session_id": "s1", "role": "toolResult", "tool_name": "session_search", "clean_text": "0 results found for query xyz", "is_error": 0},
        {"session_id": "s1", "role": "toolResult", "tool_name": "session_search", "clean_text": "no results found. try different terms.", "is_error": 0},
        {"session_id": "s1", "role": "toolResult", "tool_name": "session_search", "clean_text": "0 results", "is_error": 0},
    ]
    result = score_session(entries)
    assert "fts_query_noise" in result["flags"]

def test_score_session_empty():
    result = score_session([])
    assert result["score"] == 0.0
    assert "no_data" in result["flags"]

def test_detect_failure_patterns():
    # Setup some dummy session scores and entries
    session_scores = [
        {"session_id": "s1", "flags": ["some_flag", "other_flag"]},
        {"session_id": "s2", "flags": ["some_flag"]},
        {"session_id": "s3", "flags": ["some_flag", "third_flag"]},
        {"session_id": "s4", "flags": ["some_flag"]},
        {"session_id": "s5", "flags": ["some_flag"]},
    ]
    spooled_entries = [
        {"session_id": "s1", "role": "toolResult", "tool_name": "bad_tool", "clean_text": "", "is_error": 0},
        {"session_id": "s2", "role": "toolResult", "tool_name": "bad_tool", "clean_text": "", "is_error": 0},
        {"session_id": "s3", "role": "toolResult", "tool_name": "bad_tool", "clean_text": "", "is_error": 0},
    ]
    
    patterns = detect_failure_patterns(session_scores, spooled_entries)
    
    # We should have recognized 'bad_tool' as having empty output in >= 3 sessions
    assert any(p["pattern"] == "empty_tool_result:bad_tool" for p in patterns)
    
    # We should have recognized 'some_flag' triggered in 5 sessions
    assert any(p["pattern"] == "session_flag:some_flag" for p in patterns)


def test_find_skill_candidates_prefers_allowlist_update(tmp_path, monkeypatch):
    skills_dir = tmp_path / "skills"
    skill_dir = skills_dir / "ops-framework"
    skill_dir.mkdir(parents=True)
    (skill_dir / "SKILL.md").write_text(
        "---\n"
        "name: ops-framework\n"
        "description: Use for OpenClaw gateway troubleshooting and deterministic ops checks\n"
        "---\n"
        "# Ops Framework\n"
        "Run bounded diagnostics before changing gateway configuration.\n",
        encoding="utf-8",
    )
    config = tmp_path / "openclaw.json"
    config.write_text(
        '{"agents":{"list":[{"id":"azoth","skills":["context7"]}]}}',
        encoding="utf-8",
    )
    monkeypatch.setattr(skill_analyzer, "SKILLS_DIR", skills_dir)
    monkeypatch.setattr(skill_analyzer, "OPENCLAW_CONFIG", config)

    result = skill_analyzer.find_skill_candidates(
        "gateway troubleshooting deterministic ops checks", agent_id="azoth"
    )

    assert result["decision"] == "reuse-existing-skill"
    assert result["allowlistAction"] == "add-existing-skill-to-agent-allowlist"
    assert result["allowlistValue"] == "ops-framework"