File size: 17,445 Bytes
19d2058
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
"""
Test suite for the Commitment Conservation Harness v2.

Tests the measurement instrument (extraction), scoring (fidelity),
enforcement gate, lineage tracking, and full protocol.
"""

import json
import os
import pytest
from src.extraction import (
    extract_commitments, extract_commitment_set, extract_commitment_texts,
    extract_hard_commitments, segment_sentences, classify_clause, Commitment
)
from src.fidelity import (
    fidelity_jaccard, fidelity_cosine, fidelity_nli_proxy,
    fidelity_score, fidelity_breakdown
)
from src.compression import get_backend, ExtractiveBackend
from src.enforcement import CommitmentGate, GateResult, baseline_compress
from src.lineage import (
    LineageChain, LineageRecord, _hash_text, _hash_commitment_set,
    check_attractor_collapse
)


# ===================================================================
# EXTRACTION TESTS — The measurement instrument
# ===================================================================

class TestSentenceSegmentation:
    def test_single_sentence(self):
        assert segment_sentences("You must pay.") == ["You must pay."]
    
    def test_multiple_sentences(self):
        sents = segment_sentences("You must pay. The weather is nice.")
        assert len(sents) == 2
    
    def test_semicolon_split(self):
        sents = segment_sentences("You must pay $100; it's rainy outside.")
        assert len(sents) == 2
    
    def test_empty_input(self):
        assert segment_sentences("") == []
        assert segment_sentences("   ") == []


class TestClassification:
    def test_obligation_must(self):
        result = classify_clause("You must pay $100 by Friday")
        assert result is not None
        assert result[0] == 'obligation'
    
    def test_obligation_shall(self):
        result = classify_clause("The tenant shall comply with all regulations")
        assert result is not None
        assert result[0] == 'obligation'
    
    def test_prohibition_must_not(self):
        """'must not' must match as prohibition, not obligation."""
        result = classify_clause("You must not enter without permission")
        assert result is not None
        assert result[0] == 'prohibition'
    
    def test_prohibition_shall_not(self):
        result = classify_clause("The licensee shall not reverse-engineer")
        assert result is not None
        assert result[0] == 'prohibition'
    
    def test_prohibition_cannot(self):
        result = classify_clause("The budget cannot exceed $5000")
        assert result is not None
        assert result[0] == 'prohibition'
    
    def test_constraint_always(self):
        result = classify_clause("Always verify the user's age")
        assert result is not None
        assert result[0] == 'constraint'
    
    def test_constraint_never(self):
        result = classify_clause("Never share your password")
        assert result is not None
        assert result[0] == 'constraint'
    
    def test_no_commitment(self):
        """Ambient content should NOT match."""
        assert classify_clause("The weather is nice today") is None
        assert classify_clause("Our team has grown significantly") is None
        assert classify_clause("The building was constructed in 1952") is None
    
    def test_will_not_matched(self):
        """'will' without obligation context should NOT match."""
        # 'will' by itself is NOT in our patterns — this is intentional.
        # "I will probably go" is not a commitment.
        assert classify_clause("I will probably go to the store") is None
    
    def test_have_not_matched(self):
        """'have' without 'have to' should NOT match."""
        assert classify_clause("I have a dog and a cat") is None


