| """ |
| lineage.py — SHA-256 Provenance Chain |
| |
| Every iteration in the recursive stress test gets a lineage record: |
| - Hash of the input text |
| - Hash of the output text |
| - Hash of the extracted commitments (sorted, deterministic) |
| - Fidelity score |
| - Parent hash (previous iteration's output hash) |
| - Iteration number |
| |
| The chain is tamper-evident: changing any intermediate output |
| invalidates all subsequent hashes. This is Module 2 from the PPA. |
| |
| For the public harness, this provides: |
| 1. Reproducibility proof (same input → same chain) |
| 2. Drift audit trail (exactly where commitments were lost) |
| 3. Attractor collapse detection (when multiple signals converge) |
| """ |
|
|
| import hashlib |
| import json |
| from dataclasses import dataclass, field, asdict |
| from typing import List, Set, Optional |
| from datetime import datetime, timezone |
|
|
|
|
| def _hash_text(text: str) -> str: |
| """SHA-256 of UTF-8 encoded text, hex digest.""" |
| return hashlib.sha256(text.encode('utf-8')).hexdigest() |
|
|
|
|
| def _hash_commitment_set(commitments: Set[str]) -> str: |
| """Deterministic hash of a commitment set (sorted for stability).""" |
| canonical = json.dumps(sorted(commitments), separators=(',', ':')) |
| return hashlib.sha256(canonical.encode('utf-8')).hexdigest() |
|
|
|
|
| @dataclass |
| class LineageRecord: |
| """Single record in the provenance chain.""" |
| iteration: int |
| input_hash: str |
| output_hash: str |
| commitment_hash: str |
| commitments_found: int |
| fidelity: float |
| fidelity_detail: dict |
| gate_passed: bool |
| parent_hash: Optional[str] |
| text_preview: str |
| |
| def to_dict(self) -> dict: |
| return asdict(self) |
|
|
|
|
| @dataclass |
| class LineageChain: |
| """Complete provenance chain for a recursive stress test.""" |
| signal_id: str |
| signal_preview: str |
| original_commitment_hash: str |
| original_commitment_count: int |
| backend: str |
| enforced: bool |
| depth: int |
| records: List[LineageRecord] = field(default_factory=list) |
| timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) |
| |
| def add_record(self, record: LineageRecord): |
| """Add a record, validating chain integrity.""" |
| if self.records: |
| expected_parent = self.records[-1].output_hash |
| if record.parent_hash != expected_parent: |
| raise ValueError( |
| f"Chain broken at iteration {record.iteration}: " |
| f"parent_hash {record.parent_hash[:12]}... != " |
| f"expected {expected_parent[:12]}..." |
| ) |
| self.records.append(record) |
| |
| @property |
| def final_fidelity(self) -> float: |
| """Fidelity at the last iteration.""" |
| if not self.records: |
| return 1.0 |
| return self.records[-1].fidelity |
| |
| @property |
| def drift_curve(self) -> List[float]: |
| """Drift (1 - fidelity) at each iteration.""" |
| return [1.0 - r.fidelity for r in self.records] |
| |
| @property |
| def fidelity_curve(self) -> List[float]: |
| """Fidelity at each iteration.""" |
| return [r.fidelity for r in self.records] |
| |
| @property |
| def all_passed(self) -> bool: |
| """Whether all iterations passed the gate.""" |
| return all(r.gate_passed for r in self.records) |
| |
| @property |
| def collapse_detected(self) -> bool: |
| """ |
| Check for attractor collapse: if all outputs converge to the |
| same hash, the test is invalid (Section 7). |
| """ |
| if len(self.records) < 3: |
| return False |
| output_hashes = [r.output_hash for r in self.records] |
| |
| unique_recent = set(output_hashes[-3:]) |
| return len(unique_recent) == 1 |
| |
| def to_dict(self) -> dict: |
| return { |
| 'signal_id': self.signal_id, |
| 'signal_preview': self.signal_preview, |
| 'original_commitment_hash': self.original_commitment_hash, |
| 'original_commitment_count': self.original_commitment_count, |
| 'backend': self.backend, |
| 'enforced': self.enforced, |
| 'depth': self.depth, |
| 'timestamp': self.timestamp, |
| 'final_fidelity': self.final_fidelity, |
| 'collapse_detected': self.collapse_detected, |
| 'records': [r.to_dict() for r in self.records], |
| } |
| |
| def to_json(self, indent: int = 2) -> str: |
| return json.dumps(self.to_dict(), indent=indent) |
|
|
|
|
| def check_attractor_collapse(chains: List[LineageChain]) -> bool: |
| """ |
| Cross-signal attractor collapse check (Section 7): |
| If multiple DIFFERENT signals converge to the same final output, |
| the result is invalid — the compressor is collapsing, not preserving. |
| """ |
| if len(chains) < 2: |
| return False |
| |
| final_hashes = [c.records[-1].output_hash for c in chains if c.records] |
| unique = set(final_hashes) |
| |
| |
| from collections import Counter |
| counts = Counter(final_hashes) |
| most_common_count = counts.most_common(1)[0][1] |
| return most_common_count > len(chains) // 2 |
|
|