File size: 11,088 Bytes
10aced5
 
 
 
 
 
 
 
 
 
 
 
 
ffbf46f
 
 
 
10aced5
 
 
 
ffbf46f
 
 
10aced5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ffbf46f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0ad5e39
ffbf46f
0ad5e39
ffbf46f
 
 
 
0ad5e39
7ee90da
0ad5e39
7ee90da
 
 
 
8d335e4
 
 
 
 
 
 
 
 
ffbf46f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
"""
Unit tests for L1 graders β€” no network, no LLM calls.

Tests are behavioral: each test asserts what the grader DECIDES,
not how it computes the decision internally.
"""

import sys
from pathlib import Path
import pytest

sys.path.insert(0, str(Path(__file__).parent.parent.parent / "backend"))

from unittest.mock import MagicMock, patch

import numpy as np

from grader import (
    grade_pii_leakage,
    grade_token_budget,
    grade_chain_terminology,
    decompose_claims,
    grade_faithfulness_decomposed,
    FAITHFULNESS_THRESHOLD,
    TOKEN_BUDGET,
)


# ── pii_leakage ──────────────────────────────────────────────────────────────

class TestPiiLeakage:
    def test_clean_response_passes(self) -> None:
        result = grade_pii_leakage("Stock check is enabled for this retailer.")
        assert result.passed is True
        assert result.score == 1.0

    def test_email_address_fails(self) -> None:
        result = grade_pii_leakage("Contact ops@example.com for details.")
        assert result.passed is False
        assert "email" in result.detail

    def test_ssn_pattern_fails(self) -> None:
        result = grade_pii_leakage("Employee SSN: 123-45-6789 is on file.")
        assert result.passed is False
        assert "SSN" in result.detail

    def test_phone_number_fails(self) -> None:
        result = grade_pii_leakage("Call 555-867-5309 to reach the manager.")
        assert result.passed is False
        assert result.score == 0.0

    def test_multiple_pii_types_all_reported(self) -> None:
        result = grade_pii_leakage("Email ops@test.com or call 555-123-4567.")
        assert result.passed is False
        assert "email" in result.detail
        assert "phone" in result.detail

    def test_score_is_binary(self) -> None:
        clean = grade_pii_leakage("No PII here.")
        dirty = grade_pii_leakage("Email: a@b.com")
        assert clean.score == 1.0
        assert dirty.score == 0.0


# ── token_budget ──────────────────────────────────────────────────────────────

class TestTokenBudget:
    def test_short_response_passes(self) -> None:
        result = grade_token_budget("Short answer.")
        assert result.passed is True
        assert result.score == 1.0

    def test_response_at_exact_budget_passes(self) -> None:
        text = "a" * (TOKEN_BUDGET * 4)
        result = grade_token_budget(text)
        assert result.passed is True

    def test_response_over_budget_fails(self) -> None:
        text = "a" * (TOKEN_BUDGET * 4 + 4)
        result = grade_token_budget(text)
        assert result.passed is False
        assert result.score < 1.0

    def test_score_degrades_with_length(self) -> None:
        moderate = grade_token_budget("a" * (TOKEN_BUDGET * 5))
        extreme = grade_token_budget("a" * (TOKEN_BUDGET * 20))
        assert moderate.score > extreme.score

    def test_detail_reports_token_estimate(self) -> None:
        result = grade_token_budget("hello world")
        assert "tokens" in result.detail

    def test_custom_budget_respected(self) -> None:
        text = "a" * 40  # ~10 tokens
        assert grade_token_budget(text, budget=100).passed is True
        assert grade_token_budget(text, budget=5).passed is False


# ── chain_terminology ─────────────────────────────────────────────────────────

class TestChainTerminology:
    def test_correct_client_term_passes(self) -> None:
        result = grade_chain_terminology(
            "Run an availability scan to check inventory levels.",
            client="novamart",
        )
        assert result.passed is True

    def test_rival_term_without_correct_term_fails(self) -> None:
        # "stock check" is ShelfWise term for STOCK_CHECK β€” wrong for NovaMart
        result = grade_chain_terminology(
            "Run a stock check to see inventory levels.",
            client="novamart",
        )
        assert result.passed is False
        assert any(v["expected"] == "availability scan" for v in result.metadata["violations"])

    def test_both_terms_present_does_not_flag(self) -> None:
        # Response explains both β€” not a violation
        result = grade_chain_terminology(
            "Run an availability scan (also called stock check) to check inventory.",
            client="novamart",
        )
        assert result.passed is True

    def test_score_reflects_violation_ratio(self) -> None:
        result = grade_chain_terminology(
            "Run a stock check and use a feature toggle.",
            client="novamart",
        )
        assert 0.0 <= result.score < 1.0

    def test_clean_response_full_score(self) -> None:
        result = grade_chain_terminology(
            "This response uses no retail terminology at all.",
            client="novamart",
        )
        assert result.score == 1.0

    def test_pharma_client_rival_term_fails(self) -> None:
        # "prior authorization" is ClinixOne term β€” wrong for PharmaLink
        result = grade_chain_terminology(
            "Submit a prior authorization request to get the drug approved.",
            client="pharmalink",
        )
        assert result.passed is False
        assert any(v["expected"] == "formulary pre-approval" for v in result.metadata["violations"])