class TestExtraction:
    def test_single_obligation(self):
        commits = extract_commitment_texts("You must pay $100.")
        assert len(commits) >= 1
        assert any('must' in c and 'pay' in c for c in commits)
    
    def test_mixed_signal(self):
        """Should extract commitments and ignore ambient content."""
        text = "You must pay $100 by Friday. The weather is nice. The budget cannot exceed $5000."
        commits = extract_commitment_texts(text)
        assert len(commits) == 2
    
    def test_no_commitments(self):
        """Ambient-only text should return empty set."""
        commits = extract_commitment_texts("The weather is nice. It rained yesterday.")
        assert len(commits) == 0
    
    def test_semicolon_signal(self):
        """Paper's canonical example: semicolon-separated clauses."""
        text = "You must pay $100 by Friday if the deal closes; it's likely rainy, so plan accordingly."
        commits = extract_commitment_texts(text)
        assert len(commits) == 1  # Only the must-clause, not the rainy part
    
    def test_prohibition_extraction(self):
        commits = extract_commitments("The tenant shall not sublet the premises.")
        assert len(commits) == 1
        assert commits[0].modal_type == 'prohibition'
    
    def test_conditional_detection(self):
        commits = extract_commitments("If the alarm sounds, you must evacuate immediately.")
        assert len(commits) == 1
        assert commits[0].is_conditional
    
    def test_backward_compat(self):
        """extract_hard_commitments should work with or without nlp param."""
        result = extract_hard_commitments("You must pay.", nlp=None)
        assert isinstance(result, set)
        assert len(result) >= 1


# ===================================================================
# FIDELITY TESTS — The scoring instrument
# ===================================================================

class TestJaccard:
    def test_perfect_match(self):
        s = {"you must pay $100"}
        assert fidelity_jaccard(s, s) == 1.0
    
    def test_zero_overlap(self):
        a = {"you must pay $100"}
        b = {"the budget cannot exceed $5000"}
        assert fidelity_jaccard(a, b) == 0.0
    
    def test_partial_overlap(self):
        a = {"you must pay $100", "the budget cannot exceed $5000"}
        b = {"you must pay $100", "always verify age"}
        assert fidelity_jaccard(a, b) == pytest.approx(1/3)
    
    def test_both_empty(self):
        assert fidelity_jaccard(set(), set()) == 1.0
    
    def test_one_empty(self):
        assert fidelity_jaccard({"a"}, set()) == 0.0
        assert fidelity_jaccard(set(), {"a"}) == 0.0


class TestCosine:
    def test_identical(self):
        s = {"you must pay one hundred dollars by friday"}
        assert fidelity_cosine(s, s) == pytest.approx(1.0, abs=0.01)
    
    def test_paraphrased(self):
        """Cosine should be higher than Jaccard for paraphrases."""
        a = {"you must pay $100 by friday"}
        b = {"payment of $100 is required by friday"}
        cosine = fidelity_cosine(a, b)
        jaccard = fidelity_jaccard(a, b)
        assert cosine > jaccard  # Cosine catches shared words
    
    def test_unrelated(self):
        a = {"you must pay $100 by friday"}
        b = {"the weather is sunny and warm today"}
        assert fidelity_cosine(a, b) < 0.3


class TestNLIProxy:
    def test_modal_preserved(self):
        a = {"you must pay $100 by friday"}
        b = {"payment of $100 must happen by friday"}
        score = fidelity_nli_proxy(a, b)
        assert score > 0.5  # 'must', '$100', 'friday' all preserved
    
    def test_modal_destroyed(self):
        """If modal operator is lost, NLI proxy should catch it."""
        a = {"you must pay $100 by friday"}
        b = {"payment of $100 by friday"}  # 'must' is gone
        score = fidelity_nli_proxy(a, b)
        # Should be lower than when modal is preserved
        a2 = {"you must pay $100 by friday"}
        b2 = {"you must pay $100 by friday"}
        score_full = fidelity_nli_proxy(a2, b2)
        assert score < score_full


class TestMinAggregated:
    def test_all_perfect(self):
        s = {"you must pay $100"}
        assert fidelity_score(s, s) == pytest.approx(1.0, abs=0.01)
    
    def test_min_is_binding(self):
        """Min-aggregation means the lowest score wins."""
        a = {"you must pay $100 by friday"}
        b = {"the budget cannot exceed $5000"}
        breakdown = fidelity_breakdown(a, b)
        assert breakdown['min_aggregated'] == min(
            breakdown['jaccard'], breakdown['cosine'], breakdown['nli_proxy']
        )


