| from reviewer.scorer import score_session |
| from reviewer.pattern_detector import detect_failure_patterns |
| from reviewer import skill_analyzer |
|
|
| def test_score_session(sample_spooled_entries): |
| |
| result = score_session(sample_spooled_entries) |
| |
| assert "score" in result |
| assert result["tool_result_count"] == 2 |
| assert result["assistant_msg_count"] == 1 |
| assert result["user_msg_count"] == 1 |
| |
| |
| flags = result["flags"] |
| assert "excessively_long_tool_output" in flags |
| assert "empty_tool_results" in flags |
| assert "meta_process_narration" in flags |
| assert result["score"] < 1.0 |
|
|
| def test_score_session_fts_noise(): |
| entries = [ |
| {"session_id": "s1", "role": "toolResult", "tool_name": "session_search", "clean_text": "0 results found for query xyz", "is_error": 0}, |
| {"session_id": "s1", "role": "toolResult", "tool_name": "session_search", "clean_text": "no results found. try different terms.", "is_error": 0}, |
| {"session_id": "s1", "role": "toolResult", "tool_name": "session_search", "clean_text": "0 results", "is_error": 0}, |
| ] |
| result = score_session(entries) |
| assert "fts_query_noise" in result["flags"] |
|
|
| def test_score_session_empty(): |
| result = score_session([]) |
| assert result["score"] == 0.0 |
| assert "no_data" in result["flags"] |
|
|
| def test_detect_failure_patterns(): |
| |
| session_scores = [ |
| {"session_id": "s1", "flags": ["some_flag", "other_flag"]}, |
| {"session_id": "s2", "flags": ["some_flag"]}, |
| {"session_id": "s3", "flags": ["some_flag", "third_flag"]}, |
| {"session_id": "s4", "flags": ["some_flag"]}, |
| {"session_id": "s5", "flags": ["some_flag"]}, |
| ] |
| spooled_entries = [ |
| {"session_id": "s1", "role": "toolResult", "tool_name": "bad_tool", "clean_text": "", "is_error": 0}, |
| {"session_id": "s2", "role": "toolResult", "tool_name": "bad_tool", "clean_text": "", "is_error": 0}, |
| {"session_id": "s3", "role": "toolResult", "tool_name": "bad_tool", "clean_text": "", "is_error": 0}, |
| ] |
| |
| patterns = detect_failure_patterns(session_scores, spooled_entries) |
| |
| |
| assert any(p["pattern"] == "empty_tool_result:bad_tool" for p in patterns) |
| |
| |
| assert any(p["pattern"] == "session_flag:some_flag" for p in patterns) |
|
|
|
|
| def test_find_skill_candidates_prefers_allowlist_update(tmp_path, monkeypatch): |
| skills_dir = tmp_path / "skills" |
| skill_dir = skills_dir / "ops-framework" |
| skill_dir.mkdir(parents=True) |
| (skill_dir / "SKILL.md").write_text( |
| "---\n" |
| "name: ops-framework\n" |
| "description: Use for OpenClaw gateway troubleshooting and deterministic ops checks\n" |
| "---\n" |
| "# Ops Framework\n" |
| "Run bounded diagnostics before changing gateway configuration.\n", |
| encoding="utf-8", |
| ) |
| config = tmp_path / "openclaw.json" |
| config.write_text( |
| '{"agents":{"list":[{"id":"azoth","skills":["context7"]}]}}', |
| encoding="utf-8", |
| ) |
| monkeypatch.setattr(skill_analyzer, "SKILLS_DIR", skills_dir) |
| monkeypatch.setattr(skill_analyzer, "OPENCLAW_CONFIG", config) |
|
|
| result = skill_analyzer.find_skill_candidates( |
| "gateway troubleshooting deterministic ops checks", agent_id="azoth" |
| ) |
|
|
| assert result["decision"] == "reuse-existing-skill" |
| assert result["allowlistAction"] == "add-existing-skill-to-agent-allowlist" |
| assert result["allowlistValue"] == "ops-framework" |
|
|