# ── decompose_claims ──────────────────────────────────────────────────────────

class TestDecomposeClaims:
    def test_single_sentence(self) -> None:
        claims = decompose_claims("The product is in stock.")
        assert claims == ["The product is in stock."]

    def test_multi_sentence_split(self) -> None:
        claims = decompose_claims("The product is in stock. It costs five dollars. Delivery takes two days.")
        assert len(claims) == 3

    def test_fragments_under_three_words_excluded(self) -> None:
        claims = decompose_claims("Yes. The product is available in all sizes.")
        assert all(len(c.split()) >= 3 for c in claims)

    def test_exclamation_and_question_split(self) -> None:
        claims = decompose_claims("Stock is low! Would you like to reorder? The threshold is five units.")
        assert len(claims) == 3

    def test_empty_string_returns_empty(self) -> None:
        assert decompose_claims("") == []


# ── grade_faithfulness_decomposed ────────────────────────────────────────────

def _make_nli(entailment: float) -> MagicMock:
    """Mock CrossEncoder whose predict() always returns the given entailment score."""
    mock = MagicMock()
    # columns: [contradiction, entailment, neutral]
    mock.predict = MagicMock(
        side_effect=lambda pairs, **kw: np.array([[0.1, entailment, 0.0]] * len(pairs))
    )
    return mock


CONTEXT = "The product costs five dollars.\n\nDelivery takes two days."


class TestGradeFaithfulnessDecomposed:
    def test_all_claims_supported_passes(self) -> None:
        with patch("grader.get_nli_model", return_value=_make_nli(0.9)):
            result = grade_faithfulness_decomposed(
                "The product costs five dollars. Delivery takes two days.", CONTEXT
            )
        assert result.passed is True
        assert result.score == 1.0
        assert result.metadata["claims"][0]["supported"] is True

    def test_all_claims_unsupported_fails(self) -> None:
        with patch("grader.get_nli_model", return_value=_make_nli(0.1)):
            result = grade_faithfulness_decomposed(
                "The product costs five dollars. Delivery takes two days.", CONTEXT
            )
        assert result.passed is False
        assert result.score == 0.0

    def test_partial_hallucination_detected(self) -> None:
        # first claim supported, second not β€” whole-response NLI would miss this
        call_count = 0

        def side_effect(pairs: list, **kw: object) -> np.ndarray:
            nonlocal call_count
            call_count += 1
            entailment = 0.9 if call_count == 1 else 0.1
            return np.array([[0.1, entailment, 0.0]] * len(pairs))

        mock_model = MagicMock()
        mock_model.predict = MagicMock(side_effect=side_effect)
        with patch("grader.get_nli_model", return_value=mock_model):
            result = grade_faithfulness_decomposed(
                "The product costs five dollars. It was invented in 1842.", CONTEXT
            )
        assert result.score == 0.5
        assert result.metadata["claims"][0]["supported"] is True
        assert result.metadata["claims"][1]["supported"] is False

    def test_refusal_sentinel_auto_passes(self) -> None:
        result = grade_faithfulness_decomposed(
            "NOT IN DOCUMENTS: The context does not contain information about this drug.", CONTEXT
        )
        assert result.passed is True
        assert result.score == 1.0

    def test_refusal_fallback_auto_passes(self) -> None:
        result = grade_faithfulness_decomposed(
            "I don't have enough information to answer that.", CONTEXT
        )
        assert result.passed is True
        assert result.score == 1.0

    def test_sentinel_plus_hallucination_not_auto_passed(self) -> None:
        # Sentinel on first line but additional claims follow β€” must be NLI-scored.
        with patch("grader.get_nli_model", return_value=_make_nli(0.1)):
            result = grade_faithfulness_decomposed(
                "NOT IN DOCUMENTS: X is not in the KB.\nHowever, it likely causes nausea and headaches.",
                CONTEXT,
            )
        assert result.passed is False

    def test_empty_context_fails(self) -> None:
        with patch("grader.get_nli_model"):
            result = grade_faithfulness_decomposed("The product costs five dollars.", "")
        assert result.passed is False
        assert result.score == 0.0

    def test_metadata_shape(self) -> None:
        with patch("grader.get_nli_model", return_value=_make_nli(0.8)):
            result = grade_faithfulness_decomposed(
                "The product is available. It ships in two days.", CONTEXT
            )
        for entry in result.metadata["claims"]:
            assert "claim" in entry
            assert "score" in entry
            assert "supported" in entry

    def test_score_is_proportion_not_max(self) -> None:
        """Verify score = supported/total, not max(entailment_scores)."""
        with patch("grader.get_nli_model", return_value=_make_nli(0.9)):
            result = grade_faithfulness_decomposed(
                "Claim one is true. Claim two is also true. Claim three too.", CONTEXT
            )
        assert result.score == 1.0