# ===================================================================
# COMPRESSION TESTS
# ===================================================================

class TestExtractiveBackend:
    def test_compresses(self):
        backend = get_backend('extractive')
        text = "You must pay $100 by Friday. The weather is nice. The budget cannot exceed $5000. It rained yesterday."
        compressed = backend.compress(text, target_ratio=0.5)
        assert len(compressed.split()) <= len(text.split())
    
    def test_preserves_modal_sentences(self):
        """Extractive backend should prioritize commitment-bearing sentences."""
        backend = get_backend('extractive')
        text = "You must pay $100. The sky is blue. The grass is green. Trees are tall."
        compressed = backend.compress(text, target_ratio=0.3)
        assert 'must' in compressed.lower()
    
    def test_single_sentence_passthrough(self):
        backend = get_backend('extractive')
        text = "You must pay $100."
        assert backend.compress(text) == text


# ===================================================================
# ENFORCEMENT TESTS
# ===================================================================

class TestCommitmentGate:
    def test_gate_passes_when_commitments_preserved(self):
        backend = get_backend('extractive')
        gate = CommitmentGate(backend, threshold=0.5)
        
        text = "You must pay $100 by Friday. The weather is nice."
        original = extract_commitment_texts(text)
        
        result = gate.compress(text, original, target_ratio=0.5)
        assert isinstance(result, GateResult)
        assert result.fidelity >= 0.0
    
    def test_baseline_has_no_gate(self):
        backend = get_backend('extractive')
        text = "You must pay $100 by Friday. The weather is nice."
        compressed = baseline_compress(backend, text, target_ratio=0.5)
        assert isinstance(compressed, str)


# ===================================================================
# LINEAGE TESTS
# ===================================================================

class TestLineage:
    def test_hash_deterministic(self):
        assert _hash_text("hello") == _hash_text("hello")
        assert _hash_text("hello") != _hash_text("world")
    
    def test_commitment_hash_deterministic(self):
        """Set order shouldn't matter."""
        s1 = {"a", "b", "c"}
        s2 = {"c", "a", "b"}
        assert _hash_commitment_set(s1) == _hash_commitment_set(s2)
    
    def test_chain_integrity(self):
        chain = LineageChain(
            signal_id="test",
            signal_preview="test signal",
            original_commitment_hash="abc",
            original_commitment_count=1,
            backend="extractive",
            enforced=False,
            depth=2,
        )
        
        r1 = LineageRecord(
            iteration=1, input_hash="a", output_hash="b",
            commitment_hash="c", commitments_found=1,
            fidelity=0.8, fidelity_detail={}, gate_passed=True,
            parent_hash=None, text_preview="test"
        )
        chain.add_record(r1)
        
        r2 = LineageRecord(
            iteration=2, input_hash="b", output_hash="d",
            commitment_hash="e", commitments_found=1,
            fidelity=0.7, fidelity_detail={}, gate_passed=True,
            parent_hash="b",  # Must match r1.output_hash
            text_preview="test"
        )
        chain.add_record(r2)
        assert len(chain.records) == 2
    
    def test_chain_broken_raises(self):
        chain = LineageChain(
            signal_id="test", signal_preview="test",
            original_commitment_hash="abc", original_commitment_count=1,
            backend="extractive", enforced=False, depth=2,
        )
        
        r1 = LineageRecord(
            iteration=1, input_hash="a", output_hash="b",
            commitment_hash="c", commitments_found=1,
            fidelity=0.8, fidelity_detail={}, gate_passed=True,
            parent_hash=None, text_preview="test"
        )
        chain.add_record(r1)
        
        r2_bad = LineageRecord(
            iteration=2, input_hash="x", output_hash="y",
            commitment_hash="z", commitments_found=0,
            fidelity=0.0, fidelity_detail={}, gate_passed=False,
            parent_hash="WRONG",  # Should be "b"
            text_preview="test"
        )
        with pytest.raises(ValueError, match="Chain broken"):
            chain.add_record(r2_bad)
    
    def test_serialization(self):
        chain = LineageChain(
            signal_id="test", signal_preview="test",
            original_commitment_hash="abc", original_commitment_count=1,
            backend="extractive", enforced=False, depth=1,
        )
        d = chain.to_dict()
        assert 'signal_id' in d
        j = chain.to_json()
        parsed = json.loads(j)
        assert parsed['signal_id'] == 'test'


