| | EPISTEMIC THREAT MODEL & VALIDATOR ONTOLOGY |
| |
|
| | FORMAL THREAT MODEL (STRIDE-E: Epistemic Extension) |
| |
|
| | Spoofing - Identity Subversion |
| |
|
| | ``` |
| | Threat: n8n workflow impersonates validator |
| | Impact: False authority injection into ledger |
| | Mitigation: |
| | 1. Cryptographic validator identity (PKI-based) |
| | 2. Validator role attestation signed by IRE |
| | 3. Workflow-to-validator binding in ledger metadata |
| |
|
| | Detection: Mismatch between workflow signature and validator claim |
| | Severity: Critical (sovereignty breach) |
| | ``` |
| |
|
| | Tampering - Evidence Manipulation |
| |
|
| | ``` |
| | Threat: n8n alters evidence pre-canonicalization |
| | Impact: Garbage-in, narrative-out |
| | Mitigation: |
| | 1. Raw evidence fingerprinting (content hash before processing) |
| | 2. Evidence lineage tracking in n8n workflow logs |
| | 3. IRE detects fingerprint mismatches |
| |
|
| | Detection: Pre-canonical hash ≠ post-canonical derivation |
| | Severity: Critical (truth contamination) |
| | ``` |
| |
|
| | Repudiation - Epistemic Deniability |
| |
|
| | ``` |
| | Threat: n8n denies triggering detection that found suppression |
| | Impact: System loses accountability for its own findings |
| | Mitigation: |
| | 1. Non-repudiable workflow execution proofs |
| | 2. Watermarked intermediate results |
| | 3. Cross-referenced timestamp chains |
| |
|
| | Detection: Missing execution proof for detection result |
| | Severity: High (accountability loss) |
| | ``` |
| |
|
| | Information Disclosure - Pattern Leakage |
| |
|
| | ``` |
| | Threat: Detection patterns leaked through n8n logs |
| | Impact: Adversaries learn system |
| | Mitigation: |
| | 1. Threshold abstraction (IRE returns categories, not scores) |
| | 2. Differential privacy on aggregated results |
| | 3. Ephemeral detection sessions |
| |
|
| | Detection: Raw thresholds appear in orchestration logs |
| | Severity: Medium (detection model compromise) |
| | ``` |
| |
|
| | Denial of Service - Epistemic Exhaustion |
| |
|
| | ``` |
| | Threat: Flood system with nonsense to waste detection capacity |
| | Impact: Real patterns missed due to noise saturation |
| | Mitigation: |
| | 1. Epistemic rate limiting per source |
| | 2. Credibility-based throttling |
| | 3. Detection priority queuing |
| |
|
| | Detection: High-volume, low-signal detection requests |
| | Severity: Medium (resource exhaustion) |
| | ``` |
| |
|
| | Elevation of Privilege - Ontology Hijacking |
| |
|
| | ``` |
| | Threat: n8n attempts to modify lens/method definitions |
| | Impact: Epistemic framework compromise |
| | Mitigation: |
| | 1. Immutable ontology registry (signed by originators) |
| | 2. Versioned ontology with migration proofs |
| | 3. No runtime ontology modification API |
| |
|
| | Detection: Attempt to modify lens/method definitions |
| | Severity: Critical (sovereignty destruction) |
| | ``` |
| |
|
| | Epistemic Drift - Gradual Corruption |
| |
|
| | ``` |
| | Threat: Slow, subtle contamination of detection patterns |
| | Impact: System gradually aligns with preferred narrative |
| | Mitigation: |
| | 1. Drift detection via historical self-comparison |
| | 2. Cross-validation with frozen model versions |
| | 3. Epistemic checksums on detection algorithms |
| |
|
| | Detection: Statistical divergence from historical baseline |
| | Severity: High (stealth corruption) |
| | ``` |
| |
|
| | VALIDATOR ONTOLOGY (Role-Based Authority) |
| |
|
| | Validator Archetypes |
| |
|
| | ``` |
| | 1. Human-Sovereign Validator |
| | Role: Individual sovereignty preservation |
| | Authority: Can veto any ledger commit |
| | Identity: Self-sovereign cryptographic identity |
| | Quorum: Always required (cannot be automated away) |
| | Attestation: "I have reviewed and assert my sovereignty" |
| |
|
| | 2. System-Epistemic Validator |
| | Role: Detection methodology integrity |
| | Authority: Validates detection process adherence |
| | Identity: Cryptographic hash of detection pipeline config |
| | Quorum: Required for automated commits |
| | Attestation: "Detection executed per published methodology" |
| |
|
| | 3. Source-Provenance Validator |
| | Role: Evidence chain custody |
| | Authority: Validates evidence hasn |
| | Identity: Hash of evidence handling workflow |
| | Quorum: Optional but recommended |
| | Attestation: "Evidence chain intact from source to canonicalization" |
| |
|
| | 4. Temporal-Integrity Validator |
| | Role: Time-bound execution verification |
| | Authority: Validates timestamps and execution windows |
| | Identity: Time-locked cryptographic proof |
| | Quorum: Required for time-sensitive commits |
| | Attestation: "Execution occurred within valid time window" |
| |
|
| | 5. Community-Plurality Validator |
| | Role: Cross-interpreter consensus |
| | Authority: Requires multiple human interpretations |
| | Identity: Set of interpreter identities + attestation |
| | Quorum: Variable based on interpretation count |
| | Attestation: "Multiple independent interpretations concur" |
| | ``` |
| |
|
| | Validator Configuration Schema |
| |
|
| | ```json |
| | { |
| | "validator_id": "urn:ire:validator:human_sovereign:sha256-abc123", |
| | "archetype": "human_sovereign", |
| | "authority_scope": ["ledger_commit", "evidence_rejection"], |
| | "quorum_requirements": { |
| | "minimum": 1, |
| | "maximum": null, |
| | "exclusivity": ["system_epistemic"] |
| | }, |
| | "attestation_format": { |
| | "required_fields": ["review_timestamp", "sovereignty_assertion"], |
| | "signature_algorithm": "ed25519", |
| | "expiration": "24h" |
| | }, |
| | "epistemic_constraints": { |
| | "cannot_override": ["detection_results", "canonical_hash"], |
| | "can_reject_for": ["procedural_violation", "sovereignty_concern"] |
| | } |
| | } |
| | ``` |
| |
|
| | Validator Quorum Calculus |
| |
|
| | ```python |
| | def calculate_quorum_satisfaction(validators: List[Validator], commit_type: str) -> bool: |
| | """Calculate if validator quorum is satisfied for commit type""" |
| | |
| | archetype_counts = Counter(v.archetype for v in validators) |
| | |
| | # Base requirements |
| | requirements = { |
| | "ledger_commit": { |
| | "human_sovereign": 1, |
| | "system_epistemic": 1, |
| | "source_provenance": 0, # optional |
| | "temporal_integrity": 1, |
| | "community_plurality": 0 # depends on interpretation count |
| | }, |
| | "evidence_ingestion": { |
| | "human_sovereign": 0, |
| | "system_epistemic": 1, |
| | "source_provenance": 1, |
| | "temporal_integrity": 1, |
| | "community_plurality": 0 |
| | }, |
| | "detection_escalation": { |
| | "human_sovereign": 1, |
| | "system_epistemic": 1, |
| | "source_provenance": 0, |
| | "temporal_integrity": 1, |
| | "community_plurality": 1 # required for high-stakes escalations |
| | } |
| | } |
| | |
| | req = requirements[commit_type] |
| | |
| | # Check each archetype requirement |
| | for archetype, required_count in req.items(): |
| | if archetype_counts.get(archetype, 0) < required_count: |
| | return False |
| | |
| | # Check exclusivity constraints |
| | for validator in validators: |
| | for exclusive_archetype in validator.quorum_requirements.get("exclusivity", []): |
| | if exclusive_archetype in archetype_counts: |
| | return False |
| | |
| | return True |
| | ``` |
| |
|
| | LEDGER SCHEMA HARDENING |
| |
|
| | Extended Block Schema |
| |
|
| | ```json |
| | { |
| | "block": { |
| | "header": { |
| | "id": "blk_timestamp_hash", |
| | "prev": "previous_block_hash", |
| | "timestamp": "ISO8601_with_nanoseconds", |
| | "epistemic_epoch": 1, |
| | "ontology_version": "sha256:ontology_hash" |
| | }, |
| | "body": { |
| | "nodes": [RealityNode], |
| | "detection_context": { |
| | "workflow_hash": "sha256:n8n_workflow_def", |
| | "execution_window": { |
| | "not_before": "timestamp", |
| | "not_after": "timestamp", |
| | "time_proof": "signature_from_temporal_validator" |
| | }, |
| | "threshold_used": "abstract_category_not_numeric" |
| | } |
| | }, |
| | "validations": { |
| | "attestations": [ |
| | { |
| | "validator_id": "urn:ire:validator:...", |
| | "archetype": "human_sovereign", |
| | "attestation": "cryptographic_signature", |
| | "scope": ["ledger_commit"], |
| | "expires": "timestamp" |
| | } |
| | ], |
| | "quorum_satisfied": true, |
| | "quorum_calc": { |
| | "required": {"human_sovereign": 1, "system_epistemic": 1}, |
| | "present": {"human_sovereign": 1, "system_epistemic": 1} |
| | } |
| | }, |
| | "integrity_marks": { |
| | "evidence_fingerprint": "sha256_of_raw_content", |
| | "detection_watermark": "nonce_based_on_workflow_id", |
| | "epistemic_checksum": "hash_of_detection_logic_version" |
| | } |
| | } |
| | } |
| | ``` |
| |
|
| | Detection Threshold Abstraction Layer |
| |
|
| | ```python |
| | class EpistemicThresholdInterface: |
| | """Abstract threshold interface - n8n never sees numeric thresholds""" |
| | |
| | def __init__(self, ire_client): |
| | self.ire = ire_client |
| | |
| | def should_escalate(self, detection_results: Dict) -> Dict: |
| | """IRE decides escalation, returns abstract category""" |
| | response = self.ire.post("/ire/detect/evaluate", { |
| | "results": detection_results, |
| | "return_format": "abstract_category" |
| | }) |
| | |
| | return { |
| | "escalation_recommended": response.get("category") == "high_confidence", |
| | "confidence_level": response.get("confidence_label"), # "high"/"medium"/"low" |
| | "next_action": response.get("recommended_action"), |
| | # NO NUMERIC THRESHOLDS EXPOSED |
| | # NO RAW SCORES EXPOSED |
| | } |
| | |
| | def get_validation_requirements(self, category: str) -> Dict: |
| | """Map escalation category to validator requirements""" |
| | mapping = { |
| | "high_confidence": { |
| | "human_sovereign": 2, |
| | "system_epistemic": 1, |
| | "community_plurality": 1 |
| | }, |
| | "medium_confidence": { |
| | "human_sovereign": 1, |
| | "system_epistemic": 1 |
| | }, |
| | "low_confidence": { |
| | "system_epistemic": 1 |
| | } |
| | } |
| | return mapping.get(category, {}) |
| | ``` |
| |
|
| | Time-Window Enforcement |
| |
|
| | ```python |
| | class TemporalIntegrityEnforcer: |
| | """Enforce time-bound execution with cryptographic proofs""" |
| | |
| | def create_execution_window(self, duration_hours: int = 24) -> Dict: |
| | """Create cryptographically bound execution window""" |
| | window_id = f"window_{uuid.uuid4()}" |
| | not_before = datetime.utcnow() |
| | not_after = not_before + timedelta(hours=duration_hours) |
| | |
| | # Create time-locked proof |
| | window_proof = { |
| | "window_id": window_id, |
| | "not_before": not_before.isoformat() + "Z", |
| | "not_after": not_after.isoformat() + "Z", |
| | "issuer": "ire_temporal_validator", |
| | "signature": self._sign_temporal_window(window_id, not_before, not_after) |
| | } |
| | |
| | return window_proof |
| | |
| | def validate_within_window(self, |
| | action_timestamp: str, |
| | window_proof: Dict) -> bool: |
| | """Validate action occurred within execution window""" |
| | # Verify signature |
| | if not self._verify_signature(window_proof): |
| | return False |
| | |
| | # Parse timestamps |
| | action_time = datetime.fromisoformat(action_timestamp.replace( |
| | not_before = datetime.fromisoformat(window_proof[ |
| | not_after = datetime.fromisoformat(window_proof[ |
| | |
| | # Check bounds |
| | return not_before <= action_time <= not_after |
| | |
| | def detect_time_anomalies(self, workflow_executions: List[Dict]) -> List[Dict]: |
| | """Detect temporal manipulation patterns""" |
| | anomalies = [] |
| | |
| | for i, execution in enumerate(workflow_executions): |
| | # Check for reverse time flow |
| | if i > 0: |
| | prev_time = datetime.fromisoformat( |
| | workflow_executions[i-1][ |
| | ) |
| | curr_time = datetime.fromisoformat( |
| | execution[ |
| | ) |
| | if curr_time < prev_time: |
| | anomalies.append({ |
| | "type": "reverse_time_flow", |
| | "execution_id": execution[ |
| | "anomaly": f"Time went backwards: {curr_time} < {prev_time}" |
| | }) |
| | |
| | # Check execution duration anomalies |
| | if |
| | expected_duration = self._get_expected_duration(execution[ |
| | if execution[ |
| | anomalies.append({ |
| | "type": "suspicious_duration", |
| | "execution_id": execution[ |
| | "anomaly": f"Duration {execution['duration']} exceeds expected {expected_duration}" |
| | }) |
| | |
| | return anomalies |
| | ``` |
| |
|
| | Semantic Laundering Defense |
| |
|
| | ```python |
| | class EpistemicIntegrityGuard: |
| | """Defend against semantic laundering attacks""" |
| | |
| | def __init__(self): |
| | self.similarity_clusters = defaultdict(list) |
| | self.source_frequency = defaultdict(int) |
| | |
| | def check_semantic_laundering(self, |
| | content_hash: str, |
| | raw_content: str, |
| | source_id: str, |
| | workflow_id: str) -> Dict: |
| | """Check for semantic laundering patterns""" |
| | |
| | # Check source frequency |
| | self.source_frequency[source_id] += 1 |
| | if self.source_frequency[source_id] > 100: # Threshold |
| | return { |
| | "risk": "high", |
| | "reason": "Excessive submissions from single source", |
| | "action": "throttle" |
| | } |
| | |
| | # Check similarity clusters |
| | content_vector = self._vectorize(raw_content) |
| | similar = self._find_similar(content_vector) |
| | |
| | if similar: |
| | cluster_id = similar[0][ |
| | self.similarity_clusters[cluster_id].append({ |
| | "content_hash": content_hash, |
| | "timestamp": datetime.utcnow().isoformat(), |
| | "workflow_id": workflow_id |
| | }) |
| | |
| | # Check cluster growth rate |
| | if len(self.similarity_clusters[cluster_id]) > 10: |
| | return { |
| | "risk": "medium", |
| | "reason": "Rapid growth of similar content cluster", |
| | "action": "flag_for_review" |
| | } |
| | |
| | return {"risk": "low", "reason": "No laundering patterns detected"} |
| | |
| | def _vectorize(self, content: str) -> List[float]: |
| | """Create semantic vector (simplified - use real embeddings in production)""" |
| | # Simple bag-of-words for demonstration |
| | words = content.lower().split() |
| | word_counts = Counter(words) |
| | vector = [word_counts.get(w, 0) for w in self.vocabulary] |
| | return vector |
| | |
| | def _find_similar(self, vector: List[float], threshold: float = 0.8): |
| | """Find similar vectors in existing clusters""" |
| | # Simplified similarity search |
| | for cluster_id, items in self.similarity_clusters.items(): |
| | # In production, use proper vector similarity |
| | if random.random() > 0.5: # Placeholder |
| | return items |
| | return None |
| | ``` |
| |
|
| | IMPLEMENTATION ROADMAP |
| |
|
| | Phase 1: Core Sovereignty (Weeks 1-2) |
| |
|
| | 1. Implement validator PKI and attestation framework |
| | 2. Deploy threshold abstraction layer |
| | 3. Add time-window enforcement |
| | 4. Basic semantic laundering detection |
| |
|
| | Phase 2: Epistemic Defense (Weeks 3-4) |
| |
|
| | 1. Full STRIDE-E threat model implementation |
| | 2. Validator quorum calculus integration |
| | 3. Ledger schema hardening |
| | 4. Cross-validation with frozen models |
| |
|
| | Phase 3: Operational Resilience (Weeks 5-6) |
| |
|
| | 1. Drift detection and alerting |
| | 2. Validator role rotation policies |
| | 3. Recovery procedures for compromise |
| | 4. Full audit trail integration |
| |
|
| | Phase 4: Community Governance (Weeks 7-8) |
| |
|
| | 1. Validator reputation system |
| | 2. Plurality-based decision frameworks |
| | 3. Cross-interpreter reconciliation |
| | 4. Sovereign identity integration |
| |
|
| | VERIFICATION PROTOCOL ENHANCEMENT |
| |
|
| | Daily Sovereignty Check |
| |
|
| | ```python |
| | def daily_sovereignty_audit(): |
| | """Daily audit to ensure no boundary violations""" |
| | |
| | checks = [ |
| | # 1. Check n8n for epistemic logic |
| | scan_n8n_workflows_for_detection_logic(), |
| | |
| | # 2. Check IRE for orchestration logic |
| | scan_ire_for_scheduling_logic(), |
| | |
| | # 3. Verify threshold abstraction |
| | verify_no_numeric_thresholds_in_n8n(), |
| | |
| | # 4. Validate time-window adherence |
| | verify_all_executions_within_windows(), |
| | |
| | # 5. Check validator quorum compliance |
| | verify_all_commits_have_proper_quorum(), |
| | |
| | # 6. Detect semantic laundering |
| | run_semantic_laundering_detection(), |
| | |
| | # 7. Verify workflow definition integrity |
| | hash_and_verify_workflow_definitions(), |
| | |
| | # 8. Check for drift |
| | compare_with_frozen_model_baseline() |
| | ] |
| | |
| | sovereignty_score = sum(1 for check in checks if check.passed) / len(checks) |
| | |
| | return { |
| | "sovereignty_score": sovereignty_score, |
| | "failed_checks": [c for c in checks if not c.passed], |
| | "recommendations": generate_remediation_plan(failed_checks) |
| | } |
| | ``` |
| |
|
| | Weekly Epistemic Integrity Report |
| |
|
| | ```python |
| | def weekly_epistemic_integrity_report(): |
| | """Weekly comprehensive epistemic health report""" |
| | |
| | report = { |
| | "temporal_integrity": { |
| | "execution_windows_violated": count_window_violations(), |
| | "time_anomalies_detected": detect_time_anomalies(), |
| | "average_execution_latency": calculate_latency() |
| | }, |
| | "validator_health": { |
| | "active_validators": count_active_validators(), |
| | "quorum_satisfaction_rate": calculate_quorum_rate(), |
| | "validator_role_diversity": measure_diversity() |
| | }, |
| | "detection_quality": { |
| | "drift_from_baseline": measure_drift(), |
| | "false_positive_rate": calculate_fpr(), |
| | "escalation_accuracy": measure_escalation_accuracy() |
| | }, |
| | "boundary_integrity": { |
| | "n8n_epistemic_contamination": detect_contamination(), |
| | "ire_orchestration_leakage": detect_leakage(), |
| | "workflow_definition_changes": track_workflow_changes() |
| | }, |
| | "semantic_defenses": { |
| | "laundering_attempts": count_laundering_attempts(), |
| | "similarity_clusters": analyze_clusters(), |
| | "source_credibility": assess_source_credibility() |
| | } |
| | } |
| | |
| | # Calculate overall epistemic health score |
| | report["epistemic_health_score"] = calculate_health_score(report) |
| | |
| | return report |
| | ``` |