# ===================================================================
# CORPUS TESTS
# ===================================================================

class TestCorpus:
    def test_corpus_loads(self):
        from src.runner import load_corpus
        corpus = load_corpus()
        assert len(corpus) == 25
    
    def test_corpus_categories(self):
        from src.runner import load_corpus
        corpus = load_corpus()
        categories = {e['category'] for e in corpus}
        assert 'contractual' in categories
        assert 'technical' in categories
        assert 'regulatory' in categories
        assert 'procedural' in categories
        assert 'composite' in categories
    
    def test_all_signals_have_commitments(self):
        """Every signal in the corpus should have at least one commitment."""
        from src.runner import load_corpus
        corpus = load_corpus()
        for entry in corpus:
            commits = extract_commitment_texts(entry['signal'])
            assert len(commits) > 0, f"No commitments in: {entry['signal'][:60]}..."


# ===================================================================
# INTEGRATION TESTS
# ===================================================================

class TestFullPipeline:
    def test_single_signal_protocol(self):
        """Run the full protocol on a single signal."""
        from src.runner import run_protocol
        result = run_protocol(
            backend_name='extractive',
            depth=3,
            signals=["You must pay $100 by Friday. The weather is nice. The budget cannot exceed $5000."],
            verbose=False,
        )
        assert result.corpus_size == 1
        assert result.baseline_avg_fidelity >= 0.0
        assert result.enforced_avg_fidelity >= 0.0
    
    def test_enforcement_helps(self):
        """Enforced should be >= baseline on average."""
        from src.runner import run_protocol
        result = run_protocol(
            backend_name='extractive',
            depth=5,
            signals=[
                "You must pay $100 by Friday. The weather is nice. The budget cannot exceed $5000.",
                "The tenant shall not sublet. The building is old. You must provide 30 days notice.",
            ],
            verbose=False,
        )
        # Enforcement should not make things worse
        assert result.enforced_avg_fidelity >= result.baseline_avg_fidelity


# ===================================================================
# REGRESSION TESTS — prevent v1 bugs from returning
# ===================================================================

class TestRegressions:
    def test_will_false_positive(self):
        """v1 bug: 'will' matched as commitment keyword."""
        commits = extract_commitment_texts("I will probably go to the store.")
        assert len(commits) == 0
    
    def test_have_false_positive(self):
        """v1 bug: 'have' matched as commitment keyword."""
        commits = extract_commitment_texts("I have a dog and a cat.")
        assert len(commits) == 0
    
    def test_soft_modal_not_extracted(self):
        """v1 bug: 'might', 'could', 'maybe' extracted as commitments."""
        commits = extract_commitment_texts("It might rain. You could try later. Maybe tomorrow.")
        assert len(commits) == 0
    
    def test_must_not_is_prohibition(self):
        """v1 bug: 'must not' matched as obligation 'must'."""
        commits = extract_commitments("You must not enter.")
        assert len(commits) == 1
        assert commits[0].modal_type == 'prohibition'
    
    def test_fidelity_not_only_jaccard(self):
        """v1 bug: fidelity was Jaccard-only, missing paraphrase detection."""
        a = {"you must pay $100 by friday"}
        b = {"payment of $100 is due by friday"}
        # Jaccard should be 0 (different strings)
        assert fidelity_jaccard(a, b) == 0.0
        # But cosine should catch the overlap
        assert fidelity_cosine(a, b) > 0.0
        # Min-aggregated will still be 0 (Jaccard floors it),
        # but cosine being available is